AdcNetServiceASIOBase: add acceptor_t inner class

This commit is contained in:
Timur A. Fatkhullin 2024-10-01 23:33:53 +03:00
parent d2b2620d13
commit f329bcecec
4 changed files with 173 additions and 181 deletions

View File

@ -104,26 +104,47 @@ concept adc_netservice_c = std::movable<SRVT> && requires(SRVT srv, const SRVT s
typename SRVT::endpoint_t; // a type representing endpoint of the network service
// underlying protocol
// callback callables for asynchronous operations
requires adc_async_callback_t<typename SRVT::async_connect_callback_t>;
requires adc_async_callback_t<typename SRVT::async_send_callback_t>;
requires adc_async_callback_t<typename SRVT::async_receive_callback_t>;
// acceptor type
requires std::is_class_v<typename SRVT::acceptor_t>;
requires adc_async_callback_t<typename SRVT::acceptor_t::async_accept_callback_t>;
requires requires(typename SRVT::acceptor_t acc) {
acc.asyncAccept(std::declval<typename SRVT::acceptor_t::async_accept_callback_t>(),
std::declval<const typename SRVT::timeout_t&>());
{ acc.accept(std::declval<const typename SRVT::timeout_t&>()) } -> std::same_as<SRVT>;
};
// netservice_ident_t ident() const
{ srv_const.ident() } -> std::same_as<typename SRVT::netservice_ident_t>;
typename SRVT::async_call_ctx_t;
// typename SRVT::async_call_ctx_t;
// asynchronous (non-blocking) operations
srv.asyncAccept(std::declval<const typename SRVT::endpoint_t&>(), std::declval<typename SRVT::async_call_ctx_t&>(),
std::declval<const typename SRVT::timeout_t&>());
// srv.asyncAccept(std::declval<const typename SRVT::endpoint_t&>(), std::declval<typename
// SRVT::async_call_ctx_t&>(),
// std::declval<const typename SRVT::timeout_t&>());
srv.asyncConnect(std::declval<const typename SRVT::endpoint_t&>(), std::declval<typename SRVT::async_call_ctx_t&>(),
srv.asyncConnect(std::declval<const typename SRVT::endpoint_t&>(),
std::declval<typename SRVT::async_connect_callback_t>(),
std::declval<const typename SRVT::timeout_t&>());
srv.asyncSend(std::declval<const typename SRVT::send_msg_t&>(), std::declval<typename SRVT::async_call_ctx_t&>(),
srv.asyncSend(std::declval<const typename SRVT::send_msg_t&>(),
std::declval<typename SRVT::async_send_callback_t>(),
std::declval<const typename SRVT::timeout_t&>());
srv.asyncReceive(std::declval<typename SRVT::async_call_ctx_t&>(), std::declval<const typename SRVT::timeout_t&>());
srv.asyncReceive(std::declval<typename SRVT::async_receive_callback_t>(),
std::declval<const typename SRVT::timeout_t&>());
// synchronous (blocking) operations
srv.accept(std::declval<const typename SRVT::endpoint_t&>(), std::declval<const typename SRVT::timeout_t&>());
// srv.accept(std::declval<const typename SRVT::endpoint_t&>(), std::declval<const typename SRVT::timeout_t&>());
srv.connect(std::declval<const typename SRVT::endpoint_t&>(), std::declval<const typename SRVT::timeout_t&>());
@ -141,8 +162,12 @@ template <typename SESST>
concept adc_netsession_c =
std::derived_from<SESST, std::enable_shared_from_this<SESST>> && requires(SESST sess, const SESST sess_const) {
typename SESST::netsession_ident_t;
requires adc_netservice_c<typename SESST::netservice_t>;
typename SESST::netsession_ctx_t;
requires std::constructible_from<SESST, const typename SESST::netsession_ident_t, typename SESST::netservice_t,
typename SESST::netsession_ctx_t>;
// netsession_ident_t ident() const
{ sess_const.ident() } -> std::same_as<typename SESST::netsession_ident_t>;

View File

@ -182,24 +182,26 @@ public:
return _serverIdent;
}
template <interfaces::adc_netsession_c SessionT, typename... NetsrvCtorArgTs>
void start(const typename SessionT::netservice_t::endpoint_t& endpoint,
const typename SessionT::netsession_ident_t& id,
typename SessionT::netsession_ctx_t&& sess_ctx,
NetsrvCtorArgTs&&... ctor_args)
// start accepting remote connections, create and start given network session
// It must be assumed that this is asynchronous operation!!!
template <interfaces::adc_netsession_c SessionT, typename... AccCtorArgTs>
void start(const typename SessionT::netsession_ident_t& id,
const typename SessionT::netsession_ctx_t& sess_ctx,
AccCtorArgTs&&... ctor_args)
{
typename SessionT::netservice_t netservice(std::forward<NetsrvCtorArgTs>(ctor_args)...);
auto acceptor =
std::make_shared<typename SessionT::netservice_t::acceptor_t>(std::forward<AccCtorArgTs>(ctor_args)...);
netservice.asyncAccept(endpoint, [&endpoint, &id, sess_ctx, this](auto ec, auto...) {
if (!ec) {
auto sess = std::make_shared<SessionT>(id, std::forward<typename SessionT::netsession_ctx_t>(sess_ctx));
startSession(sess);
start(endpoint, id, sess_ctx);
}
});
doAccept<SessionT>(acceptor, id, sess_ctx);
};
template <interfaces::adc_netsession_c SessionT>
bool isListening() const
{
return _isListening<SessionT>[this];
}
virtual void start() = 0;
virtual void stop()
@ -210,7 +212,26 @@ public:
protected:
template <interfaces::adc_netsession_c SessionT>
inline static std::unordered_map<const AdcGenericNetServer*, bool> _isListening{};
server_ident_t _serverIdent;
template <typename SessionT, typename AT, typename IDT, typename CTXT>
void doAccept(std::shared_ptr<AT> acceptor, const IDT& id, CTXT& sess_ctx)
{
acceptor.asyncAccept([acceptor, &id, &sess_ctx, this](auto ec, typename SessionT::netservice_t srv) mutable {
if (!ec) {
auto sess = std::make_shared<SessionT>(id, std::move(srv), sess_ctx);
startSession(sess);
_isListening<SessionT>[this] = true;
doAccept(acceptor, id, sess_ctx);
} else {
_isListening<SessionT>[this] = false;
}
});
}
};

View File

@ -108,7 +108,7 @@ public:
using endpoint_t = typename TRANSPORT_PROTOT::endpoint;
// typedefs for completion tokens (callbacks, required by 'adc_netservice_c' concept)
typedef std::function<void(std::error_code)> async_accept_callback_t;
// typedef std::function<void(std::error_code)> async_accept_callback_t;
typedef std::function<void(std::error_code)> async_connect_callback_t;
typedef std::function<void(std::error_code)> async_send_callback_t;
typedef std::function<void(std::error_code, RMSGT)> async_receive_callback_t;
@ -116,98 +116,125 @@ public:
// typedefs from transport protocol
using socket_t = typename TRANSPORT_PROTOT::socket;
using acceptor_t =
std::conditional_t<std::derived_from<socket_t, asio::basic_datagram_socket<typename socket_t::protocol_type>>,
std::nullptr_t, // there is no acceptor
typename TRANSPORT_PROTOT::acceptor>;
// to satisfy 'adc_netservice_c' concept
class async_call_ctx_t
// acceptor
class acceptor_t
{
std::function<void(std::error_code)> _errc_comp_token;
std::function<void(std::error_code, RMSGT)> _errc_msg_comp_token;
public:
async_call_ctx_t() = default;
static constexpr std::chrono::duration DEFAULT_ACCEPT_TIMEOUT = std::chrono::seconds::max();
async_call_ctx_t(async_call_ctx_t&) = default;
async_call_ctx_t(async_call_ctx_t&&) = default;
async_call_ctx_t(const async_call_ctx_t&) = default;
template <asio::completion_token_for<void(std::error_code)> TokenT>
async_call_ctx_t(TokenT&& token)
acceptor_t(asio::io_context& io_ctx)
: _ioContext(io_ctx), _endpoint(), _socket(_ioContext), _acceptor(_ioContext)
{
_errc_comp_token = std::forward<TokenT>(token);
}
acceptor_t(asio::io_context& io_ctx, const AdcNetServiceASIOBase::endpoint_t& endpoint) : acceptor_t(io_ctx)
{
if (_endpoint != endpoint) {
_endpoint = endpoint;
_acceptor = _acceptor_t(_ioContext, _endpoint);
}
}
template <asio::completion_token_for<void(std::error_code, RMSGT)> TokenT>
async_call_ctx_t(TokenT&& token)
typedef std::function<void(std::error_code, AdcNetServiceASIOBase)> async_accept_callback_t;
template <asio::completion_token_for<void(std::error_code, AdcNetServiceASIOBase)> TokenT,
traits::adc_time_duration_c DT = decltype(DEFAULT_ACCEPT_TIMEOUT)>
auto asyncAccept(TokenT&& token, const DT& timeout = DEFAULT_ACCEPT_TIMEOUT)
{
_errc_msg_comp_token = std::forward<TokenT>(token);
// no acceptor for UDP-sockets
if constexpr (std::is_null_pointer_v<_acceptor_t>) {
static_assert(false, "INVALID TRANSPORT PROTOCOL TYPE!");
}
_socket = AdcNetServiceASIOBase::socket_t{_ioContext};
auto timer = getDeadlineTimer(_acceptor, timeout);
return asio::async_compose<TokenT, void(std::error_code, AdcNetServiceASIOBase)>(
[timer = std::move(timer), start = true, this](auto& self, std::error_code ec = {}) mutable {
if (!ec) {
if (start) {
start = false;
try {
if (!_acceptor.is_open() || (_acceptor.local_endpoint() != _endpoint)) {
_acceptor = _acceptor_t(_ioContext, _endpoint);
}
} catch (std::system_error err) {
timer->cancel();
self.complete(err.code(), AdcNetServiceASIOBase(_ioContext));
return;
}
return _acceptor.async_accept(_socket, std::move(self));
}
}
if (isTimeout(timer, ec)) {
ec = std::make_error_code(std::errc::timed_out);
} else { // an error occured in async_accept
timer->cancel();
}
self.complete(ec, AdcNetServiceASIOBase(std::move(_socket)));
},
token, _ioContext);
}
template <asio::completion_token_for<void(std::error_code)> TokenT>
async_call_ctx_t& operator=(TokenT&& token)
template <asio::completion_token_for<void(std::error_code, AdcNetServiceASIOBase)> TokenT,
traits::adc_time_duration_c DT = decltype(DEFAULT_ACCEPT_TIMEOUT)>
auto asyncAccept(const AdcNetServiceASIOBase::endpoint_t& endpoint,
TokenT&& token,
const DT& timeout = DEFAULT_ACCEPT_TIMEOUT)
{
_errc_comp_token = std::forward<TokenT>(token);
return *this;
_endpoint = endpoint;
return asyncAccept(std::forward<TokenT>(token), timeout);
}
template <asio::completion_token_for<void(std::error_code, RMSGT)> TokenT>
async_call_ctx_t& operator=(TokenT&& token)
template <traits::adc_time_duration_c DT = decltype(DEFAULT_ACCEPT_TIMEOUT)>
AdcNetServiceASIOBase accept(const DT& timeout = DEFAULT_ACCEPT_TIMEOUT)
{
_errc_msg_comp_token = std::forward<TokenT>(token);
return *this;
auto f = asyncAccept(asio::use_future, timeout);
return f.get();
}
auto operator()(std::error_code ec)
template <traits::adc_time_duration_c DT = decltype(DEFAULT_ACCEPT_TIMEOUT)>
AdcNetServiceASIOBase accept(const endpoint_t& endpoint, const DT& timeout = DEFAULT_ACCEPT_TIMEOUT)
{
return _errc_comp_token(std::move(ec));
return accept(endpoint, timeout);
}
auto operator()(std::error_code ec, RMSGT msg)
{
return _errc_msg_comp_token(std::move(ec), std::move(msg));
}
private:
asio::io_context& _ioContext;
AdcNetServiceASIOBase::endpoint_t _endpoint;
AdcNetServiceASIOBase::socket_t _socket;
template <asio::completion_token_for<void(std::error_code)> TokenT>
operator TokenT() const
{
return _errc_comp_token;
}
using _acceptor_t = std::conditional_t<
std::derived_from<socket_t, asio::basic_datagram_socket<typename socket_t::protocol_type>>,
std::nullptr_t, // there is no acceptor
typename TRANSPORT_PROTOT::acceptor>;
template <asio::completion_token_for<void(std::error_code, RMSGT)> TokenT>
operator TokenT() const
{
return _errc_msg_comp_token;
}
_acceptor_t _acceptor;
};
static constexpr std::chrono::duration DEFAULT_ACCEPT_TIMEOUT = std::chrono::seconds::max();
static constexpr std::chrono::duration DEFAULT_CONNECT_TIMEOUT = std::chrono::seconds(10);
static constexpr std::chrono::duration DEFAULT_SEND_TIMEOUT = std::chrono::seconds(5);
static constexpr std::chrono::duration DEFAULT_RECEIVE_TIMEOUT = std::chrono::seconds(5);
AdcNetServiceASIOBase(asio::io_context& ctx)
: SESSION_PROTOT(),
_ioContext(ctx),
_receiveStrand(_ioContext),
_receiveQueue(),
_acceptor(_ioContext),
_socket(_ioContext)
: SESSION_PROTOT(), _ioContext(ctx), _receiveStrand(_ioContext), _receiveQueue(), _socket(_ioContext)
{
}
AdcNetServiceASIOBase(socket_t socket)
: SESSION_PROTOT(),
_ioContext(socket.get_executor()),
_ioContext(static_cast<asio::io_context&>(socket.get_executor().context())),
_socket(std::move(socket)),
_receiveStrand(_ioContext),
_receiveQueue(),
_socket(std::move(socket))
_receiveQueue()
{
}
@ -216,11 +243,10 @@ public:
: _ioContext(other._ioContext),
_receiveStrand(std::move(other._receiveStrand)),
_receiveQueue(std::move(_receiveQueue)),
_acceptor(std::move(other._acceptor)),
_socket(std::move(other._socket)),
_streamBuffer()
{
if (*this == other)
if (this == &other)
return;
auto bytes = asio::buffer_copy(_streamBuffer.prepare(other._streamBuffer.size()), other._streamBuffer.data());
@ -236,7 +262,7 @@ public:
AdcNetServiceASIOBase& operator=(AdcNetServiceASIOBase&& other)
{
if (*this != other) {
if (this != &other) {
close();
_streamBuffer.consume(_streamBuffer.size());
@ -246,7 +272,6 @@ public:
_ioContext = other._ioContext;
_receiveStrand = std::move(other._receiveStrand), _socket = std::move(other._socket);
_acceptor = std::move(other._acceptor);
_receiveQueue = other._receiveQueue;
}
@ -263,53 +288,6 @@ public:
/* asynchronuos methods */
template <asio::completion_token_for<void(std::error_code)> TokenT,
traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_ACCEPT_TIMEOUT)>
auto asyncAccept(const endpoint_t& endpoint, TokenT&& token, const TimeoutT& timeout = DEFAULT_ACCEPT_TIMEOUT)
{
// no acceptor for UDP-sockets
if constexpr (std::is_null_pointer_v<acceptor_t>) {
static_assert(false, "INVALID TRANSPORT PROTOCOL TYPE!");
}
// auto acc = acceptor_t(_ioContext);
// auto timer = getDeadlineTimer(acc, timeout);
auto timer = getDeadlineTimer(_acceptor, timeout);
return asio::async_compose<TokenT, void(std::error_code)>(
// [acc = std::move(acc), timer = std::move(timer), start = true, &endpoint, this](
[timer = std::move(timer), start = true, &endpoint, this](auto& self, std::error_code ec = {}) mutable {
if (!ec) {
if (start) {
start = false;
try {
// acc = acceptor_t(_ioContext, endpoint);
if (!_acceptor.is_open() || (_acceptor.local_endpoint() != endpoint)) {
_acceptor = acceptor_t(_ioContext, endpoint);
}
} catch (std::system_error err) {
timer->cancel();
self.complete(err.code());
return;
}
// return acc.async_accept(_socket, std::move(self));
return _acceptor.async_accept(_socket, std::move(self));
}
}
if (isTimeout(timer, ec)) {
ec = std::make_error_code(std::errc::timed_out);
} else { // an error occured in async_connect
timer->cancel();
}
self.complete(ec);
},
token, _ioContext);
}
template <asio::completion_token_for<void(std::error_code)> TokenT,
traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_CONNECT_TIMEOUT)>
@ -385,10 +363,11 @@ public:
template <typename TokenT, traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_RECEIVE_TIMEOUT)>
auto asyncReceive(TokenT&& token, const TimeoutT& timeout = DEFAULT_RECEIVE_TIMEOUT)
{
constexpr auto is_async_ctx_t = std::same_as<std::remove_cvref_t<TokenT>, async_call_ctx_t>;
// constexpr auto is_async_ctx_t = std::same_as<std::remove_cvref_t<TokenT>, async_call_ctx_t>;
// check completion token signature and deduce message type
if constexpr (!adc_asio_special_comp_token_c<TokenT> && !is_async_ctx_t) {
// if constexpr (!adc_asio_special_comp_token_c<TokenT> && !is_async_ctx_t) {
if constexpr (!adc_asio_special_comp_token_c<TokenT>) {
static_assert(traits::adc_func_traits<TokenT>::arity == 2, "INVALID COMPLETION TOKEN SIGNATURE!");
static_assert(std::is_same_v<std::remove_cvref_t<traits::adc_func_arg1_t<TokenT>>, std::error_code>,
"INVALID COMPLETION TOKEN SIGNATURE!");
@ -398,7 +377,8 @@ public:
}
using msg_t = std::conditional_t<
adc_asio_special_comp_token_c<TokenT> || is_async_ctx_t, RMSGT,
// adc_asio_special_comp_token_c<TokenT> || is_async_ctx_t, RMSGT,
adc_asio_special_comp_token_c<TokenT>, RMSGT,
std::remove_cvref_t<std::tuple_element_t<1, typename traits::adc_func_traits<TokenT>::args_t>>>;
auto out_flags = std::make_shared<asio::socket_base::message_flags>();
@ -408,7 +388,6 @@ public:
return asio::async_compose<TokenT, void(asio::error_code, msg_t)>(
[out_flags, do_read = true, timer = std::move(timer), this](auto& self, asio::error_code ec = {},
size_t nbytes = 0) mutable {
// RMSGT msg;
msg_t msg;
if (!ec) {
@ -451,9 +430,7 @@ public:
auto start_ptr = static_cast<const char*>(_streamBuffer.data().data());
// auto sr = this->search(std::span(start_ptr, _streamBuffer.size()));
auto net_pack = this->search(std::span(start_ptr, _streamBuffer.size()));
// if (!std::get<2>(sr)) {
if (net_pack.empty()) {
do_read = true;
asio::post(std::move(self)); // initiate consequence socket's read operation
@ -463,8 +440,6 @@ public:
timer->cancel(); // there were no errors in the asynchronous read-operation, so stop timer
// here one has at least a single message
// size_t N = std::distance(std::get<0>(sr), std::get<1>(sr));
// auto net_pack = std::span{start_ptr, N};
std::ranges::copy(this->fromProto(net_pack), std::back_inserter(msg));
_streamBuffer.consume(net_pack.size());
@ -473,14 +448,9 @@ public:
while (_streamBuffer.size()) { // search for possible additional session protocol packets
start_ptr = static_cast<const char*>(_streamBuffer.data().data());
// sr = this->search(std::span(start_ptr, _streamBuffer.size()));
net_pack = this->search(std::span(start_ptr, _streamBuffer.size()));
if (!net_pack.empty()) {
// if (std::get<2>(sr)) {
// N = std::distance(std::get<0>(sr), std::get<1>(sr));
// net_pack = std::span{start_ptr, N};
_receiveQueue.emplace();
std::ranges::copy(this->fromProto(net_pack), std::back_inserter(_receiveQueue.back()));
_streamBuffer.consume(net_pack.size());
@ -499,7 +469,6 @@ public:
if constexpr (std::is_same_v<msg_t, RMSGT>) {
self.complete(ec, std::move(msg));
} else {
// msg_t user_msg{msg.begin(), msg.end()};
self.complete(ec, {msg.begin(), msg.end()});
}
},
@ -508,13 +477,6 @@ public:
/* blocking methods */
template <traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_ACCEPT_TIMEOUT)>
auto accept(const endpoint_t& endpoint, const TimeoutT& timeout = DEFAULT_ACCEPT_TIMEOUT)
{
std::future<void> ftr = asyncAccept(endpoint, asio::use_future, timeout);
ftr.get();
}
template <traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_CONNECT_TIMEOUT)>
auto connect(const endpoint_t& endpoint, const TimeoutT& timeout = DEFAULT_CONNECT_TIMEOUT)
{
@ -582,8 +544,6 @@ protected:
socket_t _socket;
acceptor_t _acceptor;
asio::streambuf _streamBuffer;
std::queue<std::vector<char>> _receiveQueue;

View File

@ -5,13 +5,13 @@
#include "../net/asio/adc_netservice_asio.h"
template <typename T>
void receive(T& srv)
void receive(T srv)
{
srv.asyncReceive(
[&srv](std::error_code ec, std::string msg) {
if (!ec) {
std::cout << "Received: [" << msg << "]\n";
receive(srv);
receive(std::move(srv));
} else {
std::cout << "Received: " << ec.message() << "\n";
}
@ -30,37 +30,23 @@ int main()
asio::io_context ctx;
adc::impl::AdcNetServiceASIOBase<asio::ip::tcp, adc::AdcStopSeqSessionProto<>> srv(ctx);
using srv_t = adc::impl::AdcNetServiceASIOBase<asio::ip::tcp, adc::AdcStopSeqSessionProto<>>;
// adc::impl::AdcNetServiceASIOBase<asio::ip::tcp, adc::AdcStopSeqSessionProto<>>::asio_async_ctx_t srv_ctx;
// srv_ctx.accept_comp_token = [](std::error_code ec) {
typename srv_t::acceptor_t acc(ctx, ept_c);
// };
// srv.asyncAccept(ept_s, srv_ctx, std::chrono::seconds(120));
// srv.asyncConnect(ept_c, [](std::error_code ec) {
// });
// adc::impl::AdcNetServiceASIOBase<asio::ip::tcp, adc::AdcStopSeqSessionProto<>>::contx_t s_ctx;
// srv.asyncConnect(ept_c, s_ctx);
// auto res = srv.asyncConnect(ept_c, asio::use_awaitable);
srv.asyncAccept(ept_c, [&srv](std::error_code ec) {
acc.asyncAccept([](std::error_code ec, srv_t srv) {
if (!ec) {
std::cout << "New connection\n";
// srv.asyncReceive(
// [](std::error_code ec, std::string msg) {
// if (!ec) {
// std::cout << "Received: [" << msg << "]\n";
// } else {
// std::cout << "Received: " << ec.message() << "\n";
// }
// },
// std::chrono::minutes(1));
receive(srv);
// receive(std::move(srv));
srv.asyncReceive(
[](std::error_code ec, std::string msg) {
if (!ec) {
std::cout << "Received: [" << msg << "]\n";
// receive(std::move(srv));
} else {
std::cout << "Received: " << ec.message() << "\n";
}
},
std::chrono::minutes(1));
} else {
std::cout << "ACCEPT ERR: " << ec.message() << "\n";
}