This commit is contained in:
Timur A. Fatkhullin 2025-02-20 12:21:18 +03:00
parent 357a0d7e19
commit 53a16d7571
2 changed files with 120 additions and 5 deletions

View File

@ -6,16 +6,20 @@
#include <asio/co_spawn.hpp>
#include <asio/deferred.hpp>
#include <asio/detached.hpp>
#include <asio/experimental/awaitable_operators.hpp>
#include <asio/ip/tcp.hpp>
#include <asio/local/seq_packet_protocol.hpp>
#include <asio/local/stream_protocol.hpp>
#include <asio/read.hpp>
#include <asio/serial_port.hpp>
#include <asio/steady_timer.hpp>
#include <asio/streambuf.hpp>
#include <asio/write.hpp>
#include <spdlog/spdlog.h>
#include "comm_server_endpoint.h"
#include "control_proto.h"
#include "mount.h"
namespace mcc
@ -309,20 +313,55 @@ private:
}
std::vector<char> handleClientCommand(std::string_view command)
{
std::vector<char> resp;
return resp;
}
template <traits::mcc_time_duration_c RCVT, traits::mcc_time_duration_c SNDT>
asio::awaitable<void> startSession(auto socket,
RCVT&& rcv_timeout = DEFAULT_RCV_TIMEOUT,
SNDT&& snd_timeout = DEFAULT_SND_TIMEOUT)
{
using namespace asio::experimental::awaitable_operators;
using sock_t = std::decay_t<decltype(socket)>;
auto watchdog = []() -> asio::awaitable<void> {};
auto look_for_whole_msg = [](auto const& bytes) {
auto found = std::ranges::search(bytes, BM700::CONTROL_PROTO_STOP_SEQ);
return found.empty() ? std::span(bytes.begin(), bytes.begin()) : std::span(bytes.begin(), found.end());
};
auto watchdog = [this](const std::chrono::steady_clock::time_point& deadline) -> asio::awaitable<void> {
asio::steady_timer timer(_asioContext);
auto now = std::chrono::steady_clock::now();
while (deadline > now) {
timer.expires_at(deadline);
co_await timer.async_wait(asio::use_awaitable);
now = std::chrono::steady_clock::now();
}
throw std::system_error(std::make_error_code(std::errc::timed_out));
};
asio::streambuf sbuff;
size_t nbytes;
std::stringstream st;
st << socket.remote_endpoint();
std::string r_epn = st.str();
st.str() = "";
st << std::this_thread::get_id();
std::string thr_id = st.str();
_serverLogger->info("Start client session: remote endpoint <{}> (session thread ID = {})", r_epn, thr_id);
try {
if constexpr (traits::is_serial_proto<sock_t>) {
@ -340,7 +379,14 @@ private:
}
// send buffer sequence
// initiate the second element by "stop-sequence" symbols
std::vector<asio::const_buffer> snd_buff_seq{
{}, {BM700::CONTROL_PROTO_STOP_SEQ.data(), BM700::CONTROL_PROTO_STOP_SEQ.size()}};
// main client request -- server respond cycle
for (;;) {
// receive message
if constexpr (traits::is_serial_proto<sock_t>) {
nbytes = 1024;
} else {
@ -351,21 +397,81 @@ private:
if constexpr (traits::is_local_seqpack_proto<sock_t>) {
asio::socket_base::message_flags oflags;
nbytes = co_await socket.async_receive(buff, &oflags, asio::use_awaitable);
// nbytes = co_await socket.async_receive(buff, &oflags, asio::use_awaitable);
nbytes = co_await (socket.async_receive(buff, &oflags, asio::use_awaitable) &&
watchdog(std::chrono::steady_clock::now() + rcv_timeout));
if (!nbytes) { // EOF!
_serverLogger->info("It seems client ({}) closed the connection!", st.str());
co_return;
// _serverLogger->info("It seems client ({}) closed the connection!", st.str());
throw std::system_error(std::error_code(asio::error::misc_errors::eof));
// co_return;
}
} else {
nbytes = co_await asio::async_read(socket, buff, asio::transfer_at_least(1), asio::use_awaitable);
// nbytes = co_await asio::async_read(socket, buff, asio::transfer_at_least(1),
// asio::use_awaitable);
nbytes =
co_await (asio::async_read(socket, buff, asio::transfer_at_least(1), asio::use_awaitable) &&
watchdog(std::chrono::steady_clock::now() + rcv_timeout));
}
sbuff.commit(nbytes);
auto start_ptr = static_cast<const char*>(sbuff.data().data());
auto msg = look_for_whole_msg(std::span(start_ptr, sbuff.size()));
if (msg.empty()) { // still not whole message
continue;
}
// extract command without stop sequence symbols
// std::string comm;
// std::ranges::copy(msg | std::views::take(msg.size() - BM700::CONTROL_PROTO_STOP_SEQ.size()),
// std::back_inserter(comm));
std::string_view comm{msg.begin(), msg.end() - BM700::CONTROL_PROTO_STOP_SEQ.size()};
_serverLogger->debug("A command [{}] was received from client (remote endpoint <{}>, thread ID = {})",
comm, r_epn, thr_id);
auto resp = handleClientCommand(comm);
_serverLogger->debug("Send respond [{}] to client (remote endpoint <{}>, thread ID = {})",
std::string_view(resp.begin(), resp.end()), r_epn, thr_id);
// send server respond to client
snd_buff_seq[0] = {resp.data(), resp.size()};
if constexpr (traits::is_local_seqpack_proto<sock_t>) {
nbytes = co_await (socket.async_send(snd_buff_seq, 0, asio::use_awaitable) &&
watchdog(std::chrono::steady_clock::now() + snd_timeout));
} else {
nbytes = co_await (asio::async_write(socket, snd_buff_seq, asio::use_awaitable) &&
watchdog(std::chrono::steady_clock::now() + snd_timeout));
}
if (nbytes != (resp.size() + BM700::CONTROL_PROTO_STOP_SEQ.size())) { // !!!!!!!!!!
}
}
} catch (const std::system_error& ex) {
if (ex.code() == std::error_code(asio::error::misc_errors::eof)) {
_serverLogger->info(
"It seems client or server closed the connection (remote endpoint <{}>, thread ID = {})", r_epn,
thr_id);
} else {
_serverLogger->error("An error '{}' occured in client session (remote endpoint <{}>, thread ID = {})",
ex.what(), r_epn, thr_id);
}
} catch (const std::exception& ex) {
_serverLogger->error(
"An unhandled error '{}' occured in client sesssion (remote endpoint <{}>, thread ID = {})", ex.what(),
r_epn, thr_id);
} catch (...) {
_serverLogger->error("An unhandled error occured in client sesssion (remote endpoint <{}>, thread ID = {})",
r_epn, thr_id);
}
_serverLogger->info("Close client session: remote endpoint <{}> (thread ID = {})", r_epn, thr_id);
}
};

View File

@ -14,6 +14,13 @@ static constexpr std::string_view CONTROL_PROTO_STOP_SEQ = "\n";
static constexpr std::string_view CONTROL_PROTO_COMM_ARG_DELIM_SEQ = "=";
static constexpr std::string_view CONTROL_PROTO_ARG_DELIM_SEQ = ",";
/* CONTROL PROTOCOL SERVER RESPONDS */
static constexpr std::string_view CONTROL_PROTO_STR_RESP_ACK = "ACK";
static constexpr std::string_view CONTROL_PROTO_STR_RESP_INVALID_COMM = "INVDCOMM"; // invalid command
static constexpr std::string_view CONTROL_PROTO_STR_RESP_INVALID_PARS = "INVDPARS"; // invalid command parameters
/* CONTROL PROTOCOL COMMANDS */
// coordinates getter/setter
@ -26,6 +33,8 @@ static constexpr std::string_view CONTROL_PROTO_STR_TAG_RADEC = "tagRADEC";
static constexpr std::string_view CONTROL_PROTO_STR_TEL_RA = "telRA";
static constexpr std::string_view CONTROL_PROTO_STR_TEL_DEC = "telDEC";
static constexpr std::string_view CONTROL_PROTO_STR_TEL_RADEC = "telRADEC";
static constexpr std::string_view CONTROL_PROTO_STR_TEL_HA = "telHA";
static constexpr std::string_view CONTROL_PROTO_STR_TEL_HADEC = "telHADEC";
// time/date
static constexpr std::string_view CONTROL_PROTO_STR_UTC_DATE = "utcDate";