working network server

This commit is contained in:
Timur A. Fatkhullin
2025-02-23 01:22:02 +03:00
parent 683114b307
commit b9032f7034
3 changed files with 302 additions and 47 deletions

View File

@@ -14,6 +14,7 @@
#include <asio/read.hpp>
#include <asio/redirect_error.hpp>
#include <asio/serial_port.hpp>
#include <asio/signal_set.hpp>
#include <asio/steady_timer.hpp>
#include <asio/streambuf.hpp>
#include <asio/write.hpp>
@@ -92,11 +93,13 @@ public:
static constexpr std::chrono::duration DEFAULT_SND_TIMEOUT = std::chrono::milliseconds(2000);
MccMountServer(asio::io_context& ctx, std::shared_ptr<spdlog::logger> logger = spdlog::null_logger_mt("NULL"))
: _asioContext(ctx), _serverLogger(std::move(logger))
: _asioContext(ctx), _serverLogger(logger), _stopSignal(ctx), _restartSignal(ctx)
{
std::stringstream st;
st << std::this_thread::get_id();
_serverLogger->set_pattern("[%Y-%m-%d %T.%e][%l]: %v");
_serverLogger->info("Create mount server instance (thread ID = {})", st.str());
}
@@ -105,7 +108,10 @@ public:
std::stringstream st;
st << std::this_thread::get_id();
_serverLogger->info("Delete mount server instance (thread ID = {})", st.str());
_serverLogger->info("Delete mount server instance (thread ID = {}) ...", st.str());
stopListening();
disconnectClients();
}
@@ -145,23 +151,19 @@ public:
// create abstract namespace socket endpoint if its path starts from '@' symbol
endpoint.makeAbstract('@');
if (endpoint.path()[0] == '\0') { // abstract namespace
std::string p;
std::ranges::copy(endpoint.path(), std::back_inserter(p));
p.insert(p.begin() + 1, '/'); // insert after '\0' symbol
pt = p;
} else {
pt += endpoint.path();
}
// if (endpoint.path()[0] == '\0') { // abstract namespace
// std::string p;
// std::ranges::copy(endpoint.path(), std::back_inserter(p));
// p.insert(p.begin() + 1, '/'); // insert after '\0' symbol
// pt = p;
// } else {
// pt += endpoint.path();
// }
if (endpoint.isLocalStream()) {
co_await listen(asio::local::stream_protocol::endpoint(pt.string()));
// asio::co_spawn(_asioContext, listen(asio::local::stream_protocol::endpoint(pt.string())),
// asio::detached);
co_await listen(asio::local::stream_protocol::endpoint(endpoint.path(pt.string())));
} else if (endpoint.isLocalSeqpacket()) {
co_await listen(asio::local::seq_packet_protocol::endpoint(pt.string()));
// asio::co_spawn(_asioContext, listen(asio::local::seq_packet_protocol::endpoint(pt.string())),
// asio::detached);
co_await listen(asio::local::seq_packet_protocol::endpoint(endpoint.path(pt.string())));
} else {
co_return; // it must not be!!!!
}
@@ -304,6 +306,7 @@ public:
acc->close(ec);
if (ec) {
_serverLogger->error("Cannot close {} acceptor! ec = '{}'", desc, ec.message());
} else {
++M;
}
++N;
@@ -324,7 +327,76 @@ public:
_serverLogger->info("The all server listening endpoints were closed!");
}
void disconnectClients() {}
void disconnectClients()
{
auto disconn_func = [this](std::ranges::input_range auto& ptrs) {
std::error_code ec;
for (auto& ptr : ptrs) {
// ptr->cancel(ec);
// if (ec) {
// _serverLogger->warn("socket_base::cancel: an error occured (ec = {})", ec.message());
// }
ptr->shutdown(asio::socket_base::shutdown_both, ec);
if (ec) {
_serverLogger->warn("socket_base::shutdown: an error occured (ec = {})", ec.message());
}
ptr->close(ec);
if (ec) {
_serverLogger->warn("socket_base::close: an error occured (ec = {})", ec.message());
}
}
};
_serverLogger->info("Close all client connections ...");
if (_serialPorts.empty() && _localStreamSockets.empty() && _localSeqpackSockets.empty() &&
_tcpSockets.empty()) {
_serverLogger->info("There were no active client connections! Skip!");
}
if (_serialPorts.size()) {
std::lock_guard lock_g(_serialPortsMutex);
std::error_code ec;
_serverLogger->info("Close serial port clients ({} in total) ...", _serialPorts.size());
for (auto& ptr : _serialPorts) {
ptr->cancel(ec);
if (ec) {
_serverLogger->warn("serial_port::cancel: an error occured (ec = {})", ec.message());
}
ptr->close(ec);
if (ec) {
_serverLogger->warn("serial_port::close: an error occured (ec = {})", ec.message());
}
}
}
if (_localStreamSockets.size()) {
std::lock_guard lock_g(_localStreamSocketsMutex);
_serverLogger->info("Close local stream socket-type clients ({} in total) ...", _localStreamSockets.size());
disconn_func(_localStreamSockets);
}
if (_localSeqpackSockets.size()) {
std::lock_guard lock_g(_localSeqpackSocketsMutex);
_serverLogger->info("Close local seqpack socket-type clients ({} in total) ...",
_localSeqpackSockets.size());
disconn_func(_localSeqpackSockets);
}
if (_tcpSockets.size()) {
std::lock_guard lock_g(_tcpSocketsMutex);
_serverLogger->info("Close TCP socket-type clients ({} in total) ...", _tcpSockets.size());
disconn_func(_tcpSockets);
}
_serverLogger->info("Client connection were closed!");
}
void daemonize()
{
@@ -390,11 +462,52 @@ public:
#endif
}
template <std::ranges::range RST = std::vector<int>, std::ranges::range RRT = std::vector<int>>
void setupSignals(const RST& stop_sig_num = {SIGINT, SIGTERM}, const RRT& restart_sig_num = {SIGUSR1})
requires(std::convertible_to<std::ranges::range_value_t<RST>, int> &&
std::convertible_to<std::ranges::range_value_t<RRT>, int>)
{
for (const int sig : stop_sig_num) {
_stopSignal.add(sig);
}
_stopSignal.async_wait([this](std::error_code, int signo) {
_serverLogger->info("Stop signal was received (signo = {})", signo);
stopListening();
disconnectClients();
_asioContext.stop();
});
for (const int sig : restart_sig_num) {
_restartSignal.add(sig);
}
_restartSignal.async_wait([this](std::error_code, int signo) {
_serverLogger->info("Restart signal was received (signo = {})", signo);
restart();
});
}
void restart()
{
disconnectClients();
_restartSignal.async_wait([this](std::error_code, int signo) {
_serverLogger->info("Restart signal was received (signo = {})", signo);
restart();
});
}
private:
asio::io_context& _asioContext;
std::shared_ptr<spdlog::logger> _serverLogger;
std::vector<asio::serial_port*> _serialPorts;
asio::signal_set _stopSignal, _restartSignal;
std::set<asio::serial_port*> _serialPorts;
// std::vector<asio::serial_port*> _serialPorts;
std::vector<asio::ip::tcp::acceptor*> _tcpAcceptors;
std::vector<asio::local::stream_protocol::acceptor*> _localStreamAcceptors;
@@ -407,6 +520,7 @@ private:
// std::vector<asio::local::stream_protocol::socket*> _localStreamSockets;
// std::vector<asio::local::seq_packet_protocol::socket*> _localSeqpackSockets;
std::mutex _serialPortsMutex, _tcpSocketsMutex, _localStreamSocketsMutex, _localSeqpackSocketsMutex;
// helpers
template <typename OptT, typename... OptTs>
@@ -463,20 +577,6 @@ private:
return found.empty() ? std::span(bytes.begin(), bytes.begin()) : std::span(bytes.begin(), found.end());
};
auto watchdog = [this](const std::chrono::steady_clock::time_point& deadline) -> asio::awaitable<void> {
// asio::steady_timer timer(_asioContext);
asio::steady_timer timer(co_await asio::this_coro::executor);
auto now = std::chrono::steady_clock::now();
while (deadline > now) {
timer.expires_at(deadline);
co_await timer.async_wait(asio::use_awaitable);
now = std::chrono::steady_clock::now();
}
throw std::system_error(std::make_error_code(std::errc::timed_out));
};
asio::streambuf sbuff;
size_t nbytes;
@@ -507,14 +607,18 @@ private:
}
if constexpr (traits::is_serial_proto<sock_t>) {
_serialPorts.emplace_back(&socket);
std::lock_guard lock_g(_serialPortsMutex);
_serialPorts.insert(&socket);
} else if constexpr (traits::is_tcp_proto<sock_t>) {
std::lock_guard lock_g(_tcpSocketsMutex);
// _tcpSockets.emplace_back(&socket);
_tcpSockets.insert(&socket);
} else if constexpr (traits::is_local_stream_proto<sock_t>) {
std::lock_guard lock_g(_localStreamSocketsMutex);
// _localStreamSockets.emplace_back(&socket);
_localStreamSockets.insert(&socket);
} else if constexpr (traits::is_local_seqpack_proto<sock_t>) {
std::lock_guard lock_g(_localSeqpackSocketsMutex);
// _localSeqpackSockets.emplace_back(&socket);
_localSeqpackSockets.insert(&socket);
} else {
@@ -665,8 +769,9 @@ private:
r_epn, thr_id);
}
// remove pointer as it is invalidated here (at the exit of the method)
if constexpr (traits::is_serial_proto<sock_t>) {
// _serialPorts.emplace_back(&socket);
_serialPorts.erase(&socket);
} else if constexpr (traits::is_tcp_proto<sock_t>) {
_tcpSockets.erase(&socket);
} else if constexpr (traits::is_local_stream_proto<sock_t>) {