This commit is contained in:
2025-09-24 18:23:17 +03:00
parent b1a48d2b77
commit f729799335
5 changed files with 203 additions and 83 deletions

View File

@@ -30,12 +30,12 @@
#include "control_proto.h"
#include "mcc_generics.h"
#include "mcc_netserver_endpoint.h"
#include "mcc_netserver_proto.h"
#include "mcc_traits.h"
namespace mcc
namespace mcc::network
{
@@ -73,6 +73,7 @@ public:
using LoggerT::logDebug;
using LoggerT::logError;
using LoggerT::logInfo;
using LoggerT::logTrace;
using LoggerT::logWarn;
static constexpr std::chrono::duration DEFAULT_RCV_TIMEOUT = std::chrono::hours(12);
@@ -84,7 +85,7 @@ public:
std::stringstream st;
st << std::this_thread::get_id();
logInfo("Create mount server instance (thread ID = {})", st.str());
logInfo(std::format("Create mount server instance (thread ID = {})", st.str()));
}
~MccNetworkServer()
@@ -92,7 +93,7 @@ public:
std::stringstream st;
st << std::this_thread::get_id();
logInfo("Delete mount server instance (thread ID = {}) ...", st.str());
logInfo(std::format("Delete mount server instance (thread ID = {}) ...", st.str()));
stopListening();
disconnectClients();
@@ -103,7 +104,8 @@ public:
asio::awaitable<void> listen(std::derived_from<MccServerEndpoint> auto endpoint, CtorArgTs&&... ctor_args)
{
if (!endpoint.isValid()) {
logError("Cannot start listening! Invalid endpoint string representation ('{}')!", endpoint.endpoint());
logError(std::format("Cannot start listening! Invalid endpoint string representation ('{}')!",
endpoint.endpoint()));
co_return;
}
@@ -124,7 +126,7 @@ public:
s_port.open(pt.string(), ec);
if (ec) {
logError("Cannot open serial device '{}' (Error = '{}')!", pt.string(), ec.message());
logError(std::format("Cannot open serial device '{}' (Error = '{}')!", pt.string(), ec.message()));
co_return;
}
@@ -157,7 +159,7 @@ public:
auto r_result = co_await res.async_resolve(endpoint.host(), endpoint.portView(), asio::use_awaitable);
logInfo("Resolve hostname <{}> to {} IP-addresses", endpoint.host(), r_result.size());
logInfo(std::format("Resolve hostname <{}> to {} IP-addresses", endpoint.host(), r_result.size()));
bool exit_flag = false;
@@ -174,7 +176,8 @@ public:
exit_flag = true;
break;
} catch (const std::system_error& err) {
logError("An error occuring while creating connection acceptor (ec = {})", err.what());
logError(
std::format("An error occuring while creating connection acceptor (ec = {})", err.what()));
continue;
}
}
@@ -185,7 +188,8 @@ public:
_tcpAcceptors.emplace_back(&acc);
logInfo("Start listening at <{}> endpoint ...", acc.local_endpoint().address().to_string());
logInfo(
std::format("Start listening at <{}> endpoint ...", acc.local_endpoint().address().to_string()));
// start accepting connections
for (;;) {
@@ -195,7 +199,8 @@ public:
}
} catch (const std::system_error& err) {
logError("An error occured while trying to start accepting connections! ec = '{}'", err.what());
logError(
std::format("An error occured while trying to start accepting connections! ec = '{}'", err.what()));
}
}
}
@@ -224,12 +229,12 @@ public:
std::stringstream st;
st << endpoint;
logDebug("Create connection acceptor for endpoint <{}> ...", st.str());
logDebug(std::format("Create connection acceptor for endpoint <{}> ...", st.str()));
auto acc = typename epn_t::protocol_type::acceptor(_asioContext, endpoint);
st.str("");
st << acc.local_endpoint();
logInfo("Start listening at <{}> endpoint ...", st.str());
logInfo(std::format("Start listening at <{}> endpoint ...", st.str()));
if constexpr (traits::is_tcp_proto<epn_t>) {
@@ -251,7 +256,8 @@ public:
} catch (const std::system_error& err) {
logError("An error occured while trying to start accepting connections! ec = '{}'", err.what());
logError(
std::format("An error occured while trying to start accepting connections! ec = '{}'", err.what()));
}
} else {
static_assert(false, "INVALID ENDPOINT!!!");
@@ -280,19 +286,19 @@ public:
std::error_code ec;
if (acc_ptrs.size()) {
logInfo("Close {} acceptors ...", desc);
logInfo(std::format("Close {} acceptors ...", desc));
for (auto& acc : acc_ptrs) {
acc->close(ec);
if (ec) {
logError("Cannot close {} acceptor! ec = '{}'", desc, ec.message());
logError(std::format("Cannot close {} acceptor! ec = '{}'", desc, ec.message()));
} else {
++M;
}
++N;
}
logDebug("{} from {} {} acceptors were closed!", M, N, desc);
logDebug(std::format("{} from {} {} acceptors were closed!", M, N, desc));
// pointers are invalidated here, so clear its container
acc_ptrs.clear();
@@ -319,12 +325,12 @@ public:
ptr->shutdown(asio::socket_base::shutdown_both, ec);
if (ec) {
logWarn("socket_base::shutdown: an error occured (ec = {})", ec.message());
logWarn(std::format("socket_base::shutdown: an error occured (ec = {})", ec.message()));
}
ptr->close(ec);
if (ec) {
logWarn("socket_base::close: an error occured (ec = {})", ec.message());
logWarn(std::format("socket_base::close: an error occured (ec = {})", ec.message()));
}
}
};
@@ -340,15 +346,15 @@ public:
std::lock_guard lock_g(_serialPortsMutex);
std::error_code ec;
logInfo("Close serial port clients ({} in total) ...", _serialPorts.size());
logInfo(std::format("Close serial port clients ({} in total) ...", _serialPorts.size()));
for (auto& ptr : _serialPorts) {
ptr->cancel(ec);
if (ec) {
logWarn("serial_port::cancel: an error occured (ec = {})", ec.message());
logWarn(std::format("serial_port::cancel: an error occured (ec = {})", ec.message()));
}
ptr->close(ec);
if (ec) {
logWarn("serial_port::close: an error occured (ec = {})", ec.message());
logWarn(std::format("serial_port::close: an error occured (ec = {})", ec.message()));
}
}
}
@@ -356,21 +362,23 @@ public:
if (_localStreamSockets.size()) {
std::lock_guard lock_g(_localStreamSocketsMutex);
logInfo("Close local stream socket-type clients ({} in total) ...", _localStreamSockets.size());
logInfo(
std::format("Close local stream socket-type clients ({} in total) ...", _localStreamSockets.size()));
disconn_func(_localStreamSockets);
}
if (_localSeqpackSockets.size()) {
std::lock_guard lock_g(_localSeqpackSocketsMutex);
logInfo("Close local seqpack socket-type clients ({} in total) ...", _localSeqpackSockets.size());
logInfo(
std::format("Close local seqpack socket-type clients ({} in total) ...", _localSeqpackSockets.size()));
disconn_func(_localSeqpackSockets);
}
if (_tcpSockets.size()) {
std::lock_guard lock_g(_tcpSocketsMutex);
logInfo("Close TCP socket-type clients ({} in total) ...", _tcpSockets.size());
logInfo(std::format("Close TCP socket-type clients ({} in total) ...", _tcpSockets.size()));
disconn_func(_tcpSockets);
}
@@ -406,13 +414,13 @@ public:
return;
}
logInfo("Try to set the daemon current path to '{}' ...", tmp_path.string());
logInfo(std::format("Try to set the daemon current path to '{}' ...", tmp_path.string()));
std::error_code ec{};
std::filesystem::current_path(tmp_path, ec);
if (!ec) {
logWarn("Cannot change current path to '{}'! Ignore!", tmp_path.string());
logWarn(std::format("Cannot change current path to '{}'! Ignore!", tmp_path.string()));
}
umask(0);
@@ -451,7 +459,7 @@ public:
}
_stopSignal.async_wait([this](std::error_code, int signo) {
logInfo("Stop signal was received (signo = {})", signo);
logInfo(std::format("Stop signal was received (signo = {})", signo));
stopListening();
disconnectClients();
@@ -464,7 +472,7 @@ public:
}
_restartSignal.async_wait([this](std::error_code, int signo) {
logInfo("Restart signal was received (signo = {})", signo);
logInfo(std::format("Restart signal was received (signo = {})", signo));
restart();
});
}
@@ -474,7 +482,7 @@ public:
disconnectClients();
_restartSignal.async_wait([this](std::error_code, int signo) {
logInfo("Restart signal was received (signo = {})", signo);
logInfo(std::format("Restart signal was received (signo = {})", signo));
restart();
});
}
@@ -522,7 +530,7 @@ private:
opt_name = "char size";
}
logError("Cannot set serial port '{}' option! Just skip!", opt_name);
logError(std::format("Cannot set serial port '{}' option! Just skip!", opt_name));
}
if constexpr (sizeof...(OptTs)) {
@@ -533,14 +541,15 @@ private:
std::vector<char> handleClientCommand(std::string_view command)
{
std::vector<char> resp{BM700::CONTROL_PROTO_STR_RESP_ACK.begin(), BM700::CONTROL_PROTO_STR_RESP_ACK.end()};
std::vector<char> resp{MCC_COMMPROTO_KEYWORD_SERVER_ACK_STR.begin(),
MCC_COMMPROTO_KEYWORD_SERVER_ACK_STR.end()};
return resp;
}
template <traits::mcc_time_duration_c RCVT = decltype(DEFAULT_RCV_TIMEOUT),
traits::mcc_time_duration_c SNDT = decltype(DEFAULT_SND_TIMEOUT)>
template <mcc::traits::mcc_time_duration_c RCVT = decltype(DEFAULT_RCV_TIMEOUT),
mcc::traits::mcc_time_duration_c SNDT = decltype(DEFAULT_SND_TIMEOUT)>
asio::awaitable<void> startSession(auto socket,
const RCVT& rcv_timeout = DEFAULT_RCV_TIMEOUT,
const SNDT& snd_timeout = DEFAULT_SND_TIMEOUT)
@@ -551,7 +560,7 @@ private:
auto look_for_whole_msg = [](auto const& bytes) {
auto found = std::ranges::search(bytes, BM700::CONTROL_PROTO_STOP_SEQ);
auto found = std::ranges::search(bytes, MCC_COMMPROTO_STOP_SEQ);
return found.empty() ? std::span(bytes.begin(), bytes.begin()) : std::span(bytes.begin(), found.end());
};
@@ -576,11 +585,11 @@ private:
r_epn = "local";
}
logInfo("Start client session: remote endpoint <{}> (session thread ID = {})", r_epn, thr_id);
logInfo(std::format("Start client session: remote endpoint <{}> (session thread ID = {})", r_epn, thr_id));
try {
if constexpr (!traits::is_serial_proto<sock_t>) {
_serverLogger->trace("Set socket option KEEP_ALIVE to TRUE");
logTrace("Set socket option KEEP_ALIVE to TRUE");
socket.set_option(asio::socket_base::keep_alive(true));
}
@@ -607,7 +616,7 @@ private:
// send buffer sequence
// initiate the second element by "stop-sequence" symbols
std::vector<asio::const_buffer> snd_buff_seq{
{}, {BM700::CONTROL_PROTO_STOP_SEQ.data(), BM700::CONTROL_PROTO_STOP_SEQ.size()}};
{}, {MCC_COMMPROTO_STOP_SEQ.data(), MCC_COMMPROTO_STOP_SEQ.size()}};
asio::steady_timer timeout_timer(_asioContext);
std::variant<size_t, std::monostate> op_res;
@@ -621,7 +630,7 @@ private:
// receive message
if (do_read) {
_serverLogger->trace("Start socket/port reading operation with timeout {} ...", rcv_timeout);
logTrace(std::format("Start socket/port reading operation with timeout {} ...", rcv_timeout));
if constexpr (traits::is_serial_proto<sock_t>) {
nbytes = 1024;
@@ -656,7 +665,7 @@ private:
} else {
nbytes = std::get<0>(op_res);
_serverLogger->trace("{} bytes were received", nbytes);
logTrace(std::format("{} bytes were received", nbytes));
if constexpr (traits::is_local_seqpack_proto<sock_t>) {
if (!nbytes) { // EOF!
@@ -672,8 +681,8 @@ private:
auto msg = look_for_whole_msg(std::span(start_ptr, sbuff.size()));
if (msg.empty()) { // still not whole message
_serverLogger->trace(
"It seems a partial command message was received, so waiting for remaining part ...");
logTrace(std::format(
"It seems a partial command message was received, so waiting for remaining part ..."));
do_read = true;
continue;
}
@@ -681,13 +690,13 @@ private:
// extract command without stop sequence symbols
// std::string comm;
// std::ranges::copy(msg | std::views::take(msg.size() - BM700::CONTROL_PROTO_STOP_SEQ.size()),
// std::ranges::copy(msg | std::views::take(msg.size() - MCC_COMMPROTO_STOP_SEQ.size()),
// std::back_inserter(comm));
std::string_view comm{msg.begin(), msg.end() - BM700::CONTROL_PROTO_STOP_SEQ.size()};
std::string_view comm{msg.begin(), msg.end() - MCC_COMMPROTO_STOP_SEQ.size()};
logDebug("A command [{}] was received from client (remote endpoint <{}>, thread ID = {})", comm, r_epn,
thr_id);
logDebug(std::format("A command [{}] was received from client (remote endpoint <{}>, thread ID = {})",
comm, r_epn, thr_id));
auto resp = handleClientCommand(comm);
@@ -696,8 +705,8 @@ private:
do_read = sbuff.size() == 0;
logDebug("Send respond [{}] to client (remote endpoint <{}>, thread ID = {})",
std::string_view(resp.begin(), resp.end()), r_epn, thr_id);
logDebug(std::format("Send respond [{}] to client (remote endpoint <{}>, thread ID = {})",
std::string_view(resp.begin(), resp.end()), r_epn, thr_id));
// send server respond to client
snd_buff_seq[0] = {resp.data(), resp.size()};
@@ -722,27 +731,29 @@ private:
throw std::system_error(std::make_error_code(std::errc::timed_out));
} else {
nbytes = std::get<0>(op_res);
_serverLogger->trace("{} bytes were sent", nbytes);
logTrace(std::format("{} bytes were sent", nbytes));
}
if (nbytes != (resp.size() + BM700::CONTROL_PROTO_STOP_SEQ.size())) { // !!!!!!!!!!
if (nbytes != (resp.size() + MCC_COMMPROTO_STOP_SEQ.size())) { // !!!!!!!!!!
}
}
} catch (const std::system_error& ex) {
if (ex.code() == std::error_code(asio::error::misc_errors::eof)) {
logInfo("It seems client or server closed the connection (remote endpoint <{}>, thread ID = {})", r_epn,
thr_id);
logInfo(std::format(
"It seems client or server closed the connection (remote endpoint <{}>, thread ID = {})", r_epn,
thr_id));
} else {
logError("An error '{}' occured in client session (remote endpoint <{}>, thread ID = {})", ex.what(),
r_epn, thr_id);
logError(std::format("An error '{}' occured in client session (remote endpoint <{}>, thread ID = {})",
ex.what(), r_epn, thr_id));
}
} catch (const std::exception& ex) {
logError("An unhandled error '{}' occured in client sesssion (remote endpoint <{}>, thread ID = {})",
ex.what(), r_epn, thr_id);
logError(
std::format("An unhandled error '{}' occured in client sesssion (remote endpoint <{}>, thread ID = {})",
ex.what(), r_epn, thr_id));
} catch (...) {
logError("An unhandled error occured in client sesssion (remote endpoint <{}>, thread ID = {})", r_epn,
thr_id);
logError(std::format("An unhandled error occured in client sesssion (remote endpoint <{}>, thread ID = {})",
r_epn, thr_id));
}
// remove pointer as it is invalidated here (at the exit of the method)
@@ -758,8 +769,8 @@ private:
static_assert(false, "INVALID SOCKET TTYPE!!!");
}
logInfo("Close client session: remote endpoint <{}> (thread ID = {})", r_epn, thr_id);
logInfo(std::format("Close client session: remote endpoint <{}> (thread ID = {})", r_epn, thr_id));
}
};
} // namespace mcc
} // namespace mcc::network