diff --git a/net/adc_net_concepts.h b/net/adc_net_concepts.h index 8153471..4ff1da9 100644 --- a/net/adc_net_concepts.h +++ b/net/adc_net_concepts.h @@ -84,39 +84,40 @@ template -concept adc_netservice_c = traits::adc_input_char_range && traits::adc_output_char_range && traits::adc_time_duration_c && - requires(SRVT srv, const SRVT srv_const) { - typename SRVT::netservice_ident_t; +concept adc_netservice_c = + traits::adc_input_char_range && traits::adc_output_char_range && traits::adc_time_duration_c && + requires(SRVT srv, const SRVT srv_const) { + typename SRVT::netservice_ident_t; - // netservice_ident_t ident() const - { srv_const.ident() } -> std::same_as; + // netservice_ident_t ident() const + { srv_const.ident() } -> std::same_as; - typename SRVT::async_ctx_t; - typename SRVT::endpoint_t; + typename SRVT::async_ctx_t; + typename SRVT::endpoint_t; - // asynchronous (non-blocking) operations - srv.asyncAccept(std::declval(), - std::declval(), std::declval()); + // asynchronous (non-blocking) operations + srv.asyncAccept(std::declval(), std::declval(), + std::declval()); - srv.asyncConnect(std::declval(), - std::declval(), std::declval()); + srv.asyncConnect(std::declval(), std::declval(), + std::declval()); - srv.asyncSend(std::declval(), std::declval(), - std::declval()); + srv.asyncSend(std::declval(), std::declval(), + std::declval()); - srv.asyncReceive(std::declval(), std::declval()); + srv.asyncReceive(std::declval(), std::declval()); - // synchronous (blocking) operations - srv.accept(std::declval(), std::declval()); + // synchronous (blocking) operations + srv.accept(std::declval(), std::declval()); - srv.connect(std::declval(), std::declval()); + srv.connect(std::declval(), std::declval()); - srv.send(std::declval(), std::declval()); + srv.send(std::declval(), std::declval()); - { srv.receive(std::declval()) } -> std::same_as; + { srv.receive(std::declval()) } -> std::same_as; - srv.shutdown(); - }; + srv.close(); + }; /* NETWORK SESSION */ @@ -137,35 +138,34 @@ concept adc_netsession_c = /* NETWORK SESSION-LEVEL PROTOCOL */ -template -concept adc_netsession_proto_c = traits::adc_input_char_range && requires(SESS_PROTOT proto, const SESS_PROTOT proto_const) { - typename SESS_PROTOT::proto_ident_t; +template +concept adc_netsession_proto_c = + traits::adc_input_char_range && requires(SESS_PROTOT proto, const SESS_PROTOT proto_const) { + typename SESS_PROTOT::proto_ident_t; - // proto_ident_t ident() const (const method) - { proto_const.ident() } -> std::same_as; + // proto_ident_t ident() const (const method) + { proto_const.ident() } -> std::same_as; - // typename SESS_PROTOT::search_result_t; + // typename SESS_PROTOT::search_result_t; - // search for the first occurence of valid protocol sequence in input user byte sequence - // the method must return std::tuple: - // start - input range iterator of the sequence first byte - // stop - input range iterator of the sequence end ("after-the-last" byte!!!) - // flag - true if valid sequence was found, false - otherwise - { - proto.search(std::declval()) - } -> std::same_as, std::ranges::iterator_t, bool>>; + // search for the first occurence of valid protocol sequence in input user byte sequence + // the method must return std::tuple: + // start - input range iterator of the sequence first byte + // stop - input range iterator of the sequence end ("after-the-last" byte!!!) + // flag - true if valid sequence was found, false - otherwise + { + proto.search(std::declval()) + } -> std::same_as, std::ranges::iterator_t, bool>>; - // construct netsession protocol representation of input user byte sequence - // the method must return a range of char range views or output char range - { proto.toProto(std::declval()) } -> traits::adc_range_of_view_or_output_char_range; + // construct netsession protocol representation of input user byte sequence + // the method must return a range of char range views or output char range + { proto.toProto(std::declval()) } -> traits::adc_range_of_view_or_output_char_range; - // return user byte sequence from input netsession protocol representation - // the method must return a view of char range or output char range - { proto.fromProto(std::declval()) } -> traits::adc_view_or_output_char_range; - -}; + // return user byte sequence from input netsession protocol representation + // the method must return a view of char range or output char range + { proto.fromProto(std::declval()) } -> traits::adc_view_or_output_char_range; + }; } // namespace adc::interfaces diff --git a/net/adc_netservice_asio.h b/net/adc_netservice_asio.h index 9afde8c..f70792c 100644 --- a/net/adc_netservice_asio.h +++ b/net/adc_netservice_asio.h @@ -40,7 +40,6 @@ #include "adc_netmsg.h" -#include "adc_net_concepts.h" namespace adc::traits @@ -72,215 +71,6 @@ concept adc_asio_inet_stream_proto_c = requires(T t, asio_streambuff_iter_t begi namespace adc::impl { -template -concept adc_asio_transport_proto_c = - std::derived_from || std::derived_from || - std::derived_from || std::derived_from; - - -template -concept adc_asio_stream_transport_proto_c = - std::derived_from || std::derived_from; - - - -template SESSION_PROTOT> -class AdcNetServiceASIOBase : public TRANSPORT_PROTOT, public SESSION_PROTOT -{ -public: - typedef std::string netservice_ident_t; - - using socket_t = typename TRANSPORT_PROTOT::socket; - using endpoint_t = typename TRANSPORT_PROTOT::endpoint; - - struct asio_async_ctx_t { - bool use_future = false; - std::function accept_comp_token; - std::function connect_comp_token; - std::function send_comp_token; - - template - static std::unordered_map> - receive_comp_token; - }; - - static constexpr std::chrono::duration DEFAULT_ACCEPT_TIMEOUT = std::chrono::years::max(); - static constexpr std::chrono::duration DEFAULT_CONNECT_TIMEOUT = std::chrono::seconds(10); - static constexpr std::chrono::duration DEFAULT_SEND_TIMEOUT = std::chrono::seconds(5); - static constexpr std::chrono::duration DEFAULT_RECEIVE_TIMEOUT = std::chrono::seconds(5); - - - netservice_ident_t ident() const - { - return _ident; - } - - - template - auto asyncConnect(const endpoint_t& endpoint, - asio_async_ctx_t& ctx, - const TimeoutT& timeout = DEFAULT_CONNECT_TIMEOUT) - { - auto timer = getDeadlineTimer(timeout); - - if (ctx.use_future) { - return _socket.async_connect( - endpoint, asio::use_future([&ctx, timer = std::move(timer)](std::error_code) { timer->cancel(); })); - } else { - return _socket.async_connect(endpoint, [&ctx, timer = std::move(timer)](std::error_code ec) { - timer->cancel(); - ctx.connect_comp_token(ec); - }); - } - } - - - template - auto asyncAccept(const endpoint_t& endpoint, - asio_async_ctx_t& ctx, - const TimeoutT& timeout = DEFAULT_ACCEPT_TIMEOUT) - { - if constexpr (std::derived_from>) { - return; // there is no acceptor for UDP protocol - } - - typename TRANSPORT_PROTOT::acceptor acceptor; - try { - acceptor = typename TRANSPORT_PROTOT::acceptor(_ioContext, endpoint); - } catch (std::system_error err) { - if (ctx.use_future) { // emulation of asio::use_future behaivior?! - throw; - } - ctx.accept_comp_token(err.code()); - return; - } - - auto timer = getDeadlineTimer(timeout); - - if (ctx.use_future) { - return _socket.async_accept( - endpoint, asio::use_future([&ctx, timer = std::move(timer)](std::error_code) { timer->cancel(); })); - } else { - return _socket.async_accept(endpoint, [&ctx, timer = std::move(timer)](std::error_code ec) { - timer->cancel(); - ctx.accept_comp_token(ec); - }); - } - } - - template - auto asyncReceive(asio_async_ctx_t& ctx, const TimeoutT& timeout = DEFAULT_RECEIVE_TIMEOUT) - { - auto timer = getDeadlineTimer(timeout); - - auto s_res = std::make_shared>(); - - if constexpr (std::derived_from>) { - return asio::async_read_until( - _socket, _streamBuffer, - [s_res, this](IT begin, IT end) { - *s_res = this->search(std::span(begin, end)); - return std::make_tuple(std::get<1>(*s_res), std::get<2>(*s_res)); - }, - [&ctx, s_res, timer = std::move(timer), this](std::error_code ec, size_t) { - timer->cancel(); - R msg; - - if (!ec) { - std::string_view net_pack{std::get<0>(*s_res), std::get<1>(*s_res)}; - - std::ranges::copy(this->fromProto(net_pack), std::back_inserter(msg)); - _streamBuffer.consume(net_pack.size()); - - while (_streamBuffer.size()) { // search for possible additional session protocol packets - auto begin_it = (const char*)traits::asio_streambuff_iter_t::begin(_streamBuffer.data()); - auto end_it = (const char*)traits::asio_streambuff_iter_t::end(_streamBuffer.data()); - // static_cast>(_streamBuffer.data().data()); - // auto end_it = begin_it + _streamBuffer.data().size(); - - - *s_res = this->search(std::span(begin_it, end_it)); - if (std::get<2>(*s_res)) { - net_pack = std::string_view{std::get<0>(*s_res), std::get<1>(*s_res)}; - - std::ranges::copy(this->fromProto(net_pack), std::back_inserter(msg)); - _streamBuffer.consume(net_pack.size()); - // TODO: insert to queue - } else { - break; - } - } - } - - - ctx.accept_comp_token(ec, std::move(msg)); - }); - } - } - - - template - auto accept(const endpoint_t& endpoint, const TimeoutT& timeout) - { - asio_async_ctx_t ctx = {.use_future = true}; - std::future ftr = asyncAcept(endpoint, ctx, timeout); - ftr.get(); - } - - template - auto connect(const endpoint_t& endpoint, const TimeoutT& timeout) - { - asio_async_ctx_t ctx = {.use_future = true}; - std::future ftr = asyncConnect(endpoint, ctx, timeout); - ftr.get(); - } - - template - auto send(const R& msg, const TimeoutT& timeout) - { - std::future ftr = asyncSend(msg, timeout, asio::use_future); - ftr.get(); - } - - template - auto receive(const TimeoutT& timeout) - { - std::future ftr = asyncReceive(timeout, asio::use_future); - return ftr.get(); - } - -protected: - netservice_ident_t _ident; - - asio::io_context& _ioContext; - - socket_t _socket; - - // acceptor_t _acceptor; - - asio::streambuf _streamBuffer; - - template - std::unique_ptr getDeadlineTimer(const TimeoutT& timeout, bool arm = true) - { - std::unique_ptr timer(_socket.get_executor()); - - if (arm) { - timer->expires_after(timeout); - - timer->async_wait([this](const std::error_code& ec) { - if (!ec) { - _socket.cancel(std::make_error_code(std::errc::timed_out)); - } - }); - } - - return timer; - } -}; - - template class AdcNetServiceASIO : public InetProtoT { diff --git a/net/asio/adc_netservice_asio.h b/net/asio/adc_netservice_asio.h new file mode 100644 index 0000000..d639212 --- /dev/null +++ b/net/asio/adc_netservice_asio.h @@ -0,0 +1,415 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#ifdef USE_OPENSSL_WITH_ASIO + +#include +#include + +#endif + + +#include "../../common/adc_traits.h" +#include "../adc_net_concepts.h" + + +namespace adc::impl +{ + +// typedef for ASIO streambuf iterators +using asio_streambuff_iter_t = asio::buffers_iterator; + + +template +concept adc_asio_transport_proto_c = + std::derived_from || std::derived_from || + std::derived_from || std::derived_from; + + +template +concept adc_asio_stream_transport_proto_c = + std::derived_from || std::derived_from; + + + +template SESSION_PROTOT, + traits::adc_output_char_range RMSGT = std::vector> +class AdcNetServiceASIOBase : public TRANSPORT_PROTOT, public SESSION_PROTOT +{ +public: + typedef std::string netservice_ident_t; + + using socket_t = typename TRANSPORT_PROTOT::socket; + using endpoint_t = typename TRANSPORT_PROTOT::endpoint; + + struct asio_async_ctx_t { + bool use_future = false; + std::function accept_comp_token; + std::function connect_comp_token; + std::function send_comp_token; + std::function receive_comp_token; + }; + + static constexpr std::chrono::duration DEFAULT_ACCEPT_TIMEOUT = std::chrono::years::max(); + static constexpr std::chrono::duration DEFAULT_CONNECT_TIMEOUT = std::chrono::seconds(10); + static constexpr std::chrono::duration DEFAULT_SEND_TIMEOUT = std::chrono::seconds(5); + static constexpr std::chrono::duration DEFAULT_RECEIVE_TIMEOUT = std::chrono::seconds(5); + + AdcNetServiceASIOBase(const netservice_ident_t& ident, asio::io_context& ctx) + : TRANSPORT_PROTOT(), + SESSION_PROTOT(), + _ident(ident), + _ioContext(ctx), + _receiveStrand(_ioContext), + _receiveQueue(), + _socket(_ioContext) + { + } + + + virtual ~AdcNetServiceASIOBase() {} + + + netservice_ident_t ident() const + { + return _ident; + } + + + void clear() + { + // clear receiving messages queue + // NOTE: there is no racing condition here since using asio::strand! + asio::post(_receiveStrand, [this]() { _receiveQueue = {}; }); + } + + + template + auto asyncConnect(const endpoint_t& endpoint, + asio_async_ctx_t& ctx, + const TimeoutT& timeout = DEFAULT_CONNECT_TIMEOUT) + { + auto timer = getDeadlineTimer(timeout); + + if (ctx.use_future) { + return _socket.async_connect( + endpoint, asio::use_future([&ctx, timer = std::move(timer)](std::error_code) { timer->cancel(); })); + } else { + return _socket.async_connect(endpoint, [&ctx, timer = std::move(timer)](std::error_code ec) { + timer->cancel(); + ctx.connect_comp_token(ec); + }); + } + } + + + template + auto asyncAccept(const endpoint_t& endpoint, + asio_async_ctx_t& ctx, + const TimeoutT& timeout = DEFAULT_ACCEPT_TIMEOUT) + { + if constexpr (std::derived_from>) { + return; // there is no acceptor for UDP protocol + } + + typename TRANSPORT_PROTOT::acceptor acceptor; + try { + acceptor = typename TRANSPORT_PROTOT::acceptor(_ioContext, endpoint); + } catch (std::system_error err) { + if (ctx.use_future) { // emulation of asio::use_future behaivior?! + throw; + } + ctx.accept_comp_token(err.code()); + return; + } + + auto timer = getDeadlineTimer(timeout); + + if (ctx.use_future) { + return _socket.async_accept( + endpoint, asio::use_future([&ctx, timer = std::move(timer)](std::error_code) { timer->cancel(); })); + } else { + return _socket.async_accept(endpoint, [&ctx, timer = std::move(timer)](std::error_code ec) { + timer->cancel(); + ctx.accept_comp_token(ec); + }); + } + } + + template + auto asyncSend(const SMSGT& msg, asio_async_ctx_t& ctx, const TimeoutT& timeout = DEFAULT_SEND_TIMEOUT) + { + // create buffer sequence of sending session protocol representation of the input message + std::vector buff_seq; + std::ranges::for_each(this->toProto(msg), [&buff_seq](const auto& el) { buff_seq.emplace_back(el); }); + + auto timer = getDeadlineTimer(timeout); + + auto comp_token = [&ctx, timer = std::move(timer)](std::error_code ec, size_t) { + timer->cancel(); + + if (!ctx.use_future) { + ctx.send_comp_token(ec); + } + }; + + + if (ctx.use_future) { + comp_token = asio::use_future(comp_token); + } + + + if constexpr (std::derived_from>) { + return asio::async_write(_socket, buff_seq, comp_token); + } else if constexpr (std::derived_from>) { + return _socket.async_send(buff_seq, comp_token); + } else if constexpr (std::derived_from>) { + return _socket.async_send(buff_seq, comp_token); + } else { + static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!"); + } + } + + + template + auto asyncReceive(asio_async_ctx_t& ctx, const TimeoutT& timeout = DEFAULT_RECEIVE_TIMEOUT) + { + if (_receiveQueue.size()) { // return message from queue + // + // !!!!!!!!!!! see documentation for composed operation and async_initiate + // + asio::post(_receiveStrand, [&ctx, this]() { + RMSGT msg = _receiveQueue.front(); + _receiveQueue.pop(); + if (ctx.use_future) { + return msg; + } else { + ctx.receive_comp_token(std::error_code(), std::move(msg)); + return; + } + }); + } + + auto out_flags = std::make_unique(); + + auto timer = getDeadlineTimer(timeout); + + auto s_res = std::make_shared>(); + + // NOTE: this competion token is safe (_streamBuffer access) in multithread context since all the instances will + // be executed in serialized execution manner (see asio::strand) + auto comp_token = [&ctx, s_res, timer = std::move(timer), out_flags = std::move(out_flags), this]( + std::error_code ec, size_t nbytes) { + timer->cancel(); + RMSGT msg; + + if (!ec && nbytes) { + // here, the iterators were computed in MatchCondition called by asio::async_read_until function!!! + std::string_view net_pack{std::get<0>(*s_res), std::get<1>(*s_res)}; + + std::ranges::copy(this->fromProto(net_pack), std::back_inserter(msg)); + _streamBuffer.consume(net_pack.size()); + + while (_streamBuffer.size()) { // search for possible additional session protocol packets + auto begin_it = (const char*)asio_streambuff_iter_t::begin(_streamBuffer.data()); + auto end_it = (const char*)asio_streambuff_iter_t::end(_streamBuffer.data()); + // static_cast>(_streamBuffer.data().data()); + // auto end_it = begin_it + _streamBuffer.data().size(); + + + *s_res = this->search(std::span(begin_it, end_it)); + if (std::get<2>(*s_res)) { + net_pack = std::string_view{std::get<0>(*s_res), std::get<1>(*s_res)}; + + _receiveQueue.emplace(); + std::ranges::copy(this->fromProto(net_pack), std::back_inserter(_receiveQueue.back())); + _streamBuffer.consume(net_pack.size()); + } else { + break; + } + } + } + + if (ctx.use_future) { + return msg; + } else { + ctx.receive_comp_token(ec, std::move(msg)); + } + }; + + if (ctx.use_future) { + comp_token = asio::use_future(comp_token); + } + + comp_token = asio::bind_executor(_receiveStrand, comp_token); + + if constexpr (std::derived_from>) { + // adapt to ASIO's MatchCondition + auto match_func = [s_res, this](IT begin, IT end) { + *s_res = this->search(std::span(begin, end)); + return std::make_tuple(std::get<1>(*s_res), std::get<2>(*s_res)); + }; + + return asio::async_read_until(_socket, _streamBuffer, match_func, comp_token); + + } else if constexpr (std::derived_from>) { + // datagram, so it should be received at once + return _socket.receive(_streamBuffer, comp_token); + } else if constexpr (std::derived_from>) { + // datagram, so it should be received at once + return _socket.receive(_streamBuffer, *out_flags, comp_token); + } else { + static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!"); + } + + /* + if constexpr (std::derived_from>) { + // adapt to ASIO's MatchCondition + auto match_func = [s_res, this](IT begin, IT end) { + *s_res = this->search(std::span(begin, end)); + return std::make_tuple(std::get<1>(*s_res), std::get<2>(*s_res)); + }; + + if (ctx.use_future) { + return asio::async_read_until(_socket, _streamBuffer, match_func, + asio::bind_executor(_receiveStrand, asio::use_future(comp_token))); + } else { + return asio::async_read_until(_socket, _streamBuffer, match_func, + asio::bind_executor(_receiveStrand, comp_token)); + } + } else if constexpr (std::derived_from>) { + // datagram, so it should be received at once + if (ctx.use_future) { + return _socket.receive(_streamBuffer, + asio::bind_executor(_receiveStrand, asio::use_future(comp_token))); + } else { + return _socket.receive(_streamBuffer, asio::bind_executor(_receiveStrand, comp_token)); + } + } else if constexpr (std::derived_from>) { + // datagram, so it should be received at once + if (ctx.use_future) { + return _socket.receive(_streamBuffer, *out_flags, + asio::bind_executor(_receiveStrand, asio::use_future(comp_token))); + } else { + return _socket.receive(_streamBuffer, *out_flags, asio::bind_executor(_receiveStrand, comp_token)); + } + } else { + static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!"); + } + */ + } + + + template + auto accept(const endpoint_t& endpoint, const TimeoutT& timeout = DEFAULT_ACCEPT_TIMEOUT) + { + asio_async_ctx_t ctx = {.use_future = true}; + std::future ftr = asyncAcept(endpoint, ctx, timeout); + ftr.get(); + } + + template + auto connect(const endpoint_t& endpoint, const TimeoutT& timeout = DEFAULT_CONNECT_TIMEOUT) + { + asio_async_ctx_t ctx = {.use_future = true}; + std::future ftr = asyncConnect(endpoint, ctx, timeout); + ftr.get(); + } + + template + auto send(const R& msg, const TimeoutT& timeout = DEFAULT_SEND_TIMEOUT) + { + std::future ftr = asyncSend(msg, timeout, asio::use_future); + ftr.get(); + } + + template + auto receive(const TimeoutT& timeout = DEFAULT_RECEIVE_TIMEOUT) + { + std::future ftr = asyncReceive(timeout, asio::use_future); + return ftr.get(); + } + + void setShutdownType(asio::socket_base::shutdown_type shutdown_type) + { + _shutdownType = shutdown_type; + } + + asio::socket_base::shutdown_type getShutdownType() const + { + return _shutdownType; + } + + std::error_code close() + { + std::error_code ec; + + _socket.shutdown(_shutdownType, ec); + if (!ec) { + _socket.close(ec); + } + + return ec; + } + +protected: + netservice_ident_t _ident; + + asio::io_context& _ioContext; + asio::io_context::strand _receiveStrand; + asio::io_context::strand _sendStrand; + + socket_t _socket; + + // acceptor_t _acceptor; + + asio::streambuf _streamBuffer; + + std::queue> _receiveQueue; + + asio::socket_base::shutdown_type _shutdownType = asio::socket_base::shutdown_both; + + template + std::unique_ptr getDeadlineTimer(const TimeoutT& timeout, bool arm = true) + { + std::unique_ptr timer(_socket.get_executor()); + + if (arm) { + timer->expires_after(timeout); + + timer->async_wait([this](const std::error_code& ec) { + if (!ec) { + _socket.cancel(std::make_error_code(std::errc::timed_out)); + } + }); + } + + return timer; + } +}; + +} // namespace adc::impl