diff --git a/net/adc_netproto.h b/net/adc_netproto.h index a56fe0f..aa781b7 100644 --- a/net/adc_netproto.h +++ b/net/adc_netproto.h @@ -34,7 +34,7 @@ struct AdcNetProtoStopSeq : InetT { static_assert(stopSeqSize, "STOP BYTE SEQUENCE MUST NOT BE AN EMPTY ONE!!!"); - using typename InetT::socket; + using socket_t = typename InetT::socket; template std::pair matchCondition(IT begin, IT end) @@ -50,7 +50,7 @@ struct AdcNetProtoStopSeq : InetT { std::advance(res.first, stopSeqSize); // move iterator to the one-past-the-end position res.second = true; } else { - // may be only a part of message was received, + // may be only a part of valid byte sequence was received, // so start next matching from previous begin-iterator res.first = begin; } diff --git a/net/adc_netservice_asio.h b/net/adc_netservice_asio.h index 898688f..1948dd7 100644 --- a/net/adc_netservice_asio.h +++ b/net/adc_netservice_asio.h @@ -29,58 +29,116 @@ namespace adc::impl template -class AdcNetServiceASIOStream +class AdcNetServiceASIO : public InetProtoT { public: using socket_t = typename InetProtoT::socket; - AdcNetServiceASIOStream(socket_t& sock) : _socket(sock) {} + using streambuff_iter_t = asio::buffers_iterator; - virtual ~AdcNetServiceASIOStream() = default; + template + AdcNetServiceASIO(socket_t& sock, CtorArgTs&&... ctor_args) + : InetProtoT(std::forward(ctor_args)...), _socket(sock) + { + } + + virtual ~AdcNetServiceASIO() = default; template CompletionTokenT> auto asynSend(const NetMessageT& msg, const TimeoutT& timeout, CompletionTokenT&& token) { - // using namespace asio::experimental::awaitable_operators; + // create buffer sequence + std::vector buff; + std::ranges::for_each(msg.template bytesView>(), + [&buff](const auto& el) { buff.emplace_back(el); }); - // auto deadline = std::chrono::steady_clock::now() + timeout; + auto timer = getDeadlineTimer(timeout); - std::unique_ptr timer(_socket.get_executor()); + // wrapper + return asio::async_compose( + [buff = std::move(buff), timer = std::move(timer), this](auto& self, const std::error_code& ec = {}, + size_t sz = 0) { + if (!ec) { + if constexpr (std::derived_from>) { + return asio::async_write(_socket, buff, std::move(self)); + } else if constexpr (std::derived_from< + socket_t, asio::basic_datagram_socket>) { + return _socket.async_send(buff, std::move(self)); + } else if constexpr (std::derived_from>) { + return _socket.async_send(buff, std::move(self)); + } else { + static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!"); + } + } - timer->expires_after(timeout); - timer->async_wait([this, timer = std::move(timer)](const std::error_code& ec) { - if (!ec) { - _socket.cancel(std::make_error_code(std::errc::timed_out)); - } - }); + timer->cancel(); - - if constexpr (std::derived_from>) { - return asio::async_write(_socket, createConstBufferSequence(msg), std::forward(token)); - } else if constexpr (std::derived_from>) { - return _socket.async_send(createConstBufferSequence(msg), std::forward(token)); - } else if constexpr (std::derived_from>) { - return _socket.async_send(createConstBufferSequence(msg), std::forward(token)); - } else { - static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!"); - } - - // std::error_code ec; - - // co_await (asyncSendImpl(msg) && watchdog(deadline, ec)); - - // std::forward(token)(ec); - // co_return asio::async_compose( - // [ec](auto& self, const std::error_code& = {}) { self.complete(ec); }, token, _socket); + self.complete(ec); + }, + token, _socket); } template auto asyncReceive(const TimeoutT& timeout, CompletionTokenT&& token) { + std::shared_ptr out_flags; + + auto timer = getDeadlineTimer(timeout); // armed timer + + return asio::async_compose( + [timer = std::move(timer), out_flags, this](auto& self, const std::error_code& ec = {}, size_t sz = 0) { + if (!ec) { + if constexpr (std::derived_from>) { + return asio::async_read_until( + _socket, _streamBuffer, [this](auto begin, auto end) { this->matchCondition(begin, end); }, + std::move(self)); + } else if constexpr (std::derived_from< + socket_t, asio::basic_datagram_socket>) { + return _socket.receive(_streamBuffer, + std::move(self)); // datagram, so it should be received at once + } else if constexpr (std::derived_from>) { + return _socket.receive(_streamBuffer, *out_flags, + std::move(self)); // datagram, so it should be received at once + } else { + static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!"); + } + } + + timer->cancel(); + + if (ec) { + self.complete(ec, NetMessageT()); // return an empty message + } else { + auto begin_it = streambuff_iter_t::begin(_streamBuffer.data()); + auto end_it = begin_it + _streamBuffer.data().size(); + + // check for byte sequence is valid byte sequence and find the limits + // (stream buffer may contain number of bytes more than requred by protocol) + auto res = this->matchCondition(begin_it, end_it); + + if (!res.second) { + self.complete(std::make_error_code(std::errc::protocol_error), + NetMessageT()); // return an empty message + } else { + auto nbytes = std::distance(begin_it, res.first); + NetMessageT msg; + + auto msg_it = this->fromLowLevel(begin_it, res.first); + msg.setFromBytes(msg_it.first, msg_it.second); + + _streamBuffer.consume(nbytes); + + self.complete(ec, msg); + } + } + }, + token, _socket); } @@ -89,6 +147,24 @@ protected: asio::streambuf _streamBuffer; + template + std::unique_ptr getDeadlineTimer(const TimeoutT& timeout, bool arm = true) + { + std::unique_ptr timer(_socket.get_executor()); + + if (arm) { + timer->expires_after(timeout); + + timer->async_wait([this](const std::error_code& ec) { + if (!ec) { + _socket.cancel(std::make_error_code(std::errc::timed_out)); + } + }); + } + + return timer; + } + std::vector createConstBufferSequence(const NetMessageT& msg) { std::vector buff;