From b860900682bdad4806a8e1a93e273c906b985c7b Mon Sep 17 00:00:00 2001 From: "Timur A. Fatkhullin" Date: Fri, 21 Feb 2025 12:33:11 +0300 Subject: [PATCH] ... --- cxx/comm_server.h | 104 ++++++++++++++++++++++++++----------- cxx/comm_server_endpoint.h | 9 +++- cxx/mount_server.cpp | 3 +- 3 files changed, 82 insertions(+), 34 deletions(-) diff --git a/cxx/comm_server.h b/cxx/comm_server.h index 481bf17..e64a45e 100644 --- a/cxx/comm_server.h +++ b/cxx/comm_server.h @@ -440,7 +440,8 @@ private: }; auto watchdog = [this](const std::chrono::steady_clock::time_point& deadline) -> asio::awaitable { - asio::steady_timer timer(_asioContext); + // asio::steady_timer timer(_asioContext); + asio::steady_timer timer(co_await asio::this_coro::executor); auto now = std::chrono::steady_clock::now(); while (deadline > now) { @@ -476,10 +477,13 @@ private: _serverLogger->info("Start client session: remote endpoint <{}> (session thread ID = {})", r_epn, thr_id); try { + if constexpr (!traits::is_serial_proto) { + _serverLogger->trace("Set socket option KEEP_ALIVE to TRUE"); + socket.set_option(asio::socket_base::keep_alive(true)); + } + if constexpr (traits::is_serial_proto) { _serialPorts.emplace_back(&socket); - - } else if constexpr (traits::is_tcp_proto) { _tcpSockets.emplace_back(&socket); } else if constexpr (traits::is_local_stream_proto) { @@ -496,45 +500,69 @@ private: std::vector snd_buff_seq{ {}, {BM700::CONTROL_PROTO_STOP_SEQ.data(), BM700::CONTROL_PROTO_STOP_SEQ.size()}}; + asio::steady_timer timeout_timer(_asioContext); + std::variant op_res; + + bool do_read = true; + // main client request -- server respond cycle for (;;) { // receive message - if constexpr (traits::is_serial_proto) { - nbytes = 1024; - } else { - nbytes = socket.available(); - } - auto buff = sbuff.prepare(nbytes ? nbytes : 1); + if (do_read) { + _serverLogger->trace("Start socket/port reading operation with timeout {} ...", rcv_timeout); - if constexpr (traits::is_local_seqpack_proto) { - asio::socket_base::message_flags oflags; - // nbytes = co_await socket.async_receive(buff, &oflags, asio::use_awaitable); - nbytes = co_await (socket.async_receive(buff, oflags, asio::use_awaitable) && - watchdog(std::chrono::steady_clock::now() + rcv_timeout)); - - if (!nbytes) { // EOF! - // _serverLogger->info("It seems client ({}) closed the connection!", st.str()); - throw std::system_error(std::error_code(asio::error::misc_errors::eof)); - // co_return; + if constexpr (traits::is_serial_proto) { + nbytes = 1024; + } else { + nbytes = socket.available(); } - } else { - // nbytes = co_await asio::async_read(socket, buff, asio::transfer_at_least(1), - // asio::use_awaitable); - nbytes = - co_await (asio::async_read(socket, buff, asio::transfer_at_least(1), asio::use_awaitable) && - watchdog(std::chrono::steady_clock::now() + rcv_timeout)); - } - sbuff.commit(nbytes); + auto buff = sbuff.prepare(nbytes ? nbytes : 1); + + // timeout_timer.expires_after(std::chrono::seconds(5)); + timeout_timer.expires_after(rcv_timeout); + + if constexpr (traits::is_local_seqpack_proto) { + asio::socket_base::message_flags oflags; + + op_res = co_await (socket.async_receive(buff, oflags, asio::use_awaitable) || + timeout_timer.async_wait(asio::use_awaitable)); + + } else { + op_res = + co_await (asio::async_read(socket, buff, asio::transfer_at_least(1), asio::use_awaitable) || + timeout_timer.async_wait(asio::use_awaitable)); + } + + if (op_res.index()) { + throw std::system_error(std::make_error_code(std::errc::timed_out)); + } else { + nbytes = std::get<0>(op_res); + + _serverLogger->trace("{} bytes were received", nbytes); + + if constexpr (traits::is_local_seqpack_proto) { + if (!nbytes) { // EOF! + throw std::system_error(std::error_code(asio::error::misc_errors::eof)); + } + } + } + + sbuff.commit(nbytes); + } // here, the input stream buffer still contains remaining bytes. try to handle its auto start_ptr = static_cast(sbuff.data().data()); auto msg = look_for_whole_msg(std::span(start_ptr, sbuff.size())); if (msg.empty()) { // still not whole message + _serverLogger->trace( + "It seems a partial command message was received, so waiting for remaining part ..."); + do_read = true; continue; } + // extract command without stop sequence symbols // std::string comm; // std::ranges::copy(msg | std::views::take(msg.size() - BM700::CONTROL_PROTO_STOP_SEQ.size()), @@ -547,18 +575,32 @@ private: auto resp = handleClientCommand(comm); + // remove received message from the input stream buffer. NOTE: 'msg' is now invalidated!!! + sbuff.consume(msg.size()); + do_read = sbuff.size() == 0; + + _serverLogger->debug("Send respond [{}] to client (remote endpoint <{}>, thread ID = {})", std::string_view(resp.begin(), resp.end()), r_epn, thr_id); // send server respond to client snd_buff_seq[0] = {resp.data(), resp.size()}; + timeout_timer.expires_after(snd_timeout); if constexpr (traits::is_local_seqpack_proto) { - nbytes = co_await (socket.async_send(snd_buff_seq, 0, asio::use_awaitable) && - watchdog(std::chrono::steady_clock::now() + snd_timeout)); + op_res = co_await (socket.async_send(snd_buff_seq, 0, asio::use_awaitable) || + timeout_timer.async_wait(asio::use_awaitable)); } else { - nbytes = co_await (asio::async_write(socket, snd_buff_seq, asio::use_awaitable) && - watchdog(std::chrono::steady_clock::now() + snd_timeout)); + // nbytes = co_await asio::async_write(socket, snd_buff_seq, asio::use_awaitable); + op_res = co_await (asio::async_write(socket, snd_buff_seq, asio::use_awaitable) || + timeout_timer.async_wait(asio::use_awaitable)); + } + + if (op_res.index()) { + throw std::system_error(std::make_error_code(std::errc::timed_out)); + } else { + nbytes = std::get<0>(op_res); + _serverLogger->trace("{} bytes were sent", nbytes); } if (nbytes != (resp.size() + BM700::CONTROL_PROTO_STOP_SEQ.size())) { // !!!!!!!!!! diff --git a/cxx/comm_server_endpoint.h b/cxx/comm_server_endpoint.h index 69349a4..13158c3 100644 --- a/cxx/comm_server_endpoint.h +++ b/cxx/comm_server_endpoint.h @@ -432,8 +432,13 @@ protected: _endpoint = other._endpoint; _proto = other._proto; - auto idx = std::distance(other._endpoint.c_str(), other._host.data()); - _host = std::string_view(_endpoint.c_str() + idx, other._host.size()); + std::iterator_traits::difference_type idx; + if (other.isLocal()) { // for 'local' host is one of static class constants + _host = other._host; + } else { + idx = std::distance(other._endpoint.c_str(), other._host.data()); + _host = std::string_view(_endpoint.c_str() + idx, other._host.size()); + } idx = std::distance(other._endpoint.c_str(), other._path.data()); _path = std::string_view(_endpoint.c_str() + idx, other._path.size()); diff --git a/cxx/mount_server.cpp b/cxx/mount_server.cpp index cac5542..a6f0ead 100644 --- a/cxx/mount_server.cpp +++ b/cxx/mount_server.cpp @@ -9,11 +9,12 @@ int main() auto logger = spdlog::stdout_color_mt("STDOUT_LOGGER"); logger->set_level(spdlog::level::debug); + logger->set_level(spdlog::level::trace); logger->flush_on(spdlog::level::debug); mcc::MccMountServer server(ctx, logger); - mcc::MccServerEndpoint epn(std::string_view("local://seqpacket/tmp/BM700_SERVER_SOCK")); + mcc::MccServerEndpoint epn(std::string_view("local://seqpacket/@tmp/BM700_SERVER_SOCK")); // mcc::MccServerEndpoint epn(std::string_view("tcp://localhost:12345/tmp/BM700_SERVER_SOCK")); asio::co_spawn(ctx, server.listen(epn), asio::detached);