#pragma once /* ABSTRACT DEVICE COMPONENTS LIBRARY ASIO-library implementation of network service */ #ifdef USE_ASIO_LIBRARY #include #include "adc_netservice.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #ifdef USE_OPENSSL_WITH_ASIO #include #include #endif #include #include "adc_netmsg.h" namespace adc::traits { // still only TCP, UDP and UNIX template concept adc_asio_inet_proto_c = requires { requires std::derived_from || std::derived_from || std::derived_from || std::derived_from; }; } // namespace adc::traits namespace adc::impl { template class AdcNetServiceASIO : public InetProtoT { public: using socket_t = typename InetProtoT::socket; using endpoint_t = typename InetProtoT::endpoint; using high_level_socket_t = HighLevelSocketT; #ifdef USE_OPENSSL_WITH_ASIO static_assert(std::is_same_v ? true : std::is_same_v>, "ONLY BASIC SOCKETS AND SSL::STREAM ARE SUPPORTED!!!"); static constexpr bool IsBasicSocket = std::is_same_v; #else static_assert(std::is_same_v, "HighLevelSocketT AND InetProtoT::socket TYPES MUST BE THE SAME!!!"); #endif using streambuff_iter_t = asio::buffers_iterator; AdcNetServiceASIO(high_level_socket_t& sock) : _socket(sock) {} virtual ~AdcNetServiceASIO() = default; template CompletionTokenT> auto asyncConnect(const endpoint_t& endpoint, const TimeoutT& timeout, CompletionTokenT&& token) { auto timer = getDeadlineTimer(timeout); enum { starting, cancel_timer }; // wrapper return asio::async_compose( [timer = std::move(timer), state = starting, &endpoint, this](auto& self, const std::error_code& ec = {}) mutable { if (!ec) { switch (state) { case starting: state = cancel_timer; if constexpr (AdcNetServiceASIO::IsBasicSocket) { return _socket.async_connect(endpoint, std::move(self)); } else { return _socket.lowest_layer().async_connect(endpoint, std::move(self)); } break; case cancel_timer: timer->cancel(); break; default: break; } } self.complete(ec); }, token, _socket); } template CompletionTokenT> auto asynSend(const NetMessageT& msg, const TimeoutT& timeout, CompletionTokenT&& token) { enum { starting, cancel_timer }; // create buffer sequence std::vector buff; std::ranges::for_each(msg.template bytesView>(), [&buff](const auto& el) { buff.emplace_back(el); }); auto timer = getDeadlineTimer(timeout); // wrapper return asio::async_compose( [buff = std::move(buff), timer = std::move(timer), state = starting, this]( auto& self, const std::error_code& ec = {}, size_t = 0) mutable { if (!ec) { switch (state) { case starting: state = cancel_timer; if constexpr (std::derived_from< socket_t, asio::basic_stream_socket>) { return asio::async_write(_socket, buff, std::move(self)); } else if constexpr (std::derived_from>) { return _socket.async_send(buff, std::move(self)); } else if constexpr (std::derived_from>) { return _socket.async_send(buff, std::move(self)); } else { static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!"); } break; case cancel_timer: timer->cancel(); break; default: break; } } self.complete(ec); }, token, _socket); } template auto asyncReceive(const TimeoutT& timeout, CompletionTokenT&& token) { enum { starting, cancel_timer }; std::unique_ptr out_flags; auto timer = getDeadlineTimer(timeout); // armed timer return asio::async_compose( [timer = std::move(timer), out_flags = std::move(out_flags), state = starting, this]( auto& self, const std::error_code& ec = {}, size_t = 0) mutable { if (!ec) { switch (state) { case starting: state = cancel_timer; if constexpr (std::derived_from< socket_t, asio::basic_stream_socket>) { return asio::async_read_until( _socket, _streamBuffer, [this](auto begin, auto end) { return this->matchCondition(begin, end); }, std::move(self)); } else if constexpr (std::derived_from>) { return _socket.receive(_streamBuffer, std::move(self)); // datagram, so it should be received at once } else if constexpr (std::derived_from>) { return _socket.receive(_streamBuffer, *out_flags, std::move(self)); // datagram, so it should be received at once } else { static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!"); } break; case cancel_timer: timer->cancel(); break; default: break; } auto begin_it = streambuff_iter_t::begin(_streamBuffer.data()); auto end_it = begin_it + _streamBuffer.data().size(); // check for byte sequence is valid byte sequence and find the limits // (stream buffer may contain number of bytes more than requred by protocol) auto res = this->matchCondition(begin_it, end_it); if (!res.second) { self.complete(std::make_error_code(std::errc::protocol_error), NetMessageT()); // return an empty message } else { auto nbytes = std::distance(begin_it, res.first); NetMessageT msg; auto msg_it = this->fromLowLevel(begin_it, res.first); msg.setFromBytes(msg_it.first, msg_it.second); _streamBuffer.consume(nbytes); self.complete(ec, msg); } } else { self.complete(ec, NetMessageT()); // return an empty message return; } }, token, _socket); } template auto connect(const endpoint_t& endpoint, const TimeoutT& timeout) { std::future ftr = asyncConnect(endpoint, timeout, asio::use_future); ftr.get(); } template auto send(const NetMessageT& msg, const TimeoutT& timeout) { std::future ftr = asyncSend(msg, timeout, asio::use_future); ftr.get(); } template auto receive(const TimeoutT& timeout) { std::future ftr = asyncReceive(timeout, asio::use_future); return ftr.get(); } std::error_code close(asio::socket_base::shutdown_type stype = asio::socket_base::shutdown_both) { std::error_code ec; if constexpr (AdcNetServiceASIO::IsBasicSocket) { _socket.shutdown(stype, ec); if (!ec) { _socket.close(ec); } } else { _socket.shutdown(ec); // shutdown OpenSSL stream if (!ec) { _socket.lowest_layer().shutdown(stype, ec); if (!ec) { _socket.lowest_layer().close(ec); } } } return ec; } protected: high_level_socket_t& _socket; asio::streambuf _streamBuffer; template std::unique_ptr getDeadlineTimer(const TimeoutT& timeout, bool arm = true) { std::unique_ptr timer(_socket.get_executor()); if (arm) { timer->expires_after(timeout); timer->async_wait([this](const std::error_code& ec) { if (!ec) { _socket.cancel(std::make_error_code(std::errc::timed_out)); } }); } return timer; } }; typedef AdcNetService> AdcNetServiceAsioTcp; typedef AdcNetService> AdcNetServiceAsioLocalSeqPack; typedef AdcNetService> AdcNetServiceAsioLocalStream; } // namespace adc::impl #endif