#pragma once /* ABSTRACT DEVICE COMPONENTS LIBRARY ASIO-library implementation of network service */ #include #ifdef USE_ASIO_LIBRARY #include #include #include #include #include #include #include #include #include #include #include #include namespace adc::impl { template class AdcNetServiceASIO : public InetProtoT { public: using socket_t = typename InetProtoT::socket; using endpoint_t = typename InetProtoT::endpoint; using streambuff_iter_t = asio::buffers_iterator; template AdcNetServiceASIO(socket_t& sock, CtorArgTs&&... ctor_args) : InetProtoT(std::forward(ctor_args)...), _socket(sock) { } virtual ~AdcNetServiceASIO() = default; template CompletionTokenT> auto asyncConnect(const endpoint_t& endpoint, const TimeoutT& timeout, CompletionTokenT&& token) { auto timer = getDeadlineTimer(timeout); enum { starting, cancel_timer }; // wrapper return asio::async_compose( [timer = std::move(timer), state = starting, &endpoint, this](auto& self, const std::error_code& ec = {}) mutable { if (!ec) { switch (state) { case starting: state = cancel_timer; return _socket.async_connect(endpoint, std::move(self)); break; case cancel_timer: timer->cancel(); break; default: break; } } self.complete(ec); }, token, _socket); } template CompletionTokenT> auto asynSend(const NetMessageT& msg, const TimeoutT& timeout, CompletionTokenT&& token) { enum { starting, cancel_timer }; // create buffer sequence std::vector buff; std::ranges::for_each(msg.template bytesView>(), [&buff](const auto& el) { buff.emplace_back(el); }); auto timer = getDeadlineTimer(timeout); // wrapper return asio::async_compose( [buff = std::move(buff), timer = std::move(timer), state = starting, this]( auto& self, const std::error_code& ec = {}, size_t sz = 0) mutable { if (!ec) { switch (state) { case starting: if constexpr (std::derived_from< socket_t, asio::basic_stream_socket>) { return asio::async_write(_socket, buff, std::move(self)); } else if constexpr (std::derived_from>) { return _socket.async_send(buff, std::move(self)); } else if constexpr (std::derived_from>) { return _socket.async_send(buff, std::move(self)); } else { static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!"); } break; case cancel_timer: timer->cancel(); break; default: break; } } self.complete(ec); }, token, _socket); } template auto asyncReceive(const TimeoutT& timeout, CompletionTokenT&& token) { enum { starting, cancel_timer }; std::unique_ptr out_flags; auto timer = getDeadlineTimer(timeout); // armed timer return asio::async_compose( [timer = std::move(timer), out_flags = std::move(out_flags), state = starting, this]( auto& self, const std::error_code& ec = {}, size_t sz = 0) mutable { if (!ec) { switch (state) { case starting: if constexpr (std::derived_from< socket_t, asio::basic_stream_socket>) { return asio::async_read_until( _socket, _streamBuffer, [this](auto begin, auto end) { this->matchCondition(begin, end); }, std::move(self)); } else if constexpr (std::derived_from>) { return _socket.receive(_streamBuffer, std::move(self)); // datagram, so it should be received at once } else if constexpr (std::derived_from>) { return _socket.receive(_streamBuffer, *out_flags, std::move(self)); // datagram, so it should be received at once } else { static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!"); } break; case cancel_timer: timer->cancel(); break; default: break; } auto begin_it = streambuff_iter_t::begin(_streamBuffer.data()); auto end_it = begin_it + _streamBuffer.data().size(); // check for byte sequence is valid byte sequence and find the limits // (stream buffer may contain number of bytes more than requred by protocol) auto res = this->matchCondition(begin_it, end_it); if (!res.second) { self.complete(std::make_error_code(std::errc::protocol_error), NetMessageT()); // return an empty message } else { auto nbytes = std::distance(begin_it, res.first); NetMessageT msg; auto msg_it = this->fromLowLevel(begin_it, res.first); msg.setFromBytes(msg_it.first, msg_it.second); _streamBuffer.consume(nbytes); self.complete(ec, msg); } } else { self.complete(ec, NetMessageT()); // return an empty message return; } }, token, _socket); } template auto connect(const endpoint_t& endpoint, const TimeoutT& timeout) { std::future ftr = asyncConnect(endpoint, timeout, asio::use_future); ftr.get(); } template auto send(const NetMessageT& msg, const TimeoutT& timeout) { std::future ftr = asyncSend(msg, timeout, asio::use_future); ftr.get(); } template auto receive(const TimeoutT& timeout) { std::future ftr = asyncReceive(timeout, asio::use_future); return ftr.get(); } protected: socket_t& _socket; asio::streambuf _streamBuffer; template std::unique_ptr getDeadlineTimer(const TimeoutT& timeout, bool arm = true) { std::unique_ptr timer(_socket.get_executor()); if (arm) { timer->expires_after(timeout); timer->async_wait([this](const std::error_code& ec) { if (!ec) { _socket.cancel(std::make_error_code(std::errc::timed_out)); } }); } return timer; } }; } // namespace adc::impl #endif