diff --git a/cxx/comm_server.h b/cxx/comm_server.h index e64a45e..7780eb7 100644 --- a/cxx/comm_server.h +++ b/cxx/comm_server.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include @@ -11,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -92,9 +94,19 @@ public: MccMountServer(asio::io_context& ctx, std::shared_ptr logger = spdlog::null_logger_mt("NULL")) : _asioContext(ctx), _serverLogger(std::move(logger)) { + std::stringstream st; + st << std::this_thread::get_id(); + + _serverLogger->info("Create mount server instance (thread ID = {})", st.str()); } - ~MccMountServer() {} + ~MccMountServer() + { + std::stringstream st; + st << std::this_thread::get_id(); + + _serverLogger->info("Delete mount server instance (thread ID = {})", st.str()); + } template @@ -108,10 +120,11 @@ public: // add root path to endpoint one std::filesystem::path pt("/"); - pt += endpoint.path(); if (endpoint.isLocalSerial()) { + pt += endpoint.path(); + asio::serial_port s_port(_asioContext); std::error_code ec; @@ -132,6 +145,14 @@ public: // create abstract namespace socket endpoint if its path starts from '@' symbol endpoint.makeAbstract('@'); + if (endpoint.path()[0] == '\0') { // abstract namespace + std::string p; + std::ranges::copy(endpoint.path(), std::back_inserter(p)); + p.insert(p.begin() + 1, '/'); // insert after '\0' symbol + pt = p; + } else { + pt += endpoint.path(); + } if (endpoint.isLocalStream()) { co_await listen(asio::local::stream_protocol::endpoint(pt.string())); @@ -379,9 +400,12 @@ private: std::vector _localStreamAcceptors; std::vector _localSeqpackAcceptors; - std::vector _tcpSockets; - std::vector _localStreamSockets; - std::vector _localSeqpackSockets; + std::set _tcpSockets; + std::set _localStreamSockets; + std::set _localSeqpackSockets; + // std::vector _tcpSockets; + // std::vector _localStreamSockets; + // std::vector _localSeqpackSockets; // helpers @@ -485,11 +509,14 @@ private: if constexpr (traits::is_serial_proto) { _serialPorts.emplace_back(&socket); } else if constexpr (traits::is_tcp_proto) { - _tcpSockets.emplace_back(&socket); + // _tcpSockets.emplace_back(&socket); + _tcpSockets.insert(&socket); } else if constexpr (traits::is_local_stream_proto) { - _localStreamSockets.emplace_back(&socket); + // _localStreamSockets.emplace_back(&socket); + _localStreamSockets.insert(&socket); } else if constexpr (traits::is_local_seqpack_proto) { - _localSeqpackSockets.emplace_back(&socket); + // _localSeqpackSockets.emplace_back(&socket); + _localSeqpackSockets.insert(&socket); } else { static_assert(false, "INVALID SOCKET TTYPE!!!"); } @@ -503,6 +530,8 @@ private: asio::steady_timer timeout_timer(_asioContext); std::variant op_res; + std::error_code ec; + bool do_read = true; // main client request -- server respond cycle @@ -526,13 +555,18 @@ private: if constexpr (traits::is_local_seqpack_proto) { asio::socket_base::message_flags oflags; - op_res = co_await (socket.async_receive(buff, oflags, asio::use_awaitable) || - timeout_timer.async_wait(asio::use_awaitable)); + op_res = co_await ( + socket.async_receive(buff, oflags, asio::redirect_error(asio::use_awaitable, ec)) || + timeout_timer.async_wait(asio::use_awaitable)); } else { - op_res = - co_await (asio::async_read(socket, buff, asio::transfer_at_least(1), asio::use_awaitable) || - timeout_timer.async_wait(asio::use_awaitable)); + op_res = co_await (asio::async_read(socket, buff, asio::transfer_at_least(1), + asio::redirect_error(asio::use_awaitable, ec)) || + timeout_timer.async_wait(asio::use_awaitable)); + } + + if (ec) { + throw std::system_error(ec); } if (op_res.index()) { @@ -588,12 +622,18 @@ private: timeout_timer.expires_after(snd_timeout); if constexpr (traits::is_local_seqpack_proto) { - op_res = co_await (socket.async_send(snd_buff_seq, 0, asio::use_awaitable) || - timeout_timer.async_wait(asio::use_awaitable)); + op_res = + co_await (socket.async_send(snd_buff_seq, 0, asio::redirect_error(asio::use_awaitable, ec)) || + timeout_timer.async_wait(asio::use_awaitable)); } else { // nbytes = co_await asio::async_write(socket, snd_buff_seq, asio::use_awaitable); - op_res = co_await (asio::async_write(socket, snd_buff_seq, asio::use_awaitable) || - timeout_timer.async_wait(asio::use_awaitable)); + op_res = co_await ( + asio::async_write(socket, snd_buff_seq, asio::redirect_error(asio::use_awaitable, ec)) || + timeout_timer.async_wait(asio::use_awaitable)); + } + + if (ec) { + throw std::system_error(ec); } if (op_res.index()) { @@ -625,6 +665,18 @@ private: r_epn, thr_id); } + if constexpr (traits::is_serial_proto) { + // _serialPorts.emplace_back(&socket); + } else if constexpr (traits::is_tcp_proto) { + _tcpSockets.erase(&socket); + } else if constexpr (traits::is_local_stream_proto) { + _localStreamSockets.erase(&socket); + } else if constexpr (traits::is_local_seqpack_proto) { + _localSeqpackSockets.erase(&socket); + } else { + static_assert(false, "INVALID SOCKET TTYPE!!!"); + } + _serverLogger->info("Close client session: remote endpoint <{}> (thread ID = {})", r_epn, thr_id); } }; diff --git a/cxx/mount_server.cpp b/cxx/mount_server.cpp index a6f0ead..61694ed 100644 --- a/cxx/mount_server.cpp +++ b/cxx/mount_server.cpp @@ -5,7 +5,7 @@ int main() { - asio::io_context ctx; + asio::io_context ctx(2); auto logger = spdlog::stdout_color_mt("STDOUT_LOGGER"); logger->set_level(spdlog::level::debug); @@ -14,8 +14,9 @@ int main() mcc::MccMountServer server(ctx, logger); - mcc::MccServerEndpoint epn(std::string_view("local://seqpacket/@tmp/BM700_SERVER_SOCK")); - // mcc::MccServerEndpoint epn(std::string_view("tcp://localhost:12345/tmp/BM700_SERVER_SOCK")); + // mcc::MccServerEndpoint epn(std::string_view("local://seqpacket/tmp/BM700_SERVER_SOCK")); + // mcc::MccServerEndpoint epn(std::string_view("local://stream/tmp/BM700_SERVER_SOCK")); + mcc::MccServerEndpoint epn(std::string_view("tcp://localhost:12345")); asio::co_spawn(ctx, server.listen(epn), asio::detached);