diff --git a/net/asio/adc_netservice_asio.h b/net/asio/adc_netservice_asio.h index 6df5a3a..c63c37a 100644 --- a/net/asio/adc_netservice_asio.h +++ b/net/asio/adc_netservice_asio.h @@ -52,17 +52,30 @@ concept adc_asio_stream_transport_proto_c = template concept adc_asio_is_future = requires { - [](std::type_identity>) {}(std::type_identity>()); - // [](std::type_identity>) { - // }(std::type_identity>()); + // [](std::type_identity>) {}(std::type_identity>()); + [](std::type_identity>) { + }(std::type_identity>{}); }; template concept adc_asio_is_awaitable = requires { [](std::type_identity>) { - }(std::type_identity>()); + }(std::type_identity>{}); }; + +template +concept adc_asio_special_comp_token = + adc_asio_is_future || adc_asio_is_awaitable || std::same_as, asio::deferred_t>; + + +// template +// static constexpr bool adc_is_asio_special_comp_token = std::is_same_v, asio::use_future_t<>> +// || +// std::is_same_v, asio::deferred_t> || +// std::is_same_v, +// asio::use_awaitable_t<>>; + struct adc_asio_async_call_ctx_t { }; @@ -91,10 +104,14 @@ template >, + std::nullptr_t, // there is no acceptor + typename TRANSPORT_PROTOT::acceptor>; struct asio_async_ctx_t { bool use_future = false; @@ -112,6 +129,10 @@ public: public: contx_t() = default; + contx_t(contx_t&) = default; + contx_t(contx_t&&) = default; + contx_t(const contx_t&) = default; + template TokenT> contx_t(TokenT&& token) { @@ -169,20 +190,14 @@ public: static constexpr std::chrono::duration DEFAULT_SEND_TIMEOUT = std::chrono::seconds(5); static constexpr std::chrono::duration DEFAULT_RECEIVE_TIMEOUT = std::chrono::seconds(5); - AdcNetServiceASIOBase(const netservice_ident_t& ident, asio::io_context& ctx) - : SESSION_PROTOT(), - _ident(ident), - _ioContext(ctx), - _receiveStrand(_ioContext), - _receiveQueue(), - _socket(_ioContext) + AdcNetServiceASIOBase(asio::io_context& ctx) + : SESSION_PROTOT(), _ioContext(ctx), _receiveStrand(_ioContext), _receiveQueue(), _socket(_ioContext) { } - AdcNetServiceASIOBase(const netservice_ident_t& ident, socket_t socket) + AdcNetServiceASIOBase(socket_t socket) : SESSION_PROTOT(), - _ident(ident), _ioContext(socket.get_executor()), _receiveStrand(_ioContext), _receiveQueue(), @@ -191,58 +206,73 @@ public: } + AdcNetServiceASIOBase(const AdcNetServiceASIOBase&) = delete; // no copy constructor! + virtual ~AdcNetServiceASIOBase() {} - netservice_ident_t ident() const + constexpr netservice_ident_t ident() const { return _ident; } - void clear() + /* asynchronuos methods */ + + template TokenT, + traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_ACCEPT_TIMEOUT)> + static auto asyncAccept(asio::io_context io_ctx, + const endpoint_t& endpoint, + TokenT&& token, + const TimeoutT& timeout = DEFAULT_ACCEPT_TIMEOUT) { - // clear receiving messages queue - // NOTE: there is no racing condition here since using asio::strand! - asio::post(_receiveStrand, [this]() { _receiveQueue = {}; }); + // no acceptor for UDP-sockets + if constexpr (!std::is_null_pointer_v) { + static_assert(false, "INVALID TRANSPORT PROTOCOL TYPE!"); + } + + auto acc = acceptor_t(io_ctx); + auto sock = socket_t(io_ctx); + + auto timer = getDeadlineTimer(timeout); + + return asio::async_compose( + [acc = std::move(acc), sock = std::move(sock), timer = std::move(timer), start = true, &endpoint, &io_ctx]( + auto& self, std::error_code ec = {}) mutable { + if (!ec) { + if (start) { + start = false; + try { + acc = acceptor_t(io_ctx, endpoint); + } catch (std::system_error err) { + timer->cancel(); + self.complete(err.code()); + return; + } + + return acc.async_accept(sock, std::move(self)); + } + } + + if (isTimeout(timer, ec)) { + ec = std::make_error_code(std::errc::timed_out); + } else { // an error occured in async_connect + timer->cancel(); + } + + self.complete(ec, AdcNetServiceASIOBase(std::move(sock))); + }, + token, io_ctx); } - // template TokenT, - // traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_CONNECT_TIMEOUT)> - // auto asyncConnect(const endpoint_t& endpoint, TokenT&& token, const TimeoutT& timeout = DEFAULT_CONNECT_TIMEOUT) - // { - // static_assert(!std::is_same_v, "'async_call_ctx_t'-TYPE MUST NOT BE USED!"); - - // auto timer = getDeadlineTimer(timeout); - - // auto comp_token = [wrapper = traits::adc_pf_wrapper(std::forward(token)), - // timer = std::move(timer)](std::error_code ec) { - // if (isTimeout(timer, ec)) { - // ec = std::make_error_code(std::errc::timed_out); - // } else { - // timer->cancel(); - // } - - // if constexpr (!adc_asio_is_future) { - // std::get<0>(wrapper)(ec); - // } - // }; - - // if constexpr (!adc_asio_is_future) { - // return _socket.async_connect(endpoint, asio::use_future(std::move(comp_token))); - // } else { - // return _socket.async_connect(endpoint, std::move(comp_token)); - // } - // } - - template CompTokenT, + template TokenT, traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_CONNECT_TIMEOUT)> - auto asyncConnect(const endpoint_t& endpoint, CompTokenT&& token, const TimeoutT& timeout = DEFAULT_CONNECT_TIMEOUT) + auto asyncConnect(const endpoint_t& endpoint, TokenT&& token, const TimeoutT& timeout = DEFAULT_CONNECT_TIMEOUT) { - auto timer = getDeadlineTimer(timeout); + auto timer = getDeadlineTimer(_socket, timeout); - return asio::async_compose( + return asio::async_compose( [start = true, endpoint, timer = std::move(timer), this](auto& self, asio::error_code ec = {}) mutable { if (!ec) { if (start) { @@ -268,49 +298,42 @@ public: traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_SEND_TIMEOUT)> auto asyncSend(const MessageT& msg, TokenT&& token, const TimeoutT& timeout = DEFAULT_SEND_TIMEOUT) { - static_assert(!std::is_same_v, "'async_call_ctx_t'-TYPE MUST NOT BE USED!"); - // create buffer sequence of sending session protocol representation of the input message std::vector buff_seq; std::ranges::for_each(this->toProto(msg), [&buff_seq](const auto& el) { buff_seq.emplace_back(el); }); - auto timer = getDeadlineTimer(timeout); + auto timer = getDeadlineTimer(_socket, timeout); - auto comp_token = [wrapper = traits::adc_pf_wrapper(std::forward(token)), buff_seq, - timer = std::move(timer)](std::error_code ec, size_t) { - timer->cancel(); + return asio::async_compose( + [start = true, buff_seq = std::move(buff_seq), timer = std::move(timer), this]( + auto& self, asio::error_code ec = {}) mutable { + if (!ec) { + if (start) { + start = false; + if constexpr (std::derived_from>) { + return asio::async_write(_socket, buff_seq, std::move(self)); + } else if constexpr (std::derived_from>) { + return _socket.async_send(buff_seq, std::move(self)); + } else if constexpr (std::derived_from>) { + return _socket.async_send(buff_seq, std::move(self)); + } else { + static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!"); + } + } + } - if constexpr (!adc_asio_is_future) { - std::get<0>(wrapper)(ec); - } - }; + if (isTimeout(timer, ec)) { + ec = std::make_error_code(std::errc::timed_out); + } else { // an error occured in async_write/async_send + timer->cancel(); + } - - if constexpr (adc_asio_is_future) { - if constexpr (std::derived_from>) { - return asio::async_write(_socket, buff_seq, asio::use_future(std::move(comp_token))); - } else if constexpr (std::derived_from>) { - return _socket.async_send(buff_seq, asio::use_future(std::move(comp_token))); - } else if constexpr (std::derived_from>) { - return _socket.async_send(buff_seq, asio::use_future(std::move(comp_token))); - } else { - static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!"); - } - } else { - if constexpr (std::derived_from>) { - return asio::async_write(_socket, buff_seq, std::move(comp_token)); - } else if constexpr (std::derived_from>) { - return _socket.async_send(buff_seq, std::move(comp_token)); - } else if constexpr (std::derived_from>) { - return _socket.async_send(buff_seq, std::move(comp_token)); - } else { - static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!"); - } - } + self.complete(ec); + }, + token, _socket); } @@ -319,8 +342,8 @@ public: { static_assert(!std::is_same_v, "'async_call_ctx_t'-TYPE MUST NOT BE USED!"); - // check completion token signature - if constexpr (!(adc_asio_is_future || std::is_same_v)) { + // check completion token signature and deduce message type + if constexpr (!adc_asio_special_comp_token) { static_assert(traits::adc_func_traits::arity == 2, "INVALID COMPLETION TOKEN SIGNATURE!"); static_assert(std::is_same_v>, std::error_code>, "INVALID COMPLETION TOKEN SIGNATURE!"); @@ -330,153 +353,113 @@ public: } using msg_t = std::conditional_t< - adc_asio_is_future || std::is_same_v, RMSGT, + adc_asio_special_comp_token, RMSGT, std::remove_cvref_t::args_t>>>; - if (_receiveQueue.size()) { // return message from queue - auto async_init = [this](auto&& compl_hndl) { - asio::post(_receiveStrand, [&compl_hndl, this]() { - RMSGT msg = _receiveQueue.front(); - _receiveQueue.pop(); - compl_hndl(std::error_code(), std::move(msg)); - }); - }; - - if (adc_asio_is_future) { - return asio::async_initiate( - async_init, asio::use_future(ctx.receive_comp_token)); - } else { - return asio::async_initiate(async_init, ctx.receive_comp_token); - } - } - - auto out_flags = std::make_unique(); - - auto timer = getDeadlineTimer(timeout); - auto s_res = std::make_shared>(); - // NOTE: this completion token is safe (_streamBuffer access) in multithread context since all the instances - // will be executed in serialized execution manner (see asio::strand) - auto comp_token = [&ctx, s_res, timer = std::move(timer), out_flags = std::move(out_flags), this]( - std::error_code ec, size_t nbytes) { - timer->cancel(); - RMSGT msg; + auto out_flags = std::make_shared(); - if (!ec && nbytes) { - // here, the iterators were computed in MatchCondition called by asio::async_read_until function!!! - std::string_view net_pack{std::get<0>(*s_res), std::get<1>(*s_res)}; + auto timer = getDeadlineTimer(_socket, timeout); - std::ranges::copy(this->fromProto(net_pack), std::back_inserter(msg)); - _streamBuffer.consume(net_pack.size()); + return asio::async_compose( + [s_res, out_flags, start = true, timer = std::move(timer), this](auto& self, asio::error_code ec = {}, + size_t = 0) mutable { + RMSGT msg; - while (_streamBuffer.size()) { // search for possible additional session protocol packets - auto begin_it = (const char*)asio_streambuff_iter_t::begin(_streamBuffer.data()); - auto end_it = (const char*)asio_streambuff_iter_t::end(_streamBuffer.data()); - // static_cast>(_streamBuffer.data().data()); - // auto end_it = begin_it + _streamBuffer.data().size(); + if (!ec) { + if (start) { + start = false; + if (_receiveQueue.size()) { // return message from queue + msg = _receiveQueue.front(); + _receiveQueue.pop(); + if constexpr (std::is_same_v) { + self.complete(std::error_code(), std::move(msg)); + } else { + // msg_t user_msg{msg.begin(), msg.end()}; + self.complete(std::error_code(), {msg.begin(), msg.end()}); + } + return; + } + + if constexpr (std::derived_from>) { + // adapt to ASIO's MatchCondition + auto match_func = [s_res, this](IT begin, IT end) { + *s_res = this->search(std::span(begin, end)); + return std::make_tuple(std::get<1>(*s_res), std::get<2>(*s_res)); + }; + + return asio::async_read_until(_socket, _streamBuffer, match_func, std::move(self)); + + } else if constexpr (std::derived_from>) { + // datagram, so it should be received at once + return _socket.receive(_streamBuffer, std::move(self)); + } else if constexpr (std::derived_from>) { + // datagram, so it should be received at once + return _socket.receive(_streamBuffer, *out_flags, std::move(self)); + } else { + static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!"); + } + } + + // here, the iterators were computed in MatchCondition called by asio::async_read_until function!!! + std::string_view net_pack{std::get<0>(*s_res), std::get<1>(*s_res)}; + + std::ranges::copy(this->fromProto(net_pack), std::back_inserter(msg)); + _streamBuffer.consume(net_pack.size()); + + while (_streamBuffer.size()) { // search for possible additional session protocol packets + auto begin_it = (const char*)asio_streambuff_iter_t::begin(_streamBuffer.data()); + auto end_it = (const char*)asio_streambuff_iter_t::end(_streamBuffer.data()); + // static_cast>(_streamBuffer.data().data()); + // auto end_it = begin_it + _streamBuffer.data().size(); - *s_res = this->search(std::span(begin_it, end_it)); - if (std::get<2>(*s_res)) { - net_pack = std::string_view{std::get<0>(*s_res), std::get<1>(*s_res)}; + *s_res = this->search(std::span(begin_it, end_it)); + if (std::get<2>(*s_res)) { + net_pack = std::string_view{std::get<0>(*s_res), std::get<1>(*s_res)}; - _receiveQueue.emplace(); - std::ranges::copy(this->fromProto(net_pack), std::back_inserter(_receiveQueue.back())); - _streamBuffer.consume(net_pack.size()); - } else { - break; + _receiveQueue.emplace(); + std::ranges::copy(this->fromProto(net_pack), std::back_inserter(_receiveQueue.back())); + _streamBuffer.consume(net_pack.size()); + } else { + break; + } } } - } - if (ctx.use_future) { - return msg; - } else { - ctx.receive_comp_token(ec, std::move(msg)); - } - }; + if (isTimeout(timer, ec)) { + ec = std::make_error_code(std::errc::timed_out); + } else { // an error occured in async_connect + timer->cancel(); + } - if (ctx.use_future) { - comp_token = asio::use_future(comp_token); - } - - comp_token = asio::bind_executor(_receiveStrand, comp_token); - - if constexpr (std::derived_from>) { - // adapt to ASIO's MatchCondition - auto match_func = [s_res, this](IT begin, IT end) { - *s_res = this->search(std::span(begin, end)); - return std::make_tuple(std::get<1>(*s_res), std::get<2>(*s_res)); - }; - - return asio::async_read_until(_socket, _streamBuffer, match_func, comp_token); - - } else if constexpr (std::derived_from>) { - // datagram, so it should be received at once - return _socket.receive(_streamBuffer, comp_token); - } else if constexpr (std::derived_from>) { - // datagram, so it should be received at once - return _socket.receive(_streamBuffer, *out_flags, comp_token); - } else { - static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!"); - } - - /* - if constexpr (std::derived_from>) { - // adapt to ASIO's MatchCondition - auto match_func = [s_res, this](IT begin, IT end) { - *s_res = this->search(std::span(begin, end)); - return std::make_tuple(std::get<1>(*s_res), std::get<2>(*s_res)); - }; - - if (ctx.use_future) { - return asio::async_read_until(_socket, _streamBuffer, match_func, - asio::bind_executor(_receiveStrand, asio::use_future(comp_token))); - } else { - return asio::async_read_until(_socket, _streamBuffer, match_func, - asio::bind_executor(_receiveStrand, comp_token)); - } - } else if constexpr (std::derived_from>) { - // datagram, so it should be received at once - if (ctx.use_future) { - return _socket.receive(_streamBuffer, - asio::bind_executor(_receiveStrand, asio::use_future(comp_token))); - } else { - return _socket.receive(_streamBuffer, asio::bind_executor(_receiveStrand, comp_token)); - } - } else if constexpr (std::derived_from>) { - // datagram, so it should be received at once - if (ctx.use_future) { - return _socket.receive(_streamBuffer, *out_flags, - asio::bind_executor(_receiveStrand, asio::use_future(comp_token))); - } else { - return _socket.receive(_streamBuffer, *out_flags, asio::bind_executor(_receiveStrand, comp_token)); - } - } else { - static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!"); - } - */ + if constexpr (std::is_same_v) { + self.complete(ec, std::move(msg)); + } else { + // msg_t user_msg{msg.begin(), msg.end()}; + self.complete(ec, {msg.begin(), msg.end()}); + } + }, + token, _socket); } + /* blocking methods */ template - auto accept(const endpoint_t& endpoint, const TimeoutT& timeout = DEFAULT_ACCEPT_TIMEOUT) + static auto accept(const endpoint_t& endpoint, const TimeoutT& timeout = DEFAULT_ACCEPT_TIMEOUT) { - asio_async_ctx_t ctx = {.use_future = true}; - std::future ftr = asyncAcept(endpoint, ctx, timeout); + std::future ftr = asyncAccept(endpoint, asio::use_future, timeout); ftr.get(); } template auto connect(const endpoint_t& endpoint, const TimeoutT& timeout = DEFAULT_CONNECT_TIMEOUT) { - asio_async_ctx_t ctx = {.use_future = true}; - std::future ftr = asyncConnect(endpoint, ctx, timeout); + std::future ftr = asyncConnect(endpoint, asio::use_future, timeout); ftr.get(); } @@ -494,16 +477,6 @@ public: return ftr.get(); } - void setShutdownType(asio::socket_base::shutdown_type shutdown_type) - { - _shutdownType = shutdown_type; - } - - asio::socket_base::shutdown_type getShutdownType() const - { - return _shutdownType; - } - std::error_code close() { std::error_code ec; @@ -516,8 +489,34 @@ public: return ec; } + /* additional ASIO-related methods */ + + void clear() + { + // clear receiving messages queue + // NOTE: there is no racing condition here since using asio::strand! + asio::post(_receiveStrand, [this]() { _receiveQueue = {}; }); + } + + void setShutdownType(asio::socket_base::shutdown_type shutdown_type) + { + _shutdownType = shutdown_type; + } + + asio::socket_base::shutdown_type getShutdownType() const + { + return _shutdownType; + } + protected: - netservice_ident_t _ident; + static constexpr netservice_ident_t _ident = + std::derived_from> + ? "STREAM-SOCKET NETWORK SERVICE" + : std::derived_from> + ? "DATAGRAM-SOCKET NETWORK SERVICE" + : std::derived_from> + ? "SEQPACKET-SOCKET NETWORK SERVICE" + : "UNKNOWN"; asio::io_context& _ioContext; asio::io_context::strand _receiveStrand; @@ -532,17 +531,19 @@ protected: asio::socket_base::shutdown_type _shutdownType = asio::socket_base::shutdown_both; - template - std::unique_ptr getDeadlineTimer(const TimeoutT& timeout, bool arm = true) + template + static std::unique_ptr getDeadlineTimer(CancelableT& obj, + const TimeoutT& timeout, + bool arm = true) { - auto timer = std::make_unique(_socket.get_executor()); + auto timer = std::make_unique(obj.get_executor()); if (arm) { timer->expires_after(timeout); - timer->async_wait([this](const std::error_code& ec) { + timer->async_wait([&obj](const std::error_code& ec) mutable { if (!ec) { - _socket.cancel(); + obj.cancel(); } }); } diff --git a/tests/adc_netservice_test.cpp b/tests/adc_netservice_test.cpp index 38f0e51..a242593 100644 --- a/tests/adc_netservice_test.cpp +++ b/tests/adc_netservice_test.cpp @@ -10,7 +10,7 @@ int main() asio::io_context ctx; - adc::impl::AdcNetServiceASIOBase> srv("TCP NETSERVICE", ctx); + adc::impl::AdcNetServiceASIOBase> srv(ctx); adc::impl::AdcNetServiceASIOBase>::asio_async_ctx_t srv_ctx; srv_ctx.accept_comp_token = [](std::error_code ec) { @@ -25,7 +25,7 @@ int main() adc::impl::AdcNetServiceASIOBase>::contx_t s_ctx; srv.asyncConnect(ept_c, s_ctx); - srv.asyncConnect(ept_c, asio::use_future); + auto res = srv.asyncConnect(ept_c, asio::use_awaitable); ctx.run();