...
This commit is contained in:
parent
da4b958d6b
commit
7265a68fd6
@ -52,17 +52,30 @@ concept adc_asio_stream_transport_proto_c =
|
|||||||
|
|
||||||
template <typename T>
|
template <typename T>
|
||||||
concept adc_asio_is_future = requires {
|
concept adc_asio_is_future = requires {
|
||||||
[](std::type_identity<asio::use_future_t<>>) {}(std::type_identity<std::remove_cvref_t<T>>());
|
// [](std::type_identity<asio::use_future_t<>>) {}(std::type_identity<std::remove_cvref_t<T>>());
|
||||||
// []<typename AllocatorT>(std::type_identity<asio::use_future_t<AllocatorT>>) {
|
[]<typename AllocatorT>(std::type_identity<asio::use_future_t<AllocatorT>>) {
|
||||||
// }(std::type_identity<std::remove_cvref_t<T>>());
|
}(std::type_identity<std::remove_cvref_t<T>>{});
|
||||||
};
|
};
|
||||||
|
|
||||||
template <typename T>
|
template <typename T>
|
||||||
concept adc_asio_is_awaitable = requires {
|
concept adc_asio_is_awaitable = requires {
|
||||||
[]<typename ExecutorT>(std::type_identity<asio::use_awaitable_t<ExecutorT>>) {
|
[]<typename ExecutorT>(std::type_identity<asio::use_awaitable_t<ExecutorT>>) {
|
||||||
}(std::type_identity<std::remove_cvref_t<T>>());
|
}(std::type_identity<std::remove_cvref_t<T>>{});
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
template <typename T>
|
||||||
|
concept adc_asio_special_comp_token =
|
||||||
|
adc_asio_is_future<T> || adc_asio_is_awaitable<T> || std::same_as<std::remove_cvref_t<T>, asio::deferred_t>;
|
||||||
|
|
||||||
|
|
||||||
|
// template <typename T>
|
||||||
|
// static constexpr bool adc_is_asio_special_comp_token = std::is_same_v<std::remove_cvref_t<T>, asio::use_future_t<>>
|
||||||
|
// ||
|
||||||
|
// std::is_same_v<std::remove_cvref_t<T>, asio::deferred_t> ||
|
||||||
|
// std::is_same_v<std::remove_cvref_t<T>,
|
||||||
|
// asio::use_awaitable_t<>>;
|
||||||
|
|
||||||
struct adc_asio_async_call_ctx_t {
|
struct adc_asio_async_call_ctx_t {
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -91,10 +104,14 @@ template <adc_asio_transport_proto_c TRANSPORT_PROTOT,
|
|||||||
class AdcNetServiceASIOBase : public SESSION_PROTOT
|
class AdcNetServiceASIOBase : public SESSION_PROTOT
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
typedef std::string netservice_ident_t;
|
typedef std::string_view netservice_ident_t;
|
||||||
|
|
||||||
using socket_t = typename TRANSPORT_PROTOT::socket;
|
using socket_t = typename TRANSPORT_PROTOT::socket;
|
||||||
using endpoint_t = typename TRANSPORT_PROTOT::endpoint;
|
using endpoint_t = typename TRANSPORT_PROTOT::endpoint;
|
||||||
|
using acceptor_t =
|
||||||
|
std::conditional_t<std::derived_from<socket_t, asio::basic_datagram_socket<typename socket_t::protocol_type>>,
|
||||||
|
std::nullptr_t, // there is no acceptor
|
||||||
|
typename TRANSPORT_PROTOT::acceptor>;
|
||||||
|
|
||||||
struct asio_async_ctx_t {
|
struct asio_async_ctx_t {
|
||||||
bool use_future = false;
|
bool use_future = false;
|
||||||
@ -112,6 +129,10 @@ public:
|
|||||||
public:
|
public:
|
||||||
contx_t() = default;
|
contx_t() = default;
|
||||||
|
|
||||||
|
contx_t(contx_t&) = default;
|
||||||
|
contx_t(contx_t&&) = default;
|
||||||
|
contx_t(const contx_t&) = default;
|
||||||
|
|
||||||
template <asio::completion_token_for<void(std::error_code)> TokenT>
|
template <asio::completion_token_for<void(std::error_code)> TokenT>
|
||||||
contx_t(TokenT&& token)
|
contx_t(TokenT&& token)
|
||||||
{
|
{
|
||||||
@ -169,20 +190,14 @@ public:
|
|||||||
static constexpr std::chrono::duration DEFAULT_SEND_TIMEOUT = std::chrono::seconds(5);
|
static constexpr std::chrono::duration DEFAULT_SEND_TIMEOUT = std::chrono::seconds(5);
|
||||||
static constexpr std::chrono::duration DEFAULT_RECEIVE_TIMEOUT = std::chrono::seconds(5);
|
static constexpr std::chrono::duration DEFAULT_RECEIVE_TIMEOUT = std::chrono::seconds(5);
|
||||||
|
|
||||||
AdcNetServiceASIOBase(const netservice_ident_t& ident, asio::io_context& ctx)
|
AdcNetServiceASIOBase(asio::io_context& ctx)
|
||||||
: SESSION_PROTOT(),
|
: SESSION_PROTOT(), _ioContext(ctx), _receiveStrand(_ioContext), _receiveQueue(), _socket(_ioContext)
|
||||||
_ident(ident),
|
|
||||||
_ioContext(ctx),
|
|
||||||
_receiveStrand(_ioContext),
|
|
||||||
_receiveQueue(),
|
|
||||||
_socket(_ioContext)
|
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
AdcNetServiceASIOBase(const netservice_ident_t& ident, socket_t socket)
|
AdcNetServiceASIOBase(socket_t socket)
|
||||||
: SESSION_PROTOT(),
|
: SESSION_PROTOT(),
|
||||||
_ident(ident),
|
|
||||||
_ioContext(socket.get_executor()),
|
_ioContext(socket.get_executor()),
|
||||||
_receiveStrand(_ioContext),
|
_receiveStrand(_ioContext),
|
||||||
_receiveQueue(),
|
_receiveQueue(),
|
||||||
@ -191,58 +206,73 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
AdcNetServiceASIOBase(const AdcNetServiceASIOBase&) = delete; // no copy constructor!
|
||||||
|
|
||||||
virtual ~AdcNetServiceASIOBase() {}
|
virtual ~AdcNetServiceASIOBase() {}
|
||||||
|
|
||||||
|
|
||||||
netservice_ident_t ident() const
|
constexpr netservice_ident_t ident() const
|
||||||
{
|
{
|
||||||
return _ident;
|
return _ident;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void clear()
|
/* asynchronuos methods */
|
||||||
|
|
||||||
|
template <asio::completion_token_for<void(std::error_code, AdcNetServiceASIOBase)> TokenT,
|
||||||
|
traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_ACCEPT_TIMEOUT)>
|
||||||
|
static auto asyncAccept(asio::io_context io_ctx,
|
||||||
|
const endpoint_t& endpoint,
|
||||||
|
TokenT&& token,
|
||||||
|
const TimeoutT& timeout = DEFAULT_ACCEPT_TIMEOUT)
|
||||||
{
|
{
|
||||||
// clear receiving messages queue
|
// no acceptor for UDP-sockets
|
||||||
// NOTE: there is no racing condition here since using asio::strand!
|
if constexpr (!std::is_null_pointer_v<acceptor_t>) {
|
||||||
asio::post(_receiveStrand, [this]() { _receiveQueue = {}; });
|
static_assert(false, "INVALID TRANSPORT PROTOCOL TYPE!");
|
||||||
|
}
|
||||||
|
|
||||||
|
auto acc = acceptor_t(io_ctx);
|
||||||
|
auto sock = socket_t(io_ctx);
|
||||||
|
|
||||||
|
auto timer = getDeadlineTimer(timeout);
|
||||||
|
|
||||||
|
return asio::async_compose<TokenT, void(std::error_code)>(
|
||||||
|
[acc = std::move(acc), sock = std::move(sock), timer = std::move(timer), start = true, &endpoint, &io_ctx](
|
||||||
|
auto& self, std::error_code ec = {}) mutable {
|
||||||
|
if (!ec) {
|
||||||
|
if (start) {
|
||||||
|
start = false;
|
||||||
|
try {
|
||||||
|
acc = acceptor_t(io_ctx, endpoint);
|
||||||
|
} catch (std::system_error err) {
|
||||||
|
timer->cancel();
|
||||||
|
self.complete(err.code());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
return acc.async_accept(sock, std::move(self));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isTimeout(timer, ec)) {
|
||||||
|
ec = std::make_error_code(std::errc::timed_out);
|
||||||
|
} else { // an error occured in async_connect
|
||||||
|
timer->cancel();
|
||||||
|
}
|
||||||
|
|
||||||
|
self.complete(ec, AdcNetServiceASIOBase(std::move(sock)));
|
||||||
|
},
|
||||||
|
token, io_ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// template <adc_completion_token_c<void(std::error_code)> TokenT,
|
template <asio::completion_token_for<void(std::error_code)> TokenT,
|
||||||
// traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_CONNECT_TIMEOUT)>
|
|
||||||
// auto asyncConnect(const endpoint_t& endpoint, TokenT&& token, const TimeoutT& timeout = DEFAULT_CONNECT_TIMEOUT)
|
|
||||||
// {
|
|
||||||
// static_assert(!std::is_same_v<TokenT, async_call_ctx_t>, "'async_call_ctx_t'-TYPE MUST NOT BE USED!");
|
|
||||||
|
|
||||||
// auto timer = getDeadlineTimer(timeout);
|
|
||||||
|
|
||||||
// auto comp_token = [wrapper = traits::adc_pf_wrapper(std::forward<TokenT>(token)),
|
|
||||||
// timer = std::move(timer)](std::error_code ec) {
|
|
||||||
// if (isTimeout(timer, ec)) {
|
|
||||||
// ec = std::make_error_code(std::errc::timed_out);
|
|
||||||
// } else {
|
|
||||||
// timer->cancel();
|
|
||||||
// }
|
|
||||||
|
|
||||||
// if constexpr (!adc_asio_is_future<TokenT>) {
|
|
||||||
// std::get<0>(wrapper)(ec);
|
|
||||||
// }
|
|
||||||
// };
|
|
||||||
|
|
||||||
// if constexpr (!adc_asio_is_future<TokenT>) {
|
|
||||||
// return _socket.async_connect(endpoint, asio::use_future(std::move(comp_token)));
|
|
||||||
// } else {
|
|
||||||
// return _socket.async_connect(endpoint, std::move(comp_token));
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
template <asio::completion_token_for<void(std::error_code)> CompTokenT,
|
|
||||||
traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_CONNECT_TIMEOUT)>
|
traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_CONNECT_TIMEOUT)>
|
||||||
auto asyncConnect(const endpoint_t& endpoint, CompTokenT&& token, const TimeoutT& timeout = DEFAULT_CONNECT_TIMEOUT)
|
auto asyncConnect(const endpoint_t& endpoint, TokenT&& token, const TimeoutT& timeout = DEFAULT_CONNECT_TIMEOUT)
|
||||||
{
|
{
|
||||||
auto timer = getDeadlineTimer(timeout);
|
auto timer = getDeadlineTimer(_socket, timeout);
|
||||||
|
|
||||||
return asio::async_compose<CompTokenT, void(asio::error_code)>(
|
return asio::async_compose<TokenT, void(asio::error_code)>(
|
||||||
[start = true, endpoint, timer = std::move(timer), this](auto& self, asio::error_code ec = {}) mutable {
|
[start = true, endpoint, timer = std::move(timer), this](auto& self, asio::error_code ec = {}) mutable {
|
||||||
if (!ec) {
|
if (!ec) {
|
||||||
if (start) {
|
if (start) {
|
||||||
@ -268,49 +298,42 @@ public:
|
|||||||
traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_SEND_TIMEOUT)>
|
traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_SEND_TIMEOUT)>
|
||||||
auto asyncSend(const MessageT& msg, TokenT&& token, const TimeoutT& timeout = DEFAULT_SEND_TIMEOUT)
|
auto asyncSend(const MessageT& msg, TokenT&& token, const TimeoutT& timeout = DEFAULT_SEND_TIMEOUT)
|
||||||
{
|
{
|
||||||
static_assert(!std::is_same_v<TokenT, async_call_ctx_t>, "'async_call_ctx_t'-TYPE MUST NOT BE USED!");
|
|
||||||
|
|
||||||
// create buffer sequence of sending session protocol representation of the input message
|
// create buffer sequence of sending session protocol representation of the input message
|
||||||
std::vector<asio::const_buffer> buff_seq;
|
std::vector<asio::const_buffer> buff_seq;
|
||||||
std::ranges::for_each(this->toProto(msg), [&buff_seq](const auto& el) { buff_seq.emplace_back(el); });
|
std::ranges::for_each(this->toProto(msg), [&buff_seq](const auto& el) { buff_seq.emplace_back(el); });
|
||||||
|
|
||||||
auto timer = getDeadlineTimer(timeout);
|
auto timer = getDeadlineTimer(_socket, timeout);
|
||||||
|
|
||||||
auto comp_token = [wrapper = traits::adc_pf_wrapper(std::forward<TokenT>(token)), buff_seq,
|
return asio::async_compose<TokenT, void(asio::error_code)>(
|
||||||
timer = std::move(timer)](std::error_code ec, size_t) {
|
[start = true, buff_seq = std::move(buff_seq), timer = std::move(timer), this](
|
||||||
|
auto& self, asio::error_code ec = {}) mutable {
|
||||||
|
if (!ec) {
|
||||||
|
if (start) {
|
||||||
|
start = false;
|
||||||
|
if constexpr (std::derived_from<socket_t,
|
||||||
|
asio::basic_stream_socket<typename socket_t::protocol_type>>) {
|
||||||
|
return asio::async_write(_socket, buff_seq, std::move(self));
|
||||||
|
} else if constexpr (std::derived_from<socket_t, asio::basic_datagram_socket<
|
||||||
|
typename socket_t::protocol_type>>) {
|
||||||
|
return _socket.async_send(buff_seq, std::move(self));
|
||||||
|
} else if constexpr (std::derived_from<socket_t, asio::basic_seq_packet_socket<
|
||||||
|
typename socket_t::protocol_type>>) {
|
||||||
|
return _socket.async_send(buff_seq, std::move(self));
|
||||||
|
} else {
|
||||||
|
static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isTimeout(timer, ec)) {
|
||||||
|
ec = std::make_error_code(std::errc::timed_out);
|
||||||
|
} else { // an error occured in async_write/async_send
|
||||||
timer->cancel();
|
timer->cancel();
|
||||||
|
}
|
||||||
|
|
||||||
if constexpr (!adc_asio_is_future<TokenT>) {
|
self.complete(ec);
|
||||||
std::get<0>(wrapper)(ec);
|
},
|
||||||
}
|
token, _socket);
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
if constexpr (adc_asio_is_future<TokenT>) {
|
|
||||||
if constexpr (std::derived_from<socket_t, asio::basic_stream_socket<typename socket_t::protocol_type>>) {
|
|
||||||
return asio::async_write(_socket, buff_seq, asio::use_future(std::move(comp_token)));
|
|
||||||
} else if constexpr (std::derived_from<socket_t,
|
|
||||||
asio::basic_datagram_socket<typename socket_t::protocol_type>>) {
|
|
||||||
return _socket.async_send(buff_seq, asio::use_future(std::move(comp_token)));
|
|
||||||
} else if constexpr (std::derived_from<socket_t,
|
|
||||||
asio::basic_seq_packet_socket<typename socket_t::protocol_type>>) {
|
|
||||||
return _socket.async_send(buff_seq, asio::use_future(std::move(comp_token)));
|
|
||||||
} else {
|
|
||||||
static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!");
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if constexpr (std::derived_from<socket_t, asio::basic_stream_socket<typename socket_t::protocol_type>>) {
|
|
||||||
return asio::async_write(_socket, buff_seq, std::move(comp_token));
|
|
||||||
} else if constexpr (std::derived_from<socket_t,
|
|
||||||
asio::basic_datagram_socket<typename socket_t::protocol_type>>) {
|
|
||||||
return _socket.async_send(buff_seq, std::move(comp_token));
|
|
||||||
} else if constexpr (std::derived_from<socket_t,
|
|
||||||
asio::basic_seq_packet_socket<typename socket_t::protocol_type>>) {
|
|
||||||
return _socket.async_send(buff_seq, std::move(comp_token));
|
|
||||||
} else {
|
|
||||||
static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -319,8 +342,8 @@ public:
|
|||||||
{
|
{
|
||||||
static_assert(!std::is_same_v<TokenT, async_call_ctx_t>, "'async_call_ctx_t'-TYPE MUST NOT BE USED!");
|
static_assert(!std::is_same_v<TokenT, async_call_ctx_t>, "'async_call_ctx_t'-TYPE MUST NOT BE USED!");
|
||||||
|
|
||||||
// check completion token signature
|
// check completion token signature and deduce message type
|
||||||
if constexpr (!(adc_asio_is_future<TokenT> || std::is_same_v<TokenT, asio::deferred_t>)) {
|
if constexpr (!adc_asio_special_comp_token<TokenT>) {
|
||||||
static_assert(traits::adc_func_traits<TokenT>::arity == 2, "INVALID COMPLETION TOKEN SIGNATURE!");
|
static_assert(traits::adc_func_traits<TokenT>::arity == 2, "INVALID COMPLETION TOKEN SIGNATURE!");
|
||||||
static_assert(std::is_same_v<std::remove_cvref_t<traits::adc_func_arg1_t<TokenT>>, std::error_code>,
|
static_assert(std::is_same_v<std::remove_cvref_t<traits::adc_func_arg1_t<TokenT>>, std::error_code>,
|
||||||
"INVALID COMPLETION TOKEN SIGNATURE!");
|
"INVALID COMPLETION TOKEN SIGNATURE!");
|
||||||
@ -330,40 +353,58 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
using msg_t = std::conditional_t<
|
using msg_t = std::conditional_t<
|
||||||
adc_asio_is_future<TokenT> || std::is_same_v<TokenT, asio::deferred_t>, RMSGT,
|
adc_asio_special_comp_token<TokenT>, RMSGT,
|
||||||
std::remove_cvref_t<std::tuple_element_t<1, typename traits::adc_func_traits<TokenT>::args_t>>>;
|
std::remove_cvref_t<std::tuple_element_t<1, typename traits::adc_func_traits<TokenT>::args_t>>>;
|
||||||
|
|
||||||
if (_receiveQueue.size()) { // return message from queue
|
|
||||||
auto async_init = [this](auto&& compl_hndl) {
|
|
||||||
asio::post(_receiveStrand, [&compl_hndl, this]() {
|
|
||||||
RMSGT msg = _receiveQueue.front();
|
|
||||||
_receiveQueue.pop();
|
|
||||||
compl_hndl(std::error_code(), std::move(msg));
|
|
||||||
});
|
|
||||||
};
|
|
||||||
|
|
||||||
if (adc_asio_is_future<TokenT>) {
|
|
||||||
return asio::async_initiate<TokenT, void(std::error_code, msg_t)>(
|
|
||||||
async_init, asio::use_future(ctx.receive_comp_token));
|
|
||||||
} else {
|
|
||||||
return asio::async_initiate<TokenT, void(std::error_code, msg_t)>(async_init, ctx.receive_comp_token);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
auto out_flags = std::make_unique<asio::socket_base::message_flags>();
|
|
||||||
|
|
||||||
auto timer = getDeadlineTimer(timeout);
|
|
||||||
|
|
||||||
auto s_res = std::make_shared<std::invoke_result_t<decltype(SESSION_PROTOT::search), RMSGT>>();
|
auto s_res = std::make_shared<std::invoke_result_t<decltype(SESSION_PROTOT::search), RMSGT>>();
|
||||||
|
|
||||||
// NOTE: this completion token is safe (_streamBuffer access) in multithread context since all the instances
|
auto out_flags = std::make_shared<asio::socket_base::message_flags>();
|
||||||
// will be executed in serialized execution manner (see asio::strand)
|
|
||||||
auto comp_token = [&ctx, s_res, timer = std::move(timer), out_flags = std::move(out_flags), this](
|
auto timer = getDeadlineTimer(_socket, timeout);
|
||||||
std::error_code ec, size_t nbytes) {
|
|
||||||
timer->cancel();
|
return asio::async_compose<TokenT, void(asio::error_code, msg_t)>(
|
||||||
|
[s_res, out_flags, start = true, timer = std::move(timer), this](auto& self, asio::error_code ec = {},
|
||||||
|
size_t = 0) mutable {
|
||||||
RMSGT msg;
|
RMSGT msg;
|
||||||
|
|
||||||
if (!ec && nbytes) {
|
if (!ec) {
|
||||||
|
if (start) {
|
||||||
|
start = false;
|
||||||
|
if (_receiveQueue.size()) { // return message from queue
|
||||||
|
msg = _receiveQueue.front();
|
||||||
|
_receiveQueue.pop();
|
||||||
|
if constexpr (std::is_same_v<msg_t, RMSGT>) {
|
||||||
|
self.complete(std::error_code(), std::move(msg));
|
||||||
|
} else {
|
||||||
|
// msg_t user_msg{msg.begin(), msg.end()};
|
||||||
|
self.complete(std::error_code(), {msg.begin(), msg.end()});
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if constexpr (std::derived_from<socket_t,
|
||||||
|
asio::basic_stream_socket<typename socket_t::protocol_type>>) {
|
||||||
|
// adapt to ASIO's MatchCondition
|
||||||
|
auto match_func = [s_res, this]<typename IT>(IT begin, IT end) {
|
||||||
|
*s_res = this->search(std::span(begin, end));
|
||||||
|
return std::make_tuple(std::get<1>(*s_res), std::get<2>(*s_res));
|
||||||
|
};
|
||||||
|
|
||||||
|
return asio::async_read_until(_socket, _streamBuffer, match_func, std::move(self));
|
||||||
|
|
||||||
|
} else if constexpr (std::derived_from<socket_t, asio::basic_datagram_socket<
|
||||||
|
typename socket_t::protocol_type>>) {
|
||||||
|
// datagram, so it should be received at once
|
||||||
|
return _socket.receive(_streamBuffer, std::move(self));
|
||||||
|
} else if constexpr (std::derived_from<socket_t, asio::basic_seq_packet_socket<
|
||||||
|
typename socket_t::protocol_type>>) {
|
||||||
|
// datagram, so it should be received at once
|
||||||
|
return _socket.receive(_streamBuffer, *out_flags, std::move(self));
|
||||||
|
} else {
|
||||||
|
static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// here, the iterators were computed in MatchCondition called by asio::async_read_until function!!!
|
// here, the iterators were computed in MatchCondition called by asio::async_read_until function!!!
|
||||||
std::string_view net_pack{std::get<0>(*s_res), std::get<1>(*s_res)};
|
std::string_view net_pack{std::get<0>(*s_res), std::get<1>(*s_res)};
|
||||||
|
|
||||||
@ -390,93 +431,35 @@ public:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ctx.use_future) {
|
if (isTimeout(timer, ec)) {
|
||||||
return msg;
|
ec = std::make_error_code(std::errc::timed_out);
|
||||||
|
} else { // an error occured in async_connect
|
||||||
|
timer->cancel();
|
||||||
|
}
|
||||||
|
|
||||||
|
if constexpr (std::is_same_v<msg_t, RMSGT>) {
|
||||||
|
self.complete(ec, std::move(msg));
|
||||||
} else {
|
} else {
|
||||||
ctx.receive_comp_token(ec, std::move(msg));
|
// msg_t user_msg{msg.begin(), msg.end()};
|
||||||
|
self.complete(ec, {msg.begin(), msg.end()});
|
||||||
}
|
}
|
||||||
};
|
},
|
||||||
|
token, _socket);
|
||||||
if (ctx.use_future) {
|
|
||||||
comp_token = asio::use_future(comp_token);
|
|
||||||
}
|
|
||||||
|
|
||||||
comp_token = asio::bind_executor(_receiveStrand, comp_token);
|
|
||||||
|
|
||||||
if constexpr (std::derived_from<socket_t, asio::basic_stream_socket<typename socket_t::protocol_type>>) {
|
|
||||||
// adapt to ASIO's MatchCondition
|
|
||||||
auto match_func = [s_res, this]<typename IT>(IT begin, IT end) {
|
|
||||||
*s_res = this->search(std::span(begin, end));
|
|
||||||
return std::make_tuple(std::get<1>(*s_res), std::get<2>(*s_res));
|
|
||||||
};
|
|
||||||
|
|
||||||
return asio::async_read_until(_socket, _streamBuffer, match_func, comp_token);
|
|
||||||
|
|
||||||
} else if constexpr (std::derived_from<socket_t,
|
|
||||||
asio::basic_datagram_socket<typename socket_t::protocol_type>>) {
|
|
||||||
// datagram, so it should be received at once
|
|
||||||
return _socket.receive(_streamBuffer, comp_token);
|
|
||||||
} else if constexpr (std::derived_from<socket_t,
|
|
||||||
asio::basic_seq_packet_socket<typename socket_t::protocol_type>>) {
|
|
||||||
// datagram, so it should be received at once
|
|
||||||
return _socket.receive(_streamBuffer, *out_flags, comp_token);
|
|
||||||
} else {
|
|
||||||
static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!");
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
if constexpr (std::derived_from<socket_t, asio::basic_stream_socket<typename socket_t::protocol_type>>) {
|
|
||||||
// adapt to ASIO's MatchCondition
|
|
||||||
auto match_func = [s_res, this]<typename IT>(IT begin, IT end) {
|
|
||||||
*s_res = this->search(std::span(begin, end));
|
|
||||||
return std::make_tuple(std::get<1>(*s_res), std::get<2>(*s_res));
|
|
||||||
};
|
|
||||||
|
|
||||||
if (ctx.use_future) {
|
|
||||||
return asio::async_read_until(_socket, _streamBuffer, match_func,
|
|
||||||
asio::bind_executor(_receiveStrand, asio::use_future(comp_token)));
|
|
||||||
} else {
|
|
||||||
return asio::async_read_until(_socket, _streamBuffer, match_func,
|
|
||||||
asio::bind_executor(_receiveStrand, comp_token));
|
|
||||||
}
|
|
||||||
} else if constexpr (std::derived_from<socket_t,
|
|
||||||
asio::basic_datagram_socket<typename socket_t::protocol_type>>) {
|
|
||||||
// datagram, so it should be received at once
|
|
||||||
if (ctx.use_future) {
|
|
||||||
return _socket.receive(_streamBuffer,
|
|
||||||
asio::bind_executor(_receiveStrand, asio::use_future(comp_token)));
|
|
||||||
} else {
|
|
||||||
return _socket.receive(_streamBuffer, asio::bind_executor(_receiveStrand, comp_token));
|
|
||||||
}
|
|
||||||
} else if constexpr (std::derived_from<socket_t,
|
|
||||||
asio::basic_seq_packet_socket<typename socket_t::protocol_type>>) {
|
|
||||||
// datagram, so it should be received at once
|
|
||||||
if (ctx.use_future) {
|
|
||||||
return _socket.receive(_streamBuffer, *out_flags,
|
|
||||||
asio::bind_executor(_receiveStrand, asio::use_future(comp_token)));
|
|
||||||
} else {
|
|
||||||
return _socket.receive(_streamBuffer, *out_flags, asio::bind_executor(_receiveStrand, comp_token));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!");
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* blocking methods */
|
||||||
|
|
||||||
template <traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_ACCEPT_TIMEOUT)>
|
template <traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_ACCEPT_TIMEOUT)>
|
||||||
auto accept(const endpoint_t& endpoint, const TimeoutT& timeout = DEFAULT_ACCEPT_TIMEOUT)
|
static auto accept(const endpoint_t& endpoint, const TimeoutT& timeout = DEFAULT_ACCEPT_TIMEOUT)
|
||||||
{
|
{
|
||||||
asio_async_ctx_t ctx = {.use_future = true};
|
std::future<void> ftr = asyncAccept(endpoint, asio::use_future, timeout);
|
||||||
std::future<void> ftr = asyncAcept(endpoint, ctx, timeout);
|
|
||||||
ftr.get();
|
ftr.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
template <traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_CONNECT_TIMEOUT)>
|
template <traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_CONNECT_TIMEOUT)>
|
||||||
auto connect(const endpoint_t& endpoint, const TimeoutT& timeout = DEFAULT_CONNECT_TIMEOUT)
|
auto connect(const endpoint_t& endpoint, const TimeoutT& timeout = DEFAULT_CONNECT_TIMEOUT)
|
||||||
{
|
{
|
||||||
asio_async_ctx_t ctx = {.use_future = true};
|
std::future<void> ftr = asyncConnect(endpoint, asio::use_future, timeout);
|
||||||
std::future<void> ftr = asyncConnect(endpoint, ctx, timeout);
|
|
||||||
ftr.get();
|
ftr.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -494,16 +477,6 @@ public:
|
|||||||
return ftr.get();
|
return ftr.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
void setShutdownType(asio::socket_base::shutdown_type shutdown_type)
|
|
||||||
{
|
|
||||||
_shutdownType = shutdown_type;
|
|
||||||
}
|
|
||||||
|
|
||||||
asio::socket_base::shutdown_type getShutdownType() const
|
|
||||||
{
|
|
||||||
return _shutdownType;
|
|
||||||
}
|
|
||||||
|
|
||||||
std::error_code close()
|
std::error_code close()
|
||||||
{
|
{
|
||||||
std::error_code ec;
|
std::error_code ec;
|
||||||
@ -516,8 +489,34 @@ public:
|
|||||||
return ec;
|
return ec;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* additional ASIO-related methods */
|
||||||
|
|
||||||
|
void clear()
|
||||||
|
{
|
||||||
|
// clear receiving messages queue
|
||||||
|
// NOTE: there is no racing condition here since using asio::strand!
|
||||||
|
asio::post(_receiveStrand, [this]() { _receiveQueue = {}; });
|
||||||
|
}
|
||||||
|
|
||||||
|
void setShutdownType(asio::socket_base::shutdown_type shutdown_type)
|
||||||
|
{
|
||||||
|
_shutdownType = shutdown_type;
|
||||||
|
}
|
||||||
|
|
||||||
|
asio::socket_base::shutdown_type getShutdownType() const
|
||||||
|
{
|
||||||
|
return _shutdownType;
|
||||||
|
}
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
netservice_ident_t _ident;
|
static constexpr netservice_ident_t _ident =
|
||||||
|
std::derived_from<socket_t, asio::basic_stream_socket<typename socket_t::protocol_type>>
|
||||||
|
? "STREAM-SOCKET NETWORK SERVICE"
|
||||||
|
: std::derived_from<socket_t, asio::basic_datagram_socket<typename socket_t::protocol_type>>
|
||||||
|
? "DATAGRAM-SOCKET NETWORK SERVICE"
|
||||||
|
: std::derived_from<socket_t, asio::basic_seq_packet_socket<typename socket_t::protocol_type>>
|
||||||
|
? "SEQPACKET-SOCKET NETWORK SERVICE"
|
||||||
|
: "UNKNOWN";
|
||||||
|
|
||||||
asio::io_context& _ioContext;
|
asio::io_context& _ioContext;
|
||||||
asio::io_context::strand _receiveStrand;
|
asio::io_context::strand _receiveStrand;
|
||||||
@ -532,17 +531,19 @@ protected:
|
|||||||
|
|
||||||
asio::socket_base::shutdown_type _shutdownType = asio::socket_base::shutdown_both;
|
asio::socket_base::shutdown_type _shutdownType = asio::socket_base::shutdown_both;
|
||||||
|
|
||||||
template <traits::adc_time_duration_c TimeoutT>
|
template <typename CancelableT, traits::adc_time_duration_c TimeoutT>
|
||||||
std::unique_ptr<asio::steady_timer> getDeadlineTimer(const TimeoutT& timeout, bool arm = true)
|
static std::unique_ptr<asio::steady_timer> getDeadlineTimer(CancelableT& obj,
|
||||||
|
const TimeoutT& timeout,
|
||||||
|
bool arm = true)
|
||||||
{
|
{
|
||||||
auto timer = std::make_unique<asio::steady_timer>(_socket.get_executor());
|
auto timer = std::make_unique<asio::steady_timer>(obj.get_executor());
|
||||||
|
|
||||||
if (arm) {
|
if (arm) {
|
||||||
timer->expires_after(timeout);
|
timer->expires_after(timeout);
|
||||||
|
|
||||||
timer->async_wait([this](const std::error_code& ec) {
|
timer->async_wait([&obj](const std::error_code& ec) mutable {
|
||||||
if (!ec) {
|
if (!ec) {
|
||||||
_socket.cancel();
|
obj.cancel();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
@ -10,7 +10,7 @@ int main()
|
|||||||
|
|
||||||
asio::io_context ctx;
|
asio::io_context ctx;
|
||||||
|
|
||||||
adc::impl::AdcNetServiceASIOBase<asio::ip::tcp, adc::AdcStopSeqSessionProto<>> srv("TCP NETSERVICE", ctx);
|
adc::impl::AdcNetServiceASIOBase<asio::ip::tcp, adc::AdcStopSeqSessionProto<>> srv(ctx);
|
||||||
|
|
||||||
adc::impl::AdcNetServiceASIOBase<asio::ip::tcp, adc::AdcStopSeqSessionProto<>>::asio_async_ctx_t srv_ctx;
|
adc::impl::AdcNetServiceASIOBase<asio::ip::tcp, adc::AdcStopSeqSessionProto<>>::asio_async_ctx_t srv_ctx;
|
||||||
srv_ctx.accept_comp_token = [](std::error_code ec) {
|
srv_ctx.accept_comp_token = [](std::error_code ec) {
|
||||||
@ -25,7 +25,7 @@ int main()
|
|||||||
adc::impl::AdcNetServiceASIOBase<asio::ip::tcp, adc::AdcStopSeqSessionProto<>>::contx_t s_ctx;
|
adc::impl::AdcNetServiceASIOBase<asio::ip::tcp, adc::AdcStopSeqSessionProto<>>::contx_t s_ctx;
|
||||||
|
|
||||||
srv.asyncConnect(ept_c, s_ctx);
|
srv.asyncConnect(ept_c, s_ctx);
|
||||||
srv.asyncConnect(ept_c, asio::use_future);
|
auto res = srv.asyncConnect(ept_c, asio::use_awaitable);
|
||||||
|
|
||||||
ctx.run();
|
ctx.run();
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user