move implementations for ASIO-library to net/asio subdirectory
This commit is contained in:
parent
9818b5f2b8
commit
a7626bfe5e
@ -84,39 +84,40 @@ template <typename SRVT,
|
||||
typename RMSGT = std::string, // receiving message type
|
||||
typename DURT = adc_common_duration_t // time duration type
|
||||
>
|
||||
concept adc_netservice_c = traits::adc_input_char_range<SMSGT> && traits::adc_output_char_range<RMSGT> && traits::adc_time_duration_c<DURT> &&
|
||||
requires(SRVT srv, const SRVT srv_const) {
|
||||
typename SRVT::netservice_ident_t;
|
||||
concept adc_netservice_c =
|
||||
traits::adc_input_char_range<SMSGT> && traits::adc_output_char_range<RMSGT> && traits::adc_time_duration_c<DURT> &&
|
||||
requires(SRVT srv, const SRVT srv_const) {
|
||||
typename SRVT::netservice_ident_t;
|
||||
|
||||
// netservice_ident_t ident() const
|
||||
{ srv_const.ident() } -> std::same_as<typename SRVT::netservice_ident_t>;
|
||||
// netservice_ident_t ident() const
|
||||
{ srv_const.ident() } -> std::same_as<typename SRVT::netservice_ident_t>;
|
||||
|
||||
typename SRVT::async_ctx_t;
|
||||
typename SRVT::endpoint_t;
|
||||
typename SRVT::async_ctx_t;
|
||||
typename SRVT::endpoint_t;
|
||||
|
||||
// asynchronous (non-blocking) operations
|
||||
srv.asyncAccept(std::declval<const typename SRVT::endpoint_t&>(),
|
||||
std::declval<typename SRVT::async_ctx_t&>(), std::declval<const DURT&>());
|
||||
// asynchronous (non-blocking) operations
|
||||
srv.asyncAccept(std::declval<const typename SRVT::endpoint_t&>(), std::declval<typename SRVT::async_ctx_t&>(),
|
||||
std::declval<const DURT&>());
|
||||
|
||||
srv.asyncConnect(std::declval<const typename SRVT::endpoint_t&>(),
|
||||
std::declval<typename SRVT::async_ctx_t&>(), std::declval<const DURT&>());
|
||||
srv.asyncConnect(std::declval<const typename SRVT::endpoint_t&>(), std::declval<typename SRVT::async_ctx_t&>(),
|
||||
std::declval<const DURT&>());
|
||||
|
||||
srv.asyncSend(std::declval<const SMSGT&>(), std::declval<typename SRVT::async_ctx_t&>(),
|
||||
std::declval<const DURT&>());
|
||||
srv.asyncSend(std::declval<const SMSGT&>(), std::declval<typename SRVT::async_ctx_t&>(),
|
||||
std::declval<const DURT&>());
|
||||
|
||||
srv.asyncReceive(std::declval<typename SRVT::async_ctx_t&>(), std::declval<const DURT&>());
|
||||
srv.asyncReceive(std::declval<typename SRVT::async_ctx_t&>(), std::declval<const DURT&>());
|
||||
|
||||
// synchronous (blocking) operations
|
||||
srv.accept(std::declval<const typename SRVT::endpoint_t&>(), std::declval<const DURT&>());
|
||||
// synchronous (blocking) operations
|
||||
srv.accept(std::declval<const typename SRVT::endpoint_t&>(), std::declval<const DURT&>());
|
||||
|
||||
srv.connect(std::declval<const typename SRVT::endpoint_t&>(), std::declval<const DURT&>());
|
||||
srv.connect(std::declval<const typename SRVT::endpoint_t&>(), std::declval<const DURT&>());
|
||||
|
||||
srv.send(std::declval<const SMSGT&>(), std::declval<const DURT&>());
|
||||
srv.send(std::declval<const SMSGT&>(), std::declval<const DURT&>());
|
||||
|
||||
{ srv.receive(std::declval<const DURT &>()) } -> std::same_as<RMSGT>;
|
||||
{ srv.receive(std::declval<const DURT&>()) } -> std::same_as<RMSGT>;
|
||||
|
||||
srv.shutdown();
|
||||
};
|
||||
srv.close();
|
||||
};
|
||||
|
||||
|
||||
/* NETWORK SESSION */
|
||||
@ -137,35 +138,34 @@ concept adc_netsession_c =
|
||||
|
||||
/* NETWORK SESSION-LEVEL PROTOCOL */
|
||||
|
||||
template <typename SESS_PROTOT,
|
||||
typename BUFFT = std::string_view>
|
||||
concept adc_netsession_proto_c = traits::adc_input_char_range<BUFFT> && requires(SESS_PROTOT proto, const SESS_PROTOT proto_const) {
|
||||
typename SESS_PROTOT::proto_ident_t;
|
||||
template <typename SESS_PROTOT, typename BUFFT = std::string_view>
|
||||
concept adc_netsession_proto_c =
|
||||
traits::adc_input_char_range<BUFFT> && requires(SESS_PROTOT proto, const SESS_PROTOT proto_const) {
|
||||
typename SESS_PROTOT::proto_ident_t;
|
||||
|
||||
// proto_ident_t ident() const (const method)
|
||||
{ proto_const.ident() } -> std::same_as<typename SESS_PROTOT::proto_ident_t>;
|
||||
// proto_ident_t ident() const (const method)
|
||||
{ proto_const.ident() } -> std::same_as<typename SESS_PROTOT::proto_ident_t>;
|
||||
|
||||
// typename SESS_PROTOT::search_result_t;
|
||||
// typename SESS_PROTOT::search_result_t;
|
||||
|
||||
// search for the first occurence of valid protocol sequence in input user byte sequence
|
||||
// the method must return std::tuple<begin, end, flag>:
|
||||
// start - input range iterator of the sequence first byte
|
||||
// stop - input range iterator of the sequence end ("after-the-last" byte!!!)
|
||||
// flag - true if valid sequence was found, false - otherwise
|
||||
{
|
||||
proto.search(std::declval<const BUFFT&>())
|
||||
} -> std::same_as<std::tuple<std::ranges::iterator_t<BUFFT>, std::ranges::iterator_t<BUFFT>, bool>>;
|
||||
// search for the first occurence of valid protocol sequence in input user byte sequence
|
||||
// the method must return std::tuple<begin, end, flag>:
|
||||
// start - input range iterator of the sequence first byte
|
||||
// stop - input range iterator of the sequence end ("after-the-last" byte!!!)
|
||||
// flag - true if valid sequence was found, false - otherwise
|
||||
{
|
||||
proto.search(std::declval<const BUFFT&>())
|
||||
} -> std::same_as<std::tuple<std::ranges::iterator_t<BUFFT>, std::ranges::iterator_t<BUFFT>, bool>>;
|
||||
|
||||
|
||||
// construct netsession protocol representation of input user byte sequence
|
||||
// the method must return a range of char range views or output char range
|
||||
{ proto.toProto(std::declval<const BUFFT &>()) } -> traits::adc_range_of_view_or_output_char_range;
|
||||
// construct netsession protocol representation of input user byte sequence
|
||||
// the method must return a range of char range views or output char range
|
||||
{ proto.toProto(std::declval<const BUFFT&>()) } -> traits::adc_range_of_view_or_output_char_range;
|
||||
|
||||
// return user byte sequence from input netsession protocol representation
|
||||
// the method must return a view of char range or output char range
|
||||
{ proto.fromProto(std::declval<const BUFFT &>()) } -> traits::adc_view_or_output_char_range;
|
||||
|
||||
};
|
||||
// return user byte sequence from input netsession protocol representation
|
||||
// the method must return a view of char range or output char range
|
||||
{ proto.fromProto(std::declval<const BUFFT&>()) } -> traits::adc_view_or_output_char_range;
|
||||
};
|
||||
|
||||
|
||||
} // namespace adc::interfaces
|
||||
|
||||
@ -40,7 +40,6 @@
|
||||
|
||||
#include "adc_netmsg.h"
|
||||
|
||||
#include "adc_net_concepts.h"
|
||||
|
||||
|
||||
namespace adc::traits
|
||||
@ -72,215 +71,6 @@ concept adc_asio_inet_stream_proto_c = requires(T t, asio_streambuff_iter_t begi
|
||||
namespace adc::impl
|
||||
{
|
||||
|
||||
template <typename T>
|
||||
concept adc_asio_transport_proto_c =
|
||||
std::derived_from<T, asio::ip::tcp> || std::derived_from<T, asio::ip::udp> ||
|
||||
std::derived_from<T, asio::local::seq_packet_protocol> || std::derived_from<T, asio::local::stream_protocol>;
|
||||
|
||||
|
||||
template <typename T>
|
||||
concept adc_asio_stream_transport_proto_c =
|
||||
std::derived_from<T, asio::ip::tcp> || std::derived_from<T, asio::local::stream_protocol>;
|
||||
|
||||
|
||||
|
||||
template <adc_asio_transport_proto_c TRANSPORT_PROTOT,
|
||||
interfaces::adc_netsession_proto_c<std::string_view> SESSION_PROTOT>
|
||||
class AdcNetServiceASIOBase : public TRANSPORT_PROTOT, public SESSION_PROTOT
|
||||
{
|
||||
public:
|
||||
typedef std::string netservice_ident_t;
|
||||
|
||||
using socket_t = typename TRANSPORT_PROTOT::socket;
|
||||
using endpoint_t = typename TRANSPORT_PROTOT::endpoint;
|
||||
|
||||
struct asio_async_ctx_t {
|
||||
bool use_future = false;
|
||||
std::function<void(std::error_code)> accept_comp_token;
|
||||
std::function<void(std::error_code)> connect_comp_token;
|
||||
std::function<void(std::error_code)> send_comp_token;
|
||||
|
||||
template <traits::adc_output_char_range R>
|
||||
static std::unordered_map<const asio_async_ctx_t*, std::function<void(std::error_code, const R&)>>
|
||||
receive_comp_token;
|
||||
};
|
||||
|
||||
static constexpr std::chrono::duration DEFAULT_ACCEPT_TIMEOUT = std::chrono::years::max();
|
||||
static constexpr std::chrono::duration DEFAULT_CONNECT_TIMEOUT = std::chrono::seconds(10);
|
||||
static constexpr std::chrono::duration DEFAULT_SEND_TIMEOUT = std::chrono::seconds(5);
|
||||
static constexpr std::chrono::duration DEFAULT_RECEIVE_TIMEOUT = std::chrono::seconds(5);
|
||||
|
||||
|
||||
netservice_ident_t ident() const
|
||||
{
|
||||
return _ident;
|
||||
}
|
||||
|
||||
|
||||
template <traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_CONNECT_TIMEOUT)>
|
||||
auto asyncConnect(const endpoint_t& endpoint,
|
||||
asio_async_ctx_t& ctx,
|
||||
const TimeoutT& timeout = DEFAULT_CONNECT_TIMEOUT)
|
||||
{
|
||||
auto timer = getDeadlineTimer(timeout);
|
||||
|
||||
if (ctx.use_future) {
|
||||
return _socket.async_connect(
|
||||
endpoint, asio::use_future([&ctx, timer = std::move(timer)](std::error_code) { timer->cancel(); }));
|
||||
} else {
|
||||
return _socket.async_connect(endpoint, [&ctx, timer = std::move(timer)](std::error_code ec) {
|
||||
timer->cancel();
|
||||
ctx.connect_comp_token(ec);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
template <traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_ACCEPT_TIMEOUT)>
|
||||
auto asyncAccept(const endpoint_t& endpoint,
|
||||
asio_async_ctx_t& ctx,
|
||||
const TimeoutT& timeout = DEFAULT_ACCEPT_TIMEOUT)
|
||||
{
|
||||
if constexpr (std::derived_from<socket_t, asio::basic_datagram_socket<typename socket_t::protocol_type>>) {
|
||||
return; // there is no acceptor for UDP protocol
|
||||
}
|
||||
|
||||
typename TRANSPORT_PROTOT::acceptor acceptor;
|
||||
try {
|
||||
acceptor = typename TRANSPORT_PROTOT::acceptor(_ioContext, endpoint);
|
||||
} catch (std::system_error err) {
|
||||
if (ctx.use_future) { // emulation of asio::use_future behaivior?!
|
||||
throw;
|
||||
}
|
||||
ctx.accept_comp_token(err.code());
|
||||
return;
|
||||
}
|
||||
|
||||
auto timer = getDeadlineTimer(timeout);
|
||||
|
||||
if (ctx.use_future) {
|
||||
return _socket.async_accept(
|
||||
endpoint, asio::use_future([&ctx, timer = std::move(timer)](std::error_code) { timer->cancel(); }));
|
||||
} else {
|
||||
return _socket.async_accept(endpoint, [&ctx, timer = std::move(timer)](std::error_code ec) {
|
||||
timer->cancel();
|
||||
ctx.accept_comp_token(ec);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
template <traits::adc_output_char_range R, traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_RECEIVE_TIMEOUT)>
|
||||
auto asyncReceive(asio_async_ctx_t& ctx, const TimeoutT& timeout = DEFAULT_RECEIVE_TIMEOUT)
|
||||
{
|
||||
auto timer = getDeadlineTimer(timeout);
|
||||
|
||||
auto s_res = std::make_shared<std::invoke_result_t<decltype(SESSION_PROTOT::search), R>>();
|
||||
|
||||
if constexpr (std::derived_from<socket_t, asio::basic_stream_socket<typename socket_t::protocol_type>>) {
|
||||
return asio::async_read_until(
|
||||
_socket, _streamBuffer,
|
||||
[s_res, this]<typename IT>(IT begin, IT end) {
|
||||
*s_res = this->search(std::span(begin, end));
|
||||
return std::make_tuple(std::get<1>(*s_res), std::get<2>(*s_res));
|
||||
},
|
||||
[&ctx, s_res, timer = std::move(timer), this](std::error_code ec, size_t) {
|
||||
timer->cancel();
|
||||
R msg;
|
||||
|
||||
if (!ec) {
|
||||
std::string_view net_pack{std::get<0>(*s_res), std::get<1>(*s_res)};
|
||||
|
||||
std::ranges::copy(this->fromProto(net_pack), std::back_inserter(msg));
|
||||
_streamBuffer.consume(net_pack.size());
|
||||
|
||||
while (_streamBuffer.size()) { // search for possible additional session protocol packets
|
||||
auto begin_it = (const char*)traits::asio_streambuff_iter_t::begin(_streamBuffer.data());
|
||||
auto end_it = (const char*)traits::asio_streambuff_iter_t::end(_streamBuffer.data());
|
||||
// static_cast<std::ranges::iterator_t<std::string_view>>(_streamBuffer.data().data());
|
||||
// auto end_it = begin_it + _streamBuffer.data().size();
|
||||
|
||||
|
||||
*s_res = this->search(std::span(begin_it, end_it));
|
||||
if (std::get<2>(*s_res)) {
|
||||
net_pack = std::string_view{std::get<0>(*s_res), std::get<1>(*s_res)};
|
||||
|
||||
std::ranges::copy(this->fromProto(net_pack), std::back_inserter(msg));
|
||||
_streamBuffer.consume(net_pack.size());
|
||||
// TODO: insert to queue
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
ctx.accept_comp_token(ec, std::move(msg));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
template <traits::adc_time_duration_c TimeoutT>
|
||||
auto accept(const endpoint_t& endpoint, const TimeoutT& timeout)
|
||||
{
|
||||
asio_async_ctx_t ctx = {.use_future = true};
|
||||
std::future<void> ftr = asyncAcept(endpoint, ctx, timeout);
|
||||
ftr.get();
|
||||
}
|
||||
|
||||
template <traits::adc_time_duration_c TimeoutT>
|
||||
auto connect(const endpoint_t& endpoint, const TimeoutT& timeout)
|
||||
{
|
||||
asio_async_ctx_t ctx = {.use_future = true};
|
||||
std::future<void> ftr = asyncConnect(endpoint, ctx, timeout);
|
||||
ftr.get();
|
||||
}
|
||||
|
||||
template <traits::adc_input_char_range R, traits::adc_time_duration_c TimeoutT>
|
||||
auto send(const R& msg, const TimeoutT& timeout)
|
||||
{
|
||||
std::future<void> ftr = asyncSend(msg, timeout, asio::use_future);
|
||||
ftr.get();
|
||||
}
|
||||
|
||||
template <traits::adc_input_char_range R, traits::adc_time_duration_c TimeoutT>
|
||||
auto receive(const TimeoutT& timeout)
|
||||
{
|
||||
std::future<R> ftr = asyncReceive(timeout, asio::use_future);
|
||||
return ftr.get();
|
||||
}
|
||||
|
||||
protected:
|
||||
netservice_ident_t _ident;
|
||||
|
||||
asio::io_context& _ioContext;
|
||||
|
||||
socket_t _socket;
|
||||
|
||||
// acceptor_t _acceptor;
|
||||
|
||||
asio::streambuf _streamBuffer;
|
||||
|
||||
template <traits::adc_time_duration_c TimeoutT>
|
||||
std::unique_ptr<asio::steady_timer> getDeadlineTimer(const TimeoutT& timeout, bool arm = true)
|
||||
{
|
||||
std::unique_ptr<asio::steady_timer> timer(_socket.get_executor());
|
||||
|
||||
if (arm) {
|
||||
timer->expires_after(timeout);
|
||||
|
||||
timer->async_wait([this](const std::error_code& ec) {
|
||||
if (!ec) {
|
||||
_socket.cancel(std::make_error_code(std::errc::timed_out));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return timer;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
template <traits::adc_asio_inet_proto_c InetProtoT>
|
||||
class AdcNetServiceASIO : public InetProtoT
|
||||
{
|
||||
|
||||
415
net/asio/adc_netservice_asio.h
Normal file
415
net/asio/adc_netservice_asio.h
Normal file
@ -0,0 +1,415 @@
|
||||
#pragma once
|
||||
|
||||
#include <asio/basic_datagram_socket.hpp>
|
||||
#include <asio/basic_seq_packet_socket.hpp>
|
||||
#include <asio/basic_stream_socket.hpp>
|
||||
#include <asio/bind_executor.hpp>
|
||||
#include <asio/compose.hpp>
|
||||
#include <asio/ip/tcp.hpp>
|
||||
#include <asio/ip/udp.hpp>
|
||||
#include <asio/local/seq_packet_protocol.hpp>
|
||||
#include <asio/local/stream_protocol.hpp>
|
||||
#include <asio/read_until.hpp>
|
||||
#include <asio/steady_timer.hpp>
|
||||
#include <asio/strand.hpp>
|
||||
#include <asio/streambuf.hpp>
|
||||
#include <asio/use_future.hpp>
|
||||
#include <asio/write.hpp>
|
||||
|
||||
#include <queue>
|
||||
|
||||
#ifdef USE_OPENSSL_WITH_ASIO
|
||||
|
||||
#include <asio/ssl.hpp>
|
||||
#include <asio/ssl/stream.hpp>
|
||||
|
||||
#endif
|
||||
|
||||
|
||||
#include "../../common/adc_traits.h"
|
||||
#include "../adc_net_concepts.h"
|
||||
|
||||
|
||||
namespace adc::impl
|
||||
{
|
||||
|
||||
// typedef for ASIO streambuf iterators
|
||||
using asio_streambuff_iter_t = asio::buffers_iterator<asio::streambuf::const_buffers_type>;
|
||||
|
||||
|
||||
template <typename T>
|
||||
concept adc_asio_transport_proto_c =
|
||||
std::derived_from<T, asio::ip::tcp> || std::derived_from<T, asio::ip::udp> ||
|
||||
std::derived_from<T, asio::local::seq_packet_protocol> || std::derived_from<T, asio::local::stream_protocol>;
|
||||
|
||||
|
||||
template <typename T>
|
||||
concept adc_asio_stream_transport_proto_c =
|
||||
std::derived_from<T, asio::ip::tcp> || std::derived_from<T, asio::local::stream_protocol>;
|
||||
|
||||
|
||||
|
||||
template <adc_asio_transport_proto_c TRANSPORT_PROTOT,
|
||||
interfaces::adc_netsession_proto_c<std::string_view> SESSION_PROTOT,
|
||||
traits::adc_output_char_range RMSGT = std::vector<char>>
|
||||
class AdcNetServiceASIOBase : public TRANSPORT_PROTOT, public SESSION_PROTOT
|
||||
{
|
||||
public:
|
||||
typedef std::string netservice_ident_t;
|
||||
|
||||
using socket_t = typename TRANSPORT_PROTOT::socket;
|
||||
using endpoint_t = typename TRANSPORT_PROTOT::endpoint;
|
||||
|
||||
struct asio_async_ctx_t {
|
||||
bool use_future = false;
|
||||
std::function<void(std::error_code)> accept_comp_token;
|
||||
std::function<void(std::error_code)> connect_comp_token;
|
||||
std::function<void(std::error_code)> send_comp_token;
|
||||
std::function<void(std::error_code, RMSGT)> receive_comp_token;
|
||||
};
|
||||
|
||||
static constexpr std::chrono::duration DEFAULT_ACCEPT_TIMEOUT = std::chrono::years::max();
|
||||
static constexpr std::chrono::duration DEFAULT_CONNECT_TIMEOUT = std::chrono::seconds(10);
|
||||
static constexpr std::chrono::duration DEFAULT_SEND_TIMEOUT = std::chrono::seconds(5);
|
||||
static constexpr std::chrono::duration DEFAULT_RECEIVE_TIMEOUT = std::chrono::seconds(5);
|
||||
|
||||
AdcNetServiceASIOBase(const netservice_ident_t& ident, asio::io_context& ctx)
|
||||
: TRANSPORT_PROTOT(),
|
||||
SESSION_PROTOT(),
|
||||
_ident(ident),
|
||||
_ioContext(ctx),
|
||||
_receiveStrand(_ioContext),
|
||||
_receiveQueue(),
|
||||
_socket(_ioContext)
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
virtual ~AdcNetServiceASIOBase() {}
|
||||
|
||||
|
||||
netservice_ident_t ident() const
|
||||
{
|
||||
return _ident;
|
||||
}
|
||||
|
||||
|
||||
void clear()
|
||||
{
|
||||
// clear receiving messages queue
|
||||
// NOTE: there is no racing condition here since using asio::strand!
|
||||
asio::post(_receiveStrand, [this]() { _receiveQueue = {}; });
|
||||
}
|
||||
|
||||
|
||||
template <traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_CONNECT_TIMEOUT)>
|
||||
auto asyncConnect(const endpoint_t& endpoint,
|
||||
asio_async_ctx_t& ctx,
|
||||
const TimeoutT& timeout = DEFAULT_CONNECT_TIMEOUT)
|
||||
{
|
||||
auto timer = getDeadlineTimer(timeout);
|
||||
|
||||
if (ctx.use_future) {
|
||||
return _socket.async_connect(
|
||||
endpoint, asio::use_future([&ctx, timer = std::move(timer)](std::error_code) { timer->cancel(); }));
|
||||
} else {
|
||||
return _socket.async_connect(endpoint, [&ctx, timer = std::move(timer)](std::error_code ec) {
|
||||
timer->cancel();
|
||||
ctx.connect_comp_token(ec);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
template <traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_ACCEPT_TIMEOUT)>
|
||||
auto asyncAccept(const endpoint_t& endpoint,
|
||||
asio_async_ctx_t& ctx,
|
||||
const TimeoutT& timeout = DEFAULT_ACCEPT_TIMEOUT)
|
||||
{
|
||||
if constexpr (std::derived_from<socket_t, asio::basic_datagram_socket<typename socket_t::protocol_type>>) {
|
||||
return; // there is no acceptor for UDP protocol
|
||||
}
|
||||
|
||||
typename TRANSPORT_PROTOT::acceptor acceptor;
|
||||
try {
|
||||
acceptor = typename TRANSPORT_PROTOT::acceptor(_ioContext, endpoint);
|
||||
} catch (std::system_error err) {
|
||||
if (ctx.use_future) { // emulation of asio::use_future behaivior?!
|
||||
throw;
|
||||
}
|
||||
ctx.accept_comp_token(err.code());
|
||||
return;
|
||||
}
|
||||
|
||||
auto timer = getDeadlineTimer(timeout);
|
||||
|
||||
if (ctx.use_future) {
|
||||
return _socket.async_accept(
|
||||
endpoint, asio::use_future([&ctx, timer = std::move(timer)](std::error_code) { timer->cancel(); }));
|
||||
} else {
|
||||
return _socket.async_accept(endpoint, [&ctx, timer = std::move(timer)](std::error_code ec) {
|
||||
timer->cancel();
|
||||
ctx.accept_comp_token(ec);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
template <traits::adc_input_char_range SMSGT, traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_SEND_TIMEOUT)>
|
||||
auto asyncSend(const SMSGT& msg, asio_async_ctx_t& ctx, const TimeoutT& timeout = DEFAULT_SEND_TIMEOUT)
|
||||
{
|
||||
// create buffer sequence of sending session protocol representation of the input message
|
||||
std::vector<asio::const_buffer> buff_seq;
|
||||
std::ranges::for_each(this->toProto(msg), [&buff_seq](const auto& el) { buff_seq.emplace_back(el); });
|
||||
|
||||
auto timer = getDeadlineTimer(timeout);
|
||||
|
||||
auto comp_token = [&ctx, timer = std::move(timer)](std::error_code ec, size_t) {
|
||||
timer->cancel();
|
||||
|
||||
if (!ctx.use_future) {
|
||||
ctx.send_comp_token(ec);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
if (ctx.use_future) {
|
||||
comp_token = asio::use_future(comp_token);
|
||||
}
|
||||
|
||||
|
||||
if constexpr (std::derived_from<socket_t, asio::basic_stream_socket<typename socket_t::protocol_type>>) {
|
||||
return asio::async_write(_socket, buff_seq, comp_token);
|
||||
} else if constexpr (std::derived_from<socket_t,
|
||||
asio::basic_datagram_socket<typename socket_t::protocol_type>>) {
|
||||
return _socket.async_send(buff_seq, comp_token);
|
||||
} else if constexpr (std::derived_from<socket_t,
|
||||
asio::basic_seq_packet_socket<typename socket_t::protocol_type>>) {
|
||||
return _socket.async_send(buff_seq, comp_token);
|
||||
} else {
|
||||
static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
template <traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_RECEIVE_TIMEOUT)>
|
||||
auto asyncReceive(asio_async_ctx_t& ctx, const TimeoutT& timeout = DEFAULT_RECEIVE_TIMEOUT)
|
||||
{
|
||||
if (_receiveQueue.size()) { // return message from queue
|
||||
//
|
||||
// !!!!!!!!!!! see documentation for composed operation and async_initiate
|
||||
//
|
||||
asio::post(_receiveStrand, [&ctx, this]() {
|
||||
RMSGT msg = _receiveQueue.front();
|
||||
_receiveQueue.pop();
|
||||
if (ctx.use_future) {
|
||||
return msg;
|
||||
} else {
|
||||
ctx.receive_comp_token(std::error_code(), std::move(msg));
|
||||
return;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
auto out_flags = std::make_unique<asio::socket_base::message_flags>();
|
||||
|
||||
auto timer = getDeadlineTimer(timeout);
|
||||
|
||||
auto s_res = std::make_shared<std::invoke_result_t<decltype(SESSION_PROTOT::search), RMSGT>>();
|
||||
|
||||
// NOTE: this competion token is safe (_streamBuffer access) in multithread context since all the instances will
|
||||
// be executed in serialized execution manner (see asio::strand)
|
||||
auto comp_token = [&ctx, s_res, timer = std::move(timer), out_flags = std::move(out_flags), this](
|
||||
std::error_code ec, size_t nbytes) {
|
||||
timer->cancel();
|
||||
RMSGT msg;
|
||||
|
||||
if (!ec && nbytes) {
|
||||
// here, the iterators were computed in MatchCondition called by asio::async_read_until function!!!
|
||||
std::string_view net_pack{std::get<0>(*s_res), std::get<1>(*s_res)};
|
||||
|
||||
std::ranges::copy(this->fromProto(net_pack), std::back_inserter(msg));
|
||||
_streamBuffer.consume(net_pack.size());
|
||||
|
||||
while (_streamBuffer.size()) { // search for possible additional session protocol packets
|
||||
auto begin_it = (const char*)asio_streambuff_iter_t::begin(_streamBuffer.data());
|
||||
auto end_it = (const char*)asio_streambuff_iter_t::end(_streamBuffer.data());
|
||||
// static_cast<std::ranges::iterator_t<std::string_view>>(_streamBuffer.data().data());
|
||||
// auto end_it = begin_it + _streamBuffer.data().size();
|
||||
|
||||
|
||||
*s_res = this->search(std::span(begin_it, end_it));
|
||||
if (std::get<2>(*s_res)) {
|
||||
net_pack = std::string_view{std::get<0>(*s_res), std::get<1>(*s_res)};
|
||||
|
||||
_receiveQueue.emplace();
|
||||
std::ranges::copy(this->fromProto(net_pack), std::back_inserter(_receiveQueue.back()));
|
||||
_streamBuffer.consume(net_pack.size());
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (ctx.use_future) {
|
||||
return msg;
|
||||
} else {
|
||||
ctx.receive_comp_token(ec, std::move(msg));
|
||||
}
|
||||
};
|
||||
|
||||
if (ctx.use_future) {
|
||||
comp_token = asio::use_future(comp_token);
|
||||
}
|
||||
|
||||
comp_token = asio::bind_executor(_receiveStrand, comp_token);
|
||||
|
||||
if constexpr (std::derived_from<socket_t, asio::basic_stream_socket<typename socket_t::protocol_type>>) {
|
||||
// adapt to ASIO's MatchCondition
|
||||
auto match_func = [s_res, this]<typename IT>(IT begin, IT end) {
|
||||
*s_res = this->search(std::span(begin, end));
|
||||
return std::make_tuple(std::get<1>(*s_res), std::get<2>(*s_res));
|
||||
};
|
||||
|
||||
return asio::async_read_until(_socket, _streamBuffer, match_func, comp_token);
|
||||
|
||||
} else if constexpr (std::derived_from<socket_t,
|
||||
asio::basic_datagram_socket<typename socket_t::protocol_type>>) {
|
||||
// datagram, so it should be received at once
|
||||
return _socket.receive(_streamBuffer, comp_token);
|
||||
} else if constexpr (std::derived_from<socket_t,
|
||||
asio::basic_seq_packet_socket<typename socket_t::protocol_type>>) {
|
||||
// datagram, so it should be received at once
|
||||
return _socket.receive(_streamBuffer, *out_flags, comp_token);
|
||||
} else {
|
||||
static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!");
|
||||
}
|
||||
|
||||
/*
|
||||
if constexpr (std::derived_from<socket_t, asio::basic_stream_socket<typename socket_t::protocol_type>>) {
|
||||
// adapt to ASIO's MatchCondition
|
||||
auto match_func = [s_res, this]<typename IT>(IT begin, IT end) {
|
||||
*s_res = this->search(std::span(begin, end));
|
||||
return std::make_tuple(std::get<1>(*s_res), std::get<2>(*s_res));
|
||||
};
|
||||
|
||||
if (ctx.use_future) {
|
||||
return asio::async_read_until(_socket, _streamBuffer, match_func,
|
||||
asio::bind_executor(_receiveStrand, asio::use_future(comp_token)));
|
||||
} else {
|
||||
return asio::async_read_until(_socket, _streamBuffer, match_func,
|
||||
asio::bind_executor(_receiveStrand, comp_token));
|
||||
}
|
||||
} else if constexpr (std::derived_from<socket_t,
|
||||
asio::basic_datagram_socket<typename socket_t::protocol_type>>) {
|
||||
// datagram, so it should be received at once
|
||||
if (ctx.use_future) {
|
||||
return _socket.receive(_streamBuffer,
|
||||
asio::bind_executor(_receiveStrand, asio::use_future(comp_token)));
|
||||
} else {
|
||||
return _socket.receive(_streamBuffer, asio::bind_executor(_receiveStrand, comp_token));
|
||||
}
|
||||
} else if constexpr (std::derived_from<socket_t,
|
||||
asio::basic_seq_packet_socket<typename socket_t::protocol_type>>) {
|
||||
// datagram, so it should be received at once
|
||||
if (ctx.use_future) {
|
||||
return _socket.receive(_streamBuffer, *out_flags,
|
||||
asio::bind_executor(_receiveStrand, asio::use_future(comp_token)));
|
||||
} else {
|
||||
return _socket.receive(_streamBuffer, *out_flags, asio::bind_executor(_receiveStrand, comp_token));
|
||||
}
|
||||
} else {
|
||||
static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!");
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
|
||||
template <traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_ACCEPT_TIMEOUT)>
|
||||
auto accept(const endpoint_t& endpoint, const TimeoutT& timeout = DEFAULT_ACCEPT_TIMEOUT)
|
||||
{
|
||||
asio_async_ctx_t ctx = {.use_future = true};
|
||||
std::future<void> ftr = asyncAcept(endpoint, ctx, timeout);
|
||||
ftr.get();
|
||||
}
|
||||
|
||||
template <traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_CONNECT_TIMEOUT)>
|
||||
auto connect(const endpoint_t& endpoint, const TimeoutT& timeout = DEFAULT_CONNECT_TIMEOUT)
|
||||
{
|
||||
asio_async_ctx_t ctx = {.use_future = true};
|
||||
std::future<void> ftr = asyncConnect(endpoint, ctx, timeout);
|
||||
ftr.get();
|
||||
}
|
||||
|
||||
template <traits::adc_input_char_range R, traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_SEND_TIMEOUT)>
|
||||
auto send(const R& msg, const TimeoutT& timeout = DEFAULT_SEND_TIMEOUT)
|
||||
{
|
||||
std::future<void> ftr = asyncSend(msg, timeout, asio::use_future);
|
||||
ftr.get();
|
||||
}
|
||||
|
||||
template <traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_RECEIVE_TIMEOUT)>
|
||||
auto receive(const TimeoutT& timeout = DEFAULT_RECEIVE_TIMEOUT)
|
||||
{
|
||||
std::future<RMSGT> ftr = asyncReceive(timeout, asio::use_future);
|
||||
return ftr.get();
|
||||
}
|
||||
|
||||
void setShutdownType(asio::socket_base::shutdown_type shutdown_type)
|
||||
{
|
||||
_shutdownType = shutdown_type;
|
||||
}
|
||||
|
||||
asio::socket_base::shutdown_type getShutdownType() const
|
||||
{
|
||||
return _shutdownType;
|
||||
}
|
||||
|
||||
std::error_code close()
|
||||
{
|
||||
std::error_code ec;
|
||||
|
||||
_socket.shutdown(_shutdownType, ec);
|
||||
if (!ec) {
|
||||
_socket.close(ec);
|
||||
}
|
||||
|
||||
return ec;
|
||||
}
|
||||
|
||||
protected:
|
||||
netservice_ident_t _ident;
|
||||
|
||||
asio::io_context& _ioContext;
|
||||
asio::io_context::strand _receiveStrand;
|
||||
asio::io_context::strand _sendStrand;
|
||||
|
||||
socket_t _socket;
|
||||
|
||||
// acceptor_t _acceptor;
|
||||
|
||||
asio::streambuf _streamBuffer;
|
||||
|
||||
std::queue<std::vector<char>> _receiveQueue;
|
||||
|
||||
asio::socket_base::shutdown_type _shutdownType = asio::socket_base::shutdown_both;
|
||||
|
||||
template <traits::adc_time_duration_c TimeoutT>
|
||||
std::unique_ptr<asio::steady_timer> getDeadlineTimer(const TimeoutT& timeout, bool arm = true)
|
||||
{
|
||||
std::unique_ptr<asio::steady_timer> timer(_socket.get_executor());
|
||||
|
||||
if (arm) {
|
||||
timer->expires_after(timeout);
|
||||
|
||||
timer->async_wait([this](const std::error_code& ec) {
|
||||
if (!ec) {
|
||||
_socket.cancel(std::make_error_code(std::errc::timed_out));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return timer;
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace adc::impl
|
||||
Loading…
x
Reference in New Issue
Block a user