diff --git a/cxx/comm_server.h b/cxx/comm_server.h index 7780eb7..0bdfda7 100644 --- a/cxx/comm_server.h +++ b/cxx/comm_server.h @@ -14,6 +14,7 @@ #include #include #include +#include #include #include #include @@ -92,11 +93,13 @@ public: static constexpr std::chrono::duration DEFAULT_SND_TIMEOUT = std::chrono::milliseconds(2000); MccMountServer(asio::io_context& ctx, std::shared_ptr logger = spdlog::null_logger_mt("NULL")) - : _asioContext(ctx), _serverLogger(std::move(logger)) + : _asioContext(ctx), _serverLogger(logger), _stopSignal(ctx), _restartSignal(ctx) { std::stringstream st; st << std::this_thread::get_id(); + _serverLogger->set_pattern("[%Y-%m-%d %T.%e][%l]: %v"); + _serverLogger->info("Create mount server instance (thread ID = {})", st.str()); } @@ -105,7 +108,10 @@ public: std::stringstream st; st << std::this_thread::get_id(); - _serverLogger->info("Delete mount server instance (thread ID = {})", st.str()); + _serverLogger->info("Delete mount server instance (thread ID = {}) ...", st.str()); + + stopListening(); + disconnectClients(); } @@ -145,23 +151,19 @@ public: // create abstract namespace socket endpoint if its path starts from '@' symbol endpoint.makeAbstract('@'); - if (endpoint.path()[0] == '\0') { // abstract namespace - std::string p; - std::ranges::copy(endpoint.path(), std::back_inserter(p)); - p.insert(p.begin() + 1, '/'); // insert after '\0' symbol - pt = p; - } else { - pt += endpoint.path(); - } + // if (endpoint.path()[0] == '\0') { // abstract namespace + // std::string p; + // std::ranges::copy(endpoint.path(), std::back_inserter(p)); + // p.insert(p.begin() + 1, '/'); // insert after '\0' symbol + // pt = p; + // } else { + // pt += endpoint.path(); + // } if (endpoint.isLocalStream()) { - co_await listen(asio::local::stream_protocol::endpoint(pt.string())); - // asio::co_spawn(_asioContext, listen(asio::local::stream_protocol::endpoint(pt.string())), - // asio::detached); + co_await listen(asio::local::stream_protocol::endpoint(endpoint.path(pt.string()))); } else if (endpoint.isLocalSeqpacket()) { - co_await listen(asio::local::seq_packet_protocol::endpoint(pt.string())); - // asio::co_spawn(_asioContext, listen(asio::local::seq_packet_protocol::endpoint(pt.string())), - // asio::detached); + co_await listen(asio::local::seq_packet_protocol::endpoint(endpoint.path(pt.string()))); } else { co_return; // it must not be!!!! } @@ -304,6 +306,7 @@ public: acc->close(ec); if (ec) { _serverLogger->error("Cannot close {} acceptor! ec = '{}'", desc, ec.message()); + } else { ++M; } ++N; @@ -324,7 +327,76 @@ public: _serverLogger->info("The all server listening endpoints were closed!"); } - void disconnectClients() {} + void disconnectClients() + { + auto disconn_func = [this](std::ranges::input_range auto& ptrs) { + std::error_code ec; + for (auto& ptr : ptrs) { + // ptr->cancel(ec); + // if (ec) { + // _serverLogger->warn("socket_base::cancel: an error occured (ec = {})", ec.message()); + // } + + ptr->shutdown(asio::socket_base::shutdown_both, ec); + if (ec) { + _serverLogger->warn("socket_base::shutdown: an error occured (ec = {})", ec.message()); + } + + ptr->close(ec); + if (ec) { + _serverLogger->warn("socket_base::close: an error occured (ec = {})", ec.message()); + } + } + }; + + _serverLogger->info("Close all client connections ..."); + + if (_serialPorts.empty() && _localStreamSockets.empty() && _localSeqpackSockets.empty() && + _tcpSockets.empty()) { + _serverLogger->info("There were no active client connections! Skip!"); + } + + if (_serialPorts.size()) { + std::lock_guard lock_g(_serialPortsMutex); + + std::error_code ec; + _serverLogger->info("Close serial port clients ({} in total) ...", _serialPorts.size()); + for (auto& ptr : _serialPorts) { + ptr->cancel(ec); + if (ec) { + _serverLogger->warn("serial_port::cancel: an error occured (ec = {})", ec.message()); + } + ptr->close(ec); + if (ec) { + _serverLogger->warn("serial_port::close: an error occured (ec = {})", ec.message()); + } + } + } + + if (_localStreamSockets.size()) { + std::lock_guard lock_g(_localStreamSocketsMutex); + + _serverLogger->info("Close local stream socket-type clients ({} in total) ...", _localStreamSockets.size()); + disconn_func(_localStreamSockets); + } + + if (_localSeqpackSockets.size()) { + std::lock_guard lock_g(_localSeqpackSocketsMutex); + + _serverLogger->info("Close local seqpack socket-type clients ({} in total) ...", + _localSeqpackSockets.size()); + disconn_func(_localSeqpackSockets); + } + + if (_tcpSockets.size()) { + std::lock_guard lock_g(_tcpSocketsMutex); + + _serverLogger->info("Close TCP socket-type clients ({} in total) ...", _tcpSockets.size()); + disconn_func(_tcpSockets); + } + + _serverLogger->info("Client connection were closed!"); + } void daemonize() { @@ -390,11 +462,52 @@ public: #endif } + template , std::ranges::range RRT = std::vector> + void setupSignals(const RST& stop_sig_num = {SIGINT, SIGTERM}, const RRT& restart_sig_num = {SIGUSR1}) + requires(std::convertible_to, int> && + std::convertible_to, int>) + { + for (const int sig : stop_sig_num) { + _stopSignal.add(sig); + } + + _stopSignal.async_wait([this](std::error_code, int signo) { + _serverLogger->info("Stop signal was received (signo = {})", signo); + + stopListening(); + disconnectClients(); + + _asioContext.stop(); + }); + + for (const int sig : restart_sig_num) { + _restartSignal.add(sig); + } + + _restartSignal.async_wait([this](std::error_code, int signo) { + _serverLogger->info("Restart signal was received (signo = {})", signo); + restart(); + }); + } + + void restart() + { + disconnectClients(); + + _restartSignal.async_wait([this](std::error_code, int signo) { + _serverLogger->info("Restart signal was received (signo = {})", signo); + restart(); + }); + } + private: asio::io_context& _asioContext; std::shared_ptr _serverLogger; - std::vector _serialPorts; + asio::signal_set _stopSignal, _restartSignal; + + std::set _serialPorts; + // std::vector _serialPorts; std::vector _tcpAcceptors; std::vector _localStreamAcceptors; @@ -407,6 +520,7 @@ private: // std::vector _localStreamSockets; // std::vector _localSeqpackSockets; + std::mutex _serialPortsMutex, _tcpSocketsMutex, _localStreamSocketsMutex, _localSeqpackSocketsMutex; // helpers template @@ -463,20 +577,6 @@ private: return found.empty() ? std::span(bytes.begin(), bytes.begin()) : std::span(bytes.begin(), found.end()); }; - auto watchdog = [this](const std::chrono::steady_clock::time_point& deadline) -> asio::awaitable { - // asio::steady_timer timer(_asioContext); - asio::steady_timer timer(co_await asio::this_coro::executor); - - auto now = std::chrono::steady_clock::now(); - while (deadline > now) { - timer.expires_at(deadline); - co_await timer.async_wait(asio::use_awaitable); - now = std::chrono::steady_clock::now(); - } - - throw std::system_error(std::make_error_code(std::errc::timed_out)); - }; - asio::streambuf sbuff; size_t nbytes; @@ -507,14 +607,18 @@ private: } if constexpr (traits::is_serial_proto) { - _serialPorts.emplace_back(&socket); + std::lock_guard lock_g(_serialPortsMutex); + _serialPorts.insert(&socket); } else if constexpr (traits::is_tcp_proto) { + std::lock_guard lock_g(_tcpSocketsMutex); // _tcpSockets.emplace_back(&socket); _tcpSockets.insert(&socket); } else if constexpr (traits::is_local_stream_proto) { + std::lock_guard lock_g(_localStreamSocketsMutex); // _localStreamSockets.emplace_back(&socket); _localStreamSockets.insert(&socket); } else if constexpr (traits::is_local_seqpack_proto) { + std::lock_guard lock_g(_localSeqpackSocketsMutex); // _localSeqpackSockets.emplace_back(&socket); _localSeqpackSockets.insert(&socket); } else { @@ -665,8 +769,9 @@ private: r_epn, thr_id); } + // remove pointer as it is invalidated here (at the exit of the method) if constexpr (traits::is_serial_proto) { - // _serialPorts.emplace_back(&socket); + _serialPorts.erase(&socket); } else if constexpr (traits::is_tcp_proto) { _tcpSockets.erase(&socket); } else if constexpr (traits::is_local_stream_proto) { diff --git a/cxx/comm_server_endpoint.h b/cxx/comm_server_endpoint.h index 13158c3..e0973b8 100644 --- a/cxx/comm_server_endpoint.h +++ b/cxx/comm_server_endpoint.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -286,6 +287,48 @@ public: return portView(); } + template + R path(RR&& root_path) const + { + if (_path.empty()) { + if constexpr (traits::mcc_output_char_range) { + R res; + std::ranges::copy(std::forward(root_path), std::back_inserter(res)); + + return res; + } else { // can't add root path!!! + return part(PATH_PART); + } + } + + auto N = std::ranges::distance(root_path.begin(), root_path.end()); + + if (N) { + R res; + std::filesystem::path pt(root_path.begin(), root_path.end()); + + if (isLocal() && _path[0] == '\0') { + std::ranges::copy(std::string_view(" "), std::back_inserter(res)); + pt /= _path.substr(1); + std::ranges::copy(pt.string(), std::back_inserter(res)); + *res.begin() = '\0'; + } else { + pt /= _path; + std::ranges::copy(pt.string(), std::back_inserter(res)); + } + + return res; + } else { + return part(PATH_PART); + } + } + + template + std::string path(RR&& root_path) const + { + return path(std::forward(root_path)); + } + template R path() const { diff --git a/cxx/mount_server.cpp b/cxx/mount_server.cpp index 61694ed..e2a5cea 100644 --- a/cxx/mount_server.cpp +++ b/cxx/mount_server.cpp @@ -1,24 +1,131 @@ -#include "comm_server.h" - #include #include -int main() +#include +#include + +#include + +#include "comm_server.h" + + +int main(int argc, char* argv[]) { + /* COMMANDLINE OPTS */ + cxxopts::Options options(argv[0], "Astrosib (c) BM700 mount server\n"); + + options.allow_unrecognised_options(); + + options.add_options()("h,help", "Print usage"); + + options.add_options()("D,daemon", "Demonize server"); + + options.add_options()("l,log", "Log filename (use stdout and stderr for standard output and error stream)", + cxxopts::value()->default_value("")); + + options.add_options()("level", "Log level (see SPDLOG package description for valid values)", + cxxopts::value()->default_value("info")); + + options.add_options()( + "endpoints", + "endpoints server will be listening for. For 'local' endpoint the '@' symbol at the beginning of the path " + "means " + "abstract namespace socket.", + cxxopts::value>()->default_value("local://stream/@BM700_SERVER")); + + + options.positional_help("[endpoint0] [enpoint1] ... [endpointN]"); + options.parse_positional({"endpoints"}); + asio::io_context ctx(2); - auto logger = spdlog::stdout_color_mt("STDOUT_LOGGER"); - logger->set_level(spdlog::level::debug); - logger->set_level(spdlog::level::trace); - logger->flush_on(spdlog::level::debug); - mcc::MccMountServer server(ctx, logger); + try { + auto opt_result = options.parse(argc, argv); - // mcc::MccServerEndpoint epn(std::string_view("local://seqpacket/tmp/BM700_SERVER_SOCK")); - // mcc::MccServerEndpoint epn(std::string_view("local://stream/tmp/BM700_SERVER_SOCK")); - mcc::MccServerEndpoint epn(std::string_view("tcp://localhost:12345")); + if (opt_result["help"].count()) { + std::cout << options.help(); + std::cout << "\n"; + std::cout << "[endpoint0] [enpoint1] ... [endpointN] - endpoints server will be listening for. For 'local' " + "endpoint the '@' symbol at the beginning of the path " + "means abstract namespace socket (e.g. local://stream/@BM700_SERVER)." + << "\n"; + return 0; + } - asio::co_spawn(ctx, server.listen(epn), asio::detached); - ctx.run(); + auto logname = opt_result["log"].as(); + + auto logger = [&logname]() { + if (logname == "stdout") { + return spdlog::stdout_color_mt("console"); + } else if (logname == "stderr") { + return spdlog::stderr_color_mt("stderr"); + } else if (logname == "") { + return spdlog::null_logger_mt("BM700_SERVER_NULL_LOGGER"); + } else { + return spdlog::basic_logger_mt(logname, logname); + } + }(); + + std::string level_str = opt_result["level"].as(); + std::ranges::transform(level_str, level_str.begin(), [](const char& c) { return std::tolower(c); }); + + auto log_level = spdlog::level::from_str(level_str); + logger->set_level(log_level); + logger->flush_on(spdlog::level::trace); + + + logger->set_pattern("%v"); + int w = 90; + const std::string fmt = std::format("{{:*^{}}}", w); + logger->info("\n\n\n"); + logger->info(fmt, ""); + logger->info(fmt, " ASTROSIB BM700 MOUNT SERVER "); + auto zt = std::chrono::zoned_time(std::chrono::current_zone(), + std::chrono::floor(std::chrono::system_clock::now())); + logger->info(fmt, std::format(" {} ", zt)); + logger->info(fmt, ""); + logger->info("\n"); + + + mcc::MccMountServer server(ctx, logger); + + server.setupSignals(); + + if (opt_result["daemon"].count()) { + server.daemonize(); + } + + // mcc::MccServerEndpoint epn(std::string_view("local://seqpacket/tmp/BM700_SERVER_SOCK")); + // mcc::MccServerEndpoint epn(std::string_view("local://stream/tmp/BM700_SERVER_SOCK")); + // mcc::MccServerEndpoint epn(std::string_view("local://stream/@tmp/BM700_SERVER_SOCK")); + // mcc::MccServerEndpoint epn(std::string_view("tcp://localhost:12345")); + + // asio::co_spawn(ctx, server.listen(epn), asio::detached); + + auto epnts = opt_result["endpoints"].as>(); + + for (auto& epnt : epnts) { + mcc::MccServerEndpoint ep(epnt); + + if (ep.isValid()) { + ep.makeAbstract('@'); + + asio::co_spawn(ctx, server.listen(ep), asio::detached); + } else { + std::cerr << "Unrecognized endpoint: '" << epnt << "'! Ignore!\n"; + } + } + + + asio::thread_pool pool(3); + + asio::post(pool, [&ctx]() { ctx.run(); }); + + pool.join(); + // ctx.run(); + + } catch (...) { + } }