diff --git a/net/adc_netservice.h b/net/adc_netservice.h index 56c0870..c794137 100644 --- a/net/adc_netservice.h +++ b/net/adc_netservice.h @@ -23,6 +23,8 @@ protected: ImplT _impl; public: + using impl_t = ImplT; + using typename ImplT::endpoint_t; using timeout_clock_t = std::chrono::steady_clock; @@ -45,9 +47,12 @@ public: /* asynchronuos operations */ // open connection - auto asyncConnect(const endpoint_t& end_point, const timeout_drtn_t& timeout = defaultConnectTimeout) + template + auto asyncConnect(const endpoint_t& end_point, + const timeout_drtn_t& timeout = defaultConnectTimeout, + ArgTs&&... args) { - return _impl.asyncConnect(end_point, timeout); + return _impl.asyncConnect(end_point, timeout, std::forward(args)...); } @@ -94,4 +99,15 @@ public: }; +namespace traits +{ + +// template +// concept adc_netservice_c = requires { +// typename T::impl_t; +// std::derived_from>; +// }; + +} // namespace traits + } // namespace adc diff --git a/net/adc_netservice_asio.h b/net/adc_netservice_asio.h index 1948dd7..6314e97 100644 --- a/net/adc_netservice_asio.h +++ b/net/adc_netservice_asio.h @@ -9,6 +9,7 @@ */ +#include #ifdef USE_ASIO_LIBRARY #include @@ -18,8 +19,9 @@ #include #include #include -#include > +#include #include +#include #include #include @@ -33,6 +35,7 @@ class AdcNetServiceASIO : public InetProtoT { public: using socket_t = typename InetProtoT::socket; + using endpoint_t = typename InetProtoT::endpoint; using streambuff_iter_t = asio::buffers_iterator; @@ -45,9 +48,41 @@ public: virtual ~AdcNetServiceASIO() = default; - template CompletionTokenT> + template CompletionTokenT> + auto asyncConnect(const endpoint_t& endpoint, const TimeoutT& timeout, CompletionTokenT&& token) + { + auto timer = getDeadlineTimer(timeout); + + enum { starting, cancel_timer }; + + // wrapper + return asio::async_compose( + [timer = std::move(timer), state = starting, &endpoint, this](auto& self, + const std::error_code& ec = {}) mutable { + if (!ec) { + switch (state) { + case starting: + state = cancel_timer; + return _socket.async_connect(endpoint, std::move(self)); + break; + case cancel_timer: + timer->cancel(); + break; + default: + break; + } + } + + self.complete(ec); + }, + token, _socket); + } + + template CompletionTokenT> auto asynSend(const NetMessageT& msg, const TimeoutT& timeout, CompletionTokenT&& token) { + enum { starting, cancel_timer }; + // create buffer sequence std::vector buff; std::ranges::for_each(msg.template bytesView>(), @@ -57,25 +92,32 @@ public: // wrapper return asio::async_compose( - [buff = std::move(buff), timer = std::move(timer), this](auto& self, const std::error_code& ec = {}, - size_t sz = 0) { + [buff = std::move(buff), timer = std::move(timer), state = starting, this]( + auto& self, const std::error_code& ec = {}, size_t sz = 0) mutable { if (!ec) { - if constexpr (std::derived_from>) { - return asio::async_write(_socket, buff, std::move(self)); - } else if constexpr (std::derived_from< - socket_t, asio::basic_datagram_socket>) { - return _socket.async_send(buff, std::move(self)); - } else if constexpr (std::derived_from>) { - return _socket.async_send(buff, std::move(self)); - } else { - static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!"); + switch (state) { + case starting: + if constexpr (std::derived_from< + socket_t, asio::basic_stream_socket>) { + return asio::async_write(_socket, buff, std::move(self)); + } else if constexpr (std::derived_from>) { + return _socket.async_send(buff, std::move(self)); + } else if constexpr (std::derived_from>) { + return _socket.async_send(buff, std::move(self)); + } else { + static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!"); + } + break; + case cancel_timer: + timer->cancel(); + break; + default: + break; } } - timer->cancel(); - self.complete(ec); }, token, _socket); @@ -85,36 +127,43 @@ public: template auto asyncReceive(const TimeoutT& timeout, CompletionTokenT&& token) { - std::shared_ptr out_flags; + enum { starting, cancel_timer }; + + std::unique_ptr out_flags; auto timer = getDeadlineTimer(timeout); // armed timer return asio::async_compose( - [timer = std::move(timer), out_flags, this](auto& self, const std::error_code& ec = {}, size_t sz = 0) { + [timer = std::move(timer), out_flags = std::move(out_flags), state = starting, this]( + auto& self, const std::error_code& ec = {}, size_t sz = 0) mutable { if (!ec) { - if constexpr (std::derived_from>) { - return asio::async_read_until( - _socket, _streamBuffer, [this](auto begin, auto end) { this->matchCondition(begin, end); }, - std::move(self)); - } else if constexpr (std::derived_from< - socket_t, asio::basic_datagram_socket>) { - return _socket.receive(_streamBuffer, - std::move(self)); // datagram, so it should be received at once - } else if constexpr (std::derived_from>) { - return _socket.receive(_streamBuffer, *out_flags, - std::move(self)); // datagram, so it should be received at once - } else { - static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!"); + switch (state) { + case starting: + if constexpr (std::derived_from< + socket_t, asio::basic_stream_socket>) { + return asio::async_read_until( + _socket, _streamBuffer, + [this](auto begin, auto end) { this->matchCondition(begin, end); }, + std::move(self)); + } else if constexpr (std::derived_from>) { + return _socket.receive(_streamBuffer, + std::move(self)); // datagram, so it should be received at once + } else if constexpr (std::derived_from>) { + return _socket.receive(_streamBuffer, *out_flags, + std::move(self)); // datagram, so it should be received at once + } else { + static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!"); + } + break; + case cancel_timer: + timer->cancel(); + break; + default: + break; } - } - timer->cancel(); - - if (ec) { - self.complete(ec, NetMessageT()); // return an empty message - } else { auto begin_it = streambuff_iter_t::begin(_streamBuffer.data()); auto end_it = begin_it + _streamBuffer.data().size(); @@ -136,11 +185,34 @@ public: self.complete(ec, msg); } + } else { + self.complete(ec, NetMessageT()); // return an empty message + return; } }, token, _socket); } + template + auto connect(const endpoint_t& endpoint, const TimeoutT& timeout) + { + std::future ftr = asyncConnect(endpoint, timeout, asio::use_future); + ftr.get(); + } + + template + auto send(const NetMessageT& msg, const TimeoutT& timeout) + { + std::future ftr = asyncSend(msg, timeout, asio::use_future); + ftr.get(); + } + + template + auto receive(const TimeoutT& timeout) + { + std::future ftr = asyncReceive(timeout, asio::use_future); + return ftr.get(); + } protected: socket_t& _socket; @@ -164,53 +236,6 @@ protected: return timer; } - - std::vector createConstBufferSequence(const NetMessageT& msg) - { - std::vector buff; - for (const auto& el : msg.template bytesView>()) { - buff.emplace_back(asio::const_buffer(el)); - } - - return buff; - } - - asio::awaitable asyncSendImpl(const NetMessageT& msg) - { - // for (const auto& buff : msg.bytesView()) { - if constexpr (std::derived_from>) { - // asio::async_write(_socket, buff, asio::use_awaitable); - co_await asio::async_write(_socket, msg.bytesView(), asio::use_awaitable); - } else if constexpr (std::derived_from>) { - co_await _socket.async_send(msg.bytesView(), asio::use_awaitable); - } else if constexpr (std::derived_from>) { - co_await _socket.async_send(msg.bytesView(), asio::use_awaitable); - } else { - static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!"); - } - // } - } - - template - asio::awaitable watchdog(TimepointT& deadline, std::error_code& ec) - { - ec.clear(); - typename TimepointT::clock timer(co_await asio::this_coro::executor); - - auto now = TimepointT::clock::template now(); - - while (deadline > now) { - timer.expires_at(deadline); - co_await timer.async_wait(asio::use_awaitable); - now = TimepointT::clock::template now(); - } - - ec = std::make_error_code(std::errc::timed_out); - - throw std::system_error(ec); - } };