ADC/net/asio/adc_device_netserver_asio.h
Timur A. Fatkhullin 45b8d4a3c7 fix 100% load of CPU after client disconnection
(AdcBaseNetServiceASIO.asyncReceive)
add resolving domain name (AdcDeviceNetServerASIO)
2024-11-17 23:50:15 +03:00

253 lines
10 KiB
C++

#pragma once
#include <asio/io_context.hpp>
#include <asio/ip/tcp.hpp>
#include <asio/ip/udp.hpp>
#include <asio/local/seq_packet_protocol.hpp>
#include <asio/local/stream_protocol.hpp>
#include <asio/signal_set.hpp>
#include "../adc_device_netserver.h"
#include "../adc_endpoint.h"
#include "adc_netservice_asio.h"
namespace adc::impl
{
template <typename IdentT = std::string, interfaces::adc_logger_c LoggerT = utils::AdcOstreamLogger<char>>
class AdcDeviceNetServerASIO : public AdcDeviceNetServer<IdentT, LoggerT>
{
typedef AdcDeviceNetServer<IdentT, LoggerT> base_t;
public:
using typename base_t::logger_t;
using typename base_t::server_ident_t;
typedef std::string session_ident_t;
template <typename ServiceT>
using Session = typename base_t::template Session<ServiceT, session_ident_t>;
template <typename... LoggerCtorArgTs>
AdcDeviceNetServerASIO(const server_ident_t& id, asio::io_context& io_context, LoggerCtorArgTs&&... ctor_args)
: base_t(id, std::forward<LoggerCtorArgTs>(ctor_args)...),
_ioContext(io_context),
_stopSignal(io_context),
_restartSignal(io_context)
{
}
template <interfaces::adc_netsession_proto_c SessProtoT, std::derived_from<AdcEndpoint> EptT>
#ifdef USE_OPENSSL_WITH_ASIO
void start(const EptT& endpoint,
asio::ssl::context tls_context = asio::ssl::context(asio::ssl::context::tlsv13_server),
asio::ssl::verify_mode tls_verify_mode = asio::ssl::context_base::verify_peer)
#else
void start(const EptT& endpoint)
#endif
{
if (!endpoint.isValid()) {
this->logError("Invalid given endpoint: <{}>", endpoint.endpoint());
return;
}
// may throw here!
if (endpoint.isTCP()) {
// asio::ip::tcp::endpoint ept(asio::ip::make_address(endpoint.host()), endpoint.port());
using srv_t = AdcNetServiceASIO<asio::ip::tcp, SessProtoT>;
// base_t::template start<Session<srv_t>>("TCP", {this, _sessionRecvTimeout, _sessionSendTimeout},
// _ioContext,
// ept);
auto res = std::make_shared<asio::ip::tcp::resolver>(_ioContext);
res->async_resolve(
endpoint.host(), endpoint.portView(), [endpoint, res, this](std::error_code ec, auto results) {
if (ec) {
this->logError(
"An error occured while resolve hostname ('{}') of the given endpoint! (ec = {})",
endpoint.host(), ec.message());
this->logError("Cannot start listening at endpoint '{}'!", endpoint.endpoint());
} else {
if (results.size() == 1) {
this->logDebug("Resolved the single IP-address for the hostname '{}'", endpoint.host());
} else {
this->logDebug("Resolved {} IP-addresses for the hostname '{}'! Use of the first one!",
results.size(), endpoint.host());
}
base_t::template start<Session<srv_t>>("TCP", {this, _sessionRecvTimeout, _sessionSendTimeout},
_ioContext, *results.begin());
}
});
#ifdef USE_OPENSSL_WITH_ASIO
} else if (endpoint.isTLS()) {
// asio::ip::tcp::endpoint ept(asio::ip::make_address(endpoint.host()), endpoint.port());
using srv_t = AdcNetServiceASIOTLS<asio::ip::tcp, SessProtoT>;
// base_t::template start<Session<srv_t>>("TLS", {this, _sessionRecvTimeout, _sessionSendTimeout},
// _ioContext,
// ept, std::move(tls_context), tls_verify_mode);
auto res = std::make_shared<asio::ip::tcp::resolver>(_ioContext);
res->async_resolve(
endpoint.host(), endpoint.portView(),
[endpoint, res, tls_context = std::move(tls_context), tls_verify_mode, this](std::error_code ec,
auto results) mutable {
if (ec) {
this->logError(
"An error occured while resolve hostname ('{}') of the given endpoint! (ec = {})",
endpoint.host(), ec.message());
this->logError("Cannot start listening at endpoint '{}'!", endpoint.endpoint());
} else {
if (results.size() == 1) {
this->logDebug("Resolved the single IP-address for the hostname '{}'", endpoint.host());
} else {
this->logDebug("Resolved {} IP-addresses for the hostname '{}'! Use of the first one!",
results.size(), endpoint.host());
}
base_t::template start<Session<srv_t>>("TLS", {this, _sessionRecvTimeout, _sessionSendTimeout},
_ioContext, *results.begin(), std::move(tls_context),
tls_verify_mode);
}
});
#endif
} else if (endpoint.isLocal()) {
if (endpoint.isLocalStream()) {
asio::local::stream_protocol::endpoint ept(endpoint.template path<std::string>());
using srv_t = AdcNetServiceASIO<asio::local::stream_protocol, SessProtoT>;
// base_t::template start<Session<srv_t>>("LOCAL STREAM", this, _ioContext, ept);
base_t::template start<Session<srv_t>>("LOCAL STREAM", {this, _sessionRecvTimeout, _sessionSendTimeout},
_ioContext, ept);
// } else if (endpoint.isLocalDatagram()) {
// asio::local::datagram_protocol::endpoint ept(endpoint.template path<std::string>());
// using srv_t = AdcNetServiceASIO<asio::local::datagram_protocol, SessProtoT>;
// base_t::template start<Session<srv_t>>("LOCAL DGRAM", this, _ioContext, ept);
} else if (endpoint.isLocalSeqpacket()) {
asio::local::seq_packet_protocol::endpoint ept(endpoint.template path<std::string>());
using srv_t = AdcNetServiceASIO<asio::local::seq_packet_protocol, SessProtoT>;
// base_t::template start<Session<srv_t>>("LOCAL SEQPACK", this, _ioContext, ept);
base_t::template start<Session<srv_t>>(
"LOCAL SEQPACK", {this, _sessionRecvTimeout, _sessionSendTimeout}, _ioContext, ept);
}
} else {
throw std::system_error(std::make_error_code(std::errc::protocol_not_supported));
}
}
// some default endpoint?!!
void start() {}
template <std::ranges::range RST = std::vector<int>, std::ranges::range RRT = std::vector<int>>
void setupSignals(const RST& stop_sig_num = {SIGINT, SIGTERM}, const RRT& restart_sig_num = {SIGUSR1})
requires(std::convertible_to<std::ranges::range_value_t<RST>, int> &&
std::convertible_to<std::ranges::range_value_t<RRT>, int>)
{
auto sig_list = [](const auto& sig_range) {
std::string sgs;
#ifdef _GNU_SOURCE
std::vector<std::string> vsg;
std::ranges::transform(sig_range, std::back_inserter(vsg),
[](auto s) { return std::format("'{}' (No = {})", sigdescr_np(s), s); });
utils::AdcJoinRange(vsg, std::string_view(", "), sgs);
#else
sgs = utils::AdcDefaultValueConverter<utils::constants::DEFAULT_CONVERTER_DELIMITER_COMA>::serialize<
std::string>(sig_range);
#endif
return sgs;
};
this->logDebug("Setup 'stop-server' signal to: {}", sig_list(stop_sig_num));
this->logDebug("Setup 'restart-server' signal to: {}", sig_list(restart_sig_num));
for (const int sig : stop_sig_num) {
_stopSignal.add(sig);
}
_stopSignal.async_wait([this](std::error_code ec, int signo) {
signalReceived(ec, signo);
this->stop();
});
for (const int sig : restart_sig_num) {
_restartSignal.add(sig);
}
_restartSignal.async_wait([this](std::error_code ec, int signo) {
signalReceived(ec, signo);
restart();
});
}
template <traits::adc_time_duration_c RT, traits::adc_time_duration_c ST>
requires(std::convertible_to<RT, std::chrono::milliseconds> &&
std::convertible_to<ST, std::chrono::milliseconds>)
void setSessionTimeouts(const RT& recv_timeout, const ST& send_timeout)
{
_sessionRecvTimeout = recv_timeout;
_sessionSendTimeout = send_timeout;
this->logDebug("Set session timeouts: recv = {} msec, send = {} msec", _sessionRecvTimeout.count(),
_sessionSendTimeout.count());
}
protected:
asio::io_context& _ioContext;
asio::signal_set _stopSignal, _restartSignal;
std::chrono::milliseconds _sessionRecvTimeout = std::chrono::hours(12);
std::chrono::milliseconds _sessionSendTimeout = std::chrono::seconds(5);
void daemonize()
{
this->logInfo("Daemonize server process (server addr: {})", (void*)this);
base_t::daemonize();
}
// demonizing ASIO-related methods
virtual void daemonizePrepare()
{
this->logDebug("ASIO-related call of daemonizePrepare()");
_ioContext.notify_fork(asio::execution_context::fork_prepare);
}
virtual void daemonizeFinalize()
{
this->logDebug("ASIO-related call of daemonizeFinalize()");
_ioContext.notify_fork(asio::io_context::fork_child);
}
virtual void signalReceived(std::error_code ec, int signo)
{
#ifdef _GNU_SOURCE
this->logInfo("The server received the signal: '{}' (No = {}, ec = {})", sigdescr_np(signo), signo,
ec.message());
#else
this->logInfo("The server received the signal: {} (ec = {})", signo, ec.message());
#endif
};
virtual void restart()
{
this->logInfo("Restart server (server addr: {})", (void*)this);
this->stopAllSessions();
_restartSignal.async_wait([this](std::error_code ec, int signo) {
signalReceived(ec, signo);
restart();
});
}
};
} // namespace adc::impl