diff --git a/cxx/CMakeLists.txt b/cxx/CMakeLists.txt index fe48d9f..63f6b67 100644 --- a/cxx/CMakeLists.txt +++ b/cxx/CMakeLists.txt @@ -32,13 +32,9 @@ else() find_package(ASIO) endif() -find_package(spdlog CONFIG) -# set(SPDLOG_USE_STD_FORMAT ON) -# set(SPDLOG_FMT_EXTERNAL OFF ) -# set(SPDLOG_MASTER_PROJECT OFF) -# set(SPDLOG_BUILD_EXAMPLE OFF) +# find_package(spdlog CONFIG) -# set(spdlog_FOUND FALSE ) +set(spdlog_FOUND FALSE ) if (NOT spdlog_FOUND) message(STATUS "SPDLOG libarary was not found! Try to download it!") @@ -70,6 +66,9 @@ if (NOT spdlog_FOUND) find_package(spdlog CONFIG) endif() +add_compile_definitions(SPDLOG_USE_STD_FORMAT) +add_compile_definitions(SPDLOG_FMT_EXTERNAL=0) + option(WITH_TESTS "Build tests" ON) # Mount client-to-server communication protocol @@ -91,4 +90,6 @@ if (WITH_TESTS) set(CNTR_PROTO_TEST_APP cntr_proto_test) add_executable(${CNTR_PROTO_TEST_APP} tests/cntr_proto_test.cpp) target_link_libraries(${CNTR_PROTO_TEST_APP} ${CNTR_PROTO_LIB} spdlog::spdlog_header_only) + # target_link_libraries(${CNTR_PROTO_TEST_APP} ${CNTR_PROTO_LIB}) + # target_include_directories(${CNTR_PROTO_TEST_APP} PUBLIC ${SPDLOG_INCLUDE_DIRS}) endif() diff --git a/cxx/comm_server.h b/cxx/comm_server.h index 08495b2..7288257 100644 --- a/cxx/comm_server.h +++ b/cxx/comm_server.h @@ -9,7 +9,9 @@ #include #include #include +#include #include +#include #include @@ -32,13 +34,33 @@ template static constexpr bool is_serial_proto = std::derived_from; template -static constexpr bool is_tcp_proto = std::derived_from; +static constexpr bool is_tcp_proto = std::derived_from; template -static constexpr bool is_local_stream_proto = std::derived_from; +static constexpr bool is_local_stream_proto = + std::derived_from; template -static constexpr bool is_local_seqpack_proto = std::derived_from; +static constexpr bool is_local_seqpack_proto = + std::derived_from; + +// template +// static constexpr bool is_tcp_proto = std::derived_from; + +// template +// static constexpr bool is_local_stream_proto = std::derived_from; + +// template +// static constexpr bool is_local_seqpack_proto = std::derived_from; + + +template +concept mcc_time_duration_c = requires { + [](std::type_identity>) { + + }(std::type_identity>()); +}; + } // namespace traits @@ -46,6 +68,9 @@ static constexpr bool is_local_seqpack_proto = std::derived_from logger) : _asioContext(ctx), _serverLogger(std::move(logger)) { @@ -109,7 +134,7 @@ public: try { asio::ip::tcp::resolver res(_asioContext); - auto r_result = co_await res.async_resolve(endpoint.host(), endpoint.portView(), asio::deferred); + auto r_result = co_await res.async_resolve(endpoint.host(), endpoint.portView(), asio::use_awaitable); for (auto const& epn : r_result) { exit_flag = co_await listen(epn); @@ -134,56 +159,64 @@ public: co_return true; } - asio::awaitable listen(traits::mcc_endpoint_c auto endpoint) + template + asio::awaitable listen(EpnT endpoint) { using epn_t = std::decay_t; std::error_code ec; if constexpr (traits::is_serial_proto) { - // first, check if port is openned + // first, check if port is open if (!endpoint.is_open()) { if (ec) { // ?????????? _serverLogger->error("Serial port was not open! Do not start waiting for commands!"); - co_return false; } + } else { + asio::co_spawn(_asioContext, startSession(std::move(endpoint)), asio::detached); } - _serialPorts.emplace_back(std::move(endpoint)); - } else if constexpr (traits::is_tcp_proto || traits::is_local_stream_proto || traits::is_local_seqpack_proto) { try { std::stringstream st; + + _serverLogger->debug("Create connection acceptor ..."); auto acc = epn_t::protocol_type::acceptor(_asioContext, endpoint); st << acc.local_endpoint(); _serverLogger->info("Try to start listening at <{}> endpoint ...", st.str()); if constexpr (traits::is_tcp_proto) { - _tcpAcceptors.emplace_back(std::move(acc)); + _tcpAcceptors.emplace_back(&acc); } else if constexpr (traits::is_local_stream_proto) { - _localStreamAcceptors.emplace_back(std::move(acc)); + _localStreamAcceptors.emplace_back(&acc); } else if constexpr (traits::is_local_seqpack_proto) { - _localSeqpackAcceptors.emplace_back(std::move(acc)); + _localSeqpackAcceptors.emplace_back(&acc); } else { static_assert(false, "INVALID ENDPOINT!!!"); } + // start accepting connections + for (;;) { + auto sock = co_await acc.async_accept(asio::use_awaitable); + // start new client session + asio::co_spawn(_asioContext, startSession(std::move(sock)), asio::detached); + } + } catch (const std::system_error& err) { - _serverLogger->error("An error occured while creating of connection acceptor! ec = '{}'", err.what()); + _serverLogger->error("An error occured while trying to start accepting connections! ec = '{}'", + err.what()); } } else { static_assert(false, "INVALID ENDPOINT!!!"); } - - co_return true; } // close listening on all endpoints - void stop() + void stopListening() { std::error_code ec; size_t N = 0, M = 0; @@ -197,94 +230,53 @@ public: return; } - if (_serialPorts.size()) { - _serverLogger->debug("Close serial ports ..."); + auto close_func = [this](auto& acc_ptrs, std::string_view desc) { + size_t N = 0, M = 0; + std::error_code ec; - for (auto& s_port : _serialPorts) { - s_port.close(ec); - if (ec) { - _serverLogger->error("Cannot close serial port! ec = '{}'", ec.message()); - ++M; + if (acc_ptrs.size()) { + _serverLogger->info("Close {} acceptors ...", desc); + + for (auto& acc : acc_ptrs) { + acc->close(ec); + if (ec) { + _serverLogger->error("Cannot close {} acceptor! ec = '{}'", desc, ec.message()); + ++M; + } + ++N; } - ++N; + + _serverLogger->debug("{} from {} {} acceptors were closed!", M, N, desc); + + // pointers are invalidated here, so clear its container + acc_ptrs.clear(); } + }; - _serverLogger->debug("{} from {} serial ports were closed!", M, N); + close_func(_tcpAcceptors, "TCP socket"); + close_func(_localStreamAcceptors, "local stream socket"); + close_func(_localSeqpackAcceptors, "local seqpack socket"); - _serialPorts.clear(); - } - - if (_tcpAcceptors.size()) { - _serverLogger->debug("Close TCP listening sockets ..."); - N = 0; - M = 0; - - for (auto& acc : _tcpAcceptors) { - acc.close(ec); - if (ec) { - _serverLogger->error("Cannot close TCP socket! ec = '{}'", ec.message()); - ++M; - } - ++N; - } - - _serverLogger->debug("{} from {} TCP sockets were closed!", M, N); - - _tcpAcceptors.clear(); - } - - if (_localStreamAcceptors.size()) { - _serverLogger->debug("Close local stream listening sockets ..."); - N = 0; - M = 0; - - for (auto& acc : _localStreamAcceptors) { - acc.close(ec); - if (ec) { - _serverLogger->error("Cannot close local stream socket! ec = '{}'", ec.message()); - ++M; - } - ++N; - } - - _serverLogger->debug("{} from {} local stream sockets were closed!", M, N); - - _localStreamAcceptors.clear(); - } - - if (_localSeqpackAcceptors.size()) { - _serverLogger->debug("Close local seqpack listening sockets ..."); - N = 0; - M = 0; - - for (auto& acc : _localSeqpackAcceptors) { - acc.close(ec); - if (ec) { - _serverLogger->error("Cannot close local seqpack socket! ec = '{}'", ec.message()); - ++M; - } - ++N; - } - - _serverLogger->debug("{} from {} local seqpack sockets were closed!", M, N); - - _localSeqpackAcceptors.clear(); - } _serverLogger->info("The all server listening endpoints were closed!"); } + void disconnectClients() {} + private: asio::io_context& _asioContext; std::shared_ptr _serverLogger; - std::vector _serialPorts; - std::vector _tcpAcceptors; - std::vector _localStreamAcceptors; - std::vector _localSeqpackAcceptors; + std::vector _serialPorts; + std::vector _tcpAcceptors; + std::vector _localStreamAcceptors; + std::vector _localSeqpackAcceptors; + + std::vector _tcpSockets; + std::vector _localStreamSockets; + std::vector _localSeqpackSockets; - asio::awaitable startSession() {} // helpers template @@ -315,6 +307,66 @@ private: setSerialOpts(s_port, std::forward(opts)...); } } + + + template + asio::awaitable startSession(auto socket, + RCVT&& rcv_timeout = DEFAULT_RCV_TIMEOUT, + SNDT&& snd_timeout = DEFAULT_SND_TIMEOUT) + { + using sock_t = std::decay_t; + + auto watchdog = []() -> asio::awaitable {}; + + asio::streambuf sbuff; + size_t nbytes; + std::stringstream st; + + st << socket.remote_endpoint(); + + try { + if constexpr (traits::is_serial_proto) { + _serialPorts.emplace_back(&socket); + + + } else if constexpr (traits::is_tcp_proto) { + _tcpSockets.emplace_back(&socket); + } else if constexpr (traits::is_local_stream_proto) { + _localStreamSockets.emplace_back(&socket); + } else if constexpr (traits::is_local_seqpack_proto) { + _localSeqpackSockets.emplace_back(&socket); + } else { + static_assert(false, "INVALID SOCKET TTYPE!!!"); + } + + + for (;;) { + if constexpr (traits::is_serial_proto) { + nbytes = 1024; + } else { + nbytes = socket.available(); + } + + auto buff = sbuff.prepare(nbytes ? nbytes : 1); + + if constexpr (traits::is_local_seqpack_proto) { + asio::socket_base::message_flags oflags; + nbytes = co_await socket.async_receive(buff, &oflags, asio::use_awaitable); + + if (!nbytes) { // EOF! + _serverLogger->info("It seems client ({}) closed the connection!", st.str()); + co_return; + } + } else { + nbytes = co_await asio::async_read(socket, buff, asio::transfer_at_least(1), asio::use_awaitable); + } + + sbuff.commit(nbytes); + } + + } catch (...) { + } + } }; } // namespace mcc