ADC/net/asio/adc_netservice_asio.h
2024-09-15 01:25:58 +03:00

426 lines
16 KiB
C++

#pragma once
#include <asio/basic_datagram_socket.hpp>
#include <asio/basic_seq_packet_socket.hpp>
#include <asio/basic_stream_socket.hpp>
#include <asio/bind_executor.hpp>
#include <asio/compose.hpp>
#include <asio/ip/tcp.hpp>
#include <asio/ip/udp.hpp>
#include <asio/local/seq_packet_protocol.hpp>
#include <asio/local/stream_protocol.hpp>
#include <asio/read_until.hpp>
#include <asio/steady_timer.hpp>
#include <asio/strand.hpp>
#include <asio/streambuf.hpp>
#include <asio/use_future.hpp>
#include <asio/write.hpp>
#include <queue>
#ifdef USE_OPENSSL_WITH_ASIO
#include <asio/ssl.hpp>
#include <asio/ssl/stream.hpp>
#endif
#include "../../common/adc_traits.h"
#include "../adc_net_concepts.h"
namespace adc::impl
{
// typedef for ASIO streambuf iterators
using asio_streambuff_iter_t = asio::buffers_iterator<asio::streambuf::const_buffers_type>;
template <typename T>
concept adc_asio_transport_proto_c =
std::derived_from<T, asio::ip::tcp> || std::derived_from<T, asio::ip::udp> ||
std::derived_from<T, asio::local::seq_packet_protocol> || std::derived_from<T, asio::local::stream_protocol>;
template <typename T>
concept adc_asio_stream_transport_proto_c =
std::derived_from<T, asio::ip::tcp> || std::derived_from<T, asio::local::stream_protocol>;
template <adc_asio_transport_proto_c TRANSPORT_PROTOT,
interfaces::adc_netsession_proto_c<std::string_view> SESSION_PROTOT,
traits::adc_output_char_range RMSGT = std::vector<char>>
class AdcNetServiceASIOBase : public SESSION_PROTOT
{
public:
typedef std::string netservice_ident_t;
using socket_t = typename TRANSPORT_PROTOT::socket;
using endpoint_t = typename TRANSPORT_PROTOT::endpoint;
struct asio_async_ctx_t {
bool use_future = false;
std::function<void(std::error_code)> accept_comp_token;
std::function<void(std::error_code)> connect_comp_token;
std::function<void(std::error_code)> send_comp_token;
std::function<void(std::error_code, RMSGT)> receive_comp_token;
};
static constexpr std::chrono::duration DEFAULT_ACCEPT_TIMEOUT = std::chrono::years::max();
static constexpr std::chrono::duration DEFAULT_CONNECT_TIMEOUT = std::chrono::seconds(10);
static constexpr std::chrono::duration DEFAULT_SEND_TIMEOUT = std::chrono::seconds(5);
static constexpr std::chrono::duration DEFAULT_RECEIVE_TIMEOUT = std::chrono::seconds(5);
AdcNetServiceASIOBase(const netservice_ident_t& ident, asio::io_context& ctx)
: SESSION_PROTOT(), _ident(ident), _ioContext(ctx), _receiveStrand(ctx), _receiveQueue(), _socket(ctx)
{
}
virtual ~AdcNetServiceASIOBase() {}
netservice_ident_t ident() const { return _ident; }
void clear()
{
// clear receiving messages queue
// NOTE: there is no racing condition here since using asio::strand!
asio::post(_receiveStrand, [this]() { _receiveQueue = {}; });
}
template <traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_CONNECT_TIMEOUT)>
auto asyncConnect(const endpoint_t& endpoint,
asio_async_ctx_t& ctx,
const TimeoutT& timeout = DEFAULT_CONNECT_TIMEOUT)
{
auto timer = getDeadlineTimer(timeout);
auto comp_token = [&ctx, timer = std::move(timer), this](std::error_code ec) {
if (isTimeout(timer, ec)) {
ec = std::make_error_code(std::errc::timed_out);
} else {
timer->cancel();
}
if (!ctx.use_future) {
ctx.connect_comp_token(ec);
}
};
if (ctx.use_future) {
comp_token = asio::use_future(comp_token);
}
return _socket.async_connect(endpoint, comp_token);
// if (ctx.use_future) {
// return _socket.async_connect(
// endpoint, asio::use_future([&ctx, timer = std::move(timer)](std::error_code) { timer->cancel(); }));
// } else {
// return _socket.async_connect(endpoint, [&ctx, timer = std::move(timer)](std::error_code ec) {
// timer->cancel();
// ctx.connect_comp_token(ec);
// });
// }
}
template <traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_ACCEPT_TIMEOUT)>
auto asyncAccept(const endpoint_t& endpoint,
asio_async_ctx_t& ctx,
const TimeoutT& timeout = DEFAULT_ACCEPT_TIMEOUT)
{
if constexpr (std::derived_from<socket_t, asio::basic_datagram_socket<typename socket_t::protocol_type>>) {
return; // there is no acceptor for UDP protocol
}
typename TRANSPORT_PROTOT::acceptor acceptor;
try {
acceptor = typename TRANSPORT_PROTOT::acceptor(_ioContext, endpoint);
} catch (std::system_error err) {
if (ctx.use_future) { // emulation of asio::use_future behaivior?!
throw;
}
ctx.accept_comp_token(err.code());
return;
}
auto timer = getDeadlineTimer(timeout);
if (ctx.use_future) {
return _socket.async_accept(
endpoint, asio::use_future([&ctx, timer = std::move(timer)](std::error_code) { timer->cancel(); }));
} else {
return _socket.async_accept(endpoint, [&ctx, timer = std::move(timer)](std::error_code ec) {
timer->cancel();
ctx.accept_comp_token(ec);
});
}
}
template <traits::adc_input_char_range SMSGT, traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_SEND_TIMEOUT)>
auto asyncSend(const SMSGT& msg, asio_async_ctx_t& ctx, const TimeoutT& timeout = DEFAULT_SEND_TIMEOUT)
{
// create buffer sequence of sending session protocol representation of the input message
std::vector<asio::const_buffer> buff_seq;
std::ranges::for_each(this->toProto(msg), [&buff_seq](const auto& el) { buff_seq.emplace_back(el); });
auto timer = getDeadlineTimer(timeout);
auto comp_token = [&ctx, timer = std::move(timer)](std::error_code ec, size_t) {
timer->cancel();
if (!ctx.use_future) {
ctx.send_comp_token(ec);
}
};
if (ctx.use_future) {
comp_token = asio::use_future(comp_token);
}
if constexpr (std::derived_from<socket_t, asio::basic_stream_socket<typename socket_t::protocol_type>>) {
return asio::async_write(_socket, buff_seq, comp_token);
} else if constexpr (std::derived_from<socket_t,
asio::basic_datagram_socket<typename socket_t::protocol_type>>) {
return _socket.async_send(buff_seq, comp_token);
} else if constexpr (std::derived_from<socket_t,
asio::basic_seq_packet_socket<typename socket_t::protocol_type>>) {
return _socket.async_send(buff_seq, comp_token);
} else {
static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!");
}
}
template <traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_RECEIVE_TIMEOUT)>
auto asyncReceive(asio_async_ctx_t& ctx, const TimeoutT& timeout = DEFAULT_RECEIVE_TIMEOUT)
{
if (_receiveQueue.size()) { // return message from queue
auto async_init = [this](auto&& compl_hndl) {
asio::post(_receiveStrand, [&compl_hndl, this]() {
RMSGT msg = _receiveQueue.front();
_receiveQueue.pop();
compl_hndl(std::error_code(), std::move(msg));
});
};
if (ctx.use_future) {
return asio::async_initiate<decltype(asio_async_ctx_t::receive_comp_token),
void(std::error_code, RMSGT)>(async_init,
asio::use_future(ctx.receive_comp_token));
} else {
return asio::async_initiate<decltype(asio_async_ctx_t::receive_comp_token),
void(std::error_code, RMSGT)>(async_init, ctx.receive_comp_token);
}
}
auto out_flags = std::make_unique<asio::socket_base::message_flags>();
auto timer = getDeadlineTimer(timeout);
auto s_res = std::make_shared<std::invoke_result_t<decltype(SESSION_PROTOT::search), RMSGT>>();
// NOTE: this completion token is safe (_streamBuffer access) in multithread context since all the instances
// will be executed in serialized execution manner (see asio::strand)
auto comp_token = [&ctx, s_res, timer = std::move(timer), out_flags = std::move(out_flags), this](
std::error_code ec, size_t nbytes) {
timer->cancel();
RMSGT msg;
if (!ec && nbytes) {
// here, the iterators were computed in MatchCondition called by asio::async_read_until function!!!
std::string_view net_pack{std::get<0>(*s_res), std::get<1>(*s_res)};
std::ranges::copy(this->fromProto(net_pack), std::back_inserter(msg));
_streamBuffer.consume(net_pack.size());
while (_streamBuffer.size()) { // search for possible additional session protocol packets
auto begin_it = (const char*)asio_streambuff_iter_t::begin(_streamBuffer.data());
auto end_it = (const char*)asio_streambuff_iter_t::end(_streamBuffer.data());
// static_cast<std::ranges::iterator_t<std::string_view>>(_streamBuffer.data().data());
// auto end_it = begin_it + _streamBuffer.data().size();
*s_res = this->search(std::span(begin_it, end_it));
if (std::get<2>(*s_res)) {
net_pack = std::string_view{std::get<0>(*s_res), std::get<1>(*s_res)};
_receiveQueue.emplace();
std::ranges::copy(this->fromProto(net_pack), std::back_inserter(_receiveQueue.back()));
_streamBuffer.consume(net_pack.size());
} else {
break;
}
}
}
if (ctx.use_future) {
return msg;
} else {
ctx.receive_comp_token(ec, std::move(msg));
}
};
if (ctx.use_future) {
comp_token = asio::use_future(comp_token);
}
comp_token = asio::bind_executor(_receiveStrand, comp_token);
if constexpr (std::derived_from<socket_t, asio::basic_stream_socket<typename socket_t::protocol_type>>) {
// adapt to ASIO's MatchCondition
auto match_func = [s_res, this]<typename IT>(IT begin, IT end) {
*s_res = this->search(std::span(begin, end));
return std::make_tuple(std::get<1>(*s_res), std::get<2>(*s_res));
};
return asio::async_read_until(_socket, _streamBuffer, match_func, comp_token);
} else if constexpr (std::derived_from<socket_t,
asio::basic_datagram_socket<typename socket_t::protocol_type>>) {
// datagram, so it should be received at once
return _socket.receive(_streamBuffer, comp_token);
} else if constexpr (std::derived_from<socket_t,
asio::basic_seq_packet_socket<typename socket_t::protocol_type>>) {
// datagram, so it should be received at once
return _socket.receive(_streamBuffer, *out_flags, comp_token);
} else {
static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!");
}
/*
if constexpr (std::derived_from<socket_t, asio::basic_stream_socket<typename socket_t::protocol_type>>) {
// adapt to ASIO's MatchCondition
auto match_func = [s_res, this]<typename IT>(IT begin, IT end) {
*s_res = this->search(std::span(begin, end));
return std::make_tuple(std::get<1>(*s_res), std::get<2>(*s_res));
};
if (ctx.use_future) {
return asio::async_read_until(_socket, _streamBuffer, match_func,
asio::bind_executor(_receiveStrand, asio::use_future(comp_token)));
} else {
return asio::async_read_until(_socket, _streamBuffer, match_func,
asio::bind_executor(_receiveStrand, comp_token));
}
} else if constexpr (std::derived_from<socket_t,
asio::basic_datagram_socket<typename socket_t::protocol_type>>) {
// datagram, so it should be received at once
if (ctx.use_future) {
return _socket.receive(_streamBuffer,
asio::bind_executor(_receiveStrand, asio::use_future(comp_token)));
} else {
return _socket.receive(_streamBuffer, asio::bind_executor(_receiveStrand, comp_token));
}
} else if constexpr (std::derived_from<socket_t,
asio::basic_seq_packet_socket<typename socket_t::protocol_type>>) {
// datagram, so it should be received at once
if (ctx.use_future) {
return _socket.receive(_streamBuffer, *out_flags,
asio::bind_executor(_receiveStrand, asio::use_future(comp_token)));
} else {
return _socket.receive(_streamBuffer, *out_flags, asio::bind_executor(_receiveStrand, comp_token));
}
} else {
static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!");
}
*/
}
template <traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_ACCEPT_TIMEOUT)>
auto accept(const endpoint_t& endpoint, const TimeoutT& timeout = DEFAULT_ACCEPT_TIMEOUT)
{
asio_async_ctx_t ctx = {.use_future = true};
std::future<void> ftr = asyncAcept(endpoint, ctx, timeout);
ftr.get();
}
template <traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_CONNECT_TIMEOUT)>
auto connect(const endpoint_t& endpoint, const TimeoutT& timeout = DEFAULT_CONNECT_TIMEOUT)
{
asio_async_ctx_t ctx = {.use_future = true};
std::future<void> ftr = asyncConnect(endpoint, ctx, timeout);
ftr.get();
}
template <traits::adc_input_char_range R, traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_SEND_TIMEOUT)>
auto send(const R& msg, const TimeoutT& timeout = DEFAULT_SEND_TIMEOUT)
{
std::future<void> ftr = asyncSend(msg, timeout, asio::use_future);
ftr.get();
}
template <traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_RECEIVE_TIMEOUT)>
auto receive(const TimeoutT& timeout = DEFAULT_RECEIVE_TIMEOUT)
{
std::future<RMSGT> ftr = asyncReceive(timeout, asio::use_future);
return ftr.get();
}
void setShutdownType(asio::socket_base::shutdown_type shutdown_type) { _shutdownType = shutdown_type; }
asio::socket_base::shutdown_type getShutdownType() const { return _shutdownType; }
std::error_code close()
{
std::error_code ec;
_socket.shutdown(_shutdownType, ec);
if (!ec) {
_socket.close(ec);
}
return ec;
}
protected:
netservice_ident_t _ident;
asio::io_context& _ioContext;
asio::io_context::strand _receiveStrand;
socket_t _socket;
// acceptor_t _acceptor;
asio::streambuf _streamBuffer;
std::queue<std::vector<char>> _receiveQueue;
asio::socket_base::shutdown_type _shutdownType = asio::socket_base::shutdown_both;
template <traits::adc_time_duration_c TimeoutT>
std::unique_ptr<asio::steady_timer> getDeadlineTimer(const TimeoutT& timeout, bool arm = true)
{
auto timer = std::make_unique<asio::steady_timer>(_socket.get_executor());
if (arm) {
timer->expires_after(timeout);
timer->async_wait([this](const std::error_code& ec) {
if (!ec) {
_socket.cancel();
}
});
}
return timer;
}
template <typename TimerT>
bool isTimeout(const std::unique_ptr<TimerT>& timer, const std::error_code& ec)
{
auto exp_time = timer->expiry();
return (exp_time < std::chrono::steady_clock::now()) && (ec == asio::error::operation_aborted);
}
};
} // namespace adc::impl