#pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #if __has_include() // POSIX #define FORK_EXISTS 1 #include #include #include #endif #include "comm_server_endpoint.h" #include "control_proto.h" #include "mount.h" namespace mcc { namespace traits { template concept mcc_endpoint_c = std::derived_from || std::derived_from || std::derived_from || std::derived_from; template static constexpr bool is_serial_proto = std::derived_from; // template // static constexpr bool is_tcp_proto = std::derived_from; // template // static constexpr bool is_local_stream_proto = // std::derived_from; // template // static constexpr bool is_local_seqpack_proto = // std::derived_from; template static constexpr bool is_tcp_proto = std::derived_from || std::derived_from; template static constexpr bool is_local_stream_proto = std::derived_from || std::derived_from; template static constexpr bool is_local_seqpack_proto = std::derived_from || std::derived_from; template concept mcc_time_duration_c = requires { [](std::type_identity>) { }(std::type_identity>()); }; } // namespace traits class MccMountServer { public: static constexpr std::chrono::duration DEFAULT_RCV_TIMEOUT = std::chrono::hours(12); static constexpr std::chrono::duration DEFAULT_SND_TIMEOUT = std::chrono::milliseconds(2000); MccMountServer(asio::io_context& ctx, std::shared_ptr logger = spdlog::null_logger_mt("NULL")) : _asioContext(ctx), _serverLogger(std::move(logger)) { std::stringstream st; st << std::this_thread::get_id(); _serverLogger->info("Create mount server instance (thread ID = {})", st.str()); } ~MccMountServer() { std::stringstream st; st << std::this_thread::get_id(); _serverLogger->info("Delete mount server instance (thread ID = {})", st.str()); } template asio::awaitable listen(std::derived_from auto endpoint, CtorArgTs&&... ctor_args) { if (!endpoint.isValid()) { _serverLogger->error("Cannot start listening! Invalid endpoint string representation ('{}')!", endpoint.endpoint()); co_return; } // add root path to endpoint one std::filesystem::path pt("/"); if (endpoint.isLocalSerial()) { pt += endpoint.path(); asio::serial_port s_port(_asioContext); std::error_code ec; if constexpr (sizeof...(CtorArgTs)) { // options setSerialOpts(s_port, std::forward(ctor_args)...); } s_port.open(pt.string(), ec); if (ec) { _serverLogger->error("Cannot open serial device '{}' (Error = '{}')!", pt.string(), ec.message()); co_return; } // asio::co_spawn(_asioContext, listen(std::move(s_port)), asio::detached); co_await listen(std::move(s_port)); } else if (endpoint.isLocal()) { // create abstract namespace socket endpoint if its path starts from '@' symbol endpoint.makeAbstract('@'); if (endpoint.path()[0] == '\0') { // abstract namespace std::string p; std::ranges::copy(endpoint.path(), std::back_inserter(p)); p.insert(p.begin() + 1, '/'); // insert after '\0' symbol pt = p; } else { pt += endpoint.path(); } if (endpoint.isLocalStream()) { co_await listen(asio::local::stream_protocol::endpoint(pt.string())); // asio::co_spawn(_asioContext, listen(asio::local::stream_protocol::endpoint(pt.string())), // asio::detached); } else if (endpoint.isLocalSeqpacket()) { co_await listen(asio::local::seq_packet_protocol::endpoint(pt.string())); // asio::co_spawn(_asioContext, listen(asio::local::seq_packet_protocol::endpoint(pt.string())), // asio::detached); } else { co_return; // it must not be!!!! } } else if (endpoint.isTCP()) { // resolve hostname try { asio::ip::tcp::resolver res(_asioContext); auto r_result = co_await res.async_resolve(endpoint.host(), endpoint.portView(), asio::use_awaitable); _serverLogger->info("Resolve hostname <{}> to {} IP-addresses", endpoint.host(), r_result.size()); bool exit_flag = false; asio::ip::tcp::acceptor acc(_asioContext); for (auto const& epn : r_result) { try { // std::stringstream st; // _serverLogger->debug("Create connection acceptor for endpoint <{}> ...", // epn.address().to_string()); acc = asio::ip::tcp::acceptor(_asioContext, epn); // st << acc.local_endpoint(); exit_flag = true; break; } catch (const std::system_error& err) { _serverLogger->error("An error occuring while creating connection acceptor (ec = {})", err.what()); continue; } } if (!exit_flag) { _serverLogger->error("Cannot start listening on any resolved endpoints!"); co_return; } _tcpAcceptors.emplace_back(&acc); _serverLogger->info("Start listening at <{}> endpoint ...", acc.local_endpoint().address().to_string()); // start accepting connections for (;;) { auto sock = co_await acc.async_accept(asio::use_awaitable); // start new client session asio::co_spawn(_asioContext, startSession(std::move(sock)), asio::detached); } } catch (const std::system_error& err) { _serverLogger->error("An error occured while trying to start accepting connections! ec = '{}'", err.what()); } } } template asio::awaitable listen(EpnT endpoint) { using epn_t = std::decay_t; std::error_code ec; if constexpr (traits::is_serial_proto) { // first, check if port is open if (!endpoint.is_open()) { if (ec) { // ?????????? _serverLogger->error("Serial port was not open! Do not start waiting for commands!"); } } else { asio::co_spawn(_asioContext, startSession(std::move(endpoint)), asio::detached); } } else if constexpr (traits::is_tcp_proto || traits::is_local_stream_proto || traits::is_local_seqpack_proto) { try { std::stringstream st; st << endpoint; _serverLogger->debug("Create connection acceptor for endpoint <{}> ...", st.str()); auto acc = typename epn_t::protocol_type::acceptor(_asioContext, endpoint); st.str(""); st << acc.local_endpoint(); _serverLogger->info("Start listening at <{}> endpoint ...", st.str()); if constexpr (traits::is_tcp_proto) { _tcpAcceptors.emplace_back(&acc); } else if constexpr (traits::is_local_stream_proto) { _localStreamAcceptors.emplace_back(&acc); } else if constexpr (traits::is_local_seqpack_proto) { _localSeqpackAcceptors.emplace_back(&acc); } else { static_assert(false, "INVALID ENDPOINT!!!"); } // start accepting connections for (;;) { auto sock = co_await acc.async_accept(asio::use_awaitable); // start new client session asio::co_spawn(_asioContext, startSession(std::move(sock)), asio::detached); } } catch (const std::system_error& err) { _serverLogger->error("An error occured while trying to start accepting connections! ec = '{}'", err.what()); } } else { static_assert(false, "INVALID ENDPOINT!!!"); } co_return; } // close listening on all endpoints void stopListening() { std::error_code ec; _serverLogger->info("Close all listening endpoints ..."); auto num = _serialPorts.size() + _tcpAcceptors.size() + _localStreamAcceptors.size() + _localSeqpackAcceptors.size(); if (!num) { _serverLogger->info("There are no listening ports/sockets!"); return; } auto close_func = [this](auto& acc_ptrs, std::string_view desc) { size_t N = 0, M = 0; std::error_code ec; if (acc_ptrs.size()) { _serverLogger->info("Close {} acceptors ...", desc); for (auto& acc : acc_ptrs) { acc->close(ec); if (ec) { _serverLogger->error("Cannot close {} acceptor! ec = '{}'", desc, ec.message()); ++M; } ++N; } _serverLogger->debug("{} from {} {} acceptors were closed!", M, N, desc); // pointers are invalidated here, so clear its container acc_ptrs.clear(); } }; close_func(_tcpAcceptors, "TCP socket"); close_func(_localStreamAcceptors, "local stream socket"); close_func(_localSeqpackAcceptors, "local seqpack socket"); _serverLogger->info("The all server listening endpoints were closed!"); } void disconnectClients() {} void daemonize() { #ifdef FORK_EXISTS _serverLogger->info("Daemonize the server ..."); _asioContext.notify_fork(asio::execution_context::fork_prepare); auto tmp_path = std::filesystem::temp_directory_path(); if (tmp_path.empty()) { tmp_path = std::filesystem::current_path().root_path(); } if (pid_t pid = fork()) { if (pid > 0) { exit(0); } else { // throw std::system_error(errno, std::generic_category(), "CANNOT FORK 1-STAGE"); _serverLogger->error("CANNOT FORK 1-STAGE! The server was not daemonized!"); return; } } if (setsid() == -1) { // throw std::system_error(errno, std::generic_category(), "CANNOT FORK SETSID"); _serverLogger->error("CANNOT FORK SETSID! The server was not daemonized!"); return; } _serverLogger->info("Try to set the daemon current path to '{}' ...", tmp_path.string()); std::error_code ec{}; std::filesystem::current_path(tmp_path, ec); if (!ec) { _serverLogger->warn("Cannot change current path to '{}'! Ignore!", tmp_path.string()); } umask(0); if (pid_t pid = fork()) { if (pid > 0) { exit(0); } else { // throw std::system_error(errno, std::generic_category(), "CANNOT FORK 2-STAGE"); _serverLogger->error("CANNOT FORK 2-STAGE! The server was not daemonized!"); return; } } // stdin, stdout, stderr close(0); close(1); close(2); _asioContext.notify_fork(asio::io_context::fork_child); _serverLogger->info("The server was daemonized successfully!"); #else _serverLogger->warn("Host platform is not POSIX one, so cannot daemonize the server!"); #endif } private: asio::io_context& _asioContext; std::shared_ptr _serverLogger; std::vector _serialPorts; std::vector _tcpAcceptors; std::vector _localStreamAcceptors; std::vector _localSeqpackAcceptors; std::set _tcpSockets; std::set _localStreamSockets; std::set _localSeqpackSockets; // std::vector _tcpSockets; // std::vector _localStreamSockets; // std::vector _localSeqpackSockets; // helpers template void setSerialOpts(asio::serial_port& s_port, OptT&& opt, OptTs&&... opts) { std::error_code ec; s_port.set_option(opt, ec); if (ec) { std::string_view opt_name; if constexpr (std::same_as) { opt_name = "baud rate"; } else if constexpr (std::same_as) { opt_name = "parity"; } else if constexpr (std::same_as) { opt_name = "flow control"; } else if constexpr (std::same_as) { opt_name = "stop bits"; } else if constexpr (std::same_as) { opt_name = "char size"; } _serverLogger->error("Cannot set serial port '{}' option! Just skip!", opt_name); } if constexpr (sizeof...(OptTs)) { setSerialOpts(s_port, std::forward(opts)...); } } std::vector handleClientCommand(std::string_view command) { std::vector resp{BM700::CONTROL_PROTO_STR_RESP_ACK.begin(), BM700::CONTROL_PROTO_STR_RESP_ACK.end()}; return resp; } template asio::awaitable startSession(auto socket, const RCVT& rcv_timeout = DEFAULT_RCV_TIMEOUT, const SNDT& snd_timeout = DEFAULT_SND_TIMEOUT) { using namespace asio::experimental::awaitable_operators; using sock_t = std::decay_t; auto look_for_whole_msg = [](auto const& bytes) { auto found = std::ranges::search(bytes, BM700::CONTROL_PROTO_STOP_SEQ); return found.empty() ? std::span(bytes.begin(), bytes.begin()) : std::span(bytes.begin(), found.end()); }; auto watchdog = [this](const std::chrono::steady_clock::time_point& deadline) -> asio::awaitable { // asio::steady_timer timer(_asioContext); asio::steady_timer timer(co_await asio::this_coro::executor); auto now = std::chrono::steady_clock::now(); while (deadline > now) { timer.expires_at(deadline); co_await timer.async_wait(asio::use_awaitable); now = std::chrono::steady_clock::now(); } throw std::system_error(std::make_error_code(std::errc::timed_out)); }; asio::streambuf sbuff; size_t nbytes; std::stringstream st; std::string r_epn; st << std::this_thread::get_id(); std::string thr_id = st.str(); st.str(""); if constexpr (traits::is_serial_proto) { st << "serial port: " << socket.native_handle(); } else { // network sockets st << socket.remote_endpoint(); } r_epn = st.str(); if (r_epn.empty()) { // UNIX domain sockets r_epn = "local"; } _serverLogger->info("Start client session: remote endpoint <{}> (session thread ID = {})", r_epn, thr_id); try { if constexpr (!traits::is_serial_proto) { _serverLogger->trace("Set socket option KEEP_ALIVE to TRUE"); socket.set_option(asio::socket_base::keep_alive(true)); } if constexpr (traits::is_serial_proto) { _serialPorts.emplace_back(&socket); } else if constexpr (traits::is_tcp_proto) { // _tcpSockets.emplace_back(&socket); _tcpSockets.insert(&socket); } else if constexpr (traits::is_local_stream_proto) { // _localStreamSockets.emplace_back(&socket); _localStreamSockets.insert(&socket); } else if constexpr (traits::is_local_seqpack_proto) { // _localSeqpackSockets.emplace_back(&socket); _localSeqpackSockets.insert(&socket); } else { static_assert(false, "INVALID SOCKET TTYPE!!!"); } // send buffer sequence // initiate the second element by "stop-sequence" symbols std::vector snd_buff_seq{ {}, {BM700::CONTROL_PROTO_STOP_SEQ.data(), BM700::CONTROL_PROTO_STOP_SEQ.size()}}; asio::steady_timer timeout_timer(_asioContext); std::variant op_res; std::error_code ec; bool do_read = true; // main client request -- server respond cycle for (;;) { // receive message if (do_read) { _serverLogger->trace("Start socket/port reading operation with timeout {} ...", rcv_timeout); if constexpr (traits::is_serial_proto) { nbytes = 1024; } else { nbytes = socket.available(); } auto buff = sbuff.prepare(nbytes ? nbytes : 1); // timeout_timer.expires_after(std::chrono::seconds(5)); timeout_timer.expires_after(rcv_timeout); if constexpr (traits::is_local_seqpack_proto) { asio::socket_base::message_flags oflags; op_res = co_await ( socket.async_receive(buff, oflags, asio::redirect_error(asio::use_awaitable, ec)) || timeout_timer.async_wait(asio::use_awaitable)); } else { op_res = co_await (asio::async_read(socket, buff, asio::transfer_at_least(1), asio::redirect_error(asio::use_awaitable, ec)) || timeout_timer.async_wait(asio::use_awaitable)); } if (ec) { throw std::system_error(ec); } if (op_res.index()) { throw std::system_error(std::make_error_code(std::errc::timed_out)); } else { nbytes = std::get<0>(op_res); _serverLogger->trace("{} bytes were received", nbytes); if constexpr (traits::is_local_seqpack_proto) { if (!nbytes) { // EOF! throw std::system_error(std::error_code(asio::error::misc_errors::eof)); } } } sbuff.commit(nbytes); } // here, the input stream buffer still contains remaining bytes. try to handle its auto start_ptr = static_cast(sbuff.data().data()); auto msg = look_for_whole_msg(std::span(start_ptr, sbuff.size())); if (msg.empty()) { // still not whole message _serverLogger->trace( "It seems a partial command message was received, so waiting for remaining part ..."); do_read = true; continue; } // extract command without stop sequence symbols // std::string comm; // std::ranges::copy(msg | std::views::take(msg.size() - BM700::CONTROL_PROTO_STOP_SEQ.size()), // std::back_inserter(comm)); std::string_view comm{msg.begin(), msg.end() - BM700::CONTROL_PROTO_STOP_SEQ.size()}; _serverLogger->debug("A command [{}] was received from client (remote endpoint <{}>, thread ID = {})", comm, r_epn, thr_id); auto resp = handleClientCommand(comm); // remove received message from the input stream buffer. NOTE: 'msg' is now invalidated!!! sbuff.consume(msg.size()); do_read = sbuff.size() == 0; _serverLogger->debug("Send respond [{}] to client (remote endpoint <{}>, thread ID = {})", std::string_view(resp.begin(), resp.end()), r_epn, thr_id); // send server respond to client snd_buff_seq[0] = {resp.data(), resp.size()}; timeout_timer.expires_after(snd_timeout); if constexpr (traits::is_local_seqpack_proto) { op_res = co_await (socket.async_send(snd_buff_seq, 0, asio::redirect_error(asio::use_awaitable, ec)) || timeout_timer.async_wait(asio::use_awaitable)); } else { // nbytes = co_await asio::async_write(socket, snd_buff_seq, asio::use_awaitable); op_res = co_await ( asio::async_write(socket, snd_buff_seq, asio::redirect_error(asio::use_awaitable, ec)) || timeout_timer.async_wait(asio::use_awaitable)); } if (ec) { throw std::system_error(ec); } if (op_res.index()) { throw std::system_error(std::make_error_code(std::errc::timed_out)); } else { nbytes = std::get<0>(op_res); _serverLogger->trace("{} bytes were sent", nbytes); } if (nbytes != (resp.size() + BM700::CONTROL_PROTO_STOP_SEQ.size())) { // !!!!!!!!!! } } } catch (const std::system_error& ex) { if (ex.code() == std::error_code(asio::error::misc_errors::eof)) { _serverLogger->info( "It seems client or server closed the connection (remote endpoint <{}>, thread ID = {})", r_epn, thr_id); } else { _serverLogger->error("An error '{}' occured in client session (remote endpoint <{}>, thread ID = {})", ex.what(), r_epn, thr_id); } } catch (const std::exception& ex) { _serverLogger->error( "An unhandled error '{}' occured in client sesssion (remote endpoint <{}>, thread ID = {})", ex.what(), r_epn, thr_id); } catch (...) { _serverLogger->error("An unhandled error occured in client sesssion (remote endpoint <{}>, thread ID = {})", r_epn, thr_id); } if constexpr (traits::is_serial_proto) { // _serialPorts.emplace_back(&socket); } else if constexpr (traits::is_tcp_proto) { _tcpSockets.erase(&socket); } else if constexpr (traits::is_local_stream_proto) { _localStreamSockets.erase(&socket); } else if constexpr (traits::is_local_seqpack_proto) { _localSeqpackSockets.erase(&socket); } else { static_assert(false, "INVALID SOCKET TTYPE!!!"); } _serverLogger->info("Close client session: remote endpoint <{}> (thread ID = {})", r_epn, thr_id); } }; } // namespace mcc