From 1c327f8fd34d46ee3fc5174e4cd4ee7951cbbe0a Mon Sep 17 00:00:00 2001 From: "Timur A. Fatkhullin" Date: Thu, 26 Sep 2024 18:12:21 +0300 Subject: [PATCH] ... --- net/adc_netserver.h | 211 ++------------------------------- net/asio/adc_netservice_asio.h | 32 +++-- net/asio/adc_netsession_asio.h | 114 ++++++++++++++++++ tests/adc_netservice_test.cpp | 37 +++--- 4 files changed, 166 insertions(+), 228 deletions(-) create mode 100644 net/asio/adc_netsession_asio.h diff --git a/net/adc_netserver.h b/net/adc_netserver.h index cd58c21..f335bd8 100644 --- a/net/adc_netserver.h +++ b/net/adc_netserver.h @@ -18,203 +18,19 @@ ABSTRACT DEVICE COMPONENTS LIBRARY #include #endif -#include +#include "adc_net_concepts.h" namespace adc { -namespace traits -{ -// network server session implementation concept -template -concept adc_netserver_session_impl_c = requires(T t, const T t_const) { - typename T::session_ident_t; - - { t_const.sessionIdent() } -> std::same_as; - { t.start() } -> std::same_as; - { t.stop() } -> std::same_as; -}; - -} // namespace traits - - - -/* Server session */ - -class AdcNetServerGenericSession -{ -public: - struct Opt { - std::function startSess; - std::function stopSess; - std::function run; - }; - - AdcNetServerGenericSession(Opt&& opts) : _opts(std::move(opts)) {} - - virtual void start() - { - _opts.startSess(); - } - - virtual void stop() - { - _opts.stopSess(); - } - -protected: - Opt _opts; - - virtual void run() - { - _opts.run(); - } -}; - -template -class AdcNetServerSession : std::enable_shared_from_this> -{ -protected: - ImplT _impl; - -public: - typedef ImplT session_impl_t; - typedef std::shared_ptr shared_ptr_t; - typedef std::weak_ptr weak_ptr_t; - - using typename ImplT::session_ident_t; - - template - AdcNetServerSession(ImplCtorArgTs&&... ctor_args) : _impl(std::forward(ctor_args)...) - { - } - - - virtual ~AdcNetServerSession() = default; - - - virtual session_ident_t sessionIdent() const - { - // - return _impl._sessionIdent(); - } - - - virtual void start() - { - // - _impl.start(); - } - - virtual void stop() - { - // - _impl.stop(); - } -}; - - - -namespace traits -{ - -// network server session concept -template -concept adc_netserver_session_c = requires { - typename T::session_impl_t; - - std::derived_from>; -}; - -} // namespace traits - - -/* network server */ - -namespace traits -{ - -template -concept adc_generic_netserver_impl_c = requires(T t, const T t_const) { - typename T::server_ident_t; - - { t_const.serverIdent() } -> std::same_as; - { t.start() } -> std::same_as; - { t.stop() } -> std::same_as; -}; - -template -concept adc_netserver_impl_c = requires(T t, const T t_const) { - typename T::server_ident_t; - - { t_const.serverIdent() } -> std::same_as; - { t.start() } -> std::same_as; - { t.stop() } -> std::same_as; - { t.daemonize() } -> std::same_as; - { t.daemonizePrepare() } -> std::same_as; - { t.daemonizeFinalize() } -> std::same_as; -}; - -} // namespace traits - - -/* VERY GENERIC NETWORK SERVER INTERFACE */ - -template -class AdcGenericNetServer -{ -protected: - ImplT _impl; - -public: - typedef ImplT server_impl_t; - using typename ImplT::server_ident_t; - - template - AdcGenericNetServer(ImplCtorArgTs&&... ctor_args) : _impl(std::make_unique(ctor_args)...) - { - } - - - virtual ~AdcGenericNetServer() = default; - - - virtual server_ident_t serverIdent() const - { - // - return _impl.serverIdent(); - } - - virtual void start() - { - // - _impl.start(); - }; - - virtual void stop() - { - // - _impl.stop(); - }; -}; - -template class AdcNetServer { protected: - ImplT _impl; - public: - typedef ImplT server_impl_t; - using typename ImplT::server_ident_t; - - template - AdcNetServer(ImplCtorArgTs&&... ctor_args) : _impl(std::make_unique(ctor_args)...) - { - } + typedef std::string server_ident_t; virtual ~AdcNetServer() = default; @@ -222,21 +38,13 @@ public: virtual server_ident_t serverIdent() const { - // - return _impl.serverIdent(); + return _serverIdent; } - virtual void start() - { - // - _impl.start(); - }; + template + void start(SRVT&& netservice, const typename SRVT::endpoint_t& endpoint) {}; - virtual void stop() - { - // - _impl.stop(); - }; + virtual void stop() {}; // run server as daemon (still only on POSIX OSes) @@ -290,12 +98,15 @@ public: protected: + server_ident_t _serverIdent; + + // started sessions weak pointers - template + template static std::unordered_map> _serverSessions; std::vector> _stopSessionFunc; - template + template void startSession(const typename SessionT::shared_ptr_t& sess_ptr) { auto res = _serverSessions[this].emplace(sess_ptr); diff --git a/net/asio/adc_netservice_asio.h b/net/asio/adc_netservice_asio.h index 160eedf..7a97a67 100644 --- a/net/asio/adc_netservice_asio.h +++ b/net/asio/adc_netservice_asio.h @@ -69,7 +69,7 @@ concept adc_asio_is_awaitable = requires { template -concept adc_asio_special_comp_token = +concept adc_asio_special_comp_token_c = adc_asio_is_future || adc_asio_is_awaitable || std::same_as, asio::deferred_t>; @@ -198,6 +198,18 @@ public: } + AdcNetServiceASIOBase(AdcNetServiceASIOBase&& other) + : _ioContext(other._ioContext), + _receiveStrand(std::move(other._receiveStrand)), + _receiveQueue(std::move(_receiveQueue)), + _acceptor(std::move(other._acceptor)), + _socket(std::move(other._socket)), + _streamBuffer() + { + auto bytes = asio::buffer_copy(_streamBuffer.prepare(other._streamBuffer.size()), other._streamBuffer.data()); + _streamBuffer.commit(bytes); + }; + AdcNetServiceASIOBase(const AdcNetServiceASIOBase&) = delete; // no copy constructor! virtual ~AdcNetServiceASIOBase() {} @@ -336,7 +348,7 @@ public: constexpr auto is_async_ctx_t = std::same_as, async_call_ctx_t>; // check completion token signature and deduce message type - if constexpr (!adc_asio_special_comp_token && !is_async_ctx_t) { + if constexpr (!adc_asio_special_comp_token_c && !is_async_ctx_t) { static_assert(traits::adc_func_traits::arity == 2, "INVALID COMPLETION TOKEN SIGNATURE!"); static_assert(std::is_same_v>, std::error_code>, "INVALID COMPLETION TOKEN SIGNATURE!"); @@ -346,7 +358,7 @@ public: } using msg_t = std::conditional_t< - adc_asio_special_comp_token || is_async_ctx_t, RMSGT, + adc_asio_special_comp_token_c || is_async_ctx_t, RMSGT, std::remove_cvref_t::args_t>>>; // auto s_res = std::make_sharedtemplate search), RMSGT>>(); @@ -595,15 +607,19 @@ protected: const TimeoutT& timeout, bool arm = true) { - // TODO: now()+timeout overflow!!! auto timer = std::make_unique(obj.get_executor()); - if (timeout == std::chrono::duration::max()) { - return timer; // do not arm the timer if MAX duration are given - } + // if (timeout == std::chrono::duration::max()) { + // return timer; // do not arm the timer if MAX duration are given + // } if (arm) { - timer->expires_after(timeout); + std::chrono::seconds max_d = std::chrono::duration_cast( + std::chrono::steady_clock::time_point::max() - std::chrono::steady_clock::now() - + std::chrono::seconds(1)); + + timer->expires_after(timeout < max_d ? timeout : max_d); // to avoid overflow! + // timer->expires_after(timeout); timer->async_wait([&obj](const std::error_code& ec) mutable { if (!ec) { diff --git a/net/asio/adc_netsession_asio.h b/net/asio/adc_netsession_asio.h new file mode 100644 index 0000000..4438f14 --- /dev/null +++ b/net/asio/adc_netsession_asio.h @@ -0,0 +1,114 @@ +#pragma once + +#include "adc_netservice_asio.h" + +namespace adc::impl +{ + +class AdcNetSessionASIO +{ +public: + typedef std::string netsession_ident_t; + + template SESSION_PROTOT, + traits::adc_output_char_range RMSGT = std::vector, + traits::adc_is_callable RECV_MSG_TOKENT> + AdcNetSessionASIO(const R& id, + std::shared_ptr> netservice, + RECV_MSG_TOKENT&& recv_msg_token) + : _ident(id.begin(), id.end()) + { + // check receive message completion token signature and deduce message type + if constexpr (!adc_asio_special_comp_token_c) { + static_assert(traits::adc_func_traits::arity == 2, "INVALID COMPLETION TOKEN SIGNATURE!"); + static_assert( + std::is_same_v>, std::error_code>, + "INVALID COMPLETION TOKEN SIGNATURE!"); + static_assert(traits::adc_output_char_range< + std::tuple_element_t<1, typename traits::adc_func_traits::args_t>>, + "INVALID COMPLETION TOKEN SIGNATURE!"); + } + + using msg_t = std::conditional_t< + adc_asio_special_comp_token_c, RMSGT, + std::remove_cvref_t::args_t>>>; + + + _startFunc = [netservice, wrapper = traits::adc_pf_wrapper(std::forward(recv_msg_token)), + this]() { + // + netservice->asyncReceive(std::get<0>(wrapper), _recvTimeout); + }; + + _stopFunc = [netservice]() { + // stop + netservice->close(); + }; + } + + template SESSION_PROTOT, + traits::adc_output_char_range RMSGT = std::vector, + traits::adc_is_callable RECV_MSG_TOKENT> + AdcNetSessionASIO(std::shared_ptr> netservice, + RECV_MSG_TOKENT&& recv_msg_token) + : AdcNetSessionASIO(std::derived_from ? "TCP SESSION" + : std::derived_from ? "UDP SESSION" + : std::derived_from + ? "UNIX SEQPACKET SESSION" + : std::derived_from ? "UNIX STREAM SESSION" + : "UNKNOWN", + std::move(netservice), + std::forward(recv_msg_token)) + { + } + + + virtual ~AdcNetSessionASIO() + { + stop(); + } + + + netsession_ident_t ident() const + { + return _ident; + } + + + void start() + { + _startFunc(); + } + + void stop() + { + _stopFunc(); + } + + + template + AdcNetSessionASIO& setDefaultTimeouts(const TimeoutT& send_timeout, const TimeoutT& recv_timeout) + { + _sendTimeout = send_timeout; + _recvTimeout = recv_timeout; + + return *this; + } + +protected: + netsession_ident_t _ident; + + std::function _startFunc; + std::function _stopFunc; + + std::chrono::duration _recvTimeout = + std::chrono::seconds::max(); + + std::chrono::duration _sendTimeout = + std::chrono::seconds(5); +}; + +} // namespace adc::impl diff --git a/tests/adc_netservice_test.cpp b/tests/adc_netservice_test.cpp index a0ce04e..5050fca 100644 --- a/tests/adc_netservice_test.cpp +++ b/tests/adc_netservice_test.cpp @@ -47,27 +47,24 @@ int main() // srv.asyncConnect(ept_c, s_ctx); // auto res = srv.asyncConnect(ept_c, asio::use_awaitable); - srv.asyncAccept( - ept_c, - [&srv](std::error_code ec) { - if (!ec) { - std::cout << "New connection\n"; + srv.asyncAccept(ept_c, [&srv](std::error_code ec) { + if (!ec) { + std::cout << "New connection\n"; - // srv.asyncReceive( - // [](std::error_code ec, std::string msg) { - // if (!ec) { - // std::cout << "Received: [" << msg << "]\n"; - // } else { - // std::cout << "Received: " << ec.message() << "\n"; - // } - // }, - // std::chrono::minutes(1)); - receive(srv); - } else { - std::cout << "ACCEPT ERR: " << ec.message() << "\n"; - } - }, - std::chrono::minutes(3)); + // srv.asyncReceive( + // [](std::error_code ec, std::string msg) { + // if (!ec) { + // std::cout << "Received: [" << msg << "]\n"; + // } else { + // std::cout << "Received: " << ec.message() << "\n"; + // } + // }, + // std::chrono::minutes(1)); + receive(srv); + } else { + std::cout << "ACCEPT ERR: " << ec.message() << "\n"; + } + }); ctx.run();