#pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include "comm_server_endpoint.h" #include "mount.h" namespace mcc { namespace traits { template concept mcc_endpoint_c = std::derived_from || std::derived_from || std::derived_from || std::derived_from; template static constexpr bool is_serial_proto = std::derived_from; template static constexpr bool is_tcp_proto = std::derived_from; template static constexpr bool is_local_stream_proto = std::derived_from; template static constexpr bool is_local_seqpack_proto = std::derived_from; // template // static constexpr bool is_tcp_proto = std::derived_from; // template // static constexpr bool is_local_stream_proto = std::derived_from; // template // static constexpr bool is_local_seqpack_proto = std::derived_from; template concept mcc_time_duration_c = requires { [](std::type_identity>) { }(std::type_identity>()); }; } // namespace traits class MccMountServer { public: static constexpr std::chrono::duration DEFAULT_RCV_TIMEOUT = std::chrono::hours(12); static constexpr std::chrono::duration DEFAULT_SND_TIMEOUT = std::chrono::milliseconds(2000); MccMountServer(asio::io_context& ctx, std::shared_ptr logger) : _asioContext(ctx), _serverLogger(std::move(logger)) { } ~MccMountServer() {} template asio::awaitable listen(std::derived_from auto endpoint, CtorArgTs&&... ctor_args) { bool exit_flag = false; if (!endpoint.isValid()) { _serverLogger->error("Cannot start listening! Invalid endpoint string representation ('{}')!", endpoint.endpoint()); co_return exit_flag; } // add root path to endpoint one std::filesystem::path pt("/"); pt += endpoint.path(); auto args = std::make_tuple(std::forward(ctor_args)...); if (endpoint.isLocalSerial()) { asio::serial_port s_port(_asioContext); std::error_code ec; if constexpr (sizeof...(CtorArgTs)) { // options setSerialOpts(s_port, std::forward(ctor_args)...); } s_port.open(pt.string(), ec); if (ec) { _serverLogger->error("Cannot open serial device '{}' (Error = '{}')!", pt.string(), ec.message()); co_return exit_flag; } // asio::co_spawn(_asioContext, listen(std::move(s_port)), asio::detached); co_return listen(std::move(s_port)); } else if (endpoint.isLocal()) { // create abstract namespace socket endpoint if its path starts from '@' symbol endpoint.makeAbstract('@'); if (endpoint.isLocalStream()) { co_return listen(asio::local::stream_protocol::endpoint(pt.string())); // asio::co_spawn(_asioContext, listen(asio::local::stream_protocol::endpoint(pt.string())), // asio::detached); } else if (endpoint.isLocalSeqpacket()) { co_return listen(asio::local::seq_packet_protocol::endpoint(pt.string())); // asio::co_spawn(_asioContext, listen(asio::local::seq_packet_protocol::endpoint(pt.string())), // asio::detached); } else { co_return exit_flag; // ???!!!! } } else if (endpoint.isTCP()) { // resolve hostname try { asio::ip::tcp::resolver res(_asioContext); auto r_result = co_await res.async_resolve(endpoint.host(), endpoint.portView(), asio::use_awaitable); for (auto const& epn : r_result) { exit_flag = co_await listen(epn); if (exit_flag) { break; } } if (!exit_flag) { _serverLogger->error("Cannot start listening on any resolved endpoints!"); co_return exit_flag; } // listen(std::move(*r_result.begin())); } catch (const std::system_error& err) { _serverLogger->error("An error occured while resolving '{}' hostname (Error = '{}')", endpoint.host(), err.code().message()); co_return exit_flag; } } co_return true; } template asio::awaitable listen(EpnT endpoint) { using epn_t = std::decay_t; std::error_code ec; if constexpr (traits::is_serial_proto) { // first, check if port is open if (!endpoint.is_open()) { if (ec) { // ?????????? _serverLogger->error("Serial port was not open! Do not start waiting for commands!"); } } else { asio::co_spawn(_asioContext, startSession(std::move(endpoint)), asio::detached); } } else if constexpr (traits::is_tcp_proto || traits::is_local_stream_proto || traits::is_local_seqpack_proto) { try { std::stringstream st; _serverLogger->debug("Create connection acceptor ..."); auto acc = epn_t::protocol_type::acceptor(_asioContext, endpoint); st << acc.local_endpoint(); _serverLogger->info("Try to start listening at <{}> endpoint ...", st.str()); if constexpr (traits::is_tcp_proto) { _tcpAcceptors.emplace_back(&acc); } else if constexpr (traits::is_local_stream_proto) { _localStreamAcceptors.emplace_back(&acc); } else if constexpr (traits::is_local_seqpack_proto) { _localSeqpackAcceptors.emplace_back(&acc); } else { static_assert(false, "INVALID ENDPOINT!!!"); } // start accepting connections for (;;) { auto sock = co_await acc.async_accept(asio::use_awaitable); // start new client session asio::co_spawn(_asioContext, startSession(std::move(sock)), asio::detached); } } catch (const std::system_error& err) { _serverLogger->error("An error occured while trying to start accepting connections! ec = '{}'", err.what()); } } else { static_assert(false, "INVALID ENDPOINT!!!"); } } // close listening on all endpoints void stopListening() { std::error_code ec; size_t N = 0, M = 0; _serverLogger->info("Close all listening endpoints ..."); auto num = _serialPorts.size() + _tcpAcceptors.size() + _localStreamAcceptors.size() + _localSeqpackAcceptors.size(); if (!num) { _serverLogger->info("There are no listening ports/sockets!"); return; } auto close_func = [this](auto& acc_ptrs, std::string_view desc) { size_t N = 0, M = 0; std::error_code ec; if (acc_ptrs.size()) { _serverLogger->info("Close {} acceptors ...", desc); for (auto& acc : acc_ptrs) { acc->close(ec); if (ec) { _serverLogger->error("Cannot close {} acceptor! ec = '{}'", desc, ec.message()); ++M; } ++N; } _serverLogger->debug("{} from {} {} acceptors were closed!", M, N, desc); // pointers are invalidated here, so clear its container acc_ptrs.clear(); } }; close_func(_tcpAcceptors, "TCP socket"); close_func(_localStreamAcceptors, "local stream socket"); close_func(_localSeqpackAcceptors, "local seqpack socket"); _serverLogger->info("The all server listening endpoints were closed!"); } void disconnectClients() {} private: asio::io_context& _asioContext; std::shared_ptr _serverLogger; std::vector _serialPorts; std::vector _tcpAcceptors; std::vector _localStreamAcceptors; std::vector _localSeqpackAcceptors; std::vector _tcpSockets; std::vector _localStreamSockets; std::vector _localSeqpackSockets; // helpers template void setSerialOpts(asio::serial_port& s_port, OptT&& opt, OptTs&&... opts) { std::error_code ec; s_port.set_option(opt, ec); if (ec) { std::string_view opt_name; if constexpr (std::same_as) { opt_name = "baud rate"; } else if constexpr (std::same_as) { opt_name = "parity"; } else if constexpr (std::same_as) { opt_name = "flow control"; } else if constexpr (std::same_as) { opt_name = "stop bits"; } else if constexpr (std::same_as) { opt_name = "char size"; } _serverLogger->error("Cannot set serial port '{}' option! Just skip!", opt_name); } if constexpr (sizeof...(OptTs)) { setSerialOpts(s_port, std::forward(opts)...); } } template asio::awaitable startSession(auto socket, RCVT&& rcv_timeout = DEFAULT_RCV_TIMEOUT, SNDT&& snd_timeout = DEFAULT_SND_TIMEOUT) { using sock_t = std::decay_t; auto watchdog = []() -> asio::awaitable {}; asio::streambuf sbuff; size_t nbytes; std::stringstream st; st << socket.remote_endpoint(); try { if constexpr (traits::is_serial_proto) { _serialPorts.emplace_back(&socket); } else if constexpr (traits::is_tcp_proto) { _tcpSockets.emplace_back(&socket); } else if constexpr (traits::is_local_stream_proto) { _localStreamSockets.emplace_back(&socket); } else if constexpr (traits::is_local_seqpack_proto) { _localSeqpackSockets.emplace_back(&socket); } else { static_assert(false, "INVALID SOCKET TTYPE!!!"); } for (;;) { if constexpr (traits::is_serial_proto) { nbytes = 1024; } else { nbytes = socket.available(); } auto buff = sbuff.prepare(nbytes ? nbytes : 1); if constexpr (traits::is_local_seqpack_proto) { asio::socket_base::message_flags oflags; nbytes = co_await socket.async_receive(buff, &oflags, asio::use_awaitable); if (!nbytes) { // EOF! _serverLogger->info("It seems client ({}) closed the connection!", st.str()); co_return; } } else { nbytes = co_await asio::async_read(socket, buff, asio::transfer_at_least(1), asio::use_awaitable); } sbuff.commit(nbytes); } } catch (...) { } } }; } // namespace mcc