diff --git a/cxx/comm_server.h b/cxx/comm_server.h index 7288257..6c0ae52 100644 --- a/cxx/comm_server.h +++ b/cxx/comm_server.h @@ -6,16 +6,20 @@ #include #include #include +#include #include #include #include #include #include +#include #include +#include #include #include "comm_server_endpoint.h" +#include "control_proto.h" #include "mount.h" namespace mcc @@ -309,20 +313,55 @@ private: } + std::vector handleClientCommand(std::string_view command) + { + std::vector resp; + + return resp; + } + + template asio::awaitable startSession(auto socket, RCVT&& rcv_timeout = DEFAULT_RCV_TIMEOUT, SNDT&& snd_timeout = DEFAULT_SND_TIMEOUT) { + using namespace asio::experimental::awaitable_operators; + using sock_t = std::decay_t; - auto watchdog = []() -> asio::awaitable {}; + + auto look_for_whole_msg = [](auto const& bytes) { + auto found = std::ranges::search(bytes, BM700::CONTROL_PROTO_STOP_SEQ); + return found.empty() ? std::span(bytes.begin(), bytes.begin()) : std::span(bytes.begin(), found.end()); + }; + + auto watchdog = [this](const std::chrono::steady_clock::time_point& deadline) -> asio::awaitable { + asio::steady_timer timer(_asioContext); + + auto now = std::chrono::steady_clock::now(); + while (deadline > now) { + timer.expires_at(deadline); + co_await timer.async_wait(asio::use_awaitable); + now = std::chrono::steady_clock::now(); + } + + throw std::system_error(std::make_error_code(std::errc::timed_out)); + }; + asio::streambuf sbuff; size_t nbytes; std::stringstream st; st << socket.remote_endpoint(); + std::string r_epn = st.str(); + + st.str() = ""; + st << std::this_thread::get_id(); + std::string thr_id = st.str(); + + _serverLogger->info("Start client session: remote endpoint <{}> (session thread ID = {})", r_epn, thr_id); try { if constexpr (traits::is_serial_proto) { @@ -340,7 +379,14 @@ private: } + // send buffer sequence + // initiate the second element by "stop-sequence" symbols + std::vector snd_buff_seq{ + {}, {BM700::CONTROL_PROTO_STOP_SEQ.data(), BM700::CONTROL_PROTO_STOP_SEQ.size()}}; + + // main client request -- server respond cycle for (;;) { + // receive message if constexpr (traits::is_serial_proto) { nbytes = 1024; } else { @@ -351,21 +397,81 @@ private: if constexpr (traits::is_local_seqpack_proto) { asio::socket_base::message_flags oflags; - nbytes = co_await socket.async_receive(buff, &oflags, asio::use_awaitable); + // nbytes = co_await socket.async_receive(buff, &oflags, asio::use_awaitable); + nbytes = co_await (socket.async_receive(buff, &oflags, asio::use_awaitable) && + watchdog(std::chrono::steady_clock::now() + rcv_timeout)); if (!nbytes) { // EOF! - _serverLogger->info("It seems client ({}) closed the connection!", st.str()); - co_return; + // _serverLogger->info("It seems client ({}) closed the connection!", st.str()); + throw std::system_error(std::error_code(asio::error::misc_errors::eof)); + // co_return; } } else { - nbytes = co_await asio::async_read(socket, buff, asio::transfer_at_least(1), asio::use_awaitable); + // nbytes = co_await asio::async_read(socket, buff, asio::transfer_at_least(1), + // asio::use_awaitable); + nbytes = + co_await (asio::async_read(socket, buff, asio::transfer_at_least(1), asio::use_awaitable) && + watchdog(std::chrono::steady_clock::now() + rcv_timeout)); } sbuff.commit(nbytes); + + auto start_ptr = static_cast(sbuff.data().data()); + + auto msg = look_for_whole_msg(std::span(start_ptr, sbuff.size())); + if (msg.empty()) { // still not whole message + continue; + } + + // extract command without stop sequence symbols + // std::string comm; + // std::ranges::copy(msg | std::views::take(msg.size() - BM700::CONTROL_PROTO_STOP_SEQ.size()), + // std::back_inserter(comm)); + + std::string_view comm{msg.begin(), msg.end() - BM700::CONTROL_PROTO_STOP_SEQ.size()}; + + _serverLogger->debug("A command [{}] was received from client (remote endpoint <{}>, thread ID = {})", + comm, r_epn, thr_id); + + auto resp = handleClientCommand(comm); + + _serverLogger->debug("Send respond [{}] to client (remote endpoint <{}>, thread ID = {})", + std::string_view(resp.begin(), resp.end()), r_epn, thr_id); + + // send server respond to client + snd_buff_seq[0] = {resp.data(), resp.size()}; + + if constexpr (traits::is_local_seqpack_proto) { + nbytes = co_await (socket.async_send(snd_buff_seq, 0, asio::use_awaitable) && + watchdog(std::chrono::steady_clock::now() + snd_timeout)); + } else { + nbytes = co_await (asio::async_write(socket, snd_buff_seq, asio::use_awaitable) && + watchdog(std::chrono::steady_clock::now() + snd_timeout)); + } + + if (nbytes != (resp.size() + BM700::CONTROL_PROTO_STOP_SEQ.size())) { // !!!!!!!!!! + } } + } catch (const std::system_error& ex) { + if (ex.code() == std::error_code(asio::error::misc_errors::eof)) { + _serverLogger->info( + "It seems client or server closed the connection (remote endpoint <{}>, thread ID = {})", r_epn, + thr_id); + } else { + _serverLogger->error("An error '{}' occured in client session (remote endpoint <{}>, thread ID = {})", + ex.what(), r_epn, thr_id); + } + } catch (const std::exception& ex) { + _serverLogger->error( + "An unhandled error '{}' occured in client sesssion (remote endpoint <{}>, thread ID = {})", ex.what(), + r_epn, thr_id); } catch (...) { + _serverLogger->error("An unhandled error occured in client sesssion (remote endpoint <{}>, thread ID = {})", + r_epn, thr_id); } + + _serverLogger->info("Close client session: remote endpoint <{}> (thread ID = {})", r_epn, thr_id); } }; diff --git a/cxx/control_proto.h b/cxx/control_proto.h index 9b20705..ffc18cb 100644 --- a/cxx/control_proto.h +++ b/cxx/control_proto.h @@ -14,6 +14,13 @@ static constexpr std::string_view CONTROL_PROTO_STOP_SEQ = "\n"; static constexpr std::string_view CONTROL_PROTO_COMM_ARG_DELIM_SEQ = "="; static constexpr std::string_view CONTROL_PROTO_ARG_DELIM_SEQ = ","; + +/* CONTROL PROTOCOL SERVER RESPONDS */ + +static constexpr std::string_view CONTROL_PROTO_STR_RESP_ACK = "ACK"; +static constexpr std::string_view CONTROL_PROTO_STR_RESP_INVALID_COMM = "INVDCOMM"; // invalid command +static constexpr std::string_view CONTROL_PROTO_STR_RESP_INVALID_PARS = "INVDPARS"; // invalid command parameters + /* CONTROL PROTOCOL COMMANDS */ // coordinates getter/setter @@ -26,6 +33,8 @@ static constexpr std::string_view CONTROL_PROTO_STR_TAG_RADEC = "tagRADEC"; static constexpr std::string_view CONTROL_PROTO_STR_TEL_RA = "telRA"; static constexpr std::string_view CONTROL_PROTO_STR_TEL_DEC = "telDEC"; static constexpr std::string_view CONTROL_PROTO_STR_TEL_RADEC = "telRADEC"; +static constexpr std::string_view CONTROL_PROTO_STR_TEL_HA = "telHA"; +static constexpr std::string_view CONTROL_PROTO_STR_TEL_HADEC = "telHADEC"; // time/date static constexpr std::string_view CONTROL_PROTO_STR_UTC_DATE = "utcDate";