diff --git a/net/adc_net_concepts.h b/net/adc_net_concepts.h index 87d081d..6954863 100644 --- a/net/adc_net_concepts.h +++ b/net/adc_net_concepts.h @@ -104,26 +104,47 @@ concept adc_netservice_c = std::movable && requires(SRVT srv, const SRVT s typename SRVT::endpoint_t; // a type representing endpoint of the network service // underlying protocol + // callback callables for asynchronous operations + requires adc_async_callback_t; + requires adc_async_callback_t; + requires adc_async_callback_t; + + + // acceptor type + requires std::is_class_v; + requires adc_async_callback_t; + requires requires(typename SRVT::acceptor_t acc) { + acc.asyncAccept(std::declval(), + std::declval()); + + { acc.accept(std::declval()) } -> std::same_as; + }; + + // netservice_ident_t ident() const { srv_const.ident() } -> std::same_as; - typename SRVT::async_call_ctx_t; + // typename SRVT::async_call_ctx_t; // asynchronous (non-blocking) operations - srv.asyncAccept(std::declval(), std::declval(), - std::declval()); + // srv.asyncAccept(std::declval(), std::declval(), + // std::declval()); - srv.asyncConnect(std::declval(), std::declval(), + srv.asyncConnect(std::declval(), + std::declval(), std::declval()); - srv.asyncSend(std::declval(), std::declval(), + srv.asyncSend(std::declval(), + std::declval(), std::declval()); - srv.asyncReceive(std::declval(), std::declval()); + srv.asyncReceive(std::declval(), + std::declval()); // synchronous (blocking) operations - srv.accept(std::declval(), std::declval()); + // srv.accept(std::declval(), std::declval()); srv.connect(std::declval(), std::declval()); @@ -141,8 +162,12 @@ template concept adc_netsession_c = std::derived_from> && requires(SESST sess, const SESST sess_const) { typename SESST::netsession_ident_t; + requires adc_netservice_c; typename SESST::netsession_ctx_t; + requires std::constructible_from; + // netsession_ident_t ident() const { sess_const.ident() } -> std::same_as; diff --git a/net/adc_netserver.h b/net/adc_netserver.h index 190cd00..48232f4 100644 --- a/net/adc_netserver.h +++ b/net/adc_netserver.h @@ -182,24 +182,26 @@ public: return _serverIdent; } - template - void start(const typename SessionT::netservice_t::endpoint_t& endpoint, - const typename SessionT::netsession_ident_t& id, - typename SessionT::netsession_ctx_t&& sess_ctx, - NetsrvCtorArgTs&&... ctor_args) + // start accepting remote connections, create and start given network session + // It must be assumed that this is asynchronous operation!!! + template + void start(const typename SessionT::netsession_ident_t& id, + const typename SessionT::netsession_ctx_t& sess_ctx, + AccCtorArgTs&&... ctor_args) { - typename SessionT::netservice_t netservice(std::forward(ctor_args)...); + auto acceptor = + std::make_shared(std::forward(ctor_args)...); - netservice.asyncAccept(endpoint, [&endpoint, &id, sess_ctx, this](auto ec, auto...) { - if (!ec) { - auto sess = std::make_shared(id, std::forward(sess_ctx)); - startSession(sess); - - start(endpoint, id, sess_ctx); - } - }); + doAccept(acceptor, id, sess_ctx); }; + + template + bool isListening() const + { + return _isListening[this]; + } + virtual void start() = 0; virtual void stop() @@ -210,7 +212,26 @@ public: protected: + template + inline static std::unordered_map _isListening{}; + server_ident_t _serverIdent; + + template + void doAccept(std::shared_ptr acceptor, const IDT& id, CTXT& sess_ctx) + { + acceptor.asyncAccept([acceptor, &id, &sess_ctx, this](auto ec, typename SessionT::netservice_t srv) mutable { + if (!ec) { + auto sess = std::make_shared(id, std::move(srv), sess_ctx); + startSession(sess); + + _isListening[this] = true; + doAccept(acceptor, id, sess_ctx); + } else { + _isListening[this] = false; + } + }); + } }; diff --git a/net/asio/adc_netservice_asio.h b/net/asio/adc_netservice_asio.h index 8eb51fa..8d94e5a 100644 --- a/net/asio/adc_netservice_asio.h +++ b/net/asio/adc_netservice_asio.h @@ -108,7 +108,7 @@ public: using endpoint_t = typename TRANSPORT_PROTOT::endpoint; // typedefs for completion tokens (callbacks, required by 'adc_netservice_c' concept) - typedef std::function async_accept_callback_t; + // typedef std::function async_accept_callback_t; typedef std::function async_connect_callback_t; typedef std::function async_send_callback_t; typedef std::function async_receive_callback_t; @@ -116,98 +116,125 @@ public: // typedefs from transport protocol using socket_t = typename TRANSPORT_PROTOT::socket; - using acceptor_t = - std::conditional_t>, - std::nullptr_t, // there is no acceptor - typename TRANSPORT_PROTOT::acceptor>; - - // to satisfy 'adc_netservice_c' concept - class async_call_ctx_t + // acceptor + class acceptor_t { - std::function _errc_comp_token; - std::function _errc_msg_comp_token; - public: - async_call_ctx_t() = default; + static constexpr std::chrono::duration DEFAULT_ACCEPT_TIMEOUT = std::chrono::seconds::max(); - async_call_ctx_t(async_call_ctx_t&) = default; - async_call_ctx_t(async_call_ctx_t&&) = default; - async_call_ctx_t(const async_call_ctx_t&) = default; - - template TokenT> - async_call_ctx_t(TokenT&& token) + acceptor_t(asio::io_context& io_ctx) + : _ioContext(io_ctx), _endpoint(), _socket(_ioContext), _acceptor(_ioContext) { - _errc_comp_token = std::forward(token); + } + acceptor_t(asio::io_context& io_ctx, const AdcNetServiceASIOBase::endpoint_t& endpoint) : acceptor_t(io_ctx) + { + if (_endpoint != endpoint) { + _endpoint = endpoint; + _acceptor = _acceptor_t(_ioContext, _endpoint); + } } - template TokenT> - async_call_ctx_t(TokenT&& token) + typedef std::function async_accept_callback_t; + + template TokenT, + traits::adc_time_duration_c DT = decltype(DEFAULT_ACCEPT_TIMEOUT)> + auto asyncAccept(TokenT&& token, const DT& timeout = DEFAULT_ACCEPT_TIMEOUT) { - _errc_msg_comp_token = std::forward(token); + // no acceptor for UDP-sockets + if constexpr (std::is_null_pointer_v<_acceptor_t>) { + static_assert(false, "INVALID TRANSPORT PROTOCOL TYPE!"); + } + + _socket = AdcNetServiceASIOBase::socket_t{_ioContext}; + auto timer = getDeadlineTimer(_acceptor, timeout); + + return asio::async_compose( + [timer = std::move(timer), start = true, this](auto& self, std::error_code ec = {}) mutable { + if (!ec) { + if (start) { + start = false; + try { + if (!_acceptor.is_open() || (_acceptor.local_endpoint() != _endpoint)) { + _acceptor = _acceptor_t(_ioContext, _endpoint); + } + } catch (std::system_error err) { + timer->cancel(); + self.complete(err.code(), AdcNetServiceASIOBase(_ioContext)); + return; + } + + return _acceptor.async_accept(_socket, std::move(self)); + } + } + + if (isTimeout(timer, ec)) { + ec = std::make_error_code(std::errc::timed_out); + } else { // an error occured in async_accept + timer->cancel(); + } + + self.complete(ec, AdcNetServiceASIOBase(std::move(_socket))); + }, + token, _ioContext); } - template TokenT> - async_call_ctx_t& operator=(TokenT&& token) + template TokenT, + traits::adc_time_duration_c DT = decltype(DEFAULT_ACCEPT_TIMEOUT)> + auto asyncAccept(const AdcNetServiceASIOBase::endpoint_t& endpoint, + TokenT&& token, + const DT& timeout = DEFAULT_ACCEPT_TIMEOUT) { - _errc_comp_token = std::forward(token); - return *this; + _endpoint = endpoint; + return asyncAccept(std::forward(token), timeout); } - template TokenT> - async_call_ctx_t& operator=(TokenT&& token) + template + AdcNetServiceASIOBase accept(const DT& timeout = DEFAULT_ACCEPT_TIMEOUT) { - _errc_msg_comp_token = std::forward(token); - return *this; + auto f = asyncAccept(asio::use_future, timeout); + + return f.get(); } - auto operator()(std::error_code ec) + template + AdcNetServiceASIOBase accept(const endpoint_t& endpoint, const DT& timeout = DEFAULT_ACCEPT_TIMEOUT) { - return _errc_comp_token(std::move(ec)); + return accept(endpoint, timeout); } - auto operator()(std::error_code ec, RMSGT msg) - { - return _errc_msg_comp_token(std::move(ec), std::move(msg)); - } + private: + asio::io_context& _ioContext; + AdcNetServiceASIOBase::endpoint_t _endpoint; + AdcNetServiceASIOBase::socket_t _socket; - template TokenT> - operator TokenT() const - { - return _errc_comp_token; - } + using _acceptor_t = std::conditional_t< + std::derived_from>, + std::nullptr_t, // there is no acceptor + typename TRANSPORT_PROTOT::acceptor>; - template TokenT> - operator TokenT() const - { - return _errc_msg_comp_token; - } + _acceptor_t _acceptor; }; - static constexpr std::chrono::duration DEFAULT_ACCEPT_TIMEOUT = std::chrono::seconds::max(); + static constexpr std::chrono::duration DEFAULT_CONNECT_TIMEOUT = std::chrono::seconds(10); static constexpr std::chrono::duration DEFAULT_SEND_TIMEOUT = std::chrono::seconds(5); static constexpr std::chrono::duration DEFAULT_RECEIVE_TIMEOUT = std::chrono::seconds(5); AdcNetServiceASIOBase(asio::io_context& ctx) - : SESSION_PROTOT(), - _ioContext(ctx), - _receiveStrand(_ioContext), - _receiveQueue(), - _acceptor(_ioContext), - _socket(_ioContext) + : SESSION_PROTOT(), _ioContext(ctx), _receiveStrand(_ioContext), _receiveQueue(), _socket(_ioContext) { } AdcNetServiceASIOBase(socket_t socket) : SESSION_PROTOT(), - _ioContext(socket.get_executor()), + _ioContext(static_cast(socket.get_executor().context())), + _socket(std::move(socket)), _receiveStrand(_ioContext), - _receiveQueue(), - _socket(std::move(socket)) + _receiveQueue() { } @@ -216,11 +243,10 @@ public: : _ioContext(other._ioContext), _receiveStrand(std::move(other._receiveStrand)), _receiveQueue(std::move(_receiveQueue)), - _acceptor(std::move(other._acceptor)), _socket(std::move(other._socket)), _streamBuffer() { - if (*this == other) + if (this == &other) return; auto bytes = asio::buffer_copy(_streamBuffer.prepare(other._streamBuffer.size()), other._streamBuffer.data()); @@ -236,7 +262,7 @@ public: AdcNetServiceASIOBase& operator=(AdcNetServiceASIOBase&& other) { - if (*this != other) { + if (this != &other) { close(); _streamBuffer.consume(_streamBuffer.size()); @@ -246,7 +272,6 @@ public: _ioContext = other._ioContext; _receiveStrand = std::move(other._receiveStrand), _socket = std::move(other._socket); - _acceptor = std::move(other._acceptor); _receiveQueue = other._receiveQueue; } @@ -263,53 +288,6 @@ public: /* asynchronuos methods */ - template TokenT, - traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_ACCEPT_TIMEOUT)> - auto asyncAccept(const endpoint_t& endpoint, TokenT&& token, const TimeoutT& timeout = DEFAULT_ACCEPT_TIMEOUT) - { - // no acceptor for UDP-sockets - if constexpr (std::is_null_pointer_v) { - static_assert(false, "INVALID TRANSPORT PROTOCOL TYPE!"); - } - - // auto acc = acceptor_t(_ioContext); - - // auto timer = getDeadlineTimer(acc, timeout); - auto timer = getDeadlineTimer(_acceptor, timeout); - - return asio::async_compose( - // [acc = std::move(acc), timer = std::move(timer), start = true, &endpoint, this]( - [timer = std::move(timer), start = true, &endpoint, this](auto& self, std::error_code ec = {}) mutable { - if (!ec) { - if (start) { - start = false; - try { - // acc = acceptor_t(_ioContext, endpoint); - if (!_acceptor.is_open() || (_acceptor.local_endpoint() != endpoint)) { - _acceptor = acceptor_t(_ioContext, endpoint); - } - } catch (std::system_error err) { - timer->cancel(); - self.complete(err.code()); - return; - } - - // return acc.async_accept(_socket, std::move(self)); - return _acceptor.async_accept(_socket, std::move(self)); - } - } - - if (isTimeout(timer, ec)) { - ec = std::make_error_code(std::errc::timed_out); - } else { // an error occured in async_connect - timer->cancel(); - } - - self.complete(ec); - }, - token, _ioContext); - } - template TokenT, traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_CONNECT_TIMEOUT)> @@ -385,10 +363,11 @@ public: template auto asyncReceive(TokenT&& token, const TimeoutT& timeout = DEFAULT_RECEIVE_TIMEOUT) { - constexpr auto is_async_ctx_t = std::same_as, async_call_ctx_t>; + // constexpr auto is_async_ctx_t = std::same_as, async_call_ctx_t>; // check completion token signature and deduce message type - if constexpr (!adc_asio_special_comp_token_c && !is_async_ctx_t) { + // if constexpr (!adc_asio_special_comp_token_c && !is_async_ctx_t) { + if constexpr (!adc_asio_special_comp_token_c) { static_assert(traits::adc_func_traits::arity == 2, "INVALID COMPLETION TOKEN SIGNATURE!"); static_assert(std::is_same_v>, std::error_code>, "INVALID COMPLETION TOKEN SIGNATURE!"); @@ -398,7 +377,8 @@ public: } using msg_t = std::conditional_t< - adc_asio_special_comp_token_c || is_async_ctx_t, RMSGT, + // adc_asio_special_comp_token_c || is_async_ctx_t, RMSGT, + adc_asio_special_comp_token_c, RMSGT, std::remove_cvref_t::args_t>>>; auto out_flags = std::make_shared(); @@ -408,7 +388,6 @@ public: return asio::async_compose( [out_flags, do_read = true, timer = std::move(timer), this](auto& self, asio::error_code ec = {}, size_t nbytes = 0) mutable { - // RMSGT msg; msg_t msg; if (!ec) { @@ -451,9 +430,7 @@ public: auto start_ptr = static_cast(_streamBuffer.data().data()); - // auto sr = this->search(std::span(start_ptr, _streamBuffer.size())); auto net_pack = this->search(std::span(start_ptr, _streamBuffer.size())); - // if (!std::get<2>(sr)) { if (net_pack.empty()) { do_read = true; asio::post(std::move(self)); // initiate consequence socket's read operation @@ -463,8 +440,6 @@ public: timer->cancel(); // there were no errors in the asynchronous read-operation, so stop timer // here one has at least a single message - // size_t N = std::distance(std::get<0>(sr), std::get<1>(sr)); - // auto net_pack = std::span{start_ptr, N}; std::ranges::copy(this->fromProto(net_pack), std::back_inserter(msg)); _streamBuffer.consume(net_pack.size()); @@ -473,14 +448,9 @@ public: while (_streamBuffer.size()) { // search for possible additional session protocol packets start_ptr = static_cast(_streamBuffer.data().data()); - // sr = this->search(std::span(start_ptr, _streamBuffer.size())); net_pack = this->search(std::span(start_ptr, _streamBuffer.size())); if (!net_pack.empty()) { - // if (std::get<2>(sr)) { - // N = std::distance(std::get<0>(sr), std::get<1>(sr)); - // net_pack = std::span{start_ptr, N}; - _receiveQueue.emplace(); std::ranges::copy(this->fromProto(net_pack), std::back_inserter(_receiveQueue.back())); _streamBuffer.consume(net_pack.size()); @@ -499,7 +469,6 @@ public: if constexpr (std::is_same_v) { self.complete(ec, std::move(msg)); } else { - // msg_t user_msg{msg.begin(), msg.end()}; self.complete(ec, {msg.begin(), msg.end()}); } }, @@ -508,13 +477,6 @@ public: /* blocking methods */ - template - auto accept(const endpoint_t& endpoint, const TimeoutT& timeout = DEFAULT_ACCEPT_TIMEOUT) - { - std::future ftr = asyncAccept(endpoint, asio::use_future, timeout); - ftr.get(); - } - template auto connect(const endpoint_t& endpoint, const TimeoutT& timeout = DEFAULT_CONNECT_TIMEOUT) { @@ -582,8 +544,6 @@ protected: socket_t _socket; - acceptor_t _acceptor; - asio::streambuf _streamBuffer; std::queue> _receiveQueue; diff --git a/tests/adc_netservice_test.cpp b/tests/adc_netservice_test.cpp index 5050fca..ae77ef0 100644 --- a/tests/adc_netservice_test.cpp +++ b/tests/adc_netservice_test.cpp @@ -5,13 +5,13 @@ #include "../net/asio/adc_netservice_asio.h" template -void receive(T& srv) +void receive(T srv) { srv.asyncReceive( [&srv](std::error_code ec, std::string msg) { if (!ec) { std::cout << "Received: [" << msg << "]\n"; - receive(srv); + receive(std::move(srv)); } else { std::cout << "Received: " << ec.message() << "\n"; } @@ -30,37 +30,23 @@ int main() asio::io_context ctx; - adc::impl::AdcNetServiceASIOBase> srv(ctx); + using srv_t = adc::impl::AdcNetServiceASIOBase>; - // adc::impl::AdcNetServiceASIOBase>::asio_async_ctx_t srv_ctx; - // srv_ctx.accept_comp_token = [](std::error_code ec) { + typename srv_t::acceptor_t acc(ctx, ept_c); - // }; - - // srv.asyncAccept(ept_s, srv_ctx, std::chrono::seconds(120)); - // srv.asyncConnect(ept_c, [](std::error_code ec) { - - // }); - - // adc::impl::AdcNetServiceASIOBase>::contx_t s_ctx; - - // srv.asyncConnect(ept_c, s_ctx); - // auto res = srv.asyncConnect(ept_c, asio::use_awaitable); - - srv.asyncAccept(ept_c, [&srv](std::error_code ec) { + acc.asyncAccept([](std::error_code ec, srv_t srv) { if (!ec) { - std::cout << "New connection\n"; - - // srv.asyncReceive( - // [](std::error_code ec, std::string msg) { - // if (!ec) { - // std::cout << "Received: [" << msg << "]\n"; - // } else { - // std::cout << "Received: " << ec.message() << "\n"; - // } - // }, - // std::chrono::minutes(1)); - receive(srv); + // receive(std::move(srv)); + srv.asyncReceive( + [](std::error_code ec, std::string msg) { + if (!ec) { + std::cout << "Received: [" << msg << "]\n"; + // receive(std::move(srv)); + } else { + std::cout << "Received: " << ec.message() << "\n"; + } + }, + std::chrono::minutes(1)); } else { std::cout << "ACCEPT ERR: " << ec.message() << "\n"; }