ADC/net/asio/adc_netservice_asio.h
2024-10-03 17:17:47 +03:00

979 lines
38 KiB
C++

#pragma once
#include <asio/basic_datagram_socket.hpp>
#include <asio/basic_seq_packet_socket.hpp>
#include <asio/basic_stream_socket.hpp>
#include <asio/bind_executor.hpp>
#include <asio/compose.hpp>
#include <asio/deferred.hpp>
#include <asio/ip/tcp.hpp>
#include <asio/ip/udp.hpp>
#include <asio/local/datagram_protocol.hpp>
#include <asio/local/seq_packet_protocol.hpp>
#include <asio/local/stream_protocol.hpp>
#include <asio/read.hpp>
#include <asio/read_until.hpp>
#include <asio/steady_timer.hpp>
#include <asio/strand.hpp>
#include <asio/streambuf.hpp>
#include <asio/use_awaitable.hpp>
#include <asio/use_future.hpp>
#include <asio/write.hpp>
#include <functional>
#include <queue>
#include <set>
#ifdef USE_OPENSSL_WITH_ASIO
#include <asio/ssl.hpp>
#include <asio/ssl/stream.hpp>
#endif
#include "../../common/adc_traits.h"
#include "../adc_net_concepts.h"
#include "../adc_netproto.h"
namespace adc::traits
{
// special ASIO-related template specializations
template <>
struct adc_func_traits<asio::use_future_t<>> {
using ret_t = std::nullptr_t;
using args_t = std::tuple<std::nullptr_t, std::nullptr_t>;
using arg1_t = std::nullptr_t;
static constexpr size_t arity = 0;
};
template <>
struct adc_func_traits<asio::use_awaitable_t<>> {
using ret_t = std::nullptr_t;
using args_t = std::tuple<std::nullptr_t, std::nullptr_t>;
using arg1_t = std::nullptr_t;
static constexpr size_t arity = 0;
};
template <>
struct adc_func_traits<asio::deferred_t> {
using ret_t = std::nullptr_t;
using args_t = std::tuple<std::nullptr_t, std::nullptr_t>;
using arg1_t = std::nullptr_t;
static constexpr size_t arity = 0;
};
} // namespace adc::traits
namespace adc::impl
{
// typedef for ASIO streambuf iterators
using asio_streambuff_iter_t = asio::buffers_iterator<asio::streambuf::const_buffers_type>;
// ASIO match condition result typedef
using asio_matchcond_result_t = std::pair<asio_streambuff_iter_t, bool>;
template <typename T>
concept adc_asio_transport_proto_c =
std::derived_from<T, asio::ip::tcp> || std::derived_from<T, asio::ip::udp> ||
std::derived_from<T, asio::local::seq_packet_protocol> || std::derived_from<T, asio::local::stream_protocol> ||
std::derived_from<T, asio::local::datagram_protocol>;
template <typename T>
concept adc_asio_stream_transport_proto_c =
std::derived_from<T, asio::ip::tcp> || std::derived_from<T, asio::local::stream_protocol>;
template <typename T>
concept adc_asio_is_future = requires {
// [](std::type_identity<asio::use_future_t<>>) {}(std::type_identity<std::remove_cvref_t<T>>());
[]<typename AllocatorT>(std::type_identity<asio::use_future_t<AllocatorT>>) {
}(std::type_identity<std::remove_cvref_t<T>>{});
};
template <typename T>
concept adc_asio_is_awaitable = requires {
[]<typename ExecutorT>(std::type_identity<asio::use_awaitable_t<ExecutorT>>) {
}(std::type_identity<std::remove_cvref_t<T>>{});
};
template <typename T>
concept adc_asio_special_comp_token_c =
adc_asio_is_future<T> || adc_asio_is_awaitable<T> || std::same_as<std::remove_cvref_t<T>, asio::deferred_t>;
// class adc_asio_async_call_ctx_t
// {
// };
// template <typename T, typename SignatureT = void>
// concept adc_completion_token_c =
// std::same_as<T, adc_asio_async_call_ctx_t> ||
// (traits::adc_is_callable<T> &&
// std::conditional_t<std::same_as<SignatureT, void>,
// std::true_type,
// std::bool_constant<asio::completion_token_for<T, SignatureT>>>::value);
template <adc_asio_transport_proto_c TRANSPORT_PROTOT,
interfaces::adc_netsession_proto_c<std::string_view> SESSION_PROTOT,
traits::adc_output_char_range RMSGT =
std::vector<char>> // used only for inner storing of message byte sequence
class AdcNetServiceASIOBase : public SESSION_PROTOT
{
public:
// typedefs to satisfy 'adc_netservice_c' concept
typedef std::string_view netservice_ident_t;
typedef std::vector<char> send_msg_t; // in general, only one of several possible
typedef RMSGT recv_msg_t; // in general, only one of several possible (see class template arguments declaration)
typedef traits::adc_common_duration_t timeout_t;
using endpoint_t = typename TRANSPORT_PROTOT::endpoint;
// typedefs for completion tokens (callbacks, required by 'adc_netservice_c' concept)
// typedef std::function<void(std::error_code)> async_accept_callback_t;
typedef std::function<void(std::error_code)> async_connect_callback_t;
typedef std::function<void(std::error_code)> async_send_callback_t;
typedef std::function<void(std::error_code, RMSGT)> async_receive_callback_t;
// typedefs from transport protocol
using socket_t = typename TRANSPORT_PROTOT::socket;
// acceptor
class acceptor_t
{
public:
static constexpr std::chrono::duration DEFAULT_ACCEPT_TIMEOUT = std::chrono::seconds::max();
acceptor_t(asio::io_context& io_ctx)
: _ioContext(io_ctx), _endpoint(), _socket(_ioContext), _acceptor(_ioContext)
{
}
acceptor_t(asio::io_context& io_ctx, const AdcNetServiceASIOBase::endpoint_t& endpoint) : acceptor_t(io_ctx)
{
if (_endpoint != endpoint) {
_endpoint = endpoint;
_acceptor = _acceptor_t(_ioContext, _endpoint);
}
}
typedef AdcNetServiceASIOBase netservice_t;
typedef std::shared_ptr<netservice_t> sptr_netservice_t;
// typedef std::function<void(std::error_code, AdcNetServiceASIOBase)> async_accept_callback_t;
typedef std::function<void(std::error_code, sptr_netservice_t)> async_accept_callback_t;
// template <asio::completion_token_for<void(std::error_code, AdcNetServiceASIOBase)> TokenT,
template <asio::completion_token_for<void(std::error_code, sptr_netservice_t)> TokenT,
traits::adc_time_duration_c DT = decltype(DEFAULT_ACCEPT_TIMEOUT)>
auto asyncAccept(TokenT&& token, const DT& timeout = DEFAULT_ACCEPT_TIMEOUT)
{
// no acceptor for UDP-sockets
if constexpr (std::is_null_pointer_v<_acceptor_t>) {
static_assert(false, "INVALID TRANSPORT PROTOCOL TYPE!");
}
_socket = AdcNetServiceASIOBase::socket_t{_ioContext};
auto timer = getDeadlineTimer(_acceptor, timeout);
// return asio::async_compose<TokenT, void(std::error_code, AdcNetServiceASIOBase)>(
return asio::async_compose<TokenT, void(std::error_code, sptr_netservice_t)>(
[timer = std::move(timer), start = true, this](auto& self, std::error_code ec = {}) mutable {
if (!ec) {
if (start) {
start = false;
try {
if (!_acceptor.is_open() || (_acceptor.local_endpoint() != _endpoint)) {
_acceptor = _acceptor_t(_ioContext, _endpoint);
}
} catch (std::system_error err) {
timer->cancel();
// self.complete(err.code(), AdcNetServiceASIOBase(_ioContext));
self.complete(err.code(), std::make_shared<netservice_t>(_ioContext));
return;
}
return _acceptor.async_accept(_socket, std::move(self));
}
}
if (isTimeout(timer, ec)) {
ec = std::make_error_code(std::errc::timed_out);
} else { // an error occured in async_accept
timer->cancel();
}
// self.complete(ec, AdcNetServiceASIOBase(std::move(_socket)));
self.complete(ec, std::make_shared<netservice_t>(std::move(_socket)));
},
token, _ioContext);
}
template <asio::completion_token_for<void(std::error_code, AdcNetServiceASIOBase)> TokenT,
traits::adc_time_duration_c DT = decltype(DEFAULT_ACCEPT_TIMEOUT)>
auto asyncAccept(const AdcNetServiceASIOBase::endpoint_t& endpoint,
TokenT&& token,
const DT& timeout = DEFAULT_ACCEPT_TIMEOUT)
{
_endpoint = endpoint;
return asyncAccept(std::forward<TokenT>(token), timeout);
}
template <traits::adc_time_duration_c DT = decltype(DEFAULT_ACCEPT_TIMEOUT)>
auto accept(const DT& timeout = DEFAULT_ACCEPT_TIMEOUT)
// AdcNetServiceASIOBase accept(const DT& timeout = DEFAULT_ACCEPT_TIMEOUT)
{
auto f = asyncAccept(asio::use_future, timeout);
return f.get();
}
template <traits::adc_time_duration_c DT = decltype(DEFAULT_ACCEPT_TIMEOUT)>
auto accept(const endpoint_t& endpoint, const DT& timeout = DEFAULT_ACCEPT_TIMEOUT)
// AdcNetServiceASIOBase accept(const endpoint_t& endpoint, const DT& timeout = DEFAULT_ACCEPT_TIMEOUT)
{
return accept(endpoint, timeout);
}
private:
asio::io_context& _ioContext;
AdcNetServiceASIOBase::endpoint_t _endpoint;
AdcNetServiceASIOBase::socket_t _socket;
using _acceptor_t = std::conditional_t<
std::derived_from<socket_t, asio::basic_datagram_socket<typename socket_t::protocol_type>>,
std::nullptr_t, // there is no acceptor
typename TRANSPORT_PROTOT::acceptor>;
_acceptor_t _acceptor;
};
static constexpr std::chrono::duration DEFAULT_CONNECT_TIMEOUT = std::chrono::seconds(10);
static constexpr std::chrono::duration DEFAULT_SEND_TIMEOUT = std::chrono::seconds(5);
static constexpr std::chrono::duration DEFAULT_RECEIVE_TIMEOUT = std::chrono::seconds(5);
AdcNetServiceASIOBase(asio::io_context& ctx)
: SESSION_PROTOT(), _ioContext(ctx), _receiveStrand(_ioContext), _receiveQueue(), _socket(_ioContext)
{
}
AdcNetServiceASIOBase(socket_t socket)
: SESSION_PROTOT(),
_ioContext(static_cast<asio::io_context&>(socket.get_executor().context())),
_socket(std::move(socket)),
_receiveStrand(_ioContext),
_receiveQueue()
{
}
// NOTE: CANNOT MOVE asio::streambuf CORRECTLY?!!!
AdcNetServiceASIOBase(AdcNetServiceASIOBase&& other) = delete;
// AdcNetServiceASIOBase(AdcNetServiceASIOBase&& other)
// : SESSION_PROTOT(std::move(other)),
// _ioContext(other._ioContext),
// _receiveStrand(std::move(other._receiveStrand)),
// _receiveQueue(std::move(_receiveQueue)),
// _socket(std::move(other._socket)),
// _streamBuffer()
// {
// if (this == &other)
// return;
// auto bytes = asio::buffer_copy(_streamBuffer.prepare(other._streamBuffer.size()),
// other._streamBuffer.data()); _streamBuffer.commit(bytes);
// };
AdcNetServiceASIOBase(const AdcNetServiceASIOBase&) = delete; // no copy constructor!
virtual ~AdcNetServiceASIOBase() {}
AdcNetServiceASIOBase& operator=(const AdcNetServiceASIOBase&) = delete;
AdcNetServiceASIOBase& operator=(AdcNetServiceASIOBase&& other) = delete;
// {
// if (this != &other) {
// close();
// _streamBuffer.consume(_streamBuffer.size());
// auto bytes =
// asio::buffer_copy(_streamBuffer.prepare(other._streamBuffer.size()), other._streamBuffer.data());
// _streamBuffer.commit(bytes);
// _ioContext = other._ioContext;
// _receiveStrand = std::move(other._receiveStrand), _socket = std::move(other._socket);
// _receiveQueue = other._receiveQueue;
// }
// return *this;
// }
constexpr netservice_ident_t ident() const
{
return _ident;
}
/* asynchronuos methods */
template <asio::completion_token_for<void(std::error_code)> TokenT,
traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_CONNECT_TIMEOUT)>
auto asyncConnect(const endpoint_t& endpoint, TokenT&& token, const TimeoutT& timeout = DEFAULT_CONNECT_TIMEOUT)
{
auto timer = getDeadlineTimer(_socket, timeout);
return asio::async_compose<TokenT, void(std::error_code)>(
[start = true, endpoint, timer = std::move(timer), this](auto& self, std::error_code ec = {}) mutable {
if (!ec) {
if (start) {
start = false;
return _socket.async_connect(endpoint, std::move(self));
}
}
if (isTimeout(timer, ec)) {
ec = std::make_error_code(std::errc::timed_out);
} else { // an error occured in async_connect
timer->cancel();
}
self.complete(ec);
},
token, _socket);
}
template <traits::adc_input_char_range MessageT,
asio::completion_token_for<void(std::error_code ec)> TokenT,
traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_SEND_TIMEOUT)>
auto asyncSend(const MessageT& msg, TokenT&& token, const TimeoutT& timeout = DEFAULT_SEND_TIMEOUT)
{
// create buffer sequence of sending session protocol representation of the input message
std::vector<asio::const_buffer> buff_seq;
// std::ranges::for_each(this->toProto(msg), [&buff_seq](const auto& el) { buff_seq.emplace_back(el); });
std::ranges::for_each(this->toProto(msg),
[&buff_seq](const auto& el) { buff_seq.emplace_back(el.data(), el.size()); });
auto timer = getDeadlineTimer(_socket, timeout);
return asio::async_compose<TokenT, void(std::error_code)>(
[start = true, buff_seq = std::move(buff_seq), timer = std::move(timer), this](
auto& self, std::error_code ec = {}, size_t = 0) mutable {
if (!ec) {
if (start) {
start = false;
if constexpr (std::derived_from<socket_t,
asio::basic_stream_socket<typename socket_t::protocol_type>>) {
return asio::async_write(_socket, buff_seq, std::move(self));
} else if constexpr (std::derived_from<socket_t, asio::basic_datagram_socket<
typename socket_t::protocol_type>>) {
return _socket.async_send(buff_seq, std::move(self));
} else if constexpr (std::derived_from<socket_t, asio::basic_seq_packet_socket<
typename socket_t::protocol_type>>) {
return _socket.async_send(buff_seq, std::move(self));
} else {
static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!");
}
}
}
if (isTimeout(timer, ec)) {
ec = std::make_error_code(std::errc::timed_out);
} else { // an error occured in async_write/async_send
timer->cancel();
}
self.complete(ec);
},
token, _socket);
}
template <typename TokenT, traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_RECEIVE_TIMEOUT)>
auto asyncReceive(TokenT&& token, const TimeoutT& timeout = DEFAULT_RECEIVE_TIMEOUT)
{
// constexpr auto is_async_ctx_t = std::same_as<std::remove_cvref_t<TokenT>, async_call_ctx_t>;
// check completion token signature and deduce message type
// if constexpr (!adc_asio_special_comp_token_c<TokenT> && !is_async_ctx_t) {
if constexpr (!adc_asio_special_comp_token_c<TokenT>) {
static_assert(traits::adc_func_traits<TokenT>::arity == 2, "INVALID COMPLETION TOKEN SIGNATURE!");
static_assert(std::is_same_v<std::remove_cvref_t<traits::adc_func_arg1_t<TokenT>>, std::error_code>,
"INVALID COMPLETION TOKEN SIGNATURE!");
static_assert(traits::adc_output_char_range<
std::tuple_element_t<1, typename traits::adc_func_traits<TokenT>::args_t>>,
"INVALID COMPLETION TOKEN SIGNATURE!");
}
using msg_t = std::conditional_t<
// adc_asio_special_comp_token_c<TokenT> || is_async_ctx_t, RMSGT,
adc_asio_special_comp_token_c<TokenT>, RMSGT,
std::remove_cvref_t<std::tuple_element_t<1, typename traits::adc_func_traits<TokenT>::args_t>>>;
auto out_flags = std::make_shared<asio::socket_base::message_flags>();
auto timer = getDeadlineTimer(_socket, timeout);
return asio::async_compose<TokenT, void(std::error_code, msg_t)>(
[out_flags, do_read = true, timer = std::move(timer), this](auto& self, std::error_code ec = {},
size_t nbytes = 0) mutable {
msg_t msg;
if (!ec) {
if (do_read) {
do_read = false;
if (_receiveQueue.size()) { // return message from queue
timer->cancel();
auto imsg = _receiveQueue.front();
_receiveQueue.pop();
if constexpr (std::is_same_v<msg_t, RMSGT>) {
self.complete(std::error_code(), std::move(imsg));
} else {
self.complete(std::error_code(), {imsg.begin(), imsg.end()});
}
return;
}
if constexpr (std::derived_from<socket_t,
asio::basic_stream_socket<typename socket_t::protocol_type>>) {
return asio::async_read(_socket, _streamBuffer, asio::transfer_at_least(1),
std::move(self));
} else if constexpr (std::derived_from<socket_t, asio::basic_datagram_socket<
typename socket_t::protocol_type>>) {
// datagram, so it should be received at once
return _socket.receive(_streamBuffer, std::move(self));
} else if constexpr (std::derived_from<socket_t, asio::basic_seq_packet_socket<
typename socket_t::protocol_type>>) {
// datagram, so it should be received at once
return _socket.receive(_streamBuffer, *out_flags, std::move(self));
} else {
static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!");
}
}
// if (!nbytes) {
// do_read = true;
// asio::post(std::move(self)); // initiate consequence socket's read operation
// return;
// }
auto start_ptr = static_cast<const char*>(_streamBuffer.data().data());
auto net_pack = this->search(std::span(start_ptr, _streamBuffer.size()));
if (net_pack.empty()) {
do_read = true;
asio::post(std::move(self)); // initiate consequence socket's read operation
return;
}
timer->cancel(); // there were no errors in the asynchronous read-operation, so stop timer
// here one has at least a single message
std::ranges::copy(this->fromProto(net_pack), std::back_inserter(msg));
_streamBuffer.consume(net_pack.size());
while (_streamBuffer.size()) { // search for possible additional session protocol packets
start_ptr = static_cast<const char*>(_streamBuffer.data().data());
net_pack = this->search(std::span(start_ptr, _streamBuffer.size()));
if (!net_pack.empty()) {
_receiveQueue.emplace();
std::ranges::copy(this->fromProto(net_pack), std::back_inserter(_receiveQueue.back()));
_streamBuffer.consume(net_pack.size());
} else {
break; // exit and hold remaining bytes in stream buffer
}
}
}
if (isTimeout(timer, ec)) {
ec = std::make_error_code(std::errc::timed_out);
} else { // an error occured in async_*
timer->cancel();
}
if constexpr (std::is_same_v<msg_t, RMSGT>) {
self.complete(ec, std::move(msg));
} else {
self.complete(ec, {msg.begin(), msg.end()});
}
// if constexpr (adc_asio_special_comp_token_c<TokenT>) {
// self.complete(ec, std::move(msg));
// } else { // may be of non-RMSGT type
// self.complete(ec, {msg.begin(), msg.end()});
// }
},
token, _receiveStrand);
}
/* blocking methods */
template <traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_CONNECT_TIMEOUT)>
auto connect(const endpoint_t& endpoint, const TimeoutT& timeout = DEFAULT_CONNECT_TIMEOUT)
{
std::future<void> ftr = asyncConnect(endpoint, asio::use_future, timeout);
ftr.get();
}
template <traits::adc_input_char_range R, traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_SEND_TIMEOUT)>
auto send(const R& msg, const TimeoutT& timeout = DEFAULT_SEND_TIMEOUT)
{
std::future<void> ftr = asyncSend(msg, asio::use_future, timeout);
ftr.get();
}
template <traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_RECEIVE_TIMEOUT)>
auto receive(const TimeoutT& timeout = DEFAULT_RECEIVE_TIMEOUT)
{
std::future<RMSGT> ftr = asyncReceive(asio::use_future, timeout);
return ftr.get();
}
std::error_code close()
{
std::error_code ec;
_socket.shutdown(_shutdownType, ec);
if (!ec) {
_socket.close(ec);
}
return ec;
}
/* additional ASIO-related methods */
void clear()
{
// clear receiving messages queue
// NOTE: there is no racing condition here since using asio::strand!
asio::post(_receiveStrand, [this]() { _receiveQueue = {}; });
}
void setShutdownType(asio::socket_base::shutdown_type shutdown_type)
{
_shutdownType = shutdown_type;
}
asio::socket_base::shutdown_type getShutdownType() const
{
return _shutdownType;
}
protected:
static constexpr netservice_ident_t _ident =
std::derived_from<socket_t, asio::basic_stream_socket<typename socket_t::protocol_type>>
? "STREAM-SOCKET NETWORK SERVICE"
: std::derived_from<socket_t, asio::basic_datagram_socket<typename socket_t::protocol_type>>
? "DATAGRAM-SOCKET NETWORK SERVICE"
: std::derived_from<socket_t, asio::basic_seq_packet_socket<typename socket_t::protocol_type>>
? "SEQPACKET-SOCKET NETWORK SERVICE"
: "UNKNOWN";
asio::io_context& _ioContext;
asio::io_context::strand _receiveStrand;
socket_t _socket;
asio::streambuf _streamBuffer;
std::queue<std::vector<char>> _receiveQueue;
asio::socket_base::shutdown_type _shutdownType = asio::socket_base::shutdown_both;
template <typename CancelableT, traits::adc_time_duration_c TimeoutT>
static std::unique_ptr<asio::steady_timer> getDeadlineTimer(CancelableT& obj,
const TimeoutT& timeout,
bool arm = true)
{
auto timer = std::make_unique<asio::steady_timer>(obj.get_executor());
// if (timeout == std::chrono::duration<typename TimeoutT::rep, typename TimeoutT::period>::max()) {
// return timer; // do not arm the timer if MAX duration are given
// }
if (arm) {
std::chrono::seconds max_d = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::time_point::max() - std::chrono::steady_clock::now() -
std::chrono::seconds(1));
timer->expires_after(timeout < max_d ? timeout : max_d); // to avoid overflow!
// timer->expires_after(timeout);
timer->async_wait([&obj](const std::error_code& ec) mutable {
if (!ec) {
obj.cancel();
}
});
}
return timer;
}
template <typename TimerT>
static bool isTimeout(const std::unique_ptr<TimerT>& timer, const std::error_code& ec)
{
auto exp_time = timer->expiry();
return (exp_time < std::chrono::steady_clock::now()) && (ec == asio::error::operation_aborted);
}
};
static_assert(adc::interfaces::adc_netservice_c<AdcNetServiceASIOBase<asio::ip::tcp, adc::AdcStopSeqSessionProto<>>>,
"");
static_assert(adc::interfaces::adc_netsession_proto_c<adc::AdcStopSeqSessionProto<>>, "");
/* EVENT-BASED SERVICE */
template <adc_asio_transport_proto_c TRANSPORT_PROTOT,
interfaces::adc_netsession_proto_c<std::string_view> SESSION_PROTOT>
class AsioNetService : public SESSION_PROTOT
{
public:
// typedefs from transport protocol
using endpoint_t = typename TRANSPORT_PROTOT::endpoint;
using socket_t = typename TRANSPORT_PROTOT::socket;
using acceptor_t =
std::conditional_t<std::derived_from<socket_t, asio::basic_datagram_socket<typename socket_t::protocol_type>>,
std::nullptr_t, // there is no acceptor
typename TRANSPORT_PROTOT::acceptor>;
typedef std::function<void(AsioNetService*, std::error_code, endpoint_t)> connect_event_hndl_t;
typedef std::function<void(AsioNetService*, std::error_code)> send_event_hndl_t;
typedef std::function<void(AsioNetService*, std::error_code)> close_event_hndl_t;
typedef std::function<void(AsioNetService*, std::error_code, std::span<const char>)> message_event_hndl_t;
struct Events {
bool listening = false; // true - server role, false - client role
connect_event_hndl_t onConnect = [](auto...) {};
send_event_hndl_t onSend = [](auto...) {};
close_event_hndl_t onClose = [](auto...) {};
message_event_hndl_t onMessage = [](auto...) {};
};
AsioNetService(asio::io_context& ctx, Events events)
: SESSION_PROTOT(),
_ioContext(ctx),
_receiveStrand(_ioContext),
_receiveQueue(),
_acceptor(_ioContext),
_socket(_ioContext),
_events(events),
_stopReceiving(false),
_waitTimer(_ioContext)
{
}
template <traits::adc_time_duration_c DT>
static void start(AsioNetService& srv, const endpoint_t& endpoint, const DT& timeout)
{
// no acceptor for UDP-sockets
if constexpr (std::is_null_pointer_v<acceptor_t>) {
srv.startReceiving();
srv._events.onConnect(&srv, std::error_code{}, endpoint_t());
return;
}
auto token = [&endpoint, &timeout, &srv](std::error_code ec) {
if (!ec) {
srv.startReceiving();
}
// post event
srv._events.onConnect(&srv, ec, srv._socket.remote_endpoint());
};
if (srv._events.listening) { // server role (accept connections to given endpoint)
auto timer = getDeadlineTimer(srv._acceptor, timeout);
asio::async_compose<decltype(token), void(std::error_code)>(
[timer = std::move(timer), start = true, &endpoint, &srv](auto& self, std::error_code ec = {}) mutable {
if (!ec) {
if (start) {
start = false;
try {
if (!srv._acceptor.is_open() || (srv._acceptor.local_endpoint() != endpoint)) {
srv._acceptor = acceptor_t(srv._ioContext, endpoint);
}
} catch (std::system_error err) {
timer->cancel();
self.complete(err.code());
return;
}
// return acc.async_accept(_socket, std::move(self));
return srv._acceptor.async_accept(srv._socket, std::move(self));
}
}
if (isTimeout(timer, ec)) {
ec = std::make_error_code(std::errc::timed_out);
} else { // an error occured in async_connect
timer->cancel();
}
self.complete(ec);
},
std::move(token), srv._ioContext);
} else { // client role (connect to remote host)
auto timer = getDeadlineTimer(srv._socket, timeout);
asio::async_compose<decltype(token), void(std::error_code)>(
[start = true, endpoint, timer = std::move(timer), &srv](auto& self, std::error_code ec = {}) mutable {
if (!ec) {
if (start) {
start = false;
return srv._socket.async_connect(endpoint, std::move(self));
}
}
if (isTimeout(timer, ec)) {
ec = std::make_error_code(std::errc::timed_out);
} else { // an error occured in async_connect
timer->cancel();
}
self.complete(ec);
},
std::move(token), srv._socket);
}
}
void stop()
{
std::error_code ec;
_stopReceiving = true;
_socket.shutdown(_shutdownType, ec);
if (!ec) {
_socket.close(ec);
}
_events.onClose(ec);
}
template <traits::adc_input_char_range R, traits::adc_time_duration_c DT>
auto send(const R& msg, const DT& timeout)
{
auto token = [this](std::error_code ec, size_t) {
//
_events.onSend(this, ec);
};
// create buffer sequence of sending session protocol representation of the input message
std::vector<asio::const_buffer> buff_seq;
std::ranges::for_each(this->toProto(msg), [&buff_seq](const auto& el) { buff_seq.emplace_back(el); });
auto timer = getDeadlineTimer(_socket, timeout);
return asio::async_compose<decltype(token), void(std::error_code)>(
[start = true, buff_seq = std::move(buff_seq), timer = std::move(timer), this](
auto& self, std::error_code ec = {}) mutable {
if (!ec) {
if (start) {
start = false;
if constexpr (std::derived_from<socket_t,
asio::basic_stream_socket<typename socket_t::protocol_type>>) {
return asio::async_write(_socket, buff_seq, std::move(self));
} else if constexpr (std::derived_from<socket_t, asio::basic_datagram_socket<
typename socket_t::protocol_type>>) {
return _socket.async_send(buff_seq, std::move(self));
} else if constexpr (std::derived_from<socket_t, asio::basic_seq_packet_socket<
typename socket_t::protocol_type>>) {
return _socket.async_send(buff_seq, std::move(self));
} else {
static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!");
}
}
}
if (isTimeout(timer, ec)) {
ec = std::make_error_code(std::errc::timed_out);
} else { // an error occured in async_write/async_send
timer->cancel();
}
self.complete(ec);
},
std::move(token), _socket);
}
template <traits::adc_time_duration_c DT>
bool wait(const DT& timeout)
{
if (_receiveQueue.size()) {
return true;
}
std::error_code ec;
std::chrono::seconds max_d = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::time_point::max() - std::chrono::steady_clock::now() - std::chrono::seconds(1));
auto res = _waitTimers.emplace(_ioContext);
if (res.second) {
(*(res.first))->expires_after(timeout < max_d ? timeout : max_d);
// _waitTimer.expires_after(timeout < max_d ? timeout : max_d); // to avoid overflow!
// auto f = _waitTimer.async_wait(asio::use_future);
auto f = (*(res.first))->async_wait(asio::use_future);
try {
f.get();
_waitTimers.erase(res.first);
} catch (std::system_error& ex) {
if (ex.code() == asio::error::operation_aborted) { // canceled in startReceiving (message was received)
return true;
} else {
return false;
}
}
}
}
protected:
asio::io_context& _ioContext;
asio::io_context::strand _receiveStrand;
socket_t _socket;
acceptor_t _acceptor;
asio::streambuf _streamBuffer;
std::queue<std::vector<char>> _receiveQueue;
asio::socket_base::shutdown_type _shutdownType = asio::socket_base::shutdown_both;
std::atomic_bool _stopReceiving;
Events _events;
asio::steady_timer _waitTimer;
std::set<std::unique_ptr<asio::steady_timer>> _waitTimers;
void startReceiving()
{
auto get_msg = [this](std::error_code ec, size_t) {
if (!ec) {
auto start_ptr = static_cast<const char*>(_streamBuffer.data().data());
auto net_pack = this->search(std::span(start_ptr, _streamBuffer.size()));
if (!net_pack.empty()) {
_receiveQueue.emplace();
std::ranges::copy(this->fromProto(net_pack), std::back_inserter(_receiveQueue.back()));
_streamBuffer.consume(net_pack.size());
while (_streamBuffer.size()) { // search for possible additional session protocol packets
start_ptr = static_cast<const char*>(_streamBuffer.data().data());
net_pack = this->search(std::span(start_ptr, _streamBuffer.size()));
if (!net_pack.empty()) {
_receiveQueue.emplace();
std::ranges::copy(this->fromProto(net_pack), std::back_inserter(_receiveQueue.back()));
_streamBuffer.consume(net_pack.size());
} else {
break; // exit and hold remaining bytes in stream buffer
}
}
auto msg = _receiveQueue.front();
_events.onMessage(this, ec, std::span<const char>(msg.begin(), msg.end()));
_receiveQueue.pop();
}
if (!_stopReceiving) {
startReceiving(); // initiate consequence socket's read operation
}
} else {
_events.onMessage(this, ec, std::span<const char>());
}
};
auto out_flags = std::make_shared<asio::socket_base::message_flags>();
if constexpr (std::derived_from<socket_t, asio::basic_stream_socket<typename socket_t::protocol_type>>) {
return asio::async_read(_socket, _streamBuffer, asio::transfer_at_least(1), std::move(get_msg));
} else if constexpr (std::derived_from<socket_t,
asio::basic_datagram_socket<typename socket_t::protocol_type>>) {
// datagram, so it should be received at once
return _socket.receive(_streamBuffer, std::move(get_msg));
} else if constexpr (std::derived_from<socket_t,
asio::basic_seq_packet_socket<typename socket_t::protocol_type>>) {
// datagram, so it should be received at once
return _socket.receive(_streamBuffer, *out_flags, std::move(get_msg));
} else {
static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!");
}
}
template <typename CancelableT, traits::adc_time_duration_c TimeoutT>
static std::unique_ptr<asio::steady_timer> getDeadlineTimer(CancelableT& obj,
const TimeoutT& timeout,
bool arm = true)
{
auto timer = std::make_unique<asio::steady_timer>(obj.get_executor());
if (arm) {
std::chrono::seconds max_d = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::time_point::max() - std::chrono::steady_clock::now() -
std::chrono::seconds(1));
timer->expires_after(timeout < max_d ? timeout : max_d); // to avoid overflow!
// timer->expires_after(timeout);
timer->async_wait([&obj](const std::error_code& ec) mutable {
if (!ec) {
obj.cancel();
}
});
}
return timer;
}
template <typename TimerT>
static bool isTimeout(const std::unique_ptr<TimerT>& timer, const std::error_code& ec)
{
auto exp_time = timer->expiry();
return (exp_time < std::chrono::steady_clock::now()) && (ec == asio::error::operation_aborted);
}
};
} // namespace adc::impl