diff --git a/common/adc_traits.h b/common/adc_traits.h index 77c1eaa..7c382b8 100644 --- a/common/adc_traits.h +++ b/common/adc_traits.h @@ -211,5 +211,35 @@ concept adc_time_duration_c = requires { }; +template +struct adc_duration_common_type; + + +template +struct adc_duration_common_type : std::common_type { +}; + + +template +struct adc_duration_common_type : adc_duration_common_type, Ts...> { +}; + + +template +using adc_duration_common_type_t = typename adc_duration_common_type::type; + + +/* all STL helper duration types */ +using adc_common_duration_t = adc_duration_common_type_t; + } // namespace adc::traits diff --git a/net/adc_net_concepts.h b/net/adc_net_concepts.h index 0c9a8f7..87d081d 100644 --- a/net/adc_net_concepts.h +++ b/net/adc_net_concepts.h @@ -93,45 +93,46 @@ concept adc_async_callback_t = traits::adc_is_callable && traits::adc_func_tr } */ -template , // sending message type - typename RMSGT = std::vector, // receiving message type - typename DURT = adc_common_duration_t // time duration type - > -concept adc_netservice_c = - traits::adc_input_char_range && traits::adc_output_char_range && traits::adc_time_duration_c && - requires(SRVT srv, const SRVT srv_const) { - typename SRVT::netservice_ident_t; +template +concept adc_netservice_c = std::movable && requires(SRVT srv, const SRVT srv_const) { + typename SRVT::netservice_ident_t; // service identificator type - // netservice_ident_t ident() const - { srv_const.ident() } -> std::same_as; + typename SRVT::send_msg_t; // sending message type + typename SRVT::recv_msg_t; // receiving message type + typename SRVT::timeout_t; // a type representing timeout (e.g. time duration) - typename SRVT::async_call_ctx_t; - typename SRVT::endpoint_t; + typename SRVT::endpoint_t; // a type representing endpoint of the network service + // underlying protocol - // asynchronous (non-blocking) operations - srv.asyncAccept(std::declval(), - std::declval(), std::declval()); + // netservice_ident_t ident() const + { srv_const.ident() } -> std::same_as; - srv.asyncConnect(std::declval(), - std::declval(), std::declval()); + typename SRVT::async_call_ctx_t; - srv.asyncSend(std::declval(), std::declval(), - std::declval()); - srv.asyncReceive(std::declval(), std::declval()); + // asynchronous (non-blocking) operations + srv.asyncAccept(std::declval(), std::declval(), + std::declval()); - // synchronous (blocking) operations - srv.accept(std::declval(), std::declval()); + srv.asyncConnect(std::declval(), std::declval(), + std::declval()); - srv.connect(std::declval(), std::declval()); + srv.asyncSend(std::declval(), std::declval(), + std::declval()); - srv.send(std::declval(), std::declval()); + srv.asyncReceive(std::declval(), std::declval()); - { srv.receive(std::declval()) } -> std::same_as; + // synchronous (blocking) operations + srv.accept(std::declval(), std::declval()); - srv.close(); - }; + srv.connect(std::declval(), std::declval()); + + srv.send(std::declval(), std::declval()); + + { srv.receive(std::declval()) } -> std::same_as; + + srv.close(); +}; /* NETWORK SESSION */ @@ -140,6 +141,7 @@ template concept adc_netsession_c = std::derived_from> && requires(SESST sess, const SESST sess_const) { typename SESST::netsession_ident_t; + typename SESST::netsession_ctx_t; // netsession_ident_t ident() const { sess_const.ident() } -> std::same_as; @@ -169,7 +171,8 @@ concept adc_netsession_proto_c = // flag - true if valid sequence was found, false - otherwise { proto.search(std::declval()) - } -> std::same_as, std::ranges::iterator_t, bool>>; + // } -> std::same_as, std::ranges::iterator_t, bool>>; + } -> traits::adc_view_or_output_char_range; // construct netsession protocol representation of input user byte sequence diff --git a/net/adc_netproto.h b/net/adc_netproto.h index 7d491f0..3eea15d 100644 --- a/net/adc_netproto.h +++ b/net/adc_netproto.h @@ -41,52 +41,67 @@ struct AdcStopSeqSessionProto { return "STOP SEQUENCE PROTO"; } + // template + // auto search(const R& r) + // { + // std::tuple, std::ranges::iterator_t, bool> res{r.begin(), r.end(), false}; + + // if (!r.size()) { + // return res; + // } + + // // std::get<1>(res) = std::search(r.begin(), r.end(), STOP_SEQ.begin(), STOP_SEQ.end()); + // auto found = std::ranges::search(r, STOP_SEQ); + // // if (std::get<1>(res) != r.end()) { // move iterator to the one-past-the-end position + // if (!found.empty()) { // move iterator to the one-past-the-end position + // // std::advance(std::get<1>(res), STOP_SEQ_SIZE); + // // std::get<1>(res) = found.begin() + STOP_SEQ_SIZE; + // std::get<1>(res) = found.end(); + // std::get<2>(res) = true; + // } + + // return res; + // } + template auto search(const R& r) { - std::tuple, std::ranges::iterator_t, bool> res{r.begin(), r.end(), false}; - - if (!r.size()) { - return res; - } - - // std::get<1>(res) = std::search(r.begin(), r.end(), STOP_SEQ.begin(), STOP_SEQ.end()); auto found = std::ranges::search(r, STOP_SEQ); - // if (std::get<1>(res) != r.end()) { // move iterator to the one-past-the-end position - if (!found.empty()) { // move iterator to the one-past-the-end position - // std::advance(std::get<1>(res), STOP_SEQ_SIZE); - // std::get<1>(res) = found.begin() + STOP_SEQ_SIZE; - std::get<1>(res) = found.end(); - std::get<2>(res) = true; - } - return res; - } - - - template - auto search(IT begin, IT end) - { - std::tuple res{begin, end, false}; - - if (begin == end) { + if constexpr (std::ranges::viewable_range) { + return found.empty() ? std::span(r.begin(), r.begin()) : std::span(r.begin(), found.end()); + } else { + std::vector res; // to guaranty adc_output_char_range + if (!found.empty()) { + std::copy(r.begin(), found.end(), std::back_inserter(res)); + } return res; } - - auto it = std::search(begin, end, STOP_SEQ.begin(), STOP_SEQ.end()); - if (it != end) { - // std::advance(it, STOP_SEQ_SIZE); // move iterator to the one-past-the-end position - std::get<1>(res) = it + STOP_SEQ_SIZE; - std::get<2>(res) = true; - } else { - // may be only a part of valid byte sequence was received, - // so start next matching from previous begin-iterator - std::get<1>(res) = begin; - } - - return res; } + // template + // auto search(IT begin, IT end) + // { + // std::tuple res{begin, end, false}; + + // if (begin == end) { + // return res; + // } + + // auto it = std::search(begin, end, STOP_SEQ.begin(), STOP_SEQ.end()); + // if (it != end) { + // // std::advance(it, STOP_SEQ_SIZE); // move iterator to the one-past-the-end position + // std::get<1>(res) = it + STOP_SEQ_SIZE; + // std::get<2>(res) = true; + // } else { + // // may be only a part of valid byte sequence was received, + // // so start next matching from previous begin-iterator + // std::get<1>(res) = begin; + // } + + // return res; + // } + template auto toProto(const R& r) { diff --git a/net/adc_netserver.h b/net/adc_netserver.h index 67e3a22..717be7b 100644 --- a/net/adc_netserver.h +++ b/net/adc_netserver.h @@ -25,7 +25,6 @@ namespace adc { - class AdcNetServer { protected: @@ -41,10 +40,28 @@ public: return _serverIdent; } - template - void start(SRVT&& netservice, const typename SRVT::endpoint_t& endpoint) {}; + template + void start(const typename SessionT::netservice_t::endpoint_t& endpoint, + const typename SessionT::netsession_ident_t& id, + typename SessionT::netsession_ctx_t&& sess_ctx, + NetsrvCtorArgTs&&... ctor_args) + { + typename SessionT::netservice_t netservice(std::forward(ctor_args)...); - virtual void stop() {}; + netservice.asyncAccept(endpoint, [&endpoint, &id, sess_ctx, this](auto ec, auto...) { + if (!ec) { + auto sess = std::make_shared(id, std::forward(sess_ctx)); + startSession(sess); + + start(endpoint, id, sess_ctx); + } + }); + }; + + virtual void stop() + { + stopAllSessions(); + }; // run server as daemon (still only on POSIX OSes) diff --git a/net/asio/adc_netservice_asio.h b/net/asio/adc_netservice_asio.h index 89a102f..09b2c1a 100644 --- a/net/asio/adc_netservice_asio.h +++ b/net/asio/adc_netservice_asio.h @@ -8,8 +8,10 @@ #include #include #include +#include #include #include +#include #include #include #include @@ -46,7 +48,8 @@ using asio_matchcond_result_t = std::pair; template concept adc_asio_transport_proto_c = std::derived_from || std::derived_from || - std::derived_from || std::derived_from; + std::derived_from || std::derived_from || + std::derived_from; template @@ -90,23 +93,33 @@ concept adc_asio_special_comp_token_c = template SESSION_PROTOT, - traits::adc_output_char_range RMSGT = std::vector> + traits::adc_output_char_range RMSGT = + std::vector> // used only for inner storing of message byte sequence class AdcNetServiceASIOBase : public SESSION_PROTOT { public: + // typedefs to satisfy 'adc_netservice_c' concept typedef std::string_view netservice_ident_t; - using socket_t = typename TRANSPORT_PROTOT::socket; + typedef std::vector send_msg_t; // in general, only one of several possible + typedef RMSGT recv_msg_t; // in general, only one of several possible (see class template arguments declaration) + typedef traits::adc_common_duration_t timeout_t; using endpoint_t = typename TRANSPORT_PROTOT::endpoint; + + // typedefs for completion tokens (callbacks, required by 'adc_netservice_c' concept) + typedef std::function async_accept_callback_t; + typedef std::function async_connect_callback_t; + typedef std::function async_send_callback_t; + typedef std::function async_receive_callback_t; + + + // typedefs from transport protocol + using socket_t = typename TRANSPORT_PROTOT::socket; using acceptor_t = std::conditional_t>, std::nullptr_t, // there is no acceptor typename TRANSPORT_PROTOT::acceptor>; - typedef std::function async_accept_callback_t; - typedef std::function async_connect_callback_t; - typedef std::function async_send_callback_t; - typedef std::function async_receive_callback_t; // to satisfy 'adc_netservice_c' concept @@ -361,59 +374,35 @@ public: adc_asio_special_comp_token_c || is_async_ctx_t, RMSGT, std::remove_cvref_t::args_t>>>; - // auto s_res = std::make_sharedtemplate search), RMSGT>>(); - - // auto tp = this->search(std::span()); - // auto s_res = std::make_shared(); - auto s_res = std::make_shared>(); - auto out_flags = std::make_shared(); auto timer = getDeadlineTimer(_socket, timeout); return asio::async_compose( - [s_res, out_flags, start = true, timer = std::move(timer), this](auto& self, asio::error_code ec = {}, - size_t = 0) mutable { - RMSGT msg; + [out_flags, do_read = true, timer = std::move(timer), this](auto& self, asio::error_code ec = {}, + size_t nbytes = 0) mutable { + // RMSGT msg; + msg_t msg; if (!ec) { - if (start) { - start = false; + if (do_read) { + do_read = false; if (_receiveQueue.size()) { // return message from queue timer->cancel(); - msg = _receiveQueue.front(); + auto imsg = _receiveQueue.front(); _receiveQueue.pop(); if constexpr (std::is_same_v) { - self.complete(std::error_code(), std::move(msg)); + self.complete(std::error_code(), std::move(imsg)); } else { - // msg_t user_msg{msg.begin(), msg.end()}; - self.complete(std::error_code(), {msg.begin(), msg.end()}); + self.complete(std::error_code(), {imsg.begin(), imsg.end()}); } return; } if constexpr (std::derived_from>) { - // adapt to ASIO's MatchCondition - auto match_func = - std::function( - [s_res, this](asio_streambuff_iter_t begin, asio_streambuff_iter_t end) { - // *s_res = this->search(std::span(&*begin, &*end)); - *s_res = this->search(begin, end); - - auto N = std::distance(std::get<0>(*s_res), std::get<1>(*s_res)); - asio_matchcond_result_t res{begin + N, std::get<2>(*s_res)}; - return res; - }); - - return asio::async_read_until(_socket, _streamBuffer, std::move(match_func), - std::move(self)); - // return asio::async_read_until( - // _socket, _streamBuffer, - // std::bind(&AdcNetServiceASIOBase::template MatchCondition, this, - // std::placeholders::_1, std::placeholders::_2, s_res), - // std::move(self)); - + return asio::async_read(_socket, _streamBuffer, asio::transfer_at_least(1), + std::move(self)); } else if constexpr (std::derived_from>) { // datagram, so it should be received at once @@ -427,54 +416,49 @@ public: } } + // if (!nbytes) { + // do_read = true; + // asio::post(std::move(self)); // initiate consequence socket's read operation + // return; + // } + auto start_ptr = static_cast(_streamBuffer.data().data()); - if constexpr (std::derived_from> || - std::derived_from>) { - auto begin_it = asio::buffers_begin(_streamBuffer.data()); - auto end_it = asio::buffers_end(_streamBuffer.data()); - - *s_res = this->search(begin_it, end_it); - - if (!std::get<2>(*s_res)) { // partial or too big packet?!! - // For these types of sockets it is an error! - // ec = std::make_error_code(std::errc::protocol_error); - // self.complete(ec, msg_t()); - start = true; - asio::post(std::move(self)); // initiate consequence socket's read operation - return; - } - } // here, the iterators were computed in MatchCondition called by asio::async_read_until - // function!!! + // auto sr = this->search(std::span(start_ptr, _streamBuffer.size())); + auto net_pack = this->search(std::span(start_ptr, _streamBuffer.size())); + // if (!std::get<2>(sr)) { + if (net_pack.empty()) { + do_read = true; + asio::post(std::move(self)); // initiate consequence socket's read operation + return; + } timer->cancel(); // there were no errors in the asynchronous read-operation, so stop timer - size_t N = std::distance(std::get<0>(*s_res), std::get<1>(*s_res)); - std::span net_pack{&*std::get<0>(*s_res), N}; + // here one has at least a single message + // size_t N = std::distance(std::get<0>(sr), std::get<1>(sr)); + // auto net_pack = std::span{start_ptr, N}; std::ranges::copy(this->fromProto(net_pack), std::back_inserter(msg)); _streamBuffer.consume(net_pack.size()); + while (_streamBuffer.size()) { // search for possible additional session protocol packets + start_ptr = static_cast(_streamBuffer.data().data()); - auto begin_it = asio::buffers_begin(_streamBuffer.data()); - auto end_it = asio::buffers_end(_streamBuffer.data()); + // sr = this->search(std::span(start_ptr, _streamBuffer.size())); + net_pack = this->search(std::span(start_ptr, _streamBuffer.size())); - - // *s_res = this->search(std::span(begin_it, end_it)); - *s_res = this->search(begin_it, end_it); - if (std::get<2>(*s_res)) { - // net_pack = std::string_view{std::get<0>(*s_res), std::get<1>(*s_res)}; - N = std::distance(std::get<0>(*s_res), std::get<1>(*s_res)); - net_pack = std::span{&*std::get<0>(*s_res), N}; + if (!net_pack.empty()) { + // if (std::get<2>(sr)) { + // N = std::distance(std::get<0>(sr), std::get<1>(sr)); + // net_pack = std::span{start_ptr, N}; _receiveQueue.emplace(); std::ranges::copy(this->fromProto(net_pack), std::back_inserter(_receiveQueue.back())); _streamBuffer.consume(net_pack.size()); } else { - break; + break; // exit and hold remaining bytes in stream buffer } } } @@ -579,29 +563,6 @@ protected: asio::socket_base::shutdown_type _shutdownType = asio::socket_base::shutdown_both; - template - auto MatchCondition(asio_streambuff_iter_t begin, asio_streambuff_iter_t end, T& s_res) - { - // if (begin == end) { - // *s_res = this->search(std::span()); - // } else { - // *s_res = this->search(std::span(&*begin, &*end)); - // } - - *s_res = this->search(begin, end); - - // return std::make_pair(std::get<1>(*s_res), std::get<2>(*s_res)); - - std::pair res{end, false}; - - typename std::iterator_traits::difference_type N = 0; - if (std::get<2>(*s_res)) { - N = std::distance(std::get<0>(*s_res), std::get<1>(*s_res)); - res = std::make_pair(begin + N, true); - } - return res; - }; - template static std::unique_ptr getDeadlineTimer(CancelableT& obj, diff --git a/net/asio/adc_netsession_asio.h b/net/asio/adc_netsession_asio.h index 4438f14..a91747a 100644 --- a/net/asio/adc_netsession_asio.h +++ b/net/asio/adc_netsession_asio.h @@ -5,7 +5,90 @@ namespace adc::impl { -class AdcNetSessionASIO +template SESSION_PROTOT, + traits::adc_output_char_range RMSGT = std::vector> +class AdcGenericNetSessionASIO : public std::enable_shared_from_this< + AdcGenericNetSessionASIO> +{ +public: + typedef std::string netsession_ident_t; + + typedef SessionContextT netsession_ctx_t; + + typedef AdcNetServiceASIOBase netservice_t; + + template + AdcGenericNetSessionASIO(const R& id, netservice_t netservice, netsession_ctx_t&& context) + : _ident(), _netservice(std::move(netservice)), _sessionContext(std::forward(context)) + { + if constexpr (std::is_array_v) { + _ident = id; + } else { + _ident = std::string(id.begin(), id.end()); + } + } + + AdcGenericNetSessionASIO(netservice_t netservice, netsession_ctx_t&& context) + : AdcGenericNetSessionASIO( + std::derived_from ? "ASIO TCP SESSION" + : std::derived_from ? "ASIO UDP SESSION" + : std::derived_from ? "ASIO UNIX SEQPACKET SESSION" + : std::derived_from ? "ASIO UNIX STREAM SESSION" + : std::derived_from ? "ASIO UNIX DATAGRAM SESSION" + : "ASIO UNKNOWN", + std::move(netservice), + std::forward(context)) + { + } + + + virtual ~AdcGenericNetSessionASIO() + { + stop(); + } + + + netsession_ident_t ident() const + { + return _ident; + } + + + virtual void start() = 0; + + virtual void stop() + { + _netservice->close(); + } + + + template + AdcGenericNetSessionASIO& setDefaultTimeouts(const TimeoutT& send_timeout, const TimeoutT& recv_timeout) + { + _sendTimeout = send_timeout; + _recvTimeout = recv_timeout; + + return *this; + } + +protected: + netsession_ident_t _ident; + + std::shared_ptr _netservice; + + netsession_ctx_t _sessionContext; + + std::chrono::duration _recvTimeout = + std::chrono::seconds::max(); + + std::chrono::duration _sendTimeout = + std::chrono::seconds(5); +}; + +/* +class AdcGenericNetSessionASIO { public: typedef std::string netsession_ident_t; @@ -15,7 +98,7 @@ public: interfaces::adc_netsession_proto_c SESSION_PROTOT, traits::adc_output_char_range RMSGT = std::vector, traits::adc_is_callable RECV_MSG_TOKENT> - AdcNetSessionASIO(const R& id, + AdcGenericNetSessionASIO(const R& id, std::shared_ptr> netservice, RECV_MSG_TOKENT&& recv_msg_token) : _ident(id.begin(), id.end()) @@ -52,21 +135,19 @@ public: interfaces::adc_netsession_proto_c SESSION_PROTOT, traits::adc_output_char_range RMSGT = std::vector, traits::adc_is_callable RECV_MSG_TOKENT> - AdcNetSessionASIO(std::shared_ptr> netservice, + AdcGenericNetSessionASIO(std::shared_ptr> netservice, RECV_MSG_TOKENT&& recv_msg_token) - : AdcNetSessionASIO(std::derived_from ? "TCP SESSION" + : AdcGenericNetSessionASIO(std::derived_from ? "TCP SESSION" : std::derived_from ? "UDP SESSION" : std::derived_from ? "UNIX SEQPACKET SESSION" - : std::derived_from ? "UNIX STREAM SESSION" - : "UNKNOWN", - std::move(netservice), - std::forward(recv_msg_token)) + : std::derived_from ? "UNIX STREAM +SESSION" : "UNKNOWN", std::move(netservice), std::forward(recv_msg_token)) { } - virtual ~AdcNetSessionASIO() + virtual ~AdcGenericNetSessionASIO() { stop(); } @@ -90,7 +171,7 @@ public: template - AdcNetSessionASIO& setDefaultTimeouts(const TimeoutT& send_timeout, const TimeoutT& recv_timeout) + AdcGenericNetSessionASIO& setDefaultTimeouts(const TimeoutT& send_timeout, const TimeoutT& recv_timeout) { _sendTimeout = send_timeout; _recvTimeout = recv_timeout; @@ -111,4 +192,6 @@ protected: std::chrono::seconds(5); }; +*/ + } // namespace adc::impl