mountcontrol/cxx/comm_server.h
2025-02-20 18:29:09 +03:00

512 lines
20 KiB
C++

#pragma once
#include <filesystem>
#include <asio/awaitable.hpp>
#include <asio/co_spawn.hpp>
#include <asio/deferred.hpp>
#include <asio/detached.hpp>
#include <asio/experimental/awaitable_operators.hpp>
#include <asio/ip/tcp.hpp>
#include <asio/local/seq_packet_protocol.hpp>
#include <asio/local/stream_protocol.hpp>
#include <asio/read.hpp>
#include <asio/serial_port.hpp>
#include <asio/steady_timer.hpp>
#include <asio/streambuf.hpp>
#include <asio/write.hpp>
#include <spdlog/sinks/null_sink.h>
#include <spdlog/spdlog.h>
#include "comm_server_endpoint.h"
#include "control_proto.h"
#include "mount.h"
namespace mcc
{
namespace traits
{
template <typename T>
concept mcc_endpoint_c = std::derived_from<T, asio::serial_port> || std::derived_from<T, asio::ip::tcp::endpoint> ||
std::derived_from<T, asio::local::stream_protocol::endpoint> ||
std::derived_from<T, asio::local::seq_packet_protocol::endpoint>;
template <typename T>
static constexpr bool is_serial_proto = std::derived_from<T, asio::serial_port>;
template <typename T>
static constexpr bool is_tcp_proto = std::derived_from<typename T::protocol_type, asio::ip::tcp>;
template <typename T>
static constexpr bool is_local_stream_proto =
std::derived_from<typename T::protocol_type, asio::local::stream_protocol>;
template <typename T>
static constexpr bool is_local_seqpack_proto =
std::derived_from<typename T::protocol_type, asio::local::seq_packet_protocol>;
// template <typename T>
// static constexpr bool is_tcp_proto = std::derived_from<T, asio::ip::tcp::endpoint>;
// template <typename T>
// static constexpr bool is_local_stream_proto = std::derived_from<T, asio::local::stream_protocol::endpoint>;
// template <typename T>
// static constexpr bool is_local_seqpack_proto = std::derived_from<T, asio::local::seq_packet_protocol::endpoint>;
template <typename T>
concept mcc_time_duration_c = requires {
[]<class RT, class PT>(std::type_identity<std::chrono::duration<RT, PT>>) {
}(std::type_identity<std::remove_cvref_t<T>>());
};
} // namespace traits
class MccMountServer
{
public:
static constexpr std::chrono::duration DEFAULT_RCV_TIMEOUT = std::chrono::hours(12);
static constexpr std::chrono::duration DEFAULT_SND_TIMEOUT = std::chrono::milliseconds(2000);
MccMountServer(asio::io_context& ctx, std::shared_ptr<spdlog::logger> logger = spdlog::null_logger_mt("NULL"))
: _asioContext(ctx), _serverLogger(std::move(logger))
{
}
~MccMountServer() {}
template <typename... CtorArgTs>
asio::awaitable<void> listen(std::derived_from<MccServerEndpoint> auto endpoint, CtorArgTs&&... ctor_args)
{
if (!endpoint.isValid()) {
_serverLogger->error("Cannot start listening! Invalid endpoint string representation ('{}')!",
endpoint.endpoint());
co_return;
}
// add root path to endpoint one
std::filesystem::path pt("/");
pt += endpoint.path();
if (endpoint.isLocalSerial()) {
asio::serial_port s_port(_asioContext);
std::error_code ec;
if constexpr (sizeof...(CtorArgTs)) { // options
setSerialOpts(s_port, std::forward<CtorArgTs>(ctor_args)...);
}
s_port.open(pt.string(), ec);
if (ec) {
_serverLogger->error("Cannot open serial device '{}' (Error = '{}')!", pt.string(), ec.message());
co_return;
}
// asio::co_spawn(_asioContext, listen(std::move(s_port)), asio::detached);
co_await listen(std::move(s_port));
} else if (endpoint.isLocal()) {
// create abstract namespace socket endpoint if its path starts from '@' symbol
endpoint.makeAbstract('@');
if (endpoint.isLocalStream()) {
co_await listen(asio::local::stream_protocol::endpoint(pt.string()));
// asio::co_spawn(_asioContext, listen(asio::local::stream_protocol::endpoint(pt.string())),
// asio::detached);
} else if (endpoint.isLocalSeqpacket()) {
co_await listen(asio::local::seq_packet_protocol::endpoint(pt.string()));
// asio::co_spawn(_asioContext, listen(asio::local::seq_packet_protocol::endpoint(pt.string())),
// asio::detached);
} else {
co_return; // it must not be!!!!
}
} else if (endpoint.isTCP()) {
// resolve hostname
try {
asio::ip::tcp::resolver res(_asioContext);
auto r_result = co_await res.async_resolve(endpoint.host(), endpoint.portView(), asio::use_awaitable);
_serverLogger->info("Resolve hostname <{}> to {} IP-addresses", endpoint.host(), r_result.size());
bool exit_flag = false;
asio::ip::tcp::acceptor acc(_asioContext);
for (auto const& epn : r_result) {
try {
// std::stringstream st;
_serverLogger->debug("Create connection acceptor for endpoint <{}> ...",
epn.address().to_string());
acc = asio::ip::tcp::acceptor(_asioContext, epn);
// st << acc.local_endpoint();
exit_flag = true;
break;
} catch (const std::system_error& err) {
_serverLogger->error("An error occuring while creating connection acceptor (ec = {})",
err.what());
continue;
}
}
if (!exit_flag) {
_serverLogger->error("Cannot start listening on any resolved endpoints!");
co_return;
}
_tcpAcceptors.emplace_back(&acc);
_serverLogger->info("Start listening at <{}> endpoint ...", acc.local_endpoint().address().to_string());
// start accepting connections
for (;;) {
auto sock = co_await acc.async_accept(asio::use_awaitable);
// start new client session
asio::co_spawn(_asioContext, startSession(std::move(sock)), asio::detached);
}
} catch (const std::system_error& err) {
_serverLogger->error("An error occured while trying to start accepting connections! ec = '{}'",
err.what());
}
}
}
template <traits::mcc_endpoint_c EpnT>
asio::awaitable<void> listen(EpnT endpoint)
{
using epn_t = std::decay_t<decltype(endpoint)>;
std::error_code ec;
if constexpr (traits::is_serial_proto<epn_t>) {
// first, check if port is open
if (!endpoint.is_open()) {
if (ec) {
// ??????????
_serverLogger->error("Serial port was not open! Do not start waiting for commands!");
}
} else {
asio::co_spawn(_asioContext, startSession(std::move(endpoint)), asio::detached);
}
} else if constexpr (traits::is_tcp_proto<epn_t> || traits::is_local_stream_proto<epn_t> ||
traits::is_local_seqpack_proto<epn_t>) {
try {
std::stringstream st;
st << endpoint;
_serverLogger->debug("Create connection acceptor for endpoint <{}> ...", st.str());
auto acc = epn_t::protocol_type::acceptor(_asioContext, endpoint);
st.str("");
st << acc.local_endpoint();
_serverLogger->info("Start listening at <{}> endpoint ...", st.str());
if constexpr (traits::is_tcp_proto<epn_t>) {
_tcpAcceptors.emplace_back(&acc);
} else if constexpr (traits::is_local_stream_proto<epn_t>) {
_localStreamAcceptors.emplace_back(&acc);
} else if constexpr (traits::is_local_seqpack_proto<epn_t>) {
_localSeqpackAcceptors.emplace_back(&acc);
} else {
static_assert(false, "INVALID ENDPOINT!!!");
}
// start accepting connections
for (;;) {
auto sock = co_await acc.async_accept(asio::use_awaitable);
// start new client session
asio::co_spawn(_asioContext, startSession(std::move(sock)), asio::detached);
}
} catch (const std::system_error& err) {
_serverLogger->error("An error occured while trying to start accepting connections! ec = '{}'",
err.what());
}
} else {
static_assert(false, "INVALID ENDPOINT!!!");
}
}
// close listening on all endpoints
void stopListening()
{
std::error_code ec;
size_t N = 0, M = 0;
_serverLogger->info("Close all listening endpoints ...");
auto num =
_serialPorts.size() + _tcpAcceptors.size() + _localStreamAcceptors.size() + _localSeqpackAcceptors.size();
if (!num) {
_serverLogger->info("There are no listening ports/sockets!");
return;
}
auto close_func = [this](auto& acc_ptrs, std::string_view desc) {
size_t N = 0, M = 0;
std::error_code ec;
if (acc_ptrs.size()) {
_serverLogger->info("Close {} acceptors ...", desc);
for (auto& acc : acc_ptrs) {
acc->close(ec);
if (ec) {
_serverLogger->error("Cannot close {} acceptor! ec = '{}'", desc, ec.message());
++M;
}
++N;
}
_serverLogger->debug("{} from {} {} acceptors were closed!", M, N, desc);
// pointers are invalidated here, so clear its container
acc_ptrs.clear();
}
};
close_func(_tcpAcceptors, "TCP socket");
close_func(_localStreamAcceptors, "local stream socket");
close_func(_localSeqpackAcceptors, "local seqpack socket");
_serverLogger->info("The all server listening endpoints were closed!");
}
void disconnectClients() {}
private:
asio::io_context& _asioContext;
std::shared_ptr<spdlog::logger> _serverLogger;
std::vector<asio::serial_port*> _serialPorts;
std::vector<asio::ip::tcp::acceptor*> _tcpAcceptors;
std::vector<asio::local::stream_protocol::acceptor*> _localStreamAcceptors;
std::vector<asio::local::seq_packet_protocol::acceptor*> _localSeqpackAcceptors;
std::vector<asio::ip::tcp::socket*> _tcpSockets;
std::vector<asio::local::stream_protocol::socket*> _localStreamSockets;
std::vector<asio::local::seq_packet_protocol::socket*> _localSeqpackSockets;
// helpers
template <typename OptT, typename... OptTs>
void setSerialOpts(asio::serial_port& s_port, OptT&& opt, OptTs&&... opts)
{
std::error_code ec;
s_port.set_option(opt, ec);
if (ec) {
std::string_view opt_name;
if constexpr (std::same_as<OptT, asio::serial_port::baud_rate>) {
opt_name = "baud rate";
} else if constexpr (std::same_as<OptT, asio::serial_port::parity>) {
opt_name = "parity";
} else if constexpr (std::same_as<OptT, asio::serial_port::flow_control>) {
opt_name = "flow control";
} else if constexpr (std::same_as<OptT, asio::serial_port::stop_bits>) {
opt_name = "stop bits";
} else if constexpr (std::same_as<OptT, asio::serial_port::character_size>) {
opt_name = "char size";
}
_serverLogger->error("Cannot set serial port '{}' option! Just skip!", opt_name);
}
if constexpr (sizeof...(OptTs)) {
setSerialOpts(s_port, std::forward<OptTs>(opts)...);
}
}
std::vector<char> handleClientCommand(std::string_view command)
{
std::vector<char> resp{BM700::CONTROL_PROTO_STR_RESP_ACK.begin(), BM700::CONTROL_PROTO_STR_RESP_ACK.end()};
return resp;
}
template <traits::mcc_time_duration_c RCVT, traits::mcc_time_duration_c SNDT>
asio::awaitable<void> startSession(auto socket,
RCVT&& rcv_timeout = DEFAULT_RCV_TIMEOUT,
SNDT&& snd_timeout = DEFAULT_SND_TIMEOUT)
{
using namespace asio::experimental::awaitable_operators;
using sock_t = std::decay_t<decltype(socket)>;
auto look_for_whole_msg = [](auto const& bytes) {
auto found = std::ranges::search(bytes, BM700::CONTROL_PROTO_STOP_SEQ);
return found.empty() ? std::span(bytes.begin(), bytes.begin()) : std::span(bytes.begin(), found.end());
};
auto watchdog = [this](const std::chrono::steady_clock::time_point& deadline) -> asio::awaitable<void> {
asio::steady_timer timer(_asioContext);
auto now = std::chrono::steady_clock::now();
while (deadline > now) {
timer.expires_at(deadline);
co_await timer.async_wait(asio::use_awaitable);
now = std::chrono::steady_clock::now();
}
throw std::system_error(std::make_error_code(std::errc::timed_out));
};
asio::streambuf sbuff;
size_t nbytes;
std::stringstream st;
std::string r_epn;
st << std::this_thread::get_id();
std::string thr_id = st.str();
st.str("");
if constexpr (traits::is_serial_proto<sock_t>) {
st << "serial port: " << socket.native_handle();
} else { // network sockets
st << socket.remote_endpoint();
}
r_epn = st.str();
if (r_epn.empty()) { // UNIX domain sockets
r_epn = "local";
}
_serverLogger->info("Start client session: remote endpoint <{}> (session thread ID = {})", r_epn, thr_id);
try {
if constexpr (traits::is_serial_proto<sock_t>) {
_serialPorts.emplace_back(&socket);
} else if constexpr (traits::is_tcp_proto<sock_t>) {
_tcpSockets.emplace_back(&socket);
} else if constexpr (traits::is_local_stream_proto<sock_t>) {
_localStreamSockets.emplace_back(&socket);
} else if constexpr (traits::is_local_seqpack_proto<sock_t>) {
_localSeqpackSockets.emplace_back(&socket);
} else {
static_assert(false, "INVALID SOCKET TTYPE!!!");
}
// send buffer sequence
// initiate the second element by "stop-sequence" symbols
std::vector<asio::const_buffer> snd_buff_seq{
{}, {BM700::CONTROL_PROTO_STOP_SEQ.data(), BM700::CONTROL_PROTO_STOP_SEQ.size()}};
// main client request -- server respond cycle
for (;;) {
// receive message
if constexpr (traits::is_serial_proto<sock_t>) {
nbytes = 1024;
} else {
nbytes = socket.available();
}
auto buff = sbuff.prepare(nbytes ? nbytes : 1);
if constexpr (traits::is_local_seqpack_proto<sock_t>) {
asio::socket_base::message_flags oflags;
// nbytes = co_await socket.async_receive(buff, &oflags, asio::use_awaitable);
nbytes = co_await (socket.async_receive(buff, &oflags, asio::use_awaitable) &&
watchdog(std::chrono::steady_clock::now() + rcv_timeout));
if (!nbytes) { // EOF!
// _serverLogger->info("It seems client ({}) closed the connection!", st.str());
throw std::system_error(std::error_code(asio::error::misc_errors::eof));
// co_return;
}
} else {
// nbytes = co_await asio::async_read(socket, buff, asio::transfer_at_least(1),
// asio::use_awaitable);
nbytes =
co_await (asio::async_read(socket, buff, asio::transfer_at_least(1), asio::use_awaitable) &&
watchdog(std::chrono::steady_clock::now() + rcv_timeout));
}
sbuff.commit(nbytes);
auto start_ptr = static_cast<const char*>(sbuff.data().data());
auto msg = look_for_whole_msg(std::span(start_ptr, sbuff.size()));
if (msg.empty()) { // still not whole message
continue;
}
// extract command without stop sequence symbols
// std::string comm;
// std::ranges::copy(msg | std::views::take(msg.size() - BM700::CONTROL_PROTO_STOP_SEQ.size()),
// std::back_inserter(comm));
std::string_view comm{msg.begin(), msg.end() - BM700::CONTROL_PROTO_STOP_SEQ.size()};
_serverLogger->debug("A command [{}] was received from client (remote endpoint <{}>, thread ID = {})",
comm, r_epn, thr_id);
auto resp = handleClientCommand(comm);
_serverLogger->debug("Send respond [{}] to client (remote endpoint <{}>, thread ID = {})",
std::string_view(resp.begin(), resp.end()), r_epn, thr_id);
// send server respond to client
snd_buff_seq[0] = {resp.data(), resp.size()};
if constexpr (traits::is_local_seqpack_proto<sock_t>) {
nbytes = co_await (socket.async_send(snd_buff_seq, 0, asio::use_awaitable) &&
watchdog(std::chrono::steady_clock::now() + snd_timeout));
} else {
nbytes = co_await (asio::async_write(socket, snd_buff_seq, asio::use_awaitable) &&
watchdog(std::chrono::steady_clock::now() + snd_timeout));
}
if (nbytes != (resp.size() + BM700::CONTROL_PROTO_STOP_SEQ.size())) { // !!!!!!!!!!
}
}
} catch (const std::system_error& ex) {
if (ex.code() == std::error_code(asio::error::misc_errors::eof)) {
_serverLogger->info(
"It seems client or server closed the connection (remote endpoint <{}>, thread ID = {})", r_epn,
thr_id);
} else {
_serverLogger->error("An error '{}' occured in client session (remote endpoint <{}>, thread ID = {})",
ex.what(), r_epn, thr_id);
}
} catch (const std::exception& ex) {
_serverLogger->error(
"An unhandled error '{}' occured in client sesssion (remote endpoint <{}>, thread ID = {})", ex.what(),
r_epn, thr_id);
} catch (...) {
_serverLogger->error("An unhandled error occured in client sesssion (remote endpoint <{}>, thread ID = {})",
r_epn, thr_id);
}
_serverLogger->info("Close client session: remote endpoint <{}> (thread ID = {})", r_epn, thr_id);
}
};
} // namespace mcc