...
This commit is contained in:
parent
497b28f83e
commit
d2b2620d13
@ -22,6 +22,7 @@
|
|||||||
|
|
||||||
#include <functional>
|
#include <functional>
|
||||||
#include <queue>
|
#include <queue>
|
||||||
|
#include <set>
|
||||||
|
|
||||||
#ifdef USE_OPENSSL_WITH_ASIO
|
#ifdef USE_OPENSSL_WITH_ASIO
|
||||||
|
|
||||||
@ -502,7 +503,7 @@ public:
|
|||||||
self.complete(ec, {msg.begin(), msg.end()});
|
self.complete(ec, {msg.begin(), msg.end()});
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
token, _socket);
|
token, _receiveStrand);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* blocking methods */
|
/* blocking methods */
|
||||||
@ -633,4 +634,334 @@ protected:
|
|||||||
|
|
||||||
static_assert(adc::interfaces::adc_netsession_proto_c<adc::AdcStopSeqSessionProto<>>, "");
|
static_assert(adc::interfaces::adc_netsession_proto_c<adc::AdcStopSeqSessionProto<>>, "");
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/* EVENT-BASED SERVICE */
|
||||||
|
template <adc_asio_transport_proto_c TRANSPORT_PROTOT,
|
||||||
|
interfaces::adc_netsession_proto_c<std::string_view> SESSION_PROTOT>
|
||||||
|
class AsioNetService : public SESSION_PROTOT
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
// typedefs from transport protocol
|
||||||
|
using endpoint_t = typename TRANSPORT_PROTOT::endpoint;
|
||||||
|
using socket_t = typename TRANSPORT_PROTOT::socket;
|
||||||
|
using acceptor_t =
|
||||||
|
std::conditional_t<std::derived_from<socket_t, asio::basic_datagram_socket<typename socket_t::protocol_type>>,
|
||||||
|
std::nullptr_t, // there is no acceptor
|
||||||
|
typename TRANSPORT_PROTOT::acceptor>;
|
||||||
|
|
||||||
|
typedef std::function<void(AsioNetService*, std::error_code, endpoint_t)> connect_event_hndl_t;
|
||||||
|
typedef std::function<void(AsioNetService*, std::error_code)> send_event_hndl_t;
|
||||||
|
typedef std::function<void(AsioNetService*, std::error_code)> close_event_hndl_t;
|
||||||
|
typedef std::function<void(AsioNetService*, std::error_code, std::span<const char>)> message_event_hndl_t;
|
||||||
|
|
||||||
|
struct Events {
|
||||||
|
bool listening = false; // true - server role, false - client role
|
||||||
|
connect_event_hndl_t onConnect = [](auto...) {};
|
||||||
|
send_event_hndl_t onSend = [](auto...) {};
|
||||||
|
close_event_hndl_t onClose = [](auto...) {};
|
||||||
|
message_event_hndl_t onMessage = [](auto...) {};
|
||||||
|
};
|
||||||
|
|
||||||
|
AsioNetService(asio::io_context& ctx, Events events)
|
||||||
|
: SESSION_PROTOT(),
|
||||||
|
_ioContext(ctx),
|
||||||
|
_receiveStrand(_ioContext),
|
||||||
|
_receiveQueue(),
|
||||||
|
_acceptor(_ioContext),
|
||||||
|
_socket(_ioContext),
|
||||||
|
_events(events),
|
||||||
|
_stopReceiving(false),
|
||||||
|
_waitTimer(_ioContext)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
template <traits::adc_time_duration_c DT>
|
||||||
|
static void start(AsioNetService& srv, const endpoint_t& endpoint, const DT& timeout)
|
||||||
|
{
|
||||||
|
// no acceptor for UDP-sockets
|
||||||
|
if constexpr (std::is_null_pointer_v<acceptor_t>) {
|
||||||
|
srv.startReceiving();
|
||||||
|
srv._events.onConnect(&srv, std::error_code{}, endpoint_t());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto token = [&endpoint, &timeout, &srv](std::error_code ec) {
|
||||||
|
if (!ec) {
|
||||||
|
srv.startReceiving();
|
||||||
|
}
|
||||||
|
|
||||||
|
// post event
|
||||||
|
srv._events.onConnect(&srv, ec, srv._socket.remote_endpoint());
|
||||||
|
};
|
||||||
|
|
||||||
|
if (srv._events.listening) { // server role (accept connections to given endpoint)
|
||||||
|
|
||||||
|
auto timer = getDeadlineTimer(srv._acceptor, timeout);
|
||||||
|
|
||||||
|
asio::async_compose<decltype(token), void(std::error_code)>(
|
||||||
|
[timer = std::move(timer), start = true, &endpoint, &srv](auto& self, std::error_code ec = {}) mutable {
|
||||||
|
if (!ec) {
|
||||||
|
if (start) {
|
||||||
|
start = false;
|
||||||
|
try {
|
||||||
|
if (!srv._acceptor.is_open() || (srv._acceptor.local_endpoint() != endpoint)) {
|
||||||
|
srv._acceptor = acceptor_t(srv._ioContext, endpoint);
|
||||||
|
}
|
||||||
|
} catch (std::system_error err) {
|
||||||
|
timer->cancel();
|
||||||
|
self.complete(err.code());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// return acc.async_accept(_socket, std::move(self));
|
||||||
|
return srv._acceptor.async_accept(srv._socket, std::move(self));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isTimeout(timer, ec)) {
|
||||||
|
ec = std::make_error_code(std::errc::timed_out);
|
||||||
|
} else { // an error occured in async_connect
|
||||||
|
timer->cancel();
|
||||||
|
}
|
||||||
|
|
||||||
|
self.complete(ec);
|
||||||
|
},
|
||||||
|
std::move(token), srv._ioContext);
|
||||||
|
|
||||||
|
} else { // client role (connect to remote host)
|
||||||
|
|
||||||
|
auto timer = getDeadlineTimer(srv._socket, timeout);
|
||||||
|
|
||||||
|
asio::async_compose<decltype(token), void(asio::error_code)>(
|
||||||
|
[start = true, endpoint, timer = std::move(timer), &srv](auto& self, asio::error_code ec = {}) mutable {
|
||||||
|
if (!ec) {
|
||||||
|
if (start) {
|
||||||
|
start = false;
|
||||||
|
return srv._socket.async_connect(endpoint, std::move(self));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isTimeout(timer, ec)) {
|
||||||
|
ec = std::make_error_code(std::errc::timed_out);
|
||||||
|
} else { // an error occured in async_connect
|
||||||
|
timer->cancel();
|
||||||
|
}
|
||||||
|
|
||||||
|
self.complete(ec);
|
||||||
|
},
|
||||||
|
std::move(token), srv._socket);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void stop()
|
||||||
|
{
|
||||||
|
std::error_code ec;
|
||||||
|
|
||||||
|
_stopReceiving = true;
|
||||||
|
|
||||||
|
_socket.shutdown(_shutdownType, ec);
|
||||||
|
if (!ec) {
|
||||||
|
_socket.close(ec);
|
||||||
|
}
|
||||||
|
|
||||||
|
_events.onClose(ec);
|
||||||
|
}
|
||||||
|
|
||||||
|
template <traits::adc_input_char_range R, traits::adc_time_duration_c DT>
|
||||||
|
auto send(const R& msg, const DT& timeout)
|
||||||
|
{
|
||||||
|
auto token = [this](std::error_code ec, size_t) {
|
||||||
|
//
|
||||||
|
_events.onSend(this, ec);
|
||||||
|
};
|
||||||
|
|
||||||
|
// create buffer sequence of sending session protocol representation of the input message
|
||||||
|
std::vector<asio::const_buffer> buff_seq;
|
||||||
|
std::ranges::for_each(this->toProto(msg), [&buff_seq](const auto& el) { buff_seq.emplace_back(el); });
|
||||||
|
|
||||||
|
auto timer = getDeadlineTimer(_socket, timeout);
|
||||||
|
|
||||||
|
return asio::async_compose<decltype(token), void(asio::error_code)>(
|
||||||
|
[start = true, buff_seq = std::move(buff_seq), timer = std::move(timer), this](
|
||||||
|
auto& self, asio::error_code ec = {}) mutable {
|
||||||
|
if (!ec) {
|
||||||
|
if (start) {
|
||||||
|
start = false;
|
||||||
|
if constexpr (std::derived_from<socket_t,
|
||||||
|
asio::basic_stream_socket<typename socket_t::protocol_type>>) {
|
||||||
|
return asio::async_write(_socket, buff_seq, std::move(self));
|
||||||
|
} else if constexpr (std::derived_from<socket_t, asio::basic_datagram_socket<
|
||||||
|
typename socket_t::protocol_type>>) {
|
||||||
|
return _socket.async_send(buff_seq, std::move(self));
|
||||||
|
} else if constexpr (std::derived_from<socket_t, asio::basic_seq_packet_socket<
|
||||||
|
typename socket_t::protocol_type>>) {
|
||||||
|
return _socket.async_send(buff_seq, std::move(self));
|
||||||
|
} else {
|
||||||
|
static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isTimeout(timer, ec)) {
|
||||||
|
ec = std::make_error_code(std::errc::timed_out);
|
||||||
|
} else { // an error occured in async_write/async_send
|
||||||
|
timer->cancel();
|
||||||
|
}
|
||||||
|
|
||||||
|
self.complete(ec);
|
||||||
|
},
|
||||||
|
std::move(token), _socket);
|
||||||
|
}
|
||||||
|
|
||||||
|
template <traits::adc_time_duration_c DT>
|
||||||
|
bool wait(const DT& timeout)
|
||||||
|
{
|
||||||
|
if (_receiveQueue.size()) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::error_code ec;
|
||||||
|
|
||||||
|
std::chrono::seconds max_d = std::chrono::duration_cast<std::chrono::seconds>(
|
||||||
|
std::chrono::steady_clock::time_point::max() - std::chrono::steady_clock::now() - std::chrono::seconds(1));
|
||||||
|
|
||||||
|
auto res = _waitTimers.emplace(_ioContext);
|
||||||
|
if (res.second) {
|
||||||
|
(*(res.first))->expires_after(timeout < max_d ? timeout : max_d);
|
||||||
|
|
||||||
|
// _waitTimer.expires_after(timeout < max_d ? timeout : max_d); // to avoid overflow!
|
||||||
|
|
||||||
|
// auto f = _waitTimer.async_wait(asio::use_future);
|
||||||
|
|
||||||
|
auto f = (*(res.first))->async_wait(asio::use_future);
|
||||||
|
try {
|
||||||
|
f.get();
|
||||||
|
_waitTimers.erase(res.first);
|
||||||
|
} catch (std::system_error& ex) {
|
||||||
|
if (ex.code() == asio::error::operation_aborted) { // canceled in startReceiving (message was received)
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected:
|
||||||
|
asio::io_context& _ioContext;
|
||||||
|
asio::io_context::strand _receiveStrand;
|
||||||
|
|
||||||
|
socket_t _socket;
|
||||||
|
|
||||||
|
acceptor_t _acceptor;
|
||||||
|
|
||||||
|
asio::streambuf _streamBuffer;
|
||||||
|
|
||||||
|
std::queue<std::vector<char>> _receiveQueue;
|
||||||
|
|
||||||
|
asio::socket_base::shutdown_type _shutdownType = asio::socket_base::shutdown_both;
|
||||||
|
|
||||||
|
std::atomic_bool _stopReceiving;
|
||||||
|
|
||||||
|
Events _events;
|
||||||
|
|
||||||
|
asio::steady_timer _waitTimer;
|
||||||
|
std::set<std::unique_ptr<asio::steady_timer>> _waitTimers;
|
||||||
|
|
||||||
|
void startReceiving()
|
||||||
|
{
|
||||||
|
auto get_msg = [this](std::error_code ec, size_t) {
|
||||||
|
if (!ec) {
|
||||||
|
auto start_ptr = static_cast<const char*>(_streamBuffer.data().data());
|
||||||
|
|
||||||
|
auto net_pack = this->search(std::span(start_ptr, _streamBuffer.size()));
|
||||||
|
if (!net_pack.empty()) {
|
||||||
|
_receiveQueue.emplace();
|
||||||
|
std::ranges::copy(this->fromProto(net_pack), std::back_inserter(_receiveQueue.back()));
|
||||||
|
_streamBuffer.consume(net_pack.size());
|
||||||
|
|
||||||
|
|
||||||
|
while (_streamBuffer.size()) { // search for possible additional session protocol packets
|
||||||
|
start_ptr = static_cast<const char*>(_streamBuffer.data().data());
|
||||||
|
|
||||||
|
net_pack = this->search(std::span(start_ptr, _streamBuffer.size()));
|
||||||
|
|
||||||
|
if (!net_pack.empty()) {
|
||||||
|
_receiveQueue.emplace();
|
||||||
|
std::ranges::copy(this->fromProto(net_pack), std::back_inserter(_receiveQueue.back()));
|
||||||
|
_streamBuffer.consume(net_pack.size());
|
||||||
|
} else {
|
||||||
|
break; // exit and hold remaining bytes in stream buffer
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
auto msg = _receiveQueue.front();
|
||||||
|
_events.onMessage(this, ec, std::span<const char>(msg.begin(), msg.end()));
|
||||||
|
|
||||||
|
_receiveQueue.pop();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!_stopReceiving) {
|
||||||
|
startReceiving(); // initiate consequence socket's read operation
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
_events.onMessage(this, ec, std::span<const char>());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
auto out_flags = std::make_shared<asio::socket_base::message_flags>();
|
||||||
|
|
||||||
|
if constexpr (std::derived_from<socket_t, asio::basic_stream_socket<typename socket_t::protocol_type>>) {
|
||||||
|
return asio::async_read(_socket, _streamBuffer, asio::transfer_at_least(1), std::move(get_msg));
|
||||||
|
} else if constexpr (std::derived_from<socket_t,
|
||||||
|
asio::basic_datagram_socket<typename socket_t::protocol_type>>) {
|
||||||
|
// datagram, so it should be received at once
|
||||||
|
return _socket.receive(_streamBuffer, std::move(get_msg));
|
||||||
|
} else if constexpr (std::derived_from<socket_t,
|
||||||
|
asio::basic_seq_packet_socket<typename socket_t::protocol_type>>) {
|
||||||
|
// datagram, so it should be received at once
|
||||||
|
return _socket.receive(_streamBuffer, *out_flags, std::move(get_msg));
|
||||||
|
} else {
|
||||||
|
static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
template <typename CancelableT, traits::adc_time_duration_c TimeoutT>
|
||||||
|
static std::unique_ptr<asio::steady_timer> getDeadlineTimer(CancelableT& obj,
|
||||||
|
const TimeoutT& timeout,
|
||||||
|
bool arm = true)
|
||||||
|
{
|
||||||
|
auto timer = std::make_unique<asio::steady_timer>(obj.get_executor());
|
||||||
|
|
||||||
|
if (arm) {
|
||||||
|
std::chrono::seconds max_d = std::chrono::duration_cast<std::chrono::seconds>(
|
||||||
|
std::chrono::steady_clock::time_point::max() - std::chrono::steady_clock::now() -
|
||||||
|
std::chrono::seconds(1));
|
||||||
|
|
||||||
|
timer->expires_after(timeout < max_d ? timeout : max_d); // to avoid overflow!
|
||||||
|
// timer->expires_after(timeout);
|
||||||
|
|
||||||
|
timer->async_wait([&obj](const std::error_code& ec) mutable {
|
||||||
|
if (!ec) {
|
||||||
|
obj.cancel();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
return timer;
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename TimerT>
|
||||||
|
static bool isTimeout(const std::unique_ptr<TimerT>& timer, const std::error_code& ec)
|
||||||
|
{
|
||||||
|
auto exp_time = timer->expiry();
|
||||||
|
return (exp_time < std::chrono::steady_clock::now()) && (ec == asio::error::operation_aborted);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
} // namespace adc::impl
|
} // namespace adc::impl
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user