From d33c101d7063aa8fc77fa1b56adbde256dd9db9b Mon Sep 17 00:00:00 2001 From: "Timur A. Fatkhullin" Date: Sun, 15 Sep 2024 01:25:58 +0300 Subject: [PATCH] ... --- CMakeLists.txt | 4 ++ common/adc_traits.h | 4 +- net/adc_netproto.h | 9 ++- net/asio/adc_netservice_asio.h | 100 ++++++++++++++++++--------------- tests/adc_netservice_test.cpp | 26 +++++++++ 5 files changed, 95 insertions(+), 48 deletions(-) create mode 100644 tests/adc_netservice_test.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 9664f2e..41ab249 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -85,6 +85,10 @@ if (BUILD_TESTS) set(NETMSG_TEST_APP adc_netmsg_test) add_executable(${NETMSG_TEST_APP} tests/adc_netmsg_test.cpp) + set(NETSERVICE_TEST_APP adc_netservice_test) + add_executable(${NETSERVICE_TEST_APP} tests/adc_netservice_test.cpp) + target_link_libraries(${NETSERVICE_TEST_APP} OpenSSL::SSL OpenSSL::Crypto) + if (NOT doctest_FOUND) include(FetchContent) FetchContent_Declare( diff --git a/common/adc_traits.h b/common/adc_traits.h index 3d76ed1..77c1eaa 100644 --- a/common/adc_traits.h +++ b/common/adc_traits.h @@ -205,7 +205,9 @@ concept adc_tuple_like = adc_is_tuple_v == true; template // from https://stackoverflow.com/questions/74383254/concept-that-models-only-the-stdchrono-duration-types concept adc_time_duration_c = requires { - [](std::type_identity>) {}(std::type_identity()); + [](std::type_identity>) { + + }(std::type_identity>()); }; diff --git a/net/adc_netproto.h b/net/adc_netproto.h index 746dcc9..bb01e24 100644 --- a/net/adc_netproto.h +++ b/net/adc_netproto.h @@ -34,6 +34,10 @@ struct AdcStopSeqSessionProto { static_assert(STOP_SEQ_SIZE, "STOP BYTE SEQUENCE MUST NOT BE AN EMPTY ONE!!!"); + typedef std::string proto_ident_t; + + proto_ident_t ident() const { return "STOP SEQUENCE PROTO"; } + template auto search(const R& r) { @@ -41,7 +45,7 @@ struct AdcStopSeqSessionProto { std::get<1>(res) = std::search(r.begin(), r.end(), STOP_SEQ.begin(), STOP_SEQ.end()); if (std::get<1>(res) != r.end()) { // move iterator to the one-past-the-end position - std::get<1>(res) = std::advance(std::get<1>(res), STOP_SEQ_SIZE); + std::advance(std::get<1>(res), STOP_SEQ_SIZE); std::get<2>(res) = true; } @@ -73,7 +77,8 @@ struct AdcStopSeqSessionProto { auto N = std::distance(r.begin(), r.end()); if (N < STOP_SEQ_SIZE) { // one must ensure for input range size correctness - return std::span>(); + // return std::span>(); + return std::ranges::subrange(r.begin(), r.begin()); } if constexpr (std::ranges::viewable_range) { diff --git a/net/asio/adc_netservice_asio.h b/net/asio/adc_netservice_asio.h index d639212..a57ca31 100644 --- a/net/asio/adc_netservice_asio.h +++ b/net/asio/adc_netservice_asio.h @@ -48,11 +48,10 @@ concept adc_asio_stream_transport_proto_c = std::derived_from || std::derived_from; - template SESSION_PROTOT, traits::adc_output_char_range RMSGT = std::vector> -class AdcNetServiceASIOBase : public TRANSPORT_PROTOT, public SESSION_PROTOT +class AdcNetServiceASIOBase : public SESSION_PROTOT { public: typedef std::string netservice_ident_t; @@ -74,13 +73,7 @@ public: static constexpr std::chrono::duration DEFAULT_RECEIVE_TIMEOUT = std::chrono::seconds(5); AdcNetServiceASIOBase(const netservice_ident_t& ident, asio::io_context& ctx) - : TRANSPORT_PROTOT(), - SESSION_PROTOT(), - _ident(ident), - _ioContext(ctx), - _receiveStrand(_ioContext), - _receiveQueue(), - _socket(_ioContext) + : SESSION_PROTOT(), _ident(ident), _ioContext(ctx), _receiveStrand(ctx), _receiveQueue(), _socket(ctx) { } @@ -88,10 +81,7 @@ public: virtual ~AdcNetServiceASIOBase() {} - netservice_ident_t ident() const - { - return _ident; - } + netservice_ident_t ident() const { return _ident; } void clear() @@ -109,15 +99,32 @@ public: { auto timer = getDeadlineTimer(timeout); - if (ctx.use_future) { - return _socket.async_connect( - endpoint, asio::use_future([&ctx, timer = std::move(timer)](std::error_code) { timer->cancel(); })); - } else { - return _socket.async_connect(endpoint, [&ctx, timer = std::move(timer)](std::error_code ec) { + auto comp_token = [&ctx, timer = std::move(timer), this](std::error_code ec) { + if (isTimeout(timer, ec)) { + ec = std::make_error_code(std::errc::timed_out); + } else { timer->cancel(); + } + if (!ctx.use_future) { ctx.connect_comp_token(ec); - }); + } + }; + + if (ctx.use_future) { + comp_token = asio::use_future(comp_token); } + + return _socket.async_connect(endpoint, comp_token); + + // if (ctx.use_future) { + // return _socket.async_connect( + // endpoint, asio::use_future([&ctx, timer = std::move(timer)](std::error_code) { timer->cancel(); })); + // } else { + // return _socket.async_connect(endpoint, [&ctx, timer = std::move(timer)](std::error_code ec) { + // timer->cancel(); + // ctx.connect_comp_token(ec); + // }); + // } } @@ -195,19 +202,22 @@ public: auto asyncReceive(asio_async_ctx_t& ctx, const TimeoutT& timeout = DEFAULT_RECEIVE_TIMEOUT) { if (_receiveQueue.size()) { // return message from queue - // - // !!!!!!!!!!! see documentation for composed operation and async_initiate - // - asio::post(_receiveStrand, [&ctx, this]() { - RMSGT msg = _receiveQueue.front(); - _receiveQueue.pop(); - if (ctx.use_future) { - return msg; - } else { - ctx.receive_comp_token(std::error_code(), std::move(msg)); - return; - } - }); + auto async_init = [this](auto&& compl_hndl) { + asio::post(_receiveStrand, [&compl_hndl, this]() { + RMSGT msg = _receiveQueue.front(); + _receiveQueue.pop(); + compl_hndl(std::error_code(), std::move(msg)); + }); + }; + + if (ctx.use_future) { + return asio::async_initiate(async_init, + asio::use_future(ctx.receive_comp_token)); + } else { + return asio::async_initiate(async_init, ctx.receive_comp_token); + } } auto out_flags = std::make_unique(); @@ -216,8 +226,8 @@ public: auto s_res = std::make_shared>(); - // NOTE: this competion token is safe (_streamBuffer access) in multithread context since all the instances will - // be executed in serialized execution manner (see asio::strand) + // NOTE: this completion token is safe (_streamBuffer access) in multithread context since all the instances + // will be executed in serialized execution manner (see asio::strand) auto comp_token = [&ctx, s_res, timer = std::move(timer), out_flags = std::move(out_flags), this]( std::error_code ec, size_t nbytes) { timer->cancel(); @@ -354,15 +364,9 @@ public: return ftr.get(); } - void setShutdownType(asio::socket_base::shutdown_type shutdown_type) - { - _shutdownType = shutdown_type; - } + void setShutdownType(asio::socket_base::shutdown_type shutdown_type) { _shutdownType = shutdown_type; } - asio::socket_base::shutdown_type getShutdownType() const - { - return _shutdownType; - } + asio::socket_base::shutdown_type getShutdownType() const { return _shutdownType; } std::error_code close() { @@ -381,7 +385,6 @@ protected: asio::io_context& _ioContext; asio::io_context::strand _receiveStrand; - asio::io_context::strand _sendStrand; socket_t _socket; @@ -396,20 +399,27 @@ protected: template std::unique_ptr getDeadlineTimer(const TimeoutT& timeout, bool arm = true) { - std::unique_ptr timer(_socket.get_executor()); + auto timer = std::make_unique(_socket.get_executor()); if (arm) { timer->expires_after(timeout); timer->async_wait([this](const std::error_code& ec) { if (!ec) { - _socket.cancel(std::make_error_code(std::errc::timed_out)); + _socket.cancel(); } }); } return timer; } + + template + bool isTimeout(const std::unique_ptr& timer, const std::error_code& ec) + { + auto exp_time = timer->expiry(); + return (exp_time < std::chrono::steady_clock::now()) && (ec == asio::error::operation_aborted); + } }; } // namespace adc::impl diff --git a/tests/adc_netservice_test.cpp b/tests/adc_netservice_test.cpp new file mode 100644 index 0000000..d4d513b --- /dev/null +++ b/tests/adc_netservice_test.cpp @@ -0,0 +1,26 @@ +#include + +#include "../net/adc_netproto.h" +#include "../net/asio/adc_netservice_asio.h" + +int main() +{ + asio::ip::tcp::endpoint ept_s(asio::ip::tcp::v4(), 9999); + asio::ip::tcp::endpoint ept_c(asio::ip::make_address_v4("127.0.0.1"), 9999); + + asio::io_context ctx; + + adc::impl::AdcNetServiceASIOBase> srv("TCP NETSERVICE", ctx); + + adc::impl::AdcNetServiceASIOBase>::asio_async_ctx_t srv_ctx; + srv_ctx.accept_comp_token = [](std::error_code ec) { + + }; + + // srv.asyncAccept(ept_s, srv_ctx, std::chrono::seconds(120)); + srv.asyncConnect(ept_c, srv_ctx); + + ctx.run(); + + return 0; +}