...
This commit is contained in:
parent
a5c5e5057a
commit
ad0bdf062a
@ -683,6 +683,7 @@ protected:
|
||||
|
||||
#ifdef USE_OPENSSL_WITH_ASIO
|
||||
|
||||
/*
|
||||
template <adc_asio_tls_transport_proto_c TRANSPORT_PROTOT,
|
||||
interfaces::adc_netsession_proto_c<std::string_view> SESSION_PROTOT,
|
||||
traits::adc_output_char_range RMSGT =
|
||||
@ -885,7 +886,7 @@ protected:
|
||||
return preverified_ok;
|
||||
}
|
||||
};
|
||||
|
||||
*/
|
||||
#endif
|
||||
|
||||
|
||||
|
||||
739
net/asio/adc_netsrv_asio.h
Normal file
739
net/asio/adc_netsrv_asio.h
Normal file
@ -0,0 +1,739 @@
|
||||
#pragma once
|
||||
|
||||
#include <asio/any_completion_handler.hpp>
|
||||
#include <asio/basic_datagram_socket.hpp>
|
||||
#include <asio/basic_seq_packet_socket.hpp>
|
||||
#include <asio/basic_stream_socket.hpp>
|
||||
#include <asio/bind_executor.hpp>
|
||||
#include <asio/compose.hpp>
|
||||
#include <asio/deferred.hpp>
|
||||
#include <asio/ip/tcp.hpp>
|
||||
#include <asio/ip/udp.hpp>
|
||||
#include <asio/local/datagram_protocol.hpp>
|
||||
#include <asio/local/seq_packet_protocol.hpp>
|
||||
#include <asio/local/stream_protocol.hpp>
|
||||
#include <asio/read.hpp>
|
||||
#include <asio/read_until.hpp>
|
||||
#include <asio/steady_timer.hpp>
|
||||
#include <asio/strand.hpp>
|
||||
#include <asio/streambuf.hpp>
|
||||
#include <asio/use_awaitable.hpp>
|
||||
#include <asio/use_future.hpp>
|
||||
#include <asio/write.hpp>
|
||||
|
||||
#include <functional>
|
||||
#include <queue>
|
||||
|
||||
#ifdef USE_OPENSSL_WITH_ASIO
|
||||
|
||||
#include <asio/ssl.hpp>
|
||||
#include <asio/ssl/stream.hpp>
|
||||
|
||||
#endif
|
||||
|
||||
|
||||
#include "../../common/adc_traits.h"
|
||||
#include "../adc_net_concepts.h"
|
||||
#include "../adc_netproto.h"
|
||||
|
||||
namespace adc::traits
|
||||
{
|
||||
|
||||
// special ASIO-related template specializations
|
||||
|
||||
template <>
|
||||
struct adc_func_traits<asio::use_future_t<>> {
|
||||
using ret_t = std::nullptr_t;
|
||||
using args_t = std::tuple<std::nullptr_t, std::nullptr_t>;
|
||||
using arg1_t = std::nullptr_t;
|
||||
static constexpr size_t arity = 0;
|
||||
};
|
||||
|
||||
template <>
|
||||
struct adc_func_traits<asio::use_awaitable_t<>> {
|
||||
using ret_t = std::nullptr_t;
|
||||
using args_t = std::tuple<std::nullptr_t, std::nullptr_t>;
|
||||
using arg1_t = std::nullptr_t;
|
||||
static constexpr size_t arity = 0;
|
||||
};
|
||||
|
||||
template <>
|
||||
struct adc_func_traits<asio::deferred_t> {
|
||||
using ret_t = std::nullptr_t;
|
||||
using args_t = std::tuple<std::nullptr_t, std::nullptr_t>;
|
||||
using arg1_t = std::nullptr_t;
|
||||
static constexpr size_t arity = 0;
|
||||
};
|
||||
|
||||
} // namespace adc::traits
|
||||
|
||||
namespace adc::impl
|
||||
{
|
||||
|
||||
template <typename T>
|
||||
concept adc_asio_transport_proto_c =
|
||||
std::derived_from<T, asio::ip::tcp> || std::derived_from<T, asio::ip::udp> ||
|
||||
std::derived_from<T, asio::local::seq_packet_protocol> || std::derived_from<T, asio::local::stream_protocol> ||
|
||||
std::derived_from<T, asio::local::datagram_protocol>;
|
||||
|
||||
|
||||
template <typename T>
|
||||
concept adc_asio_tls_transport_proto_c =
|
||||
std::derived_from<T, asio::ip::tcp> || std::derived_from<T, asio::local::stream_protocol> ||
|
||||
std::derived_from<T, asio::local::seq_packet_protocol>;
|
||||
|
||||
template <typename T>
|
||||
concept adc_asio_stream_transport_proto_c =
|
||||
std::derived_from<T, asio::ip::tcp> || std::derived_from<T, asio::local::stream_protocol>;
|
||||
|
||||
|
||||
template <typename T>
|
||||
concept adc_asio_is_future = requires {
|
||||
// [](std::type_identity<asio::use_future_t<>>) {}(std::type_identity<std::remove_cvref_t<T>>());
|
||||
[]<typename AllocatorT>(std::type_identity<asio::use_future_t<AllocatorT>>) {
|
||||
}(std::type_identity<std::remove_cvref_t<T>>{});
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
concept adc_asio_is_awaitable = requires {
|
||||
[]<typename ExecutorT>(std::type_identity<asio::use_awaitable_t<ExecutorT>>) {
|
||||
}(std::type_identity<std::remove_cvref_t<T>>{});
|
||||
};
|
||||
|
||||
|
||||
template <typename T>
|
||||
concept adc_asio_special_comp_token_c =
|
||||
adc_asio_is_future<T> || adc_asio_is_awaitable<T> || std::same_as<std::remove_cvref_t<T>, asio::deferred_t>;
|
||||
|
||||
|
||||
namespace details
|
||||
{
|
||||
|
||||
|
||||
// template <interfaces::adc_netservice_c SRVT>
|
||||
template <typename SRVT>
|
||||
class AdcAcceptorASIO
|
||||
{
|
||||
public:
|
||||
using netservice_t = SRVT;
|
||||
|
||||
// deduce needed types
|
||||
using transport_proto_t = typename SRVT::endpoint_t::protocol_type;
|
||||
using socket_t = typename SRVT::endpoint_t::protocol_type::socket;
|
||||
using acceptor_t = std::conditional_t<std::derived_from<socket_t, asio::basic_datagram_socket<transport_proto_t>>,
|
||||
std::nullptr_t, // there is no acceptor
|
||||
typename transport_proto_t::acceptor>;
|
||||
|
||||
static constexpr std::chrono::duration DEFAULT_ACCEPT_TIMEOUT = std::chrono::seconds::max();
|
||||
|
||||
AdcAcceptorASIO(asio::io_context& io_ctx, const netservice_t::endpoint_t& endpoint)
|
||||
: _ioContext(io_ctx), _acceptor(io_ctx, endpoint)
|
||||
{
|
||||
}
|
||||
|
||||
AdcAcceptorASIO(const AdcAcceptorASIO& other)
|
||||
: _ioContext(other._ioContext), _acceptor(std::move(other._acceptor)) {
|
||||
|
||||
};
|
||||
|
||||
template <asio::completion_token_for<void(std::error_code, netservice_t)> TokenT,
|
||||
traits::adc_time_duration_c DT = decltype(DEFAULT_ACCEPT_TIMEOUT)>
|
||||
auto asyncAccept(TokenT&& token, const DT& timeout = DEFAULT_ACCEPT_TIMEOUT)
|
||||
{
|
||||
if constexpr (std::is_null_pointer_v<acceptor_t>) {
|
||||
static_assert(false, "INVALID TRANSPORT PROTOCOL TYPE!");
|
||||
}
|
||||
|
||||
enum { starting, native_accept, post_accept, finishing };
|
||||
|
||||
auto timer = netservice_t::getDeadlineTimer(_acceptor, timeout);
|
||||
|
||||
// auto srv = std::make_shared<netservice_t>(_ioContext);
|
||||
auto srv = std::make_unique<netservice_t>(_ioContext);
|
||||
|
||||
// return asio::async_compose<TokenT, void(std::error_code, netservice_t)>(
|
||||
// asyncAcceptImplementation{this, _acceptor, std::move(timer), srv, AdcAcceptorASIO::starting}, token,
|
||||
// _ioContext);
|
||||
return asio::async_compose<TokenT, void(std::error_code, netservice_t)>(
|
||||
[timer = std::move(timer), srv = std::move(srv), state = AdcAcceptorASIO::starting, this](
|
||||
auto& self, std::error_code ec = {}) mutable {
|
||||
if (!ec) {
|
||||
switch (state) {
|
||||
case starting:
|
||||
// _starting(srv, state, self);
|
||||
_starting(srv, state, std::move(self));
|
||||
break;
|
||||
case native_accept:
|
||||
// _native_accept(srv, state, self);
|
||||
_native_accept(srv, state, std::move(self));
|
||||
break;
|
||||
case post_accept:
|
||||
// _post_accept(srv, state, self);
|
||||
_post_accept(srv, state, std::move(self));
|
||||
break;
|
||||
case finishing:
|
||||
// _finishing(srv, state, self);
|
||||
_finishing(srv, state, std::move(self));
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (netservice_t::isTimeout(timer, ec)) {
|
||||
ec = std::make_error_code(std::errc::timed_out);
|
||||
} else { // an error occured in async_accept
|
||||
timer->cancel();
|
||||
}
|
||||
|
||||
self.complete(ec, std::move(*srv));
|
||||
},
|
||||
token, _ioContext);
|
||||
}
|
||||
|
||||
template <traits::adc_time_duration_c DT = decltype(DEFAULT_ACCEPT_TIMEOUT)>
|
||||
auto accept(const DT& timeout = DEFAULT_ACCEPT_TIMEOUT)
|
||||
{
|
||||
auto f = asyncAccept(asio::use_future, timeout);
|
||||
|
||||
return f.get();
|
||||
}
|
||||
|
||||
protected:
|
||||
asio::io_context& _ioContext;
|
||||
acceptor_t _acceptor;
|
||||
|
||||
enum state_t { starting, native_accept, post_accept, finishing };
|
||||
|
||||
// using self_t = std::function<void(std::error_code)>;
|
||||
using self_t = std::function<void(std::error_code)>;
|
||||
|
||||
|
||||
|
||||
typedef std::function<void(std::unique_ptr<netservice_t>&, state_t&, self_t)> stage_func_t;
|
||||
// typedef std::function<void(std::shared_ptr<netservice_t>&, state_t&, self_t)> stage_func_t;
|
||||
|
||||
stage_func_t _starting = [this](auto&, state_t& state, self_t self) mutable {
|
||||
state = native_accept;
|
||||
// asio::post(_ioContext, std::bind([](auto, auto) {}, std::move(self), std::error_code{}));
|
||||
};
|
||||
|
||||
stage_func_t _native_accept = [this](auto& srv, state_t& state, self_t self) mutable {
|
||||
state = post_accept;
|
||||
_acceptor.async_accept(srv->_socket, std::move(self));
|
||||
};
|
||||
|
||||
stage_func_t _post_accept = [this](auto&, state_t& state, self_t self) mutable { state = finishing; };
|
||||
|
||||
stage_func_t _finishing = [](auto&, state_t&, self_t) mutable {};
|
||||
|
||||
/*
|
||||
struct asyncAcceptImplementation {
|
||||
AdcAcceptorASIO* acp;
|
||||
acceptor_t& _acceptor;
|
||||
std::shared_ptr<asio::steady_timer> timer;
|
||||
std::shared_ptr<netservice_t> srv;
|
||||
// std::unique_ptr<asio::steady_timer> timer;
|
||||
// std::unique_ptr<netservice_t> srv;
|
||||
state_t state;
|
||||
|
||||
asyncAcceptImplementation(AdcAcceptorASIO* a,
|
||||
acceptor_t& ar,
|
||||
std::shared_ptr<asio::steady_timer> tm,
|
||||
std::shared_ptr<netservice_t> s,
|
||||
state_t st)
|
||||
: acp(a), _acceptor(ar), timer(tm), srv(s), state(st)
|
||||
{
|
||||
}
|
||||
|
||||
asyncAcceptImplementation(const asyncAcceptImplementation& other) : _acceptor(other._acceptor)
|
||||
{
|
||||
acp = other.acp;
|
||||
timer = other.timer;
|
||||
srv = other.srv;
|
||||
state = other.state;
|
||||
}
|
||||
|
||||
void operator()(auto& self, std::error_code ec = {})
|
||||
{
|
||||
if (!ec) {
|
||||
switch (state) {
|
||||
case starting:
|
||||
acp->_starting(srv, state, self);
|
||||
break;
|
||||
case native_accept:
|
||||
acp->_native_accept(srv, state, self);
|
||||
break;
|
||||
case post_accept:
|
||||
acp->_post_accept(srv, state, self);
|
||||
break;
|
||||
case finishing:
|
||||
acp->_finishing(srv, state, self);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (netservice_t::isTimeout(timer, ec)) {
|
||||
ec = std::make_error_code(std::errc::timed_out);
|
||||
} else { // an error occured in async_accept
|
||||
timer->cancel();
|
||||
}
|
||||
|
||||
self.complete(ec, std::move(*srv));
|
||||
}
|
||||
};
|
||||
*/
|
||||
};
|
||||
|
||||
} // namespace details
|
||||
|
||||
template <adc_asio_transport_proto_c TRANSPORT_PROTOT,
|
||||
interfaces::adc_netsession_proto_c<std::string_view> SESSION_PROTOT,
|
||||
traits::adc_output_char_range RMSGT =
|
||||
std::vector<char>> // used only for inner storing of message byte sequence
|
||||
class AdcNetServiceASIOBase : public SESSION_PROTOT
|
||||
{
|
||||
public:
|
||||
friend details::AdcAcceptorASIO<AdcNetServiceASIOBase>;
|
||||
|
||||
// typedefs to satisfy 'adc_netservice_c' concept
|
||||
typedef std::string_view netservice_ident_t;
|
||||
|
||||
typedef std::vector<char> send_msg_t; // in general, only one of several possible
|
||||
typedef RMSGT recv_msg_t; // in general, only one of several possible (see class template arguments declaration)
|
||||
typedef traits::adc_common_duration_t timeout_t;
|
||||
using endpoint_t = typename TRANSPORT_PROTOT::endpoint;
|
||||
|
||||
// typedefs for completion tokens (callbacks, required by 'adc_netservice_c' concept)
|
||||
typedef std::function<void(std::error_code)> async_connect_callback_t;
|
||||
typedef std::function<void(std::error_code)> async_send_callback_t;
|
||||
typedef std::function<void(std::error_code, RMSGT)> async_receive_callback_t;
|
||||
|
||||
|
||||
// typedefs from transport protocol
|
||||
using socket_t = typename TRANSPORT_PROTOT::socket;
|
||||
|
||||
typedef details::AdcAcceptorASIO<AdcNetServiceASIOBase> acceptor_t;
|
||||
|
||||
static constexpr std::chrono::duration DEFAULT_CONNECT_TIMEOUT = std::chrono::seconds(10);
|
||||
static constexpr std::chrono::duration DEFAULT_SEND_TIMEOUT = std::chrono::seconds(5);
|
||||
static constexpr std::chrono::duration DEFAULT_RECEIVE_TIMEOUT = std::chrono::seconds(5);
|
||||
|
||||
AdcNetServiceASIOBase(asio::io_context& ctx)
|
||||
: SESSION_PROTOT(), _ioContext(ctx), _receiveStrand(_ioContext), _socket(_ioContext), _receiveQueue()
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
AdcNetServiceASIOBase(socket_t socket)
|
||||
: SESSION_PROTOT(),
|
||||
_ioContext(static_cast<asio::io_context&>(socket.get_executor().context())),
|
||||
_socket(std::move(socket)),
|
||||
_receiveStrand(_ioContext),
|
||||
_receiveQueue()
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
// NOTE: CANNOT MOVE asio::streambuf CORRECTLY?!!!
|
||||
// AdcNetServiceASIOBase(AdcNetServiceASIOBase&& other) = default;
|
||||
AdcNetServiceASIOBase(AdcNetServiceASIOBase&& other)
|
||||
: _ioContext(other._ioContext),
|
||||
_receiveStrand(std::move(other._receiveStrand)),
|
||||
_socket(std::move(other._socket)),
|
||||
_streamBuffer(),
|
||||
_receiveQueue(std::move(other._receiveQueue))
|
||||
|
||||
{
|
||||
auto bytes = asio::buffer_copy(_streamBuffer.prepare(other._streamBuffer.size()), other._streamBuffer.data());
|
||||
_streamBuffer.commit(bytes);
|
||||
}
|
||||
|
||||
|
||||
// AdcNetServiceASIOBase(AdcNetServiceASIOBase&& other) = delete;
|
||||
AdcNetServiceASIOBase(const AdcNetServiceASIOBase&) = delete; // no copy constructor!
|
||||
|
||||
virtual ~AdcNetServiceASIOBase() {}
|
||||
|
||||
|
||||
AdcNetServiceASIOBase& operator=(const AdcNetServiceASIOBase&) = delete;
|
||||
|
||||
// AdcNetServiceASIOBase& operator=(AdcNetServiceASIOBase&& other) = delete;
|
||||
// AdcNetServiceASIOBase& operator=(AdcNetServiceASIOBase&& other) = default;
|
||||
AdcNetServiceASIOBase& operator=(AdcNetServiceASIOBase&& other)
|
||||
{
|
||||
_ioContext = other._ioContext;
|
||||
_receiveStrand = std::move(other._receiveStrand);
|
||||
_receiveQueue = std::move(other._receiveQueue);
|
||||
_socket = std::move(other._socket);
|
||||
|
||||
_streamBuffer.consume(_streamBuffer.size());
|
||||
|
||||
auto bytes = asio::buffer_copy(_streamBuffer.prepare(other._streamBuffer.size()), other._streamBuffer.data());
|
||||
_streamBuffer.commit(bytes);
|
||||
|
||||
return *this;
|
||||
};
|
||||
|
||||
|
||||
constexpr netservice_ident_t ident() const
|
||||
{
|
||||
return _ident;
|
||||
}
|
||||
|
||||
|
||||
/* asynchronuos methods */
|
||||
|
||||
template <asio::completion_token_for<void(std::error_code)> TokenT,
|
||||
traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_CONNECT_TIMEOUT)>
|
||||
auto asyncConnect(const endpoint_t& endpoint, TokenT&& token, const TimeoutT& timeout = DEFAULT_CONNECT_TIMEOUT)
|
||||
{
|
||||
auto timer = getDeadlineTimer(_socket, timeout);
|
||||
|
||||
return asio::async_compose<TokenT, void(std::error_code)>(
|
||||
[start = true, endpoint, timer = std::move(timer), this](auto& self, std::error_code ec = {}) mutable {
|
||||
if (!ec) {
|
||||
if (start) {
|
||||
start = false;
|
||||
return _socket.async_connect(endpoint, std::move(self));
|
||||
}
|
||||
}
|
||||
|
||||
if (isTimeout(timer, ec)) {
|
||||
ec = std::make_error_code(std::errc::timed_out);
|
||||
} else { // an error occured in async_connect
|
||||
timer->cancel();
|
||||
}
|
||||
|
||||
self.complete(ec);
|
||||
},
|
||||
token, _socket);
|
||||
}
|
||||
|
||||
|
||||
template <traits::adc_input_char_range MessageT,
|
||||
asio::completion_token_for<void(std::error_code ec)> TokenT,
|
||||
traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_SEND_TIMEOUT)>
|
||||
auto asyncSend(const MessageT& msg, TokenT&& token, const TimeoutT& timeout = DEFAULT_SEND_TIMEOUT)
|
||||
{
|
||||
// create buffer sequence of sending session protocol representation of the input message
|
||||
std::vector<asio::const_buffer> buff_seq;
|
||||
// std::ranges::for_each(this->toProto(msg), [&buff_seq](const auto& el) { buff_seq.emplace_back(el); });
|
||||
std::ranges::for_each(this->toProto(msg),
|
||||
[&buff_seq](const auto& el) { buff_seq.emplace_back(el.data(), el.size()); });
|
||||
|
||||
auto timer = getDeadlineTimer(_socket, timeout);
|
||||
|
||||
return asio::async_compose<TokenT, void(std::error_code)>(
|
||||
[start = true, buff_seq = std::move(buff_seq), timer = std::move(timer), this](
|
||||
auto& self, std::error_code ec = {}, size_t = 0) mutable {
|
||||
if (!ec) {
|
||||
if (start) {
|
||||
start = false;
|
||||
if constexpr (std::derived_from<socket_t,
|
||||
asio::basic_stream_socket<typename socket_t::protocol_type>>) {
|
||||
return asio::async_write(_socket, buff_seq, std::move(self));
|
||||
} else if constexpr (std::derived_from<socket_t, asio::basic_datagram_socket<
|
||||
typename socket_t::protocol_type>>) {
|
||||
return _socket.async_send(buff_seq, std::move(self));
|
||||
} else if constexpr (std::derived_from<socket_t, asio::basic_seq_packet_socket<
|
||||
typename socket_t::protocol_type>>) {
|
||||
return _socket.async_send(buff_seq, std::move(self));
|
||||
} else {
|
||||
static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (isTimeout(timer, ec)) {
|
||||
ec = std::make_error_code(std::errc::timed_out);
|
||||
} else { // an error occured in async_write/async_send
|
||||
timer->cancel();
|
||||
}
|
||||
|
||||
self.complete(ec);
|
||||
},
|
||||
token, _socket);
|
||||
}
|
||||
|
||||
|
||||
template <typename TokenT, traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_RECEIVE_TIMEOUT)>
|
||||
auto asyncReceive(TokenT&& token, const TimeoutT& timeout = DEFAULT_RECEIVE_TIMEOUT)
|
||||
{
|
||||
// static asio::streambuf _streamBuffer;
|
||||
|
||||
// check completion token signature and deduce message type
|
||||
// if constexpr (!adc_asio_special_comp_token_c<TokenT> && !is_async_ctx_t) {
|
||||
if constexpr (!adc_asio_special_comp_token_c<TokenT>) {
|
||||
static_assert(traits::adc_func_traits<TokenT>::arity == 2, "INVALID COMPLETION TOKEN SIGNATURE!");
|
||||
static_assert(std::is_same_v<std::remove_cvref_t<traits::adc_func_arg1_t<TokenT>>, std::error_code>,
|
||||
"INVALID COMPLETION TOKEN SIGNATURE!");
|
||||
static_assert(traits::adc_output_char_range<
|
||||
std::tuple_element_t<1, typename traits::adc_func_traits<TokenT>::args_t>>,
|
||||
"INVALID COMPLETION TOKEN SIGNATURE!");
|
||||
}
|
||||
|
||||
using msg_t = std::conditional_t<
|
||||
// adc_asio_special_comp_token_c<TokenT> || is_async_ctx_t, RMSGT,
|
||||
adc_asio_special_comp_token_c<TokenT>, RMSGT,
|
||||
std::remove_cvref_t<std::tuple_element_t<1, typename traits::adc_func_traits<TokenT>::args_t>>>;
|
||||
|
||||
auto out_flags = std::make_shared<asio::socket_base::message_flags>();
|
||||
|
||||
auto timer = getDeadlineTimer(_socket, timeout);
|
||||
|
||||
return asio::async_compose<TokenT, void(std::error_code, msg_t)>(
|
||||
[out_flags, do_read = true, timer = std::move(timer), this](auto& self, std::error_code ec = {},
|
||||
size_t nbytes = 0) mutable {
|
||||
msg_t msg;
|
||||
|
||||
if (!ec) {
|
||||
if (do_read) {
|
||||
do_read = false;
|
||||
if (_receiveQueue.size()) { // return message from queue
|
||||
timer->cancel();
|
||||
auto imsg = _receiveQueue.front();
|
||||
_receiveQueue.pop();
|
||||
if constexpr (std::is_same_v<msg_t, RMSGT>) {
|
||||
self.complete(std::error_code(), std::move(imsg));
|
||||
} else {
|
||||
self.complete(std::error_code(), {imsg.begin(), imsg.end()});
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
auto n_avail = _socket.available();
|
||||
auto buff = _streamBuffer.prepare(n_avail ? n_avail : 1);
|
||||
|
||||
if constexpr (std::derived_from<socket_t,
|
||||
asio::basic_stream_socket<typename socket_t::protocol_type>>) {
|
||||
return asio::async_read(_socket, std::move(buff), asio::transfer_at_least(1),
|
||||
std::move(self));
|
||||
} else if constexpr (std::derived_from<socket_t, asio::basic_datagram_socket<
|
||||
typename socket_t::protocol_type>>) {
|
||||
// datagram, so it should be received at once
|
||||
return _socket.async_receive(std::move(buff), std::move(self));
|
||||
} else if constexpr (std::derived_from<socket_t, asio::basic_seq_packet_socket<
|
||||
typename socket_t::protocol_type>>) {
|
||||
// datagram, so it should be received at once
|
||||
return _socket.async_receive(std::move(buff), *out_flags, std::move(self));
|
||||
} else {
|
||||
static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!");
|
||||
}
|
||||
}
|
||||
|
||||
// zero-length message for SEQ_PACK sockets is EOF
|
||||
if constexpr (std::derived_from<socket_t,
|
||||
asio::basic_seq_packet_socket<typename socket_t::protocol_type>>) {
|
||||
if (!nbytes) {
|
||||
timer->cancel();
|
||||
self.complete(std::error_code(asio::error::misc_errors::eof), std::move(msg));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
_streamBuffer.commit(nbytes);
|
||||
|
||||
// if (!nbytes) {
|
||||
// do_read = true;
|
||||
// asio::post(std::move(self)); // initiate consequence socket's read operation
|
||||
// return;
|
||||
// }
|
||||
|
||||
auto start_ptr = static_cast<const char*>(_streamBuffer.data().data());
|
||||
|
||||
auto net_pack = this->search(std::span(start_ptr, _streamBuffer.size()));
|
||||
if (net_pack.empty()) {
|
||||
do_read = true;
|
||||
asio::post(std::move(self)); // initiate consequence socket's read operation
|
||||
return;
|
||||
}
|
||||
|
||||
timer->cancel(); // there were no errors in the asynchronous read-operation, so stop timer
|
||||
|
||||
// here one has at least a single message
|
||||
|
||||
std::ranges::copy(this->fromProto(net_pack), std::back_inserter(msg));
|
||||
_streamBuffer.consume(net_pack.size());
|
||||
|
||||
|
||||
while (_streamBuffer.size()) { // search for possible additional session protocol packets
|
||||
start_ptr = static_cast<const char*>(_streamBuffer.data().data());
|
||||
|
||||
net_pack = this->search(std::span(start_ptr, _streamBuffer.size()));
|
||||
|
||||
if (!net_pack.empty()) {
|
||||
_receiveQueue.emplace();
|
||||
std::ranges::copy(this->fromProto(net_pack), std::back_inserter(_receiveQueue.back()));
|
||||
_streamBuffer.consume(net_pack.size());
|
||||
} else {
|
||||
break; // exit and hold remaining bytes in stream buffer
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (isTimeout(timer, ec)) {
|
||||
ec = std::make_error_code(std::errc::timed_out);
|
||||
} else { // an error occured in async_*
|
||||
timer->cancel();
|
||||
}
|
||||
|
||||
if constexpr (std::is_same_v<msg_t, RMSGT>) {
|
||||
self.complete(ec, std::move(msg));
|
||||
} else {
|
||||
self.complete(ec, {msg.begin(), msg.end()});
|
||||
}
|
||||
|
||||
// if constexpr (adc_asio_special_comp_token_c<TokenT>) {
|
||||
// self.complete(ec, std::move(msg));
|
||||
// } else { // may be of non-RMSGT type
|
||||
// self.complete(ec, {msg.begin(), msg.end()});
|
||||
// }
|
||||
},
|
||||
token, _receiveStrand);
|
||||
}
|
||||
|
||||
/* blocking methods */
|
||||
|
||||
template <traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_CONNECT_TIMEOUT)>
|
||||
auto connect(const endpoint_t& endpoint, const TimeoutT& timeout = DEFAULT_CONNECT_TIMEOUT)
|
||||
{
|
||||
std::future<void> ftr = asyncConnect(endpoint, asio::use_future, timeout);
|
||||
ftr.get();
|
||||
}
|
||||
|
||||
template <traits::adc_input_char_range R, traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_SEND_TIMEOUT)>
|
||||
auto send(const R& msg, const TimeoutT& timeout = DEFAULT_SEND_TIMEOUT)
|
||||
{
|
||||
std::future<void> ftr = asyncSend(msg, asio::use_future, timeout);
|
||||
ftr.get();
|
||||
}
|
||||
|
||||
template <traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_RECEIVE_TIMEOUT)>
|
||||
auto receive(const TimeoutT& timeout = DEFAULT_RECEIVE_TIMEOUT)
|
||||
{
|
||||
std::future<RMSGT> ftr = asyncReceive(asio::use_future, timeout);
|
||||
return ftr.get();
|
||||
}
|
||||
|
||||
// one still may receive messages from queue!
|
||||
std::error_code close()
|
||||
{
|
||||
std::error_code ec;
|
||||
|
||||
_socket.shutdown(_shutdownType, ec);
|
||||
if (!ec) {
|
||||
_socket.close(ec);
|
||||
}
|
||||
|
||||
return ec;
|
||||
}
|
||||
|
||||
/* additional ASIO-related methods */
|
||||
|
||||
void clearRcvQueue()
|
||||
{
|
||||
// clear receiving messages queue
|
||||
// NOTE: there is no racing condition here since using asio::strand!
|
||||
asio::post(_receiveStrand, [this]() {
|
||||
//
|
||||
_receiveQueue = {};
|
||||
});
|
||||
}
|
||||
|
||||
void clearRcvBuff()
|
||||
{
|
||||
asio::post(_receiveStrand, [this]() {
|
||||
//
|
||||
_streamBuffer.consume(_streamBuffer.size());
|
||||
});
|
||||
}
|
||||
|
||||
void clearRcvData()
|
||||
{
|
||||
asio::post(_receiveStrand, [this]() {
|
||||
_receiveQueue = {};
|
||||
_streamBuffer.consume(_streamBuffer.size());
|
||||
});
|
||||
}
|
||||
|
||||
void setShutdownType(asio::socket_base::shutdown_type shutdown_type)
|
||||
{
|
||||
_shutdownType = shutdown_type;
|
||||
}
|
||||
|
||||
asio::socket_base::shutdown_type getShutdownType() const
|
||||
{
|
||||
return _shutdownType;
|
||||
}
|
||||
|
||||
protected:
|
||||
static constexpr netservice_ident_t _ident =
|
||||
std::derived_from<socket_t, asio::basic_stream_socket<typename socket_t::protocol_type>>
|
||||
? "STREAM-SOCKET NETWORK SERVICE"
|
||||
: std::derived_from<socket_t, asio::basic_datagram_socket<typename socket_t::protocol_type>>
|
||||
? "DATAGRAM-SOCKET NETWORK SERVICE"
|
||||
: std::derived_from<socket_t, asio::basic_seq_packet_socket<typename socket_t::protocol_type>>
|
||||
? "SEQPACKET-SOCKET NETWORK SERVICE"
|
||||
: "UNKNOWN";
|
||||
|
||||
asio::io_context& _ioContext;
|
||||
asio::io_context::strand _receiveStrand;
|
||||
|
||||
socket_t _socket;
|
||||
|
||||
asio::streambuf _streamBuffer;
|
||||
|
||||
std::queue<std::vector<char>> _receiveQueue;
|
||||
|
||||
asio::socket_base::shutdown_type _shutdownType = asio::socket_base::shutdown_both;
|
||||
|
||||
|
||||
// public:
|
||||
template <typename CancelableT, traits::adc_time_duration_c TimeoutT>
|
||||
static std::unique_ptr<asio::steady_timer> getDeadlineTimer(CancelableT& obj,
|
||||
const TimeoutT& timeout,
|
||||
bool arm = true)
|
||||
{
|
||||
auto timer = std::make_unique<asio::steady_timer>(obj.get_executor());
|
||||
|
||||
// if (timeout == std::chrono::duration<typename TimeoutT::rep, typename TimeoutT::period>::max()) {
|
||||
// return timer; // do not arm the timer if MAX duration are given
|
||||
// }
|
||||
|
||||
if (arm) {
|
||||
std::chrono::seconds max_d = std::chrono::duration_cast<std::chrono::seconds>(
|
||||
std::chrono::steady_clock::time_point::max() - std::chrono::steady_clock::now() -
|
||||
std::chrono::seconds(1));
|
||||
|
||||
timer->expires_after(timeout < max_d ? timeout : max_d); // to avoid overflow!
|
||||
// timer->expires_after(timeout);
|
||||
|
||||
timer->async_wait([&obj](const std::error_code& ec) mutable {
|
||||
if (!ec) {
|
||||
obj.cancel();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return timer;
|
||||
}
|
||||
|
||||
template <typename TimerT>
|
||||
static bool isTimeout(const std::unique_ptr<TimerT>& timer, const std::error_code& ec)
|
||||
{
|
||||
auto exp_time = timer->expiry();
|
||||
return (exp_time < std::chrono::steady_clock::now()) && (ec == asio::error::operation_aborted);
|
||||
}
|
||||
|
||||
template <typename TimerT>
|
||||
static bool isTimeout(const std::shared_ptr<TimerT>& timer, const std::error_code& ec)
|
||||
{
|
||||
auto exp_time = timer->expiry();
|
||||
return (exp_time < std::chrono::steady_clock::now()) && (ec == asio::error::operation_aborted);
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace adc::impl
|
||||
@ -3,6 +3,7 @@
|
||||
|
||||
#include "../net/adc_netproto.h"
|
||||
#include "../net/asio/adc_netservice_asio.h"
|
||||
// #include "../net/asio/adc_netsrv_asio.h"
|
||||
|
||||
template <typename T>
|
||||
void receive(T srv)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user