...
This commit is contained in:
parent
2e6deffdd2
commit
d1d822415b
@ -22,7 +22,6 @@
|
||||
|
||||
#include <functional>
|
||||
#include <queue>
|
||||
#include <set>
|
||||
|
||||
#ifdef USE_OPENSSL_WITH_ASIO
|
||||
|
||||
@ -83,6 +82,11 @@ concept adc_asio_transport_proto_c =
|
||||
std::derived_from<T, asio::local::datagram_protocol>;
|
||||
|
||||
|
||||
template <typename T>
|
||||
concept adc_asio_tls_transport_proto_c =
|
||||
std::derived_from<T, asio::ip::tcp> || std::derived_from<T, asio::local::stream_protocol> ||
|
||||
std::derived_from<T, asio::local::seq_packet_protocol>;
|
||||
|
||||
template <typename T>
|
||||
concept adc_asio_stream_transport_proto_c =
|
||||
std::derived_from<T, asio::ip::tcp> || std::derived_from<T, asio::local::stream_protocol>;
|
||||
@ -107,20 +111,6 @@ concept adc_asio_special_comp_token_c =
|
||||
adc_asio_is_future<T> || adc_asio_is_awaitable<T> || std::same_as<std::remove_cvref_t<T>, asio::deferred_t>;
|
||||
|
||||
|
||||
// class adc_asio_async_call_ctx_t
|
||||
// {
|
||||
// };
|
||||
|
||||
|
||||
// template <typename T, typename SignatureT = void>
|
||||
// concept adc_completion_token_c =
|
||||
// std::same_as<T, adc_asio_async_call_ctx_t> ||
|
||||
// (traits::adc_is_callable<T> &&
|
||||
// std::conditional_t<std::same_as<SignatureT, void>,
|
||||
// std::true_type,
|
||||
// std::bool_constant<asio::completion_token_for<T, SignatureT>>>::value);
|
||||
|
||||
|
||||
|
||||
template <adc_asio_transport_proto_c TRANSPORT_PROTOT,
|
||||
interfaces::adc_netsession_proto_c<std::string_view> SESSION_PROTOT,
|
||||
@ -138,7 +128,6 @@ public:
|
||||
using endpoint_t = typename TRANSPORT_PROTOT::endpoint;
|
||||
|
||||
// typedefs for completion tokens (callbacks, required by 'adc_netservice_c' concept)
|
||||
// typedef std::function<void(std::error_code)> async_accept_callback_t;
|
||||
typedef std::function<void(std::error_code)> async_connect_callback_t;
|
||||
typedef std::function<void(std::error_code)> async_send_callback_t;
|
||||
typedef std::function<void(std::error_code, RMSGT)> async_receive_callback_t;
|
||||
@ -146,7 +135,7 @@ public:
|
||||
|
||||
// typedefs from transport protocol
|
||||
using socket_t = typename TRANSPORT_PROTOT::socket;
|
||||
|
||||
using sock_stream_t = socket_t&;
|
||||
|
||||
// acceptor
|
||||
class acceptor_t
|
||||
@ -172,10 +161,8 @@ public:
|
||||
typedef AdcNetServiceASIOBase netservice_t;
|
||||
typedef std::shared_ptr<netservice_t> sptr_netservice_t;
|
||||
|
||||
// typedef std::function<void(std::error_code, AdcNetServiceASIOBase)> async_accept_callback_t;
|
||||
typedef std::function<void(std::error_code, sptr_netservice_t)> async_accept_callback_t;
|
||||
|
||||
// template <asio::completion_token_for<void(std::error_code, AdcNetServiceASIOBase)> TokenT,
|
||||
template <asio::completion_token_for<void(std::error_code, sptr_netservice_t)> TokenT,
|
||||
traits::adc_time_duration_c DT = decltype(DEFAULT_ACCEPT_TIMEOUT)>
|
||||
auto asyncAccept(TokenT&& token, const DT& timeout = DEFAULT_ACCEPT_TIMEOUT)
|
||||
@ -188,7 +175,6 @@ public:
|
||||
_socket = AdcNetServiceASIOBase::socket_t{_ioContext};
|
||||
auto timer = getDeadlineTimer(_acceptor, timeout);
|
||||
|
||||
// return asio::async_compose<TokenT, void(std::error_code, AdcNetServiceASIOBase)>(
|
||||
return asio::async_compose<TokenT, void(std::error_code, sptr_netservice_t)>(
|
||||
[timer = std::move(timer), start = true, this](auto& self, std::error_code ec = {}) mutable {
|
||||
if (!ec) {
|
||||
@ -200,7 +186,6 @@ public:
|
||||
}
|
||||
} catch (std::system_error err) {
|
||||
timer->cancel();
|
||||
// self.complete(err.code(), AdcNetServiceASIOBase(_ioContext));
|
||||
self.complete(err.code(), std::make_shared<netservice_t>(_ioContext));
|
||||
return;
|
||||
}
|
||||
@ -233,7 +218,6 @@ public:
|
||||
|
||||
template <traits::adc_time_duration_c DT = decltype(DEFAULT_ACCEPT_TIMEOUT)>
|
||||
auto accept(const DT& timeout = DEFAULT_ACCEPT_TIMEOUT)
|
||||
// AdcNetServiceASIOBase accept(const DT& timeout = DEFAULT_ACCEPT_TIMEOUT)
|
||||
{
|
||||
auto f = asyncAccept(asio::use_future, timeout);
|
||||
|
||||
@ -242,7 +226,6 @@ public:
|
||||
|
||||
template <traits::adc_time_duration_c DT = decltype(DEFAULT_ACCEPT_TIMEOUT)>
|
||||
auto accept(const endpoint_t& endpoint, const DT& timeout = DEFAULT_ACCEPT_TIMEOUT)
|
||||
// AdcNetServiceASIOBase accept(const endpoint_t& endpoint, const DT& timeout = DEFAULT_ACCEPT_TIMEOUT)
|
||||
{
|
||||
return accept(endpoint, timeout);
|
||||
}
|
||||
@ -284,21 +267,6 @@ public:
|
||||
|
||||
// NOTE: CANNOT MOVE asio::streambuf CORRECTLY?!!!
|
||||
AdcNetServiceASIOBase(AdcNetServiceASIOBase&& other) = delete;
|
||||
// AdcNetServiceASIOBase(AdcNetServiceASIOBase&& other)
|
||||
// : SESSION_PROTOT(std::move(other)),
|
||||
// _ioContext(other._ioContext),
|
||||
// _receiveStrand(std::move(other._receiveStrand)),
|
||||
// _receiveQueue(std::move(_receiveQueue)),
|
||||
// _socket(std::move(other._socket)),
|
||||
// _streamBuffer()
|
||||
// {
|
||||
// if (this == &other)
|
||||
// return;
|
||||
|
||||
// auto bytes = asio::buffer_copy(_streamBuffer.prepare(other._streamBuffer.size()),
|
||||
// other._streamBuffer.data()); _streamBuffer.commit(bytes);
|
||||
// };
|
||||
|
||||
AdcNetServiceASIOBase(const AdcNetServiceASIOBase&) = delete; // no copy constructor!
|
||||
|
||||
virtual ~AdcNetServiceASIOBase() {}
|
||||
@ -307,23 +275,6 @@ public:
|
||||
AdcNetServiceASIOBase& operator=(const AdcNetServiceASIOBase&) = delete;
|
||||
|
||||
AdcNetServiceASIOBase& operator=(AdcNetServiceASIOBase&& other) = delete;
|
||||
// {
|
||||
// if (this != &other) {
|
||||
// close();
|
||||
// _streamBuffer.consume(_streamBuffer.size());
|
||||
|
||||
// auto bytes =
|
||||
// asio::buffer_copy(_streamBuffer.prepare(other._streamBuffer.size()), other._streamBuffer.data());
|
||||
// _streamBuffer.commit(bytes);
|
||||
|
||||
// _ioContext = other._ioContext;
|
||||
// _receiveStrand = std::move(other._receiveStrand), _socket = std::move(other._socket);
|
||||
|
||||
// _receiveQueue = other._receiveQueue;
|
||||
// }
|
||||
|
||||
// return *this;
|
||||
// }
|
||||
|
||||
|
||||
constexpr netservice_ident_t ident() const
|
||||
@ -411,8 +362,6 @@ public:
|
||||
template <typename TokenT, traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_RECEIVE_TIMEOUT)>
|
||||
auto asyncReceive(TokenT&& token, const TimeoutT& timeout = DEFAULT_RECEIVE_TIMEOUT)
|
||||
{
|
||||
// constexpr auto is_async_ctx_t = std::same_as<std::remove_cvref_t<TokenT>, async_call_ctx_t>;
|
||||
|
||||
// check completion token signature and deduce message type
|
||||
// if constexpr (!adc_asio_special_comp_token_c<TokenT> && !is_async_ctx_t) {
|
||||
if constexpr (!adc_asio_special_comp_token_c<TokenT>) {
|
||||
@ -453,7 +402,8 @@ public:
|
||||
return;
|
||||
}
|
||||
|
||||
auto buff = _streamBuffer.prepare(2880);
|
||||
auto n_avail = _socket.available();
|
||||
auto buff = _streamBuffer.prepare(n_avail ? n_avail : 1);
|
||||
|
||||
if constexpr (std::derived_from<socket_t,
|
||||
asio::basic_stream_socket<typename socket_t::protocol_type>>) {
|
||||
@ -470,22 +420,6 @@ public:
|
||||
} else {
|
||||
static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!");
|
||||
}
|
||||
// if constexpr (std::derived_from<socket_t,
|
||||
// asio::basic_stream_socket<typename socket_t::protocol_type>>)
|
||||
// {
|
||||
// return asio::async_read(_socket, _streamBuffer, asio::transfer_at_least(1),
|
||||
// std::move(self));
|
||||
// } else if constexpr (std::derived_from<socket_t, asio::basic_datagram_socket<
|
||||
// typename socket_t::protocol_type>>) {
|
||||
// // datagram, so it should be received at once
|
||||
// return _socket.async_receive(_streamBuffer, std::move(self));
|
||||
// } else if constexpr (std::derived_from<socket_t, asio::basic_seq_packet_socket<
|
||||
// typename socket_t::protocol_type>>) {
|
||||
// // datagram, so it should be received at once
|
||||
// return _socket.async_receive(_streamBuffer, *out_flags, std::move(self));
|
||||
// } else {
|
||||
// static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!");
|
||||
// }
|
||||
}
|
||||
|
||||
// zero-length message for SEQ_PACK sockets is EOF
|
||||
@ -493,7 +427,7 @@ public:
|
||||
asio::basic_seq_packet_socket<typename socket_t::protocol_type>>) {
|
||||
if (!nbytes) {
|
||||
timer->cancel();
|
||||
self.complete(std::make_error_code(std::errc::connection_aborted), std::move(msg));
|
||||
self.complete(std::error_code(asio::error::misc_errors::eof), std::move(msg));
|
||||
return;
|
||||
}
|
||||
}
|
||||
@ -676,27 +610,30 @@ protected:
|
||||
|
||||
#ifdef USE_OPENSSL_WITH_ASIO
|
||||
|
||||
template <adc_asio_stream_transport_proto_c TRANSPORT_PROTOT,
|
||||
template <adc_asio_tls_transport_proto_c TRANSPORT_PROTOT,
|
||||
interfaces::adc_netsession_proto_c<std::string_view> SESSION_PROTOT,
|
||||
traits::adc_output_char_range RMSGT =
|
||||
std::vector<char>> // used only for inner storing of message byte sequence
|
||||
class AdcNetServiceASIOTLS : public AdcNetServiceASIOBase<TRANSPORT_PROTOT, SESSION_PROTOT, RMSGT>
|
||||
class AdcNetServiceASIOTLS : public TRANSPORT_PROTOT
|
||||
{
|
||||
typedef AdcNetServiceASIOBase<TRANSPORT_PROTOT, SESSION_PROTOT, RMSGT> base_t;
|
||||
|
||||
public:
|
||||
using typename base_t::socket_t;
|
||||
using socket_t = typename TRANSPORT_PROTOT::socket;
|
||||
typedef asio::ssl::stream<socket_t> tls_stream_t;
|
||||
|
||||
using base_t::connect;
|
||||
using base_t::getShutdownType;
|
||||
using base_t::receive;
|
||||
using base_t::send;
|
||||
using base_t::setShutdownType;
|
||||
// TLS certificate attributes comparison function:
|
||||
// 'serial' - as returned by OpenSSL BN_bn2hex
|
||||
// 'fingerprint' - as returned by OpenSSL X509_digest
|
||||
// 'depth' - depth in chain
|
||||
// the function must return 0 - if comparison failed; otherwise - something != 0
|
||||
typedef std::function<int(const std::string& serial, const std::vector<unsigned char>& fingerprint, int depth)>
|
||||
cert_comp_func_t;
|
||||
|
||||
|
||||
// reimplement acceptor class
|
||||
class acceptor_t : public base_t::acceptor_t
|
||||
class acceptor_t : public AdcNetServiceASIOBase<TRANSPORT_PROTOT, SESSION_PROTOT, RMSGT>::acceptor_t
|
||||
{
|
||||
using base_t = AdcNetServiceASIOBase<TRANSPORT_PROTOT, SESSION_PROTOT, RMSGT>;
|
||||
|
||||
public:
|
||||
typedef AdcNetServiceASIOTLS netservice_t;
|
||||
typedef std::shared_ptr<netservice_t> sptr_netservice_t;
|
||||
@ -711,6 +648,7 @@ public:
|
||||
{
|
||||
enum { starting, handshaking, finishing };
|
||||
|
||||
|
||||
this->_socket = base_t::socket_t(this->_ioContext);
|
||||
auto timer = getDeadlineTimer(this->_acceptor, timeout);
|
||||
}
|
||||
@ -732,11 +670,11 @@ public:
|
||||
{
|
||||
std::error_code ec;
|
||||
|
||||
_tlsStream.shutdown(ec); // shutdown OpenSSL stream
|
||||
this->_sock_stream.shutdown(ec); // shutdown OpenSSL stream
|
||||
if (!ec) {
|
||||
_tlsStream.lowest_layer().shutdown(this->_shutdownType, ec);
|
||||
this->_sock_stream.lowest_layer().shutdown(this->_shutdownType, ec);
|
||||
if (!ec) {
|
||||
_tlsStream.lowest_layer().close(ec);
|
||||
this->_sock_stream.lowest_layer().close(ec);
|
||||
}
|
||||
}
|
||||
|
||||
@ -744,7 +682,70 @@ public:
|
||||
}
|
||||
|
||||
protected:
|
||||
asio::ssl::stream<socket_t> _tlsStream;
|
||||
asio::ssl::context _tlsContext;
|
||||
asio::ssl::verify_mode _tlsPeerVerifyMode;
|
||||
std::string _tlsCertFingerprintDigest;
|
||||
cert_comp_func_t _tlsCertCompFunc;
|
||||
|
||||
asio::streambuf _streamBuffer;
|
||||
|
||||
|
||||
// reference implementation
|
||||
virtual bool verifyCertificate(int preverified_ok, X509_STORE_CTX* store)
|
||||
{
|
||||
if (preverified_ok == 0) {
|
||||
int err = X509_STORE_CTX_get_error(store);
|
||||
auto err_str = X509_verify_cert_error_string(err);
|
||||
// log_error("TLS certificate verification error: {}", err_str);
|
||||
|
||||
return preverified_ok;
|
||||
}
|
||||
|
||||
char subject_name[256];
|
||||
|
||||
int depth;
|
||||
ASN1_INTEGER* serial;
|
||||
BIGNUM* bnser;
|
||||
|
||||
X509* cert = X509_STORE_CTX_get_current_cert(store);
|
||||
|
||||
if (cert != NULL) {
|
||||
depth = X509_STORE_CTX_get_error_depth(store);
|
||||
X509_NAME_oneline(X509_get_subject_name(cert), subject_name, 256);
|
||||
serial = X509_get_serialNumber(cert); // IT IS INTERNAL POINTER SO IT MUST NOT BE FREED UP!!!
|
||||
bnser = ASN1_INTEGER_to_BN(serial, NULL);
|
||||
auto serial_hex = BN_bn2hex(bnser);
|
||||
|
||||
// log_debug("Received TLS certificate: SUBJECT = {}, SERIAL = {}, DEPTH = {}", subject_name, serial_hex,
|
||||
// depth);
|
||||
|
||||
// if no compare function then do not compute fingerprint
|
||||
if (_tlsCertCompFunc) {
|
||||
// compute certificate fingerprint
|
||||
unsigned char digest_buff[EVP_MAX_MD_SIZE];
|
||||
const EVP_MD* digest = EVP_get_digestbyname(_tlsCertFingerprintDigest.c_str());
|
||||
unsigned int N;
|
||||
|
||||
if (X509_digest(cert, digest, digest_buff, &N)) {
|
||||
preverified_ok = _tlsCertCompFunc(std::string(serial_hex),
|
||||
std::vector<unsigned char>(digest_buff, digest_buff + N), depth);
|
||||
|
||||
} else {
|
||||
// log_error("Cannot compute client certificate fingerprint! Cannot verify the certificate!");
|
||||
preverified_ok = 0;
|
||||
}
|
||||
}
|
||||
|
||||
BN_free(bnser);
|
||||
OPENSSL_free(serial_hex);
|
||||
|
||||
} else {
|
||||
// log_error("OpenSSL error: cannot get current certificate");
|
||||
preverified_ok = 0;
|
||||
}
|
||||
|
||||
return preverified_ok;
|
||||
}
|
||||
};
|
||||
|
||||
#endif
|
||||
@ -757,332 +758,4 @@ static_assert(adc::interfaces::adc_netsession_proto_c<adc::AdcStopSeqSessionProt
|
||||
|
||||
|
||||
|
||||
/* EVENT-BASED SERVICE */
|
||||
template <adc_asio_transport_proto_c TRANSPORT_PROTOT,
|
||||
interfaces::adc_netsession_proto_c<std::string_view> SESSION_PROTOT>
|
||||
class AsioNetService : public SESSION_PROTOT
|
||||
{
|
||||
public:
|
||||
// typedefs from transport protocol
|
||||
using endpoint_t = typename TRANSPORT_PROTOT::endpoint;
|
||||
using socket_t = typename TRANSPORT_PROTOT::socket;
|
||||
using acceptor_t =
|
||||
std::conditional_t<std::derived_from<socket_t, asio::basic_datagram_socket<typename socket_t::protocol_type>>,
|
||||
std::nullptr_t, // there is no acceptor
|
||||
typename TRANSPORT_PROTOT::acceptor>;
|
||||
|
||||
typedef std::function<void(AsioNetService*, std::error_code, endpoint_t)> connect_event_hndl_t;
|
||||
typedef std::function<void(AsioNetService*, std::error_code)> send_event_hndl_t;
|
||||
typedef std::function<void(AsioNetService*, std::error_code)> close_event_hndl_t;
|
||||
typedef std::function<void(AsioNetService*, std::error_code, std::span<const char>)> message_event_hndl_t;
|
||||
|
||||
struct Events {
|
||||
bool listening = false; // true - server role, false - client role
|
||||
connect_event_hndl_t onConnect = [](auto...) {};
|
||||
send_event_hndl_t onSend = [](auto...) {};
|
||||
close_event_hndl_t onClose = [](auto...) {};
|
||||
message_event_hndl_t onMessage = [](auto...) {};
|
||||
};
|
||||
|
||||
AsioNetService(asio::io_context& ctx, Events events)
|
||||
: SESSION_PROTOT(),
|
||||
_ioContext(ctx),
|
||||
_receiveStrand(_ioContext),
|
||||
_receiveQueue(),
|
||||
_acceptor(_ioContext),
|
||||
_socket(_ioContext),
|
||||
_events(events),
|
||||
_stopReceiving(false),
|
||||
_waitTimer(_ioContext)
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
template <traits::adc_time_duration_c DT>
|
||||
static void start(AsioNetService& srv, const endpoint_t& endpoint, const DT& timeout)
|
||||
{
|
||||
// no acceptor for UDP-sockets
|
||||
if constexpr (std::is_null_pointer_v<acceptor_t>) {
|
||||
srv.startReceiving();
|
||||
srv._events.onConnect(&srv, std::error_code{}, endpoint_t());
|
||||
return;
|
||||
}
|
||||
|
||||
auto token = [&endpoint, &timeout, &srv](std::error_code ec) {
|
||||
if (!ec) {
|
||||
srv.startReceiving();
|
||||
}
|
||||
|
||||
// post event
|
||||
srv._events.onConnect(&srv, ec, srv._socket.remote_endpoint());
|
||||
};
|
||||
|
||||
if (srv._events.listening) { // server role (accept connections to given endpoint)
|
||||
|
||||
auto timer = getDeadlineTimer(srv._acceptor, timeout);
|
||||
|
||||
asio::async_compose<decltype(token), void(std::error_code)>(
|
||||
[timer = std::move(timer), start = true, &endpoint, &srv](auto& self, std::error_code ec = {}) mutable {
|
||||
if (!ec) {
|
||||
if (start) {
|
||||
start = false;
|
||||
try {
|
||||
if (!srv._acceptor.is_open() || (srv._acceptor.local_endpoint() != endpoint)) {
|
||||
srv._acceptor = acceptor_t(srv._ioContext, endpoint);
|
||||
}
|
||||
} catch (std::system_error err) {
|
||||
timer->cancel();
|
||||
self.complete(err.code());
|
||||
return;
|
||||
}
|
||||
|
||||
// return acc.async_accept(_socket, std::move(self));
|
||||
return srv._acceptor.async_accept(srv._socket, std::move(self));
|
||||
}
|
||||
}
|
||||
|
||||
if (isTimeout(timer, ec)) {
|
||||
ec = std::make_error_code(std::errc::timed_out);
|
||||
} else { // an error occured in async_connect
|
||||
timer->cancel();
|
||||
}
|
||||
|
||||
self.complete(ec);
|
||||
},
|
||||
std::move(token), srv._ioContext);
|
||||
|
||||
} else { // client role (connect to remote host)
|
||||
|
||||
auto timer = getDeadlineTimer(srv._socket, timeout);
|
||||
|
||||
asio::async_compose<decltype(token), void(std::error_code)>(
|
||||
[start = true, endpoint, timer = std::move(timer), &srv](auto& self, std::error_code ec = {}) mutable {
|
||||
if (!ec) {
|
||||
if (start) {
|
||||
start = false;
|
||||
return srv._socket.async_connect(endpoint, std::move(self));
|
||||
}
|
||||
}
|
||||
|
||||
if (isTimeout(timer, ec)) {
|
||||
ec = std::make_error_code(std::errc::timed_out);
|
||||
} else { // an error occured in async_connect
|
||||
timer->cancel();
|
||||
}
|
||||
|
||||
self.complete(ec);
|
||||
},
|
||||
std::move(token), srv._socket);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void stop()
|
||||
{
|
||||
std::error_code ec;
|
||||
|
||||
_stopReceiving = true;
|
||||
|
||||
_socket.shutdown(_shutdownType, ec);
|
||||
if (!ec) {
|
||||
_socket.close(ec);
|
||||
}
|
||||
|
||||
_events.onClose(ec);
|
||||
}
|
||||
|
||||
template <traits::adc_input_char_range R, traits::adc_time_duration_c DT>
|
||||
auto send(const R& msg, const DT& timeout)
|
||||
{
|
||||
auto token = [this](std::error_code ec, size_t) {
|
||||
//
|
||||
_events.onSend(this, ec);
|
||||
};
|
||||
|
||||
// create buffer sequence of sending session protocol representation of the input message
|
||||
std::vector<asio::const_buffer> buff_seq;
|
||||
std::ranges::for_each(this->toProto(msg), [&buff_seq](const auto& el) { buff_seq.emplace_back(el); });
|
||||
|
||||
auto timer = getDeadlineTimer(_socket, timeout);
|
||||
|
||||
return asio::async_compose<decltype(token), void(std::error_code)>(
|
||||
[start = true, buff_seq = std::move(buff_seq), timer = std::move(timer), this](
|
||||
auto& self, std::error_code ec = {}) mutable {
|
||||
if (!ec) {
|
||||
if (start) {
|
||||
start = false;
|
||||
if constexpr (std::derived_from<socket_t,
|
||||
asio::basic_stream_socket<typename socket_t::protocol_type>>) {
|
||||
return asio::async_write(_socket, buff_seq, std::move(self));
|
||||
} else if constexpr (std::derived_from<socket_t, asio::basic_datagram_socket<
|
||||
typename socket_t::protocol_type>>) {
|
||||
return _socket.async_send(buff_seq, std::move(self));
|
||||
} else if constexpr (std::derived_from<socket_t, asio::basic_seq_packet_socket<
|
||||
typename socket_t::protocol_type>>) {
|
||||
return _socket.async_send(buff_seq, std::move(self));
|
||||
} else {
|
||||
static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (isTimeout(timer, ec)) {
|
||||
ec = std::make_error_code(std::errc::timed_out);
|
||||
} else { // an error occured in async_write/async_send
|
||||
timer->cancel();
|
||||
}
|
||||
|
||||
self.complete(ec);
|
||||
},
|
||||
std::move(token), _socket);
|
||||
}
|
||||
|
||||
template <traits::adc_time_duration_c DT>
|
||||
bool wait(const DT& timeout)
|
||||
{
|
||||
if (_receiveQueue.size()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
std::error_code ec;
|
||||
|
||||
std::chrono::seconds max_d = std::chrono::duration_cast<std::chrono::seconds>(
|
||||
std::chrono::steady_clock::time_point::max() - std::chrono::steady_clock::now() - std::chrono::seconds(1));
|
||||
|
||||
auto res = _waitTimers.emplace(_ioContext);
|
||||
if (res.second) {
|
||||
(*(res.first))->expires_after(timeout < max_d ? timeout : max_d);
|
||||
|
||||
// _waitTimer.expires_after(timeout < max_d ? timeout : max_d); // to avoid overflow!
|
||||
|
||||
// auto f = _waitTimer.async_wait(asio::use_future);
|
||||
|
||||
auto f = (*(res.first))->async_wait(asio::use_future);
|
||||
try {
|
||||
f.get();
|
||||
_waitTimers.erase(res.first);
|
||||
} catch (std::system_error& ex) {
|
||||
if (ex.code() == asio::error::operation_aborted) { // canceled in startReceiving (message was received)
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected:
|
||||
asio::io_context& _ioContext;
|
||||
asio::io_context::strand _receiveStrand;
|
||||
|
||||
socket_t _socket;
|
||||
|
||||
acceptor_t _acceptor;
|
||||
|
||||
asio::streambuf _streamBuffer;
|
||||
|
||||
std::queue<std::vector<char>> _receiveQueue;
|
||||
|
||||
asio::socket_base::shutdown_type _shutdownType = asio::socket_base::shutdown_both;
|
||||
|
||||
std::atomic_bool _stopReceiving;
|
||||
|
||||
Events _events;
|
||||
|
||||
asio::steady_timer _waitTimer;
|
||||
std::set<std::unique_ptr<asio::steady_timer>> _waitTimers;
|
||||
|
||||
void startReceiving()
|
||||
{
|
||||
auto get_msg = [this](std::error_code ec, size_t) {
|
||||
if (!ec) {
|
||||
auto start_ptr = static_cast<const char*>(_streamBuffer.data().data());
|
||||
|
||||
auto net_pack = this->search(std::span(start_ptr, _streamBuffer.size()));
|
||||
if (!net_pack.empty()) {
|
||||
_receiveQueue.emplace();
|
||||
std::ranges::copy(this->fromProto(net_pack), std::back_inserter(_receiveQueue.back()));
|
||||
_streamBuffer.consume(net_pack.size());
|
||||
|
||||
|
||||
while (_streamBuffer.size()) { // search for possible additional session protocol packets
|
||||
start_ptr = static_cast<const char*>(_streamBuffer.data().data());
|
||||
|
||||
net_pack = this->search(std::span(start_ptr, _streamBuffer.size()));
|
||||
|
||||
if (!net_pack.empty()) {
|
||||
_receiveQueue.emplace();
|
||||
std::ranges::copy(this->fromProto(net_pack), std::back_inserter(_receiveQueue.back()));
|
||||
_streamBuffer.consume(net_pack.size());
|
||||
} else {
|
||||
break; // exit and hold remaining bytes in stream buffer
|
||||
}
|
||||
}
|
||||
|
||||
auto msg = _receiveQueue.front();
|
||||
_events.onMessage(this, ec, std::span<const char>(msg.begin(), msg.end()));
|
||||
|
||||
_receiveQueue.pop();
|
||||
}
|
||||
|
||||
if (!_stopReceiving) {
|
||||
startReceiving(); // initiate consequence socket's read operation
|
||||
}
|
||||
} else {
|
||||
_events.onMessage(this, ec, std::span<const char>());
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
auto out_flags = std::make_shared<asio::socket_base::message_flags>();
|
||||
|
||||
if constexpr (std::derived_from<socket_t, asio::basic_stream_socket<typename socket_t::protocol_type>>) {
|
||||
return asio::async_read(_socket, _streamBuffer, asio::transfer_at_least(1), std::move(get_msg));
|
||||
} else if constexpr (std::derived_from<socket_t,
|
||||
asio::basic_datagram_socket<typename socket_t::protocol_type>>) {
|
||||
// datagram, so it should be received at once
|
||||
return _socket.receive(_streamBuffer, std::move(get_msg));
|
||||
} else if constexpr (std::derived_from<socket_t,
|
||||
asio::basic_seq_packet_socket<typename socket_t::protocol_type>>) {
|
||||
// datagram, so it should be received at once
|
||||
return _socket.receive(_streamBuffer, *out_flags, std::move(get_msg));
|
||||
} else {
|
||||
static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
template <typename CancelableT, traits::adc_time_duration_c TimeoutT>
|
||||
static std::unique_ptr<asio::steady_timer> getDeadlineTimer(CancelableT& obj,
|
||||
const TimeoutT& timeout,
|
||||
bool arm = true)
|
||||
{
|
||||
auto timer = std::make_unique<asio::steady_timer>(obj.get_executor());
|
||||
|
||||
if (arm) {
|
||||
std::chrono::seconds max_d = std::chrono::duration_cast<std::chrono::seconds>(
|
||||
std::chrono::steady_clock::time_point::max() - std::chrono::steady_clock::now() -
|
||||
std::chrono::seconds(1));
|
||||
|
||||
timer->expires_after(timeout < max_d ? timeout : max_d); // to avoid overflow!
|
||||
// timer->expires_after(timeout);
|
||||
|
||||
timer->async_wait([&obj](const std::error_code& ec) mutable {
|
||||
if (!ec) {
|
||||
obj.cancel();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return timer;
|
||||
}
|
||||
|
||||
template <typename TimerT>
|
||||
static bool isTimeout(const std::unique_ptr<TimerT>& timer, const std::error_code& ec)
|
||||
{
|
||||
auto exp_time = timer->expiry();
|
||||
return (exp_time < std::chrono::steady_clock::now()) && (ec == asio::error::operation_aborted);
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace adc::impl
|
||||
|
||||
@ -21,14 +21,14 @@ void receive(T srv)
|
||||
|
||||
int main()
|
||||
{
|
||||
// using tr_p_t = asio ::ip::tcp;
|
||||
using tr_p_t = asio ::ip::tcp;
|
||||
// using tr_p_t = asio::local::stream_protocol;
|
||||
using tr_p_t = asio::local::seq_packet_protocol;
|
||||
// using tr_p_t = asio::local::seq_packet_protocol;
|
||||
|
||||
tr_p_t::endpoint ept_c(std::string("/tmp/AAA").insert(0, 1, '\0'));
|
||||
// tr_p_t::endpoint ept_c(std::string("/tmp/AAA").insert(0, 1, '\0'));
|
||||
// tr_p_t::endpoint ept_c("/tmp/AAA");
|
||||
|
||||
// tr_p_t::endpoint ept_c(asio::ip::make_address_v4("0.0.0.0"), 9999);
|
||||
tr_p_t::endpoint ept_c(asio::ip::make_address_v4("0.0.0.0"), 9999);
|
||||
std::cout << "ADDR: " << ept_c << "\n";
|
||||
|
||||
asio::io_context ctx;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user