From d2b2620d1325c207344e297116f7d7eb6e16e82d Mon Sep 17 00:00:00 2001 From: "Timur A. Fatkhullin" Date: Tue, 1 Oct 2024 18:02:01 +0300 Subject: [PATCH] ... --- net/asio/adc_netservice_asio.h | 333 ++++++++++++++++++++++++++++++++- 1 file changed, 332 insertions(+), 1 deletion(-) diff --git a/net/asio/adc_netservice_asio.h b/net/asio/adc_netservice_asio.h index f161368..8eb51fa 100644 --- a/net/asio/adc_netservice_asio.h +++ b/net/asio/adc_netservice_asio.h @@ -22,6 +22,7 @@ #include #include +#include #ifdef USE_OPENSSL_WITH_ASIO @@ -502,7 +503,7 @@ public: self.complete(ec, {msg.begin(), msg.end()}); } }, - token, _socket); + token, _receiveStrand); } /* blocking methods */ @@ -633,4 +634,334 @@ protected: static_assert(adc::interfaces::adc_netsession_proto_c>, ""); + + +/* EVENT-BASED SERVICE */ +template SESSION_PROTOT> +class AsioNetService : public SESSION_PROTOT +{ +public: + // typedefs from transport protocol + using endpoint_t = typename TRANSPORT_PROTOT::endpoint; + using socket_t = typename TRANSPORT_PROTOT::socket; + using acceptor_t = + std::conditional_t>, + std::nullptr_t, // there is no acceptor + typename TRANSPORT_PROTOT::acceptor>; + + typedef std::function connect_event_hndl_t; + typedef std::function send_event_hndl_t; + typedef std::function close_event_hndl_t; + typedef std::function)> message_event_hndl_t; + + struct Events { + bool listening = false; // true - server role, false - client role + connect_event_hndl_t onConnect = [](auto...) {}; + send_event_hndl_t onSend = [](auto...) {}; + close_event_hndl_t onClose = [](auto...) {}; + message_event_hndl_t onMessage = [](auto...) {}; + }; + + AsioNetService(asio::io_context& ctx, Events events) + : SESSION_PROTOT(), + _ioContext(ctx), + _receiveStrand(_ioContext), + _receiveQueue(), + _acceptor(_ioContext), + _socket(_ioContext), + _events(events), + _stopReceiving(false), + _waitTimer(_ioContext) + { + } + + + template + static void start(AsioNetService& srv, const endpoint_t& endpoint, const DT& timeout) + { + // no acceptor for UDP-sockets + if constexpr (std::is_null_pointer_v) { + srv.startReceiving(); + srv._events.onConnect(&srv, std::error_code{}, endpoint_t()); + return; + } + + auto token = [&endpoint, &timeout, &srv](std::error_code ec) { + if (!ec) { + srv.startReceiving(); + } + + // post event + srv._events.onConnect(&srv, ec, srv._socket.remote_endpoint()); + }; + + if (srv._events.listening) { // server role (accept connections to given endpoint) + + auto timer = getDeadlineTimer(srv._acceptor, timeout); + + asio::async_compose( + [timer = std::move(timer), start = true, &endpoint, &srv](auto& self, std::error_code ec = {}) mutable { + if (!ec) { + if (start) { + start = false; + try { + if (!srv._acceptor.is_open() || (srv._acceptor.local_endpoint() != endpoint)) { + srv._acceptor = acceptor_t(srv._ioContext, endpoint); + } + } catch (std::system_error err) { + timer->cancel(); + self.complete(err.code()); + return; + } + + // return acc.async_accept(_socket, std::move(self)); + return srv._acceptor.async_accept(srv._socket, std::move(self)); + } + } + + if (isTimeout(timer, ec)) { + ec = std::make_error_code(std::errc::timed_out); + } else { // an error occured in async_connect + timer->cancel(); + } + + self.complete(ec); + }, + std::move(token), srv._ioContext); + + } else { // client role (connect to remote host) + + auto timer = getDeadlineTimer(srv._socket, timeout); + + asio::async_compose( + [start = true, endpoint, timer = std::move(timer), &srv](auto& self, asio::error_code ec = {}) mutable { + if (!ec) { + if (start) { + start = false; + return srv._socket.async_connect(endpoint, std::move(self)); + } + } + + if (isTimeout(timer, ec)) { + ec = std::make_error_code(std::errc::timed_out); + } else { // an error occured in async_connect + timer->cancel(); + } + + self.complete(ec); + }, + std::move(token), srv._socket); + } + } + + + void stop() + { + std::error_code ec; + + _stopReceiving = true; + + _socket.shutdown(_shutdownType, ec); + if (!ec) { + _socket.close(ec); + } + + _events.onClose(ec); + } + + template + auto send(const R& msg, const DT& timeout) + { + auto token = [this](std::error_code ec, size_t) { + // + _events.onSend(this, ec); + }; + + // create buffer sequence of sending session protocol representation of the input message + std::vector buff_seq; + std::ranges::for_each(this->toProto(msg), [&buff_seq](const auto& el) { buff_seq.emplace_back(el); }); + + auto timer = getDeadlineTimer(_socket, timeout); + + return asio::async_compose( + [start = true, buff_seq = std::move(buff_seq), timer = std::move(timer), this]( + auto& self, asio::error_code ec = {}) mutable { + if (!ec) { + if (start) { + start = false; + if constexpr (std::derived_from>) { + return asio::async_write(_socket, buff_seq, std::move(self)); + } else if constexpr (std::derived_from>) { + return _socket.async_send(buff_seq, std::move(self)); + } else if constexpr (std::derived_from>) { + return _socket.async_send(buff_seq, std::move(self)); + } else { + static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!"); + } + } + } + + if (isTimeout(timer, ec)) { + ec = std::make_error_code(std::errc::timed_out); + } else { // an error occured in async_write/async_send + timer->cancel(); + } + + self.complete(ec); + }, + std::move(token), _socket); + } + + template + bool wait(const DT& timeout) + { + if (_receiveQueue.size()) { + return true; + } + + std::error_code ec; + + std::chrono::seconds max_d = std::chrono::duration_cast( + std::chrono::steady_clock::time_point::max() - std::chrono::steady_clock::now() - std::chrono::seconds(1)); + + auto res = _waitTimers.emplace(_ioContext); + if (res.second) { + (*(res.first))->expires_after(timeout < max_d ? timeout : max_d); + + // _waitTimer.expires_after(timeout < max_d ? timeout : max_d); // to avoid overflow! + + // auto f = _waitTimer.async_wait(asio::use_future); + + auto f = (*(res.first))->async_wait(asio::use_future); + try { + f.get(); + _waitTimers.erase(res.first); + } catch (std::system_error& ex) { + if (ex.code() == asio::error::operation_aborted) { // canceled in startReceiving (message was received) + return true; + } else { + return false; + } + } + } + } + +protected: + asio::io_context& _ioContext; + asio::io_context::strand _receiveStrand; + + socket_t _socket; + + acceptor_t _acceptor; + + asio::streambuf _streamBuffer; + + std::queue> _receiveQueue; + + asio::socket_base::shutdown_type _shutdownType = asio::socket_base::shutdown_both; + + std::atomic_bool _stopReceiving; + + Events _events; + + asio::steady_timer _waitTimer; + std::set> _waitTimers; + + void startReceiving() + { + auto get_msg = [this](std::error_code ec, size_t) { + if (!ec) { + auto start_ptr = static_cast(_streamBuffer.data().data()); + + auto net_pack = this->search(std::span(start_ptr, _streamBuffer.size())); + if (!net_pack.empty()) { + _receiveQueue.emplace(); + std::ranges::copy(this->fromProto(net_pack), std::back_inserter(_receiveQueue.back())); + _streamBuffer.consume(net_pack.size()); + + + while (_streamBuffer.size()) { // search for possible additional session protocol packets + start_ptr = static_cast(_streamBuffer.data().data()); + + net_pack = this->search(std::span(start_ptr, _streamBuffer.size())); + + if (!net_pack.empty()) { + _receiveQueue.emplace(); + std::ranges::copy(this->fromProto(net_pack), std::back_inserter(_receiveQueue.back())); + _streamBuffer.consume(net_pack.size()); + } else { + break; // exit and hold remaining bytes in stream buffer + } + } + + auto msg = _receiveQueue.front(); + _events.onMessage(this, ec, std::span(msg.begin(), msg.end())); + + _receiveQueue.pop(); + } + + if (!_stopReceiving) { + startReceiving(); // initiate consequence socket's read operation + } + } else { + _events.onMessage(this, ec, std::span()); + } + }; + + + auto out_flags = std::make_shared(); + + if constexpr (std::derived_from>) { + return asio::async_read(_socket, _streamBuffer, asio::transfer_at_least(1), std::move(get_msg)); + } else if constexpr (std::derived_from>) { + // datagram, so it should be received at once + return _socket.receive(_streamBuffer, std::move(get_msg)); + } else if constexpr (std::derived_from>) { + // datagram, so it should be received at once + return _socket.receive(_streamBuffer, *out_flags, std::move(get_msg)); + } else { + static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!"); + } + } + + + template + static std::unique_ptr getDeadlineTimer(CancelableT& obj, + const TimeoutT& timeout, + bool arm = true) + { + auto timer = std::make_unique(obj.get_executor()); + + if (arm) { + std::chrono::seconds max_d = std::chrono::duration_cast( + std::chrono::steady_clock::time_point::max() - std::chrono::steady_clock::now() - + std::chrono::seconds(1)); + + timer->expires_after(timeout < max_d ? timeout : max_d); // to avoid overflow! + // timer->expires_after(timeout); + + timer->async_wait([&obj](const std::error_code& ec) mutable { + if (!ec) { + obj.cancel(); + } + }); + } + + return timer; + } + + template + static bool isTimeout(const std::unique_ptr& timer, const std::error_code& ec) + { + auto exp_time = timer->expiry(); + return (exp_time < std::chrono::steady_clock::now()) && (ec == asio::error::operation_aborted); + } +}; + } // namespace adc::impl