rewrite SESSION_PROTO.search

rewrite ASIO NETSERVICE.asyncReceive
rewrite ASIO NETSESSION
This commit is contained in:
Timur A. Fatkhullin
2024-09-29 00:40:38 +03:00
parent 242a0571e0
commit 8aef1a7c25
6 changed files with 285 additions and 176 deletions

View File

@@ -8,8 +8,10 @@
#include <asio/deferred.hpp>
#include <asio/ip/tcp.hpp>
#include <asio/ip/udp.hpp>
#include <asio/local/datagram_protocol.hpp>
#include <asio/local/seq_packet_protocol.hpp>
#include <asio/local/stream_protocol.hpp>
#include <asio/read.hpp>
#include <asio/read_until.hpp>
#include <asio/steady_timer.hpp>
#include <asio/strand.hpp>
@@ -46,7 +48,8 @@ using asio_matchcond_result_t = std::pair<asio_streambuff_iter_t, bool>;
template <typename T>
concept adc_asio_transport_proto_c =
std::derived_from<T, asio::ip::tcp> || std::derived_from<T, asio::ip::udp> ||
std::derived_from<T, asio::local::seq_packet_protocol> || std::derived_from<T, asio::local::stream_protocol>;
std::derived_from<T, asio::local::seq_packet_protocol> || std::derived_from<T, asio::local::stream_protocol> ||
std::derived_from<T, asio::local::datagram_protocol>;
template <typename T>
@@ -90,23 +93,33 @@ concept adc_asio_special_comp_token_c =
template <adc_asio_transport_proto_c TRANSPORT_PROTOT,
interfaces::adc_netsession_proto_c<std::string_view> SESSION_PROTOT,
traits::adc_output_char_range RMSGT = std::vector<char>>
traits::adc_output_char_range RMSGT =
std::vector<char>> // used only for inner storing of message byte sequence
class AdcNetServiceASIOBase : public SESSION_PROTOT
{
public:
// typedefs to satisfy 'adc_netservice_c' concept
typedef std::string_view netservice_ident_t;
using socket_t = typename TRANSPORT_PROTOT::socket;
typedef std::vector<char> send_msg_t; // in general, only one of several possible
typedef RMSGT recv_msg_t; // in general, only one of several possible (see class template arguments declaration)
typedef traits::adc_common_duration_t timeout_t;
using endpoint_t = typename TRANSPORT_PROTOT::endpoint;
// typedefs for completion tokens (callbacks, required by 'adc_netservice_c' concept)
typedef std::function<void(std::error_code)> async_accept_callback_t;
typedef std::function<void(std::error_code)> async_connect_callback_t;
typedef std::function<void(std::error_code)> async_send_callback_t;
typedef std::function<void(std::error_code, RMSGT)> async_receive_callback_t;
// typedefs from transport protocol
using socket_t = typename TRANSPORT_PROTOT::socket;
using acceptor_t =
std::conditional_t<std::derived_from<socket_t, asio::basic_datagram_socket<typename socket_t::protocol_type>>,
std::nullptr_t, // there is no acceptor
typename TRANSPORT_PROTOT::acceptor>;
typedef std::function<void(std::error_code)> async_accept_callback_t;
typedef std::function<void(std::error_code)> async_connect_callback_t;
typedef std::function<void(std::error_code)> async_send_callback_t;
typedef std::function<void(std::error_code, RMSGT)> async_receive_callback_t;
// to satisfy 'adc_netservice_c' concept
@@ -361,59 +374,35 @@ public:
adc_asio_special_comp_token_c<TokenT> || is_async_ctx_t, RMSGT,
std::remove_cvref_t<std::tuple_element_t<1, typename traits::adc_func_traits<TokenT>::args_t>>>;
// auto s_res = std::make_shared<std::invoke_result_t<decltype(this->template search<RMSGT>), RMSGT>>();
// auto tp = this->search(std::span<const char>());
// auto s_res = std::make_shared<decltype(tp)>();
auto s_res = std::make_shared<std::tuple<asio_streambuff_iter_t, asio_streambuff_iter_t, bool>>();
auto out_flags = std::make_shared<asio::socket_base::message_flags>();
auto timer = getDeadlineTimer(_socket, timeout);
return asio::async_compose<TokenT, void(asio::error_code, msg_t)>(
[s_res, out_flags, start = true, timer = std::move(timer), this](auto& self, asio::error_code ec = {},
size_t = 0) mutable {
RMSGT msg;
[out_flags, do_read = true, timer = std::move(timer), this](auto& self, asio::error_code ec = {},
size_t nbytes = 0) mutable {
// RMSGT msg;
msg_t msg;
if (!ec) {
if (start) {
start = false;
if (do_read) {
do_read = false;
if (_receiveQueue.size()) { // return message from queue
timer->cancel();
msg = _receiveQueue.front();
auto imsg = _receiveQueue.front();
_receiveQueue.pop();
if constexpr (std::is_same_v<msg_t, RMSGT>) {
self.complete(std::error_code(), std::move(msg));
self.complete(std::error_code(), std::move(imsg));
} else {
// msg_t user_msg{msg.begin(), msg.end()};
self.complete(std::error_code(), {msg.begin(), msg.end()});
self.complete(std::error_code(), {imsg.begin(), imsg.end()});
}
return;
}
if constexpr (std::derived_from<socket_t,
asio::basic_stream_socket<typename socket_t::protocol_type>>) {
// adapt to ASIO's MatchCondition
auto match_func =
std::function<asio_matchcond_result_t(asio_streambuff_iter_t, asio_streambuff_iter_t)>(
[s_res, this](asio_streambuff_iter_t begin, asio_streambuff_iter_t end) {
// *s_res = this->search(std::span(&*begin, &*end));
*s_res = this->search(begin, end);
auto N = std::distance(std::get<0>(*s_res), std::get<1>(*s_res));
asio_matchcond_result_t res{begin + N, std::get<2>(*s_res)};
return res;
});
return asio::async_read_until(_socket, _streamBuffer, std::move(match_func),
std::move(self));
// return asio::async_read_until(
// _socket, _streamBuffer,
// std::bind(&AdcNetServiceASIOBase::template MatchCondition<decltype(s_res)>, this,
// std::placeholders::_1, std::placeholders::_2, s_res),
// std::move(self));
return asio::async_read(_socket, _streamBuffer, asio::transfer_at_least(1),
std::move(self));
} else if constexpr (std::derived_from<socket_t, asio::basic_datagram_socket<
typename socket_t::protocol_type>>) {
// datagram, so it should be received at once
@@ -427,54 +416,49 @@ public:
}
}
// if (!nbytes) {
// do_read = true;
// asio::post(std::move(self)); // initiate consequence socket's read operation
// return;
// }
auto start_ptr = static_cast<const char*>(_streamBuffer.data().data());
if constexpr (std::derived_from<socket_t,
asio::basic_datagram_socket<typename socket_t::protocol_type>> ||
std::derived_from<socket_t,
asio::basic_seq_packet_socket<typename socket_t::protocol_type>>) {
auto begin_it = asio::buffers_begin(_streamBuffer.data());
auto end_it = asio::buffers_end(_streamBuffer.data());
*s_res = this->search(begin_it, end_it);
if (!std::get<2>(*s_res)) { // partial or too big packet?!!
// For these types of sockets it is an error!
// ec = std::make_error_code(std::errc::protocol_error);
// self.complete(ec, msg_t());
start = true;
asio::post(std::move(self)); // initiate consequence socket's read operation
return;
}
} // here, the iterators were computed in MatchCondition called by asio::async_read_until
// function!!!
// auto sr = this->search(std::span(start_ptr, _streamBuffer.size()));
auto net_pack = this->search(std::span(start_ptr, _streamBuffer.size()));
// if (!std::get<2>(sr)) {
if (net_pack.empty()) {
do_read = true;
asio::post(std::move(self)); // initiate consequence socket's read operation
return;
}
timer->cancel(); // there were no errors in the asynchronous read-operation, so stop timer
size_t N = std::distance(std::get<0>(*s_res), std::get<1>(*s_res));
std::span net_pack{&*std::get<0>(*s_res), N};
// here one has at least a single message
// size_t N = std::distance(std::get<0>(sr), std::get<1>(sr));
// auto net_pack = std::span{start_ptr, N};
std::ranges::copy(this->fromProto(net_pack), std::back_inserter(msg));
_streamBuffer.consume(net_pack.size());
while (_streamBuffer.size()) { // search for possible additional session protocol packets
start_ptr = static_cast<const char*>(_streamBuffer.data().data());
auto begin_it = asio::buffers_begin(_streamBuffer.data());
auto end_it = asio::buffers_end(_streamBuffer.data());
// sr = this->search(std::span(start_ptr, _streamBuffer.size()));
net_pack = this->search(std::span(start_ptr, _streamBuffer.size()));
// *s_res = this->search(std::span(begin_it, end_it));
*s_res = this->search(begin_it, end_it);
if (std::get<2>(*s_res)) {
// net_pack = std::string_view{std::get<0>(*s_res), std::get<1>(*s_res)};
N = std::distance(std::get<0>(*s_res), std::get<1>(*s_res));
net_pack = std::span{&*std::get<0>(*s_res), N};
if (!net_pack.empty()) {
// if (std::get<2>(sr)) {
// N = std::distance(std::get<0>(sr), std::get<1>(sr));
// net_pack = std::span{start_ptr, N};
_receiveQueue.emplace();
std::ranges::copy(this->fromProto(net_pack), std::back_inserter(_receiveQueue.back()));
_streamBuffer.consume(net_pack.size());
} else {
break;
break; // exit and hold remaining bytes in stream buffer
}
}
}
@@ -579,29 +563,6 @@ protected:
asio::socket_base::shutdown_type _shutdownType = asio::socket_base::shutdown_both;
template <typename T>
auto MatchCondition(asio_streambuff_iter_t begin, asio_streambuff_iter_t end, T& s_res)
{
// if (begin == end) {
// *s_res = this->search(std::span<const char>());
// } else {
// *s_res = this->search(std::span(&*begin, &*end));
// }
*s_res = this->search(begin, end);
// return std::make_pair(std::get<1>(*s_res), std::get<2>(*s_res));
std::pair<asio_streambuff_iter_t, bool> res{end, false};
typename std::iterator_traits<asio_streambuff_iter_t>::difference_type N = 0;
if (std::get<2>(*s_res)) {
N = std::distance(std::get<0>(*s_res), std::get<1>(*s_res));
res = std::make_pair(begin + N, true);
}
return res;
};
template <typename CancelableT, traits::adc_time_duration_c TimeoutT>
static std::unique_ptr<asio::steady_timer> getDeadlineTimer(CancelableT& obj,

View File

@@ -5,7 +5,90 @@
namespace adc::impl
{
class AdcNetSessionASIO
template <typename SessionContextT,
adc_asio_transport_proto_c TRANSPORT_PROTOT,
interfaces::adc_netsession_proto_c<std::string_view> SESSION_PROTOT,
traits::adc_output_char_range RMSGT = std::vector<char>>
class AdcGenericNetSessionASIO : public std::enable_shared_from_this<
AdcGenericNetSessionASIO<SessionContextT, TRANSPORT_PROTOT, SESSION_PROTOT, RMSGT>>
{
public:
typedef std::string netsession_ident_t;
typedef SessionContextT netsession_ctx_t;
typedef AdcNetServiceASIOBase<TRANSPORT_PROTOT, SESSION_PROTOT, RMSGT> netservice_t;
template <traits::adc_input_char_range R, traits::adc_is_callable RECV_MSG_TOKENT>
AdcGenericNetSessionASIO(const R& id, netservice_t netservice, netsession_ctx_t&& context)
: _ident(), _netservice(std::move(netservice)), _sessionContext(std::forward<netsession_ctx_t>(context))
{
if constexpr (std::is_array_v<R>) {
_ident = id;
} else {
_ident = std::string(id.begin(), id.end());
}
}
AdcGenericNetSessionASIO(netservice_t netservice, netsession_ctx_t&& context)
: AdcGenericNetSessionASIO(
std::derived_from<TRANSPORT_PROTOT, asio::ip::tcp> ? "ASIO TCP SESSION"
: std::derived_from<TRANSPORT_PROTOT, asio::ip::udp> ? "ASIO UDP SESSION"
: std::derived_from<TRANSPORT_PROTOT, asio::local::seq_packet_protocol> ? "ASIO UNIX SEQPACKET SESSION"
: std::derived_from<TRANSPORT_PROTOT, asio::local::stream_protocol> ? "ASIO UNIX STREAM SESSION"
: std::derived_from<TRANSPORT_PROTOT, asio::local::datagram_protocol> ? "ASIO UNIX DATAGRAM SESSION"
: "ASIO UNKNOWN",
std::move(netservice),
std::forward<netsession_ctx_t>(context))
{
}
virtual ~AdcGenericNetSessionASIO()
{
stop();
}
netsession_ident_t ident() const
{
return _ident;
}
virtual void start() = 0;
virtual void stop()
{
_netservice->close();
}
template <traits::adc_time_duration_c TimeoutT>
AdcGenericNetSessionASIO& setDefaultTimeouts(const TimeoutT& send_timeout, const TimeoutT& recv_timeout)
{
_sendTimeout = send_timeout;
_recvTimeout = recv_timeout;
return *this;
}
protected:
netsession_ident_t _ident;
std::shared_ptr<netservice_t> _netservice;
netsession_ctx_t _sessionContext;
std::chrono::duration<std::chrono::seconds::rep, std::chrono::seconds::period> _recvTimeout =
std::chrono::seconds::max();
std::chrono::duration<std::chrono::seconds::rep, std::chrono::seconds::period> _sendTimeout =
std::chrono::seconds(5);
};
/*
class AdcGenericNetSessionASIO
{
public:
typedef std::string netsession_ident_t;
@@ -15,7 +98,7 @@ public:
interfaces::adc_netsession_proto_c<std::string_view> SESSION_PROTOT,
traits::adc_output_char_range RMSGT = std::vector<char>,
traits::adc_is_callable RECV_MSG_TOKENT>
AdcNetSessionASIO(const R& id,
AdcGenericNetSessionASIO(const R& id,
std::shared_ptr<AdcNetServiceASIOBase<TRANSPORT_PROTOT, SESSION_PROTOT, RMSGT>> netservice,
RECV_MSG_TOKENT&& recv_msg_token)
: _ident(id.begin(), id.end())
@@ -52,21 +135,19 @@ public:
interfaces::adc_netsession_proto_c<std::string_view> SESSION_PROTOT,
traits::adc_output_char_range RMSGT = std::vector<char>,
traits::adc_is_callable RECV_MSG_TOKENT>
AdcNetSessionASIO(std::shared_ptr<AdcNetServiceASIOBase<TRANSPORT_PROTOT, SESSION_PROTOT, RMSGT>> netservice,
AdcGenericNetSessionASIO(std::shared_ptr<AdcNetServiceASIOBase<TRANSPORT_PROTOT, SESSION_PROTOT, RMSGT>> netservice,
RECV_MSG_TOKENT&& recv_msg_token)
: AdcNetSessionASIO(std::derived_from<TRANSPORT_PROTOT, asio::ip::tcp> ? "TCP SESSION"
: AdcGenericNetSessionASIO(std::derived_from<TRANSPORT_PROTOT, asio::ip::tcp> ? "TCP SESSION"
: std::derived_from<TRANSPORT_PROTOT, asio::ip::udp> ? "UDP SESSION"
: std::derived_from<TRANSPORT_PROTOT, asio::local::seq_packet_protocol>
? "UNIX SEQPACKET SESSION"
: std::derived_from<TRANSPORT_PROTOT, asio::local::stream_protocol> ? "UNIX STREAM SESSION"
: "UNKNOWN",
std::move(netservice),
std::forward<RECV_MSG_TOKENT>(recv_msg_token))
: std::derived_from<TRANSPORT_PROTOT, asio::local::stream_protocol> ? "UNIX STREAM
SESSION" : "UNKNOWN", std::move(netservice), std::forward<RECV_MSG_TOKENT>(recv_msg_token))
{
}
virtual ~AdcNetSessionASIO()
virtual ~AdcGenericNetSessionASIO()
{
stop();
}
@@ -90,7 +171,7 @@ public:
template <traits::adc_time_duration_c TimeoutT>
AdcNetSessionASIO& setDefaultTimeouts(const TimeoutT& send_timeout, const TimeoutT& recv_timeout)
AdcGenericNetSessionASIO& setDefaultTimeouts(const TimeoutT& send_timeout, const TimeoutT& recv_timeout)
{
_sendTimeout = send_timeout;
_recvTimeout = recv_timeout;
@@ -111,4 +192,6 @@ protected:
std::chrono::seconds(5);
};
*/
} // namespace adc::impl