This commit is contained in:
Timur A. Fatkhullin 2024-11-18 18:03:07 +03:00
parent 45b8d4a3c7
commit f3a6aa3571
3 changed files with 173 additions and 24 deletions

126
net/adc_device_netclient.h Normal file
View File

@ -0,0 +1,126 @@
#pragma once
#include "../common/adc_utils.h"
#include "adc_device_netmsg.h"
#include "adc_netclient.h"
namespace adc
{
template <typename IdentT = std::string, interfaces::adc_logger_c LoggerT = utils::AdcOstreamLogger<char>>
class AdcDeviceNetClient : public AdcGenericNetClient<IdentT, LoggerT>
{
typedef AdcGenericNetClient<IdentT, LoggerT> base_t;
public:
template <interfaces::adc_netservice_c NetServiceT, traits::adc_hashable_c SessionIdentT = std::string>
class Session : public std::enable_shared_from_this<Session<NetServiceT, SessionIdentT>>
{
public:
typedef SessionIdentT netsession_ident_t;
typedef NetServiceT netservice_t;
struct netsession_ctx_t {
AdcDeviceNetClient* clientPtr;
std::chrono::milliseconds recvTimeout;
std::chrono::milliseconds sendTimeout;
};
typedef std::vector<char> message_t;
// Session(const netsession_ident_t& id, netservice_t srv, AdcDeviceNetServer* srv_ptr)
Session(const netsession_ident_t& id, netservice_t srv, netsession_ctx_t ctx)
: _ident(id),
_netService(std::move(srv)),
_clientPtr(ctx.clientPtr),
_recvTimeout(ctx.recvTimeout),
_sendTimeout(ctx.sendTimeout)
{
_clientPtr->logInfo("Create client-to-server session with ID = {} (addr = {}, thread = {})", _ident,
(void*)this, utils::AdcThisThreadId());
}
virtual ~Session()
{
_clientPtr->logInfo("Delete client-to-server session with ID = {} (addr = {}, thread = {})", _ident,
(void*)this, utils::AdcThisThreadId());
}
netsession_ident_t ident() const
{
return _ident;
}
void start()
{
_clientPtr->logInfo("Start client-to-server session with ID = {} (addr = {}, thread = {})", _ident,
(void*)this, utils::AdcThisThreadId());
}
void stop()
{
_clientPtr->logInfo("Stop client-to-server session with ID = {} (addr = {}, thread = {})", _ident,
(void*)this, utils::AdcThisThreadId());
_netService.close();
}
protected:
netsession_ident_t _ident;
netservice_t _netService;
AdcDeviceNetClient* _clientPtr;
std::chrono::milliseconds _recvTimeout = std::chrono::seconds(5);
std::chrono::milliseconds _sendTimeout = std::chrono::seconds(5);
// helper methods
std::function<void(typename netservice_t::async_callback_err_t,
typename netservice_t::async_receive_callback_t)>
_defaultRecvCallback = [this](auto err, auto msg) {
if (err) {
_clientPtr->logError("An error occured while receiving server respond: {}",
netservice_t::formattableError(err));
} else {
AdcDeviceProtoMessage dev_msg(msg);
if (!dev_msg.isValid()) {
_clientPtr->logError("Invalid server respond");
return;
}
if (dev_msg.isACK()) {
} else if (dev_msg.isERROR()) {
} else {
_clientPtr->logError("Unexpectable server respond");
}
}
};
template <traits::adc_input_char_range SendMsgT, typename TokenT>
auto asyncSendRecv(const SendMsgT& send_msg, TokenT&& token)
{
return _netService.asyncSend(
send_msg,
[wrapper = traits::adc_pf_wrapper(std::forward<TokenT>(token)), this](auto err) {
if (err) {
this->logError("An error occured while sending the message: {}",
netservice_t::formattableError(err));
} else {
_netService.asyncReceive(std::get<0>(wrapper), _recvTimeout);
}
},
_sendTimeout);
}
}; // end of 'Session' class declaration
using base_t::base_t;
virtual ~AdcDeviceNetClient() {}
};
} // namespace adc

View File

@ -252,14 +252,32 @@ public:
void start()
{
static bool first_time = true;
if (first_time) {
_serverPtr->logInfo("Start client session with ID = {} (addr = {}, thread = {})", _ident, (void*)this,
utils::AdcThisThreadId());
first_time = false;
}
_serverPtr->logInfo("Start client session with ID = {} (addr = {}, thread = {})", _ident, (void*)this,
utils::AdcThisThreadId());
do_start();
}
void stop()
{
_serverPtr->logInfo("Stop client session with ID = {} (addr = {}, thread = {})", _ident, (void*)this,
utils::AdcThisThreadId());
_netService.close();
}
protected:
netsession_ident_t _ident;
netservice_t _netService;
AdcDeviceNetServer* _serverPtr;
AdcDeviceNetServer::DeviceWrapper* _bindDevice;
std::chrono::milliseconds _recvTimeout = std::chrono::hours(12);
std::chrono::milliseconds _sendTimeout = std::chrono::seconds(5);
void do_start()
{
auto self(this->shared_from_this());
_netService.asyncReceive(
@ -285,7 +303,7 @@ public:
netservice_t::formattableError(ec), (void*)this, utils::AdcThisThreadId());
stop();
} else {
start();
do_start();
}
},
_sendTimeout);
@ -294,23 +312,6 @@ public:
_recvTimeout);
}
void stop()
{
_serverPtr->logInfo("Stop client session with ID = {} (addr = {}, thread = {})", _ident, (void*)this,
utils::AdcThisThreadId());
_netService.close();
}
protected:
netsession_ident_t _ident;
netservice_t _netService;
AdcDeviceNetServer* _serverPtr;
AdcDeviceNetServer::DeviceWrapper* _bindDevice;
std::chrono::milliseconds _recvTimeout = std::chrono::hours(12);
std::chrono::milliseconds _sendTimeout = std::chrono::seconds(5);
void processMessage(auto& msg)
{
typedef std::decay_t<decltype(msg)> msg_t;

View File

@ -55,6 +55,28 @@ public:
}
template <interfaces::adc_netsession_c SessionT, typename TokenT, typename... NetServiceCtorArgTs>
void start(const SessionT::netservice_t::endpoint_t& endpoint,
SessionT::netsession_ident_t id,
SessionT::netsession_ctx_t sess_ctx,
TokenT&& token,
NetServiceCtorArgTs&&... ctor_args)
{
auto srv_sptr =
std::make_shared<typename SessionT::netservice_t>(std::forward<NetServiceCtorArgTs>(ctor_args)...);
// try to connect to server and create session
srv_sptr->asyncConnect(
endpoint,
[id = std::move(id), sess_ctx = std::move(sess_ctx), srv_sptr,
wrapper = traits::adc_pf_wrapper(std::forward<TokenT>(token))](auto ec) mutable {
// create session
std::get<0>(wrapper)(ec, {std::move(id), std::move(*srv_sptr), std::move(sess_ctx)});
},
_connectTimeout);
}
template <traits::adc_time_duration_c DT>
void setConnectTimeout(const DT& timeout)
{