...
This commit is contained in:
parent
aca8a6523e
commit
b860900682
@ -440,7 +440,8 @@ private:
|
|||||||
};
|
};
|
||||||
|
|
||||||
auto watchdog = [this](const std::chrono::steady_clock::time_point& deadline) -> asio::awaitable<void> {
|
auto watchdog = [this](const std::chrono::steady_clock::time_point& deadline) -> asio::awaitable<void> {
|
||||||
asio::steady_timer timer(_asioContext);
|
// asio::steady_timer timer(_asioContext);
|
||||||
|
asio::steady_timer timer(co_await asio::this_coro::executor);
|
||||||
|
|
||||||
auto now = std::chrono::steady_clock::now();
|
auto now = std::chrono::steady_clock::now();
|
||||||
while (deadline > now) {
|
while (deadline > now) {
|
||||||
@ -476,10 +477,13 @@ private:
|
|||||||
_serverLogger->info("Start client session: remote endpoint <{}> (session thread ID = {})", r_epn, thr_id);
|
_serverLogger->info("Start client session: remote endpoint <{}> (session thread ID = {})", r_epn, thr_id);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
if constexpr (!traits::is_serial_proto<sock_t>) {
|
||||||
|
_serverLogger->trace("Set socket option KEEP_ALIVE to TRUE");
|
||||||
|
socket.set_option(asio::socket_base::keep_alive(true));
|
||||||
|
}
|
||||||
|
|
||||||
if constexpr (traits::is_serial_proto<sock_t>) {
|
if constexpr (traits::is_serial_proto<sock_t>) {
|
||||||
_serialPorts.emplace_back(&socket);
|
_serialPorts.emplace_back(&socket);
|
||||||
|
|
||||||
|
|
||||||
} else if constexpr (traits::is_tcp_proto<sock_t>) {
|
} else if constexpr (traits::is_tcp_proto<sock_t>) {
|
||||||
_tcpSockets.emplace_back(&socket);
|
_tcpSockets.emplace_back(&socket);
|
||||||
} else if constexpr (traits::is_local_stream_proto<sock_t>) {
|
} else if constexpr (traits::is_local_stream_proto<sock_t>) {
|
||||||
@ -496,45 +500,69 @@ private:
|
|||||||
std::vector<asio::const_buffer> snd_buff_seq{
|
std::vector<asio::const_buffer> snd_buff_seq{
|
||||||
{}, {BM700::CONTROL_PROTO_STOP_SEQ.data(), BM700::CONTROL_PROTO_STOP_SEQ.size()}};
|
{}, {BM700::CONTROL_PROTO_STOP_SEQ.data(), BM700::CONTROL_PROTO_STOP_SEQ.size()}};
|
||||||
|
|
||||||
|
asio::steady_timer timeout_timer(_asioContext);
|
||||||
|
std::variant<size_t, std::monostate> op_res;
|
||||||
|
|
||||||
|
bool do_read = true;
|
||||||
|
|
||||||
// main client request -- server respond cycle
|
// main client request -- server respond cycle
|
||||||
for (;;) {
|
for (;;) {
|
||||||
// receive message
|
// receive message
|
||||||
if constexpr (traits::is_serial_proto<sock_t>) {
|
|
||||||
nbytes = 1024;
|
|
||||||
} else {
|
|
||||||
nbytes = socket.available();
|
|
||||||
}
|
|
||||||
|
|
||||||
auto buff = sbuff.prepare(nbytes ? nbytes : 1);
|
if (do_read) {
|
||||||
|
_serverLogger->trace("Start socket/port reading operation with timeout {} ...", rcv_timeout);
|
||||||
|
|
||||||
if constexpr (traits::is_local_seqpack_proto<sock_t>) {
|
if constexpr (traits::is_serial_proto<sock_t>) {
|
||||||
asio::socket_base::message_flags oflags;
|
nbytes = 1024;
|
||||||
// nbytes = co_await socket.async_receive(buff, &oflags, asio::use_awaitable);
|
} else {
|
||||||
nbytes = co_await (socket.async_receive(buff, oflags, asio::use_awaitable) &&
|
nbytes = socket.available();
|
||||||
watchdog(std::chrono::steady_clock::now() + rcv_timeout));
|
|
||||||
|
|
||||||
if (!nbytes) { // EOF!
|
|
||||||
// _serverLogger->info("It seems client ({}) closed the connection!", st.str());
|
|
||||||
throw std::system_error(std::error_code(asio::error::misc_errors::eof));
|
|
||||||
// co_return;
|
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
// nbytes = co_await asio::async_read(socket, buff, asio::transfer_at_least(1),
|
|
||||||
// asio::use_awaitable);
|
|
||||||
nbytes =
|
|
||||||
co_await (asio::async_read(socket, buff, asio::transfer_at_least(1), asio::use_awaitable) &&
|
|
||||||
watchdog(std::chrono::steady_clock::now() + rcv_timeout));
|
|
||||||
}
|
|
||||||
|
|
||||||
sbuff.commit(nbytes);
|
auto buff = sbuff.prepare(nbytes ? nbytes : 1);
|
||||||
|
|
||||||
|
// timeout_timer.expires_after(std::chrono::seconds(5));
|
||||||
|
timeout_timer.expires_after(rcv_timeout);
|
||||||
|
|
||||||
|
if constexpr (traits::is_local_seqpack_proto<sock_t>) {
|
||||||
|
asio::socket_base::message_flags oflags;
|
||||||
|
|
||||||
|
op_res = co_await (socket.async_receive(buff, oflags, asio::use_awaitable) ||
|
||||||
|
timeout_timer.async_wait(asio::use_awaitable));
|
||||||
|
|
||||||
|
} else {
|
||||||
|
op_res =
|
||||||
|
co_await (asio::async_read(socket, buff, asio::transfer_at_least(1), asio::use_awaitable) ||
|
||||||
|
timeout_timer.async_wait(asio::use_awaitable));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (op_res.index()) {
|
||||||
|
throw std::system_error(std::make_error_code(std::errc::timed_out));
|
||||||
|
} else {
|
||||||
|
nbytes = std::get<0>(op_res);
|
||||||
|
|
||||||
|
_serverLogger->trace("{} bytes were received", nbytes);
|
||||||
|
|
||||||
|
if constexpr (traits::is_local_seqpack_proto<sock_t>) {
|
||||||
|
if (!nbytes) { // EOF!
|
||||||
|
throw std::system_error(std::error_code(asio::error::misc_errors::eof));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sbuff.commit(nbytes);
|
||||||
|
} // here, the input stream buffer still contains remaining bytes. try to handle its
|
||||||
|
|
||||||
auto start_ptr = static_cast<const char*>(sbuff.data().data());
|
auto start_ptr = static_cast<const char*>(sbuff.data().data());
|
||||||
|
|
||||||
auto msg = look_for_whole_msg(std::span(start_ptr, sbuff.size()));
|
auto msg = look_for_whole_msg(std::span(start_ptr, sbuff.size()));
|
||||||
if (msg.empty()) { // still not whole message
|
if (msg.empty()) { // still not whole message
|
||||||
|
_serverLogger->trace(
|
||||||
|
"It seems a partial command message was received, so waiting for remaining part ...");
|
||||||
|
do_read = true;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// extract command without stop sequence symbols
|
// extract command without stop sequence symbols
|
||||||
// std::string comm;
|
// std::string comm;
|
||||||
// std::ranges::copy(msg | std::views::take(msg.size() - BM700::CONTROL_PROTO_STOP_SEQ.size()),
|
// std::ranges::copy(msg | std::views::take(msg.size() - BM700::CONTROL_PROTO_STOP_SEQ.size()),
|
||||||
@ -547,18 +575,32 @@ private:
|
|||||||
|
|
||||||
auto resp = handleClientCommand(comm);
|
auto resp = handleClientCommand(comm);
|
||||||
|
|
||||||
|
// remove received message from the input stream buffer. NOTE: 'msg' is now invalidated!!!
|
||||||
|
sbuff.consume(msg.size());
|
||||||
|
do_read = sbuff.size() == 0;
|
||||||
|
|
||||||
|
|
||||||
_serverLogger->debug("Send respond [{}] to client (remote endpoint <{}>, thread ID = {})",
|
_serverLogger->debug("Send respond [{}] to client (remote endpoint <{}>, thread ID = {})",
|
||||||
std::string_view(resp.begin(), resp.end()), r_epn, thr_id);
|
std::string_view(resp.begin(), resp.end()), r_epn, thr_id);
|
||||||
|
|
||||||
// send server respond to client
|
// send server respond to client
|
||||||
snd_buff_seq[0] = {resp.data(), resp.size()};
|
snd_buff_seq[0] = {resp.data(), resp.size()};
|
||||||
|
|
||||||
|
timeout_timer.expires_after(snd_timeout);
|
||||||
if constexpr (traits::is_local_seqpack_proto<sock_t>) {
|
if constexpr (traits::is_local_seqpack_proto<sock_t>) {
|
||||||
nbytes = co_await (socket.async_send(snd_buff_seq, 0, asio::use_awaitable) &&
|
op_res = co_await (socket.async_send(snd_buff_seq, 0, asio::use_awaitable) ||
|
||||||
watchdog(std::chrono::steady_clock::now() + snd_timeout));
|
timeout_timer.async_wait(asio::use_awaitable));
|
||||||
} else {
|
} else {
|
||||||
nbytes = co_await (asio::async_write(socket, snd_buff_seq, asio::use_awaitable) &&
|
// nbytes = co_await asio::async_write(socket, snd_buff_seq, asio::use_awaitable);
|
||||||
watchdog(std::chrono::steady_clock::now() + snd_timeout));
|
op_res = co_await (asio::async_write(socket, snd_buff_seq, asio::use_awaitable) ||
|
||||||
|
timeout_timer.async_wait(asio::use_awaitable));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (op_res.index()) {
|
||||||
|
throw std::system_error(std::make_error_code(std::errc::timed_out));
|
||||||
|
} else {
|
||||||
|
nbytes = std::get<0>(op_res);
|
||||||
|
_serverLogger->trace("{} bytes were sent", nbytes);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (nbytes != (resp.size() + BM700::CONTROL_PROTO_STOP_SEQ.size())) { // !!!!!!!!!!
|
if (nbytes != (resp.size() + BM700::CONTROL_PROTO_STOP_SEQ.size())) { // !!!!!!!!!!
|
||||||
|
|||||||
@ -432,8 +432,13 @@ protected:
|
|||||||
_endpoint = other._endpoint;
|
_endpoint = other._endpoint;
|
||||||
_proto = other._proto;
|
_proto = other._proto;
|
||||||
|
|
||||||
auto idx = std::distance(other._endpoint.c_str(), other._host.data());
|
std::iterator_traits<const char*>::difference_type idx;
|
||||||
_host = std::string_view(_endpoint.c_str() + idx, other._host.size());
|
if (other.isLocal()) { // for 'local' host is one of static class constants
|
||||||
|
_host = other._host;
|
||||||
|
} else {
|
||||||
|
idx = std::distance(other._endpoint.c_str(), other._host.data());
|
||||||
|
_host = std::string_view(_endpoint.c_str() + idx, other._host.size());
|
||||||
|
}
|
||||||
|
|
||||||
idx = std::distance(other._endpoint.c_str(), other._path.data());
|
idx = std::distance(other._endpoint.c_str(), other._path.data());
|
||||||
_path = std::string_view(_endpoint.c_str() + idx, other._path.size());
|
_path = std::string_view(_endpoint.c_str() + idx, other._path.size());
|
||||||
|
|||||||
@ -9,11 +9,12 @@ int main()
|
|||||||
|
|
||||||
auto logger = spdlog::stdout_color_mt("STDOUT_LOGGER");
|
auto logger = spdlog::stdout_color_mt("STDOUT_LOGGER");
|
||||||
logger->set_level(spdlog::level::debug);
|
logger->set_level(spdlog::level::debug);
|
||||||
|
logger->set_level(spdlog::level::trace);
|
||||||
logger->flush_on(spdlog::level::debug);
|
logger->flush_on(spdlog::level::debug);
|
||||||
|
|
||||||
mcc::MccMountServer server(ctx, logger);
|
mcc::MccMountServer server(ctx, logger);
|
||||||
|
|
||||||
mcc::MccServerEndpoint epn(std::string_view("local://seqpacket/tmp/BM700_SERVER_SOCK"));
|
mcc::MccServerEndpoint epn(std::string_view("local://seqpacket/@tmp/BM700_SERVER_SOCK"));
|
||||||
// mcc::MccServerEndpoint epn(std::string_view("tcp://localhost:12345/tmp/BM700_SERVER_SOCK"));
|
// mcc::MccServerEndpoint epn(std::string_view("tcp://localhost:12345/tmp/BM700_SERVER_SOCK"));
|
||||||
|
|
||||||
asio::co_spawn(ctx, server.listen(epn), asio::detached);
|
asio::co_spawn(ctx, server.listen(epn), asio::detached);
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user