Compare commits

...

10 Commits

Author SHA1 Message Date
ad12ee1ad8 ... 2024-11-20 17:25:33 +03:00
319276845a ... 2024-11-20 12:23:50 +03:00
Timur A. Fatkhullin
ae6fbf18ca add AdcDeviceNetClient class 2024-11-19 00:28:52 +03:00
f3a6aa3571 ... 2024-11-18 18:03:07 +03:00
Timur A. Fatkhullin
45b8d4a3c7 fix 100% load of CPU after client disconnection
(AdcBaseNetServiceASIO.asyncReceive)
add resolving domain name (AdcDeviceNetServerASIO)
2024-11-17 23:50:15 +03:00
221f595bcb ... 2024-11-15 12:49:11 +03:00
Timur A. Fatkhullin
285f8de1f7 add AdcGenericNetClient class 2024-11-14 23:16:08 +03:00
78a9e53d18 Back to C++20 standard!
Logging is worked (AdcOstreamLogger and AdcSPDLOGLogger classes)
2024-11-14 18:33:07 +03:00
05e0055193 ... 2024-11-13 18:01:31 +03:00
7251f95459 add AdcOstreamLogger class (std::basic_stream based multithread-safe
simple logger)
2024-11-12 18:07:00 +03:00
13 changed files with 1330 additions and 264 deletions

View File

@ -4,7 +4,7 @@ project(ADC LANGUAGES CXX)
set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}") set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}")
set(CMAKE_CXX_STANDARD 23) set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED ON) set(CMAKE_CXX_STANDARD_REQUIRED ON)
# set(CMAKE_BUILD_TYPE Release) # set(CMAKE_BUILD_TYPE Release)
@ -12,21 +12,21 @@ set(CMAKE_CXX_STANDARD_REQUIRED ON)
# check compiler version to ensure supporting of # check compiler version to ensure supporting of
# 'deducing this' C++23 feature # 'deducing this' C++23 feature
# #
if("${CMAKE_CXX_COMPILER_ID}" STREQUAL "GNU") # if("${CMAKE_CXX_COMPILER_ID}" STREQUAL "GNU")
if (CMAKE_CXX_COMPILER_VERSION VERSION_LESS 14.0) # if (CMAKE_CXX_COMPILER_VERSION VERSION_LESS 14.0)
message(FATAL_ERROR "GCC version must be at least 14.0!") # message(FATAL_ERROR "GCC version must be at least 14.0!")
endif() # endif()
elseif ("${CMAKE_CXX_COMPILER_ID}" STREQUAL "Clang") # elseif ("${CMAKE_CXX_COMPILER_ID}" STREQUAL "Clang")
if (CMAKE_CXX_COMPILER_VERSION VERSION_LESS 18.0) # if (CMAKE_CXX_COMPILER_VERSION VERSION_LESS 18.0)
message(FATAL_ERROR "Clang version must be at least 18.0!") # message(FATAL_ERROR "Clang version must be at least 18.0!")
endif() # endif()
elseif(CMAKE_CXX_COMPILER_ID STREQUAL "MSVC") # elseif(CMAKE_CXX_COMPILER_ID STREQUAL "MSVC")
if(CMAKE_CXX_COMPILER_VERSION VERSION_LESS "19.32") # if(CMAKE_CXX_COMPILER_VERSION VERSION_LESS "19.32")
message(FATAL_ERROR "MSVC version must be at least 19.32") # message(FATAL_ERROR "MSVC version must be at least 19.32")
endif() # endif()
else() # else()
message(WARNING "You are using an unsupported compiler! Compilation has only been tested with Clang and GCC.") # message(WARNING "You are using an unsupported compiler! Compilation has only been tested with Clang and GCC.")
endif() # endif()
@ -53,9 +53,11 @@ set(ADC_NETWORK_HEADERS
# net/adc_netservice.h # net/adc_netservice.h
net/adc_endpoint.h net/adc_endpoint.h
net/adc_netserver.h net/adc_netserver.h
net/adc_netclient.h
net/adc_net_concepts.h net/adc_net_concepts.h
net/adc_device_netmsg.h net/adc_device_netmsg.h
net/adc_device_netserver.h net/adc_device_netserver.h
net/adc_device_netclient.h
) )
@ -85,6 +87,7 @@ option(SPDLOG_LIBRARY "Use of SPDLOG library for logging" ON)
if (SPDLOG_LIBRARY) if (SPDLOG_LIBRARY)
find_package(spdlog REQUIRED) find_package(spdlog REQUIRED)
find_package(fmt REQUIRED)
set(ADC_COMMON_HEADERS ${ADC_COMMON_HEADERS} set(ADC_COMMON_HEADERS ${ADC_COMMON_HEADERS}
common/adc_spdlog.h common/adc_spdlog.h
@ -198,7 +201,12 @@ if (BUILD_TESTS)
# add_test(VALUE_HOLDER ${VALUEHOLDER_TEST_APP}) # add_test(VALUE_HOLDER ${VALUEHOLDER_TEST_APP})
add_test(VALUE_HOLDER ${DEVATTR_TEST_APP}) add_test(VALUE_HOLDER ${DEVATTR_TEST_APP})
add_test(NETMSG_TEST ${NETMSG_TEST_APP}) add_test(NETMSG_TEST ${NETMSG_TEST_APP})
add_test(ASIO_NETSRV_TEST ${ASIO_NETSERVER_TEST_APP}) add_test(ASIO_NETSRV_TEST ${ASIO_NETSERVER_TEST_APP})
if (SPDLOG_LIBRARY)
target_link_libraries(${ASIO_NETSERVER_TEST_APP} PRIVATE fmt::fmt)
endif()
enable_testing() enable_testing()
endif(BUILD_TESTS) endif(BUILD_TESTS)

View File

@ -13,6 +13,139 @@
namespace adc namespace adc
{ {
/* SPDLOG-library based advanced single/multithreaded logger */
class AdcSpdlogLogger
{
public:
// [year-month-day time.millisecs][log-level]: log-message
constexpr static std::string_view LOGGER_DEFAULT_FORMAT = "[%Y-%m-%d %T.%e][%l]: %v";
typedef spdlog::level::level_enum loglevel_t;
template <traits::adc_input_char_range R = decltype(LOGGER_DEFAULT_FORMAT)>
AdcSpdlogLogger(std::shared_ptr<spdlog::logger> logger, const R& pattern = LOGGER_DEFAULT_FORMAT)
: _loggerSPtr(logger), _currentLogPattern()
{
std::ranges::copy(pattern, std::back_inserter(_currentLogPattern));
_loggerSPtr->set_pattern(_currentLogPattern);
}
virtual ~AdcSpdlogLogger() = default;
void setLogLevel(loglevel_t log_level)
{
_loggerSPtr->set_level(log_level);
}
loglevel_t getLogLevel() const
{
return _loggerSPtr->level();
}
void logMessage(loglevel_t level, const std::string& msg)
{
_loggerSPtr->log(level, msg);
}
// specialized for given level methods
void logCritical(const std::string& msg)
{
logMessage(spdlog::level::critical, msg);
}
void logError(const std::string& msg)
{
logMessage(spdlog::level::err, msg);
}
void logWarn(const std::string& msg)
{
logMessage(spdlog::level::warn, msg);
}
void logInfo(const std::string& msg)
{
logMessage(spdlog::level::info, msg);
}
void logDebug(const std::string& msg)
{
logMessage(spdlog::level::debug, msg);
}
void logTrace(const std::string& msg)
{
logMessage(spdlog::level::trace, msg);
}
template <traits::formattable... ArgTs>
void logCritical(std::format_string<ArgTs...> fmt, ArgTs&&... args)
{
_loggerSPtr->log(spdlog::level::critical, fmt, std::forward<ArgTs>(args)...);
}
template <traits::formattable... ArgTs>
void logError(std::format_string<ArgTs...> fmt, ArgTs&&... args)
{
_loggerSPtr->log(spdlog::level::err, fmt, std::forward<ArgTs>(args)...);
}
template <traits::formattable... ArgTs>
void logWarn(std::format_string<ArgTs...> fmt, ArgTs&&... args)
{
_loggerSPtr->log(spdlog::level::warn, fmt, std::forward<ArgTs>(args)...);
}
template <traits::formattable... ArgTs>
void logInfo(std::format_string<ArgTs...> fmt, ArgTs&&... args)
{
_loggerSPtr->log(spdlog::level::info, fmt, std::forward<ArgTs>(args)...);
}
template <traits::formattable... ArgTs>
void logDebug(std::format_string<ArgTs...> fmt, ArgTs&&... args)
{
_loggerSPtr->log(spdlog::level::debug, fmt, std::forward<ArgTs>(args)...);
}
template <traits::formattable... ArgTs>
void logTrace(std::format_string<ArgTs...> fmt, ArgTs&&... args)
{
_loggerSPtr->log(spdlog::level::trace, fmt, std::forward<ArgTs>(args)...);
}
protected:
static constexpr size_t LOGGER_DEFAULT_FORMAT_MARK_POS = 20;
std::string _currentLogPattern;
std::shared_ptr<spdlog::logger> _loggerSPtr;
// helper method
void addMarkToPattern(traits::adc_input_char_range auto& mark, size_t pos = LOGGER_DEFAULT_FORMAT_MARK_POS)
{
std::string ptrn = _currentLogPattern.substr(0, pos);
ptrn += " [";
std::ranges::copy(mark, std::back_inserter(ptrn));
ptrn += "] ";
std::ranges::copy(_currentLogPattern | std::views::drop(pos), std::back_inserter(ptrn));
_currentLogPattern = ptrn;
_loggerSPtr->set_pattern(_currentLogPattern);
}
};
/*
template <typename BaseT> template <typename BaseT>
class AdcSpdlogGenericDecorator : public BaseT class AdcSpdlogGenericDecorator : public BaseT
{ {
@ -77,7 +210,7 @@ public:
} }
template <traits::formattable... ArgTs> template <traits::formattable... ArgTs>
void logMsg(spdlog::level::level_enum level, std::string_view fmt, ArgTs&&... args) void logMessage(spdlog::level::level_enum level, std::string_view fmt, ArgTs&&... args)
{ {
_logger->log(level, fmt, std::forward<ArgTs>(args)...); _logger->log(level, fmt, std::forward<ArgTs>(args)...);
} }
@ -153,7 +286,6 @@ protected:
} }
}; };
template <typename BaseT> template <typename BaseT>
class AdcSpdlogGenericMarkDecorator : public AdcSpdlogGenericDecorator<BaseT> class AdcSpdlogGenericMarkDecorator : public AdcSpdlogGenericDecorator<BaseT>
{ {
@ -185,7 +317,7 @@ public:
} }
}; };
*/
} // namespace adc } // namespace adc
#endif #endif

View File

@ -2,9 +2,12 @@
#include <algorithm> #include <algorithm>
#include <charconv> #include <charconv>
#include <iostream>
#include <limits> #include <limits>
#include <mutex>
#include <ranges> #include <ranges>
#include <regex> #include <regex>
#include <thread>
#include <utility> #include <utility>
#include "../common/adc_traits.h" #include "../common/adc_traits.h"
@ -451,6 +454,7 @@ namespace constants
{ {
static constexpr char DEFAULT_CONVERTER_DELIMITER[] = " "; static constexpr char DEFAULT_CONVERTER_DELIMITER[] = " ";
static constexpr char DEFAULT_CONVERTER_DELIMITER_COMA[] = ", ";
} // namespace constants } // namespace constants
@ -675,4 +679,89 @@ static constexpr size_t AdcFNV1aHash(const R& r)
} }
/* current thread ID std::string representation */
static std::string AdcThisThreadId()
{
std::stringstream st;
st << std::this_thread::get_id();
return st.str();
}
/* std::basic_ostream based multithread-safe simple logger */
template <typename CharT = char, typename CharTraitsT = std::char_traits<CharT>>
class AdcOstreamLogger
{
public:
typedef CharT char_t;
typedef CharTraitsT char_traits_t;
enum loglevel_t { NULL_LEVEL, ERROR_LEVEL, INFO_LEVEL, DEBUG_LEVEL };
static constexpr std::array LOGLEVEL_MARK{"null", "error", "info", "debug"};
AdcOstreamLogger(std::basic_ostream<CharT, CharTraitsT>& stream = std::cout, loglevel_t log_level = INFO_LEVEL)
: _logStream(stream), _currentLogLevel(log_level)
{
}
AdcOstreamLogger(loglevel_t log_level) : _logStream(std::cout), _currentLogLevel(log_level) {}
virtual ~AdcOstreamLogger() = default;
void setLogLevel(loglevel_t log_level)
{
std::lock_guard<std::mutex> lock(_logMutex);
_currentLogLevel = log_level;
}
loglevel_t getLogLevel() const
{
return _currentLogLevel;
}
void logMessage(loglevel_t level, const std::string& msg)
{
std::lock_guard<std::mutex> lock(_logMutex);
if (_currentLogLevel < level)
return;
const std::time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
// format log-message in form:
// [YYYY-MM-DD HH:MM:SS][level] log-message
//
_logStream << std::put_time(std::localtime(&now), "[%F %T]") << "[" << LOGLEVEL_MARK[level] << "] " << msg
<< "\n"
<< std::flush;
}
void logError(const std::string& msg)
{
logMessage(ERROR_LEVEL, msg);
}
void logInfo(const std::string& msg)
{
logMessage(INFO_LEVEL, msg);
}
void logDebug(const std::string& msg)
{
logMessage(DEBUG_LEVEL, msg);
}
protected:
std::basic_ostream<CharT, CharTraitsT>& _logStream;
loglevel_t _currentLogLevel;
std::mutex _logMutex;
};
} // namespace adc::utils } // namespace adc::utils

497
net/adc_device_netclient.h Normal file
View File

@ -0,0 +1,497 @@
#pragma once
#include <queue>
#include "../common/adc_utils.h"
#include "adc_device_netmsg.h"
#include "adc_netclient.h"
namespace adc
{
enum class AdcDeviceNetClientSessionError : int {
ERROR_OK,
ERROR_INVALID_SERVER_RESPOND,
ERROR_UNEXPECTED_SERVER_RESPOND,
ERROR_UNKNOWN_ERROR
};
}
// place here to allow clang compilation
namespace std
{
template <>
class is_error_code_enum<adc::AdcDeviceNetClientSessionError> : public true_type
{
};
} // namespace std
namespace adc
{
struct AdcDeviceNetClientSessionErrorCategory : std::error_category {
AdcDeviceNetClientSessionErrorCategory() : std::error_category() {}
const char* name() const noexcept
{
return "ADC_DEVICE_NESERVER_SESSION";
}
std::string message(int ec) const
{
AdcDeviceNetClientSessionError err = static_cast<AdcDeviceNetClientSessionError>(ec);
switch (err) {
case AdcDeviceNetClientSessionError::ERROR_OK:
return "OK";
case AdcDeviceNetClientSessionError::ERROR_INVALID_SERVER_RESPOND:
return "invalid server respond message";
case AdcDeviceNetClientSessionError::ERROR_UNEXPECTED_SERVER_RESPOND:
return "unexpected server respond message";
case AdcDeviceNetClientSessionError::ERROR_UNKNOWN_ERROR:
return "catch unhandled exception";
default:
return "UNKNOWN";
}
}
static const AdcDeviceNetClientSessionErrorCategory& get()
{
static const AdcDeviceNetClientSessionErrorCategory constInst;
return constInst;
}
};
} // namespace adc
namespace std
{
inline std::error_code make_error_code(adc::AdcDeviceNetClientSessionError ec)
{
return std::error_code(static_cast<int>(ec), adc::AdcDeviceNetClientSessionErrorCategory::get());
}
} // namespace std
namespace adc
{
template <typename IdentT = std::string, interfaces::adc_logger_c LoggerT = utils::AdcOstreamLogger<char>>
class AdcDeviceNetClient : public AdcGenericNetClient<IdentT, LoggerT>
{
typedef AdcGenericNetClient<IdentT, LoggerT> base_t;
public:
template <interfaces::adc_netservice_c NetServiceT, traits::adc_hashable_c SessionIdentT = std::string>
class Session : public std::enable_shared_from_this<Session<NetServiceT, SessionIdentT>>
{
public:
enum ServerResponseType { RESP_INVALID, RESP_ACK, RESP_ERROR, RESP_UNEXPECTED };
typedef SessionIdentT netsession_ident_t;
typedef NetServiceT netservice_t;
// default server respond type
typedef std::vector<std::string> default_server_resp_t;
// asynchronous callback callable type for ADC device getter/setter/executor
typedef std::function<void(ServerResponseType, default_server_resp_t)> async_callback_func_t;
struct netsession_ctx_t {
AdcDeviceNetClient* clientPtr;
std::chrono::milliseconds recvTimeout;
std::chrono::milliseconds sendTimeout;
};
typedef std::vector<char> message_t;
// Session(const netsession_ident_t& id, netservice_t srv, AdcDeviceNetServer* srv_ptr)
Session(const netsession_ident_t& id, netservice_t srv, netsession_ctx_t ctx)
: _ident(id),
_netService(std::move(srv)),
_clientPtr(ctx.clientPtr),
_recvTimeout(ctx.recvTimeout),
_sendTimeout(ctx.sendTimeout)
{
_clientPtr->logInfo("Create client-to-server session with ID = {} (addr = {}, thread = {})", _ident,
(void*)this, utils::AdcThisThreadId());
}
virtual ~Session()
{
_clientPtr->logInfo("Delete client-to-server session with ID = {} (addr = {}, thread = {})", _ident,
(void*)this, utils::AdcThisThreadId());
}
netsession_ident_t ident() const
{
return _ident;
}
void start()
{
_clientPtr->logInfo("Start client-to-server session with ID = {} (addr = {}, thread = {})", _ident,
(void*)this, utils::AdcThisThreadId());
}
void stop()
{
_clientPtr->logInfo("Stop client-to-server session with ID = {} (addr = {}, thread = {})", _ident,
(void*)this, utils::AdcThisThreadId());
_netService.close();
}
// ADC device helper methods (blocking)
// get names of devices
template <traits::adc_range_of_output_char_range R>
R deviceNames(ServerResponseType& rtype)
{
// expected respond: ACK NAMES DEV1 DEV2 ...
// return DEV1 DEV2 ... (or error description 'code category what')
return deviceFuncHelper<R>(constants::ADC_DEVICE_NETPROTO_KEY_NAMES, rtype);
}
default_server_resp_t deviceNames(ServerResponseType& rtype)
{
return deviceNames<default_server_resp_t>(rtype);
}
// bind device to the client
template <traits::adc_range_of_output_char_range R, traits::adc_input_char_range DevNameT>
R bindDevice(DevNameT&& dev_name, ServerResponseType& rtype)
{
// expected respond: ACK DEVICE DEV_NAME
// return DEV_NAME (or error description 'code category what')
return deviceFuncHelper<R>(constants::ADC_DEVICE_NETPROTO_KEY_DEVICE, rtype,
std::forward<DevNameT>(dev_name));
}
template <traits::adc_input_char_range DevNameT>
default_server_resp_t bindDevice(DevNameT&& dev_name, ServerResponseType& rtype)
{
return bindDevice<default_server_resp_t>(std::forward<DevNameT>(dev_name), rtype);
}
// execute a command
template <traits::adc_range_of_output_char_range R, traits::adc_input_char_range CmdNameT>
R exec(CmdNameT&& cmd_name, ServerResponseType& rtype)
{
// expected respond: ACK CMD CMD_NAME
// return CMD_NAME ... (or error description 'code category what')
return deviceFuncHelper<R>(constants::ADC_DEVICE_NETPROTO_KEY_CMD, rtype, std::forward<CmdNameT>(cmd_name));
}
template <traits::adc_input_char_range CmdNameT>
default_server_resp_t exec(CmdNameT&& cmd_name, ServerResponseType& rtype)
{
return exec<default_server_resp_t>(std::forward<CmdNameT>(cmd_name), rtype);
}
// get an attribute value
template <traits::adc_range_of_output_char_range R, traits::adc_input_char_range AttrNameT>
R getAttr(AttrNameT&& attr_name, ServerResponseType& rtype)
{
// expected respond: ACK GET ATTR_NAME ATTR_VALUE
// return ATTR_NAME ATTR_VALUE (or error description 'code category what')
return deviceFuncHelper<R>(constants::ADC_DEVICE_NETPROTO_KEY_GET, rtype,
std::forward<AttrNameT>(attr_name));
}
template <traits::adc_input_char_range AttrNameT>
default_server_resp_t getAttr(AttrNameT&& attr_name, ServerResponseType& rtype)
{
return getAttr<default_server_resp_t>(std::forward<AttrNameT>(attr_name), rtype);
}
// set an attribute value
template <traits::adc_range_of_output_char_range R,
traits::adc_input_char_range AttrNameT,
typename ValueT,
typename... ValueTs>
R setAttr(AttrNameT&& attr_name, ServerResponseType& rtype, ValueT&& value, ValueTs&&... values)
{
// expected respond: ACK SET ATTR_NAME ATTR_VALUE
// return ATTR_NAME ATTR_VALUE (or error description 'code category what')
return deviceFuncHelper<R>(constants::ADC_DEVICE_NETPROTO_KEY_SET, rtype, std::forward<ValueT>(value),
std::forward<ValueTs>(values)...);
}
template <traits::adc_input_char_range AttrNameT, typename ValueT, typename... ValueTs>
default_server_resp_t setAttr(AttrNameT&& attr_name,
ServerResponseType& rtype,
ValueT&& value,
ValueTs&&... values)
{
return setAttr<default_server_resp_t>(std::forward<AttrNameT>(attr_name), rtype,
std::forward<ValueT>(value), std::forward<ValueTs>(values)...);
}
// ADC device helper methods (asynchronous)
template <std::convertible_to<async_callback_func_t> CallbackT>
auto asyncDeviceNames(CallbackT&& callback_func)
{
return asyncDeviceFuncHelper(constants::ADC_DEVICE_NETPROTO_KEY_NAMES,
std::forward<CallbackT>(callback_func));
}
template <std::convertible_to<async_callback_func_t> CallbackT, traits::adc_input_char_range DevNameT>
auto asyncBindDevice(CallbackT&& callback_func, DevNameT&& dev_name)
{
return asyncDeviceFuncHelper(constants::ADC_DEVICE_NETPROTO_KEY_DEVICE,
std::forward<CallbackT>(callback_func), std::forward<DevNameT>(dev_name));
}
template <std::convertible_to<async_callback_func_t> CallbackT, traits::adc_input_char_range CmdNameT>
auto asyncExec(CallbackT&& callback_func, CmdNameT&& cmd_name)
{
return asyncDeviceFuncHelper(constants::ADC_DEVICE_NETPROTO_KEY_CMD, std::forward<CallbackT>(callback_func),
std::forward<CmdNameT>(cmd_name));
}
template <std::convertible_to<async_callback_func_t> CallbackT, traits::adc_input_char_range AttrNameT>
auto asyncGetAttr(CallbackT&& callback_func, AttrNameT&& attr_name)
{
return asyncDeviceFuncHelper(constants::ADC_DEVICE_NETPROTO_KEY_GET, std::forward<CallbackT>(callback_func),
std::forward<AttrNameT>(attr_name));
}
template <std::convertible_to<async_callback_func_t> CallbackT,
traits::adc_input_char_range AttrNameT,
typename ValueT,
typename... ValueTs>
auto asyncSetAttr(CallbackT&& callback_func, AttrNameT&& attr_name, ValueT&& value, ValueTs&&... values)
{
return asyncDeviceFuncHelper(constants::ADC_DEVICE_NETPROTO_KEY_SET, std::forward<CallbackT>(callback_func),
std::forward<AttrNameT>(attr_name), std::forward<ValueT>(value),
std::forward<ValueTs>(values)...);
}
protected:
netsession_ident_t _ident;
netservice_t _netService;
AdcDeviceNetClient* _clientPtr;
std::chrono::milliseconds _recvTimeout = std::chrono::seconds(5);
std::chrono::milliseconds _sendTimeout = std::chrono::seconds(5);
// main 'run' method
virtual void run() = 0;
// helper methods
template <traits::adc_range_of_output_char_range R>
R checkServerRespond(std::string_view key, const auto& bytes, ServerResponseType& type) const
{
AdcDeviceProtoMessage dev_msg(bytes);
if (!dev_msg.isValid()) {
throw std::system_error(AdcDeviceNetClientSessionError::ERROR_INVALID_SERVER_RESPOND);
}
if (dev_msg.isACK(key)) {
type = RESP_ACK;
return dev_msg.template attrs<R>(1);
} else if (dev_msg.isERROR()) {
type = RESP_ERROR;
return dev_msg.template attrs<R>();
} else {
throw std::system_error(AdcDeviceNetClientSessionError::ERROR_UNEXPECTED_SERVER_RESPOND);
}
}
template <traits::adc_range_of_output_char_range R>
R getFromServer(std::string_view key, const typename netservice_t::send_msg_t& bytes, ServerResponseType& type)
{
auto rbytes = sendRecv(bytes);
return checkServerRespond<R>(key, rbytes, type);
}
template <traits::adc_range_of_output_char_range R, typename... ArgTs>
R deviceFuncHelper(std::string_view key, ServerResponseType& rtype, ArgTs&&... args)
{
typename netservice_t::send_msg_t bytes;
AdcDeviceProtoMessage msg(bytes);
msg.setKeyValue(key, std::forward<ArgTs>(args)...);
return getFromServer<R>(key, bytes, rtype);
}
template <std::convertible_to<async_callback_func_t> CallbackT, typename... ArgTs>
auto asyncDeviceFuncHelper(std::string_view key, CallbackT&& callback_func, ArgTs&&... args)
{
auto bytes = std::shared_ptr<typename netservice_t::send_msg_t>();
AdcDeviceProtoMessage msg(*bytes);
msg.setKeyValue(key, std::forward<ArgTs>(args)...);
asyncSendRecv(*bytes, [bytes, key, wrapper = traits::adc_pf_wrapper(std::forward<CallbackT>(callback_func)),
this](auto err, auto rmsg) mutable {
if (err) {
_clientPtr->logError("An error occured while receiving server respond: {}",
netservice_t::formattableError(err));
} else {
try {
ServerResponseType type;
auto attrs = checkServerRespond<default_server_resp_t>(key, rmsg, type);
std::forward<CallbackT>(std::get<0>(wrapper))(type, attrs);
} catch (const std::system_error& err) {
_clientPtr->logError("An error occured while getting server respond: {}", err.what());
}
}
});
}
template <traits::adc_input_char_range SendMsgT>
auto sendRecv(const SendMsgT& send_msg)
{
_netService.send(send_msg, _sendTimeout);
return _netService.receive(_recvTimeout);
}
template <traits::adc_input_char_range SendMsgT, typename TokenT>
auto asyncSendRecv(const SendMsgT& send_msg, TokenT&& token)
{
return _netService.asyncSend(
send_msg,
[wrapper = traits::adc_pf_wrapper(std::forward<TokenT>(token)), this](auto err) {
if (err) {
this->logError("An error occured while sending the message: {}",
netservice_t::formattableError(err));
} else {
_netService.asyncReceive(std::get<0>(wrapper), _recvTimeout);
}
},
_sendTimeout);
}
}; // end of 'Session' class declaration
using base_t::base_t;
virtual ~AdcDeviceNetClient() {}
};
template <traits::adc_range_of_input_char_range ArgRangeT = std::vector<std::string>>
class AdcNetClientSendQueue
{
public:
// <netproto-key, args ...>
typedef std::tuple<std::string_view, ArgRangeT> queue_elem_t;
AdcNetClientSendQueue() = default;
size_t queueSize() const
{
return _queue.size();
}
AdcNetClientSendQueue& addToQueue(std::string_view key, const ArgRangeT& args)
{
_queue.push({key, args});
return *this;
}
template <traits::adc_input_char_range... ElemTs>
AdcNetClientSendQueue& addToQueue(std::string key, ElemTs&&... elems)
{
if constexpr (sizeof...(ElemTs)) {
_queue.push({key, ArgRangeT()});
addToQueueHelper(std::get<1>(_queue.back()), std::forward<ElemTs>(elems)...);
}
return *this;
}
template <traits::adc_input_char_range CmdNameT>
AdcNetClientSendQueue& addCmdToQueue(CmdNameT&& cmd_name)
{
return addToQueue(constants::ADC_DEVICE_NETPROTO_KEY_CMD, std::forward<CmdNameT>(cmd_name));
}
template <traits::adc_input_char_range AttrNameT>
AdcNetClientSendQueue& addGetAttrToQueue(AttrNameT&& attr_name)
{
return addToQueue(constants::ADC_DEVICE_NETPROTO_KEY_GET, std::forward<AttrNameT>(attr_name));
}
template <traits::adc_input_char_range AttrNameT,
traits::adc_input_char_range ValueT,
traits::adc_input_char_range... ValueTs>
AdcNetClientSendQueue& addSetAttrToQueue(AttrNameT&& attr_name, ValueT&& value, ValueTs&&... values)
{
return addToQueue(constants::ADC_DEVICE_NETPROTO_KEY_SET, std::forward<AttrNameT>(attr_name),
std::forward<ValueT>(value), std::forward<ValueTs>(values)...);
}
AdcNetClientSendQueue& addGetNamesToQueue()
{
return addToQueue(constants::ADC_DEVICE_NETPROTO_KEY_NAMES);
}
template <traits::adc_input_char_range DeviceNameT>
AdcNetClientSendQueue& addBindDevToQueue(DeviceNameT&& dev_name)
{
return addToQueue(constants::ADC_DEVICE_NETPROTO_KEY_DEVICE, std::forward<DeviceNameT>(dev_name));
}
protected:
std::queue<queue_elem_t> _queue;
template <traits::adc_input_char_range ElemT, traits::adc_input_char_range... ElemTs>
void addToQueueHelper(ArgRangeT& args, ElemT&& elem, ElemTs&&... elems)
{
using el_t = std::ranges::range_value_t<ArgRangeT>;
if constexpr (std::same_as<el_t, std::remove_cvref_t<ElemT>>) {
std::ranges::copy(std::ranges::single_view(elem), std::back_inserter(args));
} else {
std::span<const char> sp;
if constexpr (std::is_array_v<std::remove_cvref_t<ElemT>>) {
sp = std::string_view(elem);
} else {
sp = std::span<const char>(elem);
}
std::ranges::copy(std::views::transform(std::ranges::single_view(sp),
[](const auto& val) {
el_t el;
std::ranges::copy(val, std::back_inserter(el));
return el;
}),
std::back_inserter(args));
}
if constexpr (sizeof...(ElemTs)) {
addToQueue(args, std::forward<ElemTs>(elems)...);
}
}
};
} // namespace adc

View File

@ -291,6 +291,27 @@ protected:
}; };
/* ADC client-server network protocol definitions */
namespace constants
{
static constexpr std::string_view ADC_DEVICE_NETPROTO_KEY_ACK{"ACK"};
static constexpr std::string_view ADC_DEVICE_NETPROTO_KEY_SET{"SET"};
static constexpr std::string_view ADC_DEVICE_NETPROTO_KEY_GET{"GET"};
static constexpr std::string_view ADC_DEVICE_NETPROTO_KEY_CMD{"CMD"};
static constexpr std::string_view ADC_DEVICE_NETPROTO_KEY_ERR{"ERR"};
static constexpr std::string_view ADC_DEVICE_NETPROTO_KEY_HELLO{"HELLO"};
static constexpr std::string_view ADC_DEVICE_NETPROTO_KEY_DEVICE{"DEVICE"};
static constexpr std::string_view ADC_DEVICE_NETPROTO_KEY_NAMES{"NAMES"};
static constexpr std::array ADC_DEVICE_NETPROTO_VALID_KEY{
ADC_DEVICE_NETPROTO_KEY_ACK, ADC_DEVICE_NETPROTO_KEY_SET, ADC_DEVICE_NETPROTO_KEY_GET,
ADC_DEVICE_NETPROTO_KEY_CMD, ADC_DEVICE_NETPROTO_KEY_ERR, ADC_DEVICE_NETPROTO_KEY_HELLO,
ADC_DEVICE_NETPROTO_KEY_DEVICE, ADC_DEVICE_NETPROTO_KEY_NAMES};
} // namespace constants
template <traits::adc_char_range ByteSeqT, template <traits::adc_char_range ByteSeqT,
const char KEY_VALUE_DELIM[] = constants::ADC_DEFAULT_KEY_VALUE_DELIMITER1, const char KEY_VALUE_DELIM[] = constants::ADC_DEFAULT_KEY_VALUE_DELIMITER1,
const char VALUE_DELIM[] = constants::ADC_DEFAULT_VALUE_DELIMITER, const char VALUE_DELIM[] = constants::ADC_DEFAULT_VALUE_DELIMITER,
@ -301,20 +322,7 @@ class AdcDeviceProtoMessage
using base_t = AdcKeyValueMessage<ByteSeqT, KEY_VALUE_DELIM, VALUE_DELIM, COMPOSITE_VALUE_DELIM>; using base_t = AdcKeyValueMessage<ByteSeqT, KEY_VALUE_DELIM, VALUE_DELIM, COMPOSITE_VALUE_DELIM>;
public: public:
static constexpr std::string_view ACK_KEY{"ACK"}; typedef std::array<size_t, constants::ADC_DEVICE_NETPROTO_VALID_KEY.size()> keyword_hash_array_t;
static constexpr std::string_view SET_KEY{"SET"};
static constexpr std::string_view GET_KEY{"GET"};
static constexpr std::string_view CMD_KEY{"CMD"};
static constexpr std::string_view ERR_KEY{"ERR"};
static constexpr std::string_view HELLO_KEY{"HELLO"};
static constexpr std::string_view DEVICE_KEY{"DEVICE"};
static constexpr std::string_view NAMES_KEY{"NAMES"};
static constexpr std::array VALID_KEY{ACK_KEY, SET_KEY, GET_KEY, CMD_KEY,
ERR_KEY, HELLO_KEY, DEVICE_KEY, NAMES_KEY};
typedef std::array<size_t, VALID_KEY.size()> keyword_hash_array_t;
enum KEY_IDX : size_t { enum KEY_IDX : size_t {
ACK_KEY_IDX, ACK_KEY_IDX,
@ -324,7 +332,8 @@ public:
ERR_KEY_IDX, ERR_KEY_IDX,
HELLO_KEY_IDX, HELLO_KEY_IDX,
DEVICE_KEY_IDX, DEVICE_KEY_IDX,
NAMES_KEY_IDX NAMES_KEY_IDX,
INVALID_KEY_IDX
}; };
private: // include here to allow clang compilation private: // include here to allow clang compilation
@ -333,9 +342,9 @@ private: // include here to allow clang compilation
template <size_t... I> template <size_t... I>
static constexpr auto computeKeywordHashesImpl(std::index_sequence<I...>) static constexpr auto computeKeywordHashesImpl(std::index_sequence<I...>)
{ {
return keyword_hash_array_t{utils::AdcFNV1aHash(VALID_KEY[I])...}; return keyword_hash_array_t{utils::AdcFNV1aHash(constants::ADC_DEVICE_NETPROTO_VALID_KEY[I])...};
} }
template <typename Indices = std::make_index_sequence<VALID_KEY.size()>> template <typename Indices = std::make_index_sequence<constants::ADC_DEVICE_NETPROTO_VALID_KEY.size()>>
static constexpr auto computeKeywordHashes() static constexpr auto computeKeywordHashes()
{ {
return computeKeywordHashesImpl(Indices{}); return computeKeywordHashesImpl(Indices{});
@ -350,6 +359,9 @@ private: // include here to allow clang compilation
bool isKey(size_t idx) const bool isKey(size_t idx) const
{ {
if (idx >= INVALID_KEY_IDX)
return false;
return _keyHash == KEY_HASHES[idx]; return _keyHash == KEY_HASHES[idx];
} }
@ -369,6 +381,17 @@ public:
return isKey(ACK_KEY_IDX); return isKey(ACK_KEY_IDX);
} }
// ACK KEY_NAME ...
bool isACK(std::string_view key) const
{
auto ats = attrs();
if (std::ranges::size(ats)) {
return isKey(ACK_KEY_IDX) && (key == *ats.begin());
} else {
return false;
}
}
bool isSET() const bool isSET() const
{ {
return isKey(SET_KEY_IDX); return isKey(SET_KEY_IDX);
@ -441,9 +464,11 @@ public:
} }
using base_t::setKeyValue; // import to public area
void ack() void ack()
{ {
base_t::setKey(ACK_KEY); base_t::setKey(constants::ADC_DEVICE_NETPROTO_KEY_ACK);
keyHash(); keyHash();
} }
@ -452,7 +477,7 @@ public:
template <typename ParT, typename... ParTs> template <typename ParT, typename... ParTs>
void ack(const ParT& param, const ParTs&... params) void ack(const ParT& param, const ParTs&... params)
{ {
base_t::setKeyValue(ACK_KEY, param, params...); base_t::setKeyValue(constants::ADC_DEVICE_NETPROTO_KEY_ACK, param, params...);
keyHash(); keyHash();
} }
@ -463,7 +488,7 @@ public:
requires(!traits::adc_input_char_range<ParT>) requires(!traits::adc_input_char_range<ParT>)
void ack(const ParT& param) void ack(const ParT& param)
{ {
base_t::setKeyValue(ACK_KEY, param); base_t::setKeyValue(constants::ADC_DEVICE_NETPROTO_KEY_ACK, param);
keyHash(); keyHash();
} }
@ -472,7 +497,7 @@ public:
template <traits::adc_tuple_like ParT> template <traits::adc_tuple_like ParT>
void ack(const ParT& param) void ack(const ParT& param)
{ {
base_t::setKeyValue(ACK_KEY, param); base_t::setKeyValue(constants::ADC_DEVICE_NETPROTO_KEY_ACK, param);
keyHash(); keyHash();
} }
@ -481,8 +506,8 @@ public:
template <traits::adc_input_char_range AttrNameT, typename ValueT, typename... ValueTs> template <traits::adc_input_char_range AttrNameT, typename ValueT, typename... ValueTs>
void set(AttrNameT&& attr_name, ValueT&& value, ValueTs&&... values) void set(AttrNameT&& attr_name, ValueT&& value, ValueTs&&... values)
{ {
base_t::setKeyValue(SET_KEY, std::forward<AttrNameT>(attr_name), std::forward<ValueT>(value), base_t::setKeyValue(constants::ADC_DEVICE_NETPROTO_KEY_SET, std::forward<AttrNameT>(attr_name),
std::forward<ValueTs>(values)...); std::forward<ValueT>(value), std::forward<ValueTs>(values)...);
keyHash(); keyHash();
} }
@ -490,7 +515,7 @@ public:
template <traits::adc_input_char_range AttrNameT> template <traits::adc_input_char_range AttrNameT>
void get(AttrNameT&& attr_name) void get(AttrNameT&& attr_name)
{ {
base_t::setKeyValue(GET_KEY, std::forward<AttrNameT>(attr_name)); base_t::setKeyValue(constants::ADC_DEVICE_NETPROTO_KEY_GET, std::forward<AttrNameT>(attr_name));
keyHash(); keyHash();
} }
@ -498,14 +523,14 @@ public:
template <traits::adc_input_char_range CmdNameT> template <traits::adc_input_char_range CmdNameT>
void cmd(CmdNameT&& cmd_name) void cmd(CmdNameT&& cmd_name)
{ {
base_t::setKeyValue(CMD_KEY, std::forward<CmdNameT>(cmd_name)); base_t::setKeyValue(constants::ADC_DEVICE_NETPROTO_KEY_CMD, std::forward<CmdNameT>(cmd_name));
keyHash(); keyHash();
} }
void err(const std::error_code& ec) void err(const std::error_code& ec)
{ {
base_t::setKeyValue(ERR_KEY, ec.value(), ec.category().name(), ec.message()); base_t::setKeyValue(constants::ADC_DEVICE_NETPROTO_KEY_ERR, ec.value(), ec.category().name(), ec.message());
keyHash(); keyHash();
} }
@ -513,7 +538,8 @@ public:
template <traits::adc_input_char_range SenderNameT, typename... ParamTs> template <traits::adc_input_char_range SenderNameT, typename... ParamTs>
void hello(SenderNameT&& name, ParamTs&&... params) void hello(SenderNameT&& name, ParamTs&&... params)
{ {
base_t::setKeyValue(HELLO_KEY, std::forward<SenderNameT>(name), std::forward<ParamTs>(params)...); base_t::setKeyValue(constants::ADC_DEVICE_NETPROTO_KEY_HELLO, std::forward<SenderNameT>(name),
std::forward<ParamTs>(params)...);
keyHash(); keyHash();
} }
@ -521,14 +547,14 @@ public:
template <traits::adc_input_char_range DevNameT> template <traits::adc_input_char_range DevNameT>
void device(DevNameT&& dev_name) void device(DevNameT&& dev_name)
{ {
base_t::setKeyValue(DEVICE_KEY, std::forward<DevNameT>(dev_name)); base_t::setKeyValue(constants::ADC_DEVICE_NETPROTO_KEY_DEVICE, std::forward<DevNameT>(dev_name));
keyHash(); keyHash();
} }
void names() void names()
{ {
base_t::setKey(NAMES_KEY); base_t::setKey(constants::ADC_DEVICE_NETPROTO_KEY_NAMES);
keyHash(); keyHash();
} }
@ -536,7 +562,7 @@ public:
template <typename DevNameT, typename... DevNameTs> template <typename DevNameT, typename... DevNameTs>
void names(const DevNameT& dev_name, const DevNameTs&... dev_names) void names(const DevNameT& dev_name, const DevNameTs&... dev_names)
{ {
base_t::setKeyValue(NAMES_KEY, dev_name, dev_names...); base_t::setKeyValue(constants::ADC_DEVICE_NETPROTO_KEY_NAMES, dev_name, dev_names...);
keyHash(); keyHash();
} }
@ -545,7 +571,7 @@ public:
requires(!traits::adc_input_char_range<std::ranges::range_value_t<R>>) requires(!traits::adc_input_char_range<std::ranges::range_value_t<R>>)
void names(const R& dev_names) void names(const R& dev_names)
{ {
base_t::setKeyValue(NAMES_KEY, dev_names); base_t::setKeyValue(constants::ADC_DEVICE_NETPROTO_KEY_NAMES, dev_names);
keyHash(); keyHash();
} }
@ -553,7 +579,7 @@ public:
template <traits::adc_tuple_like T> template <traits::adc_tuple_like T>
void names(const T& dev_names) void names(const T& dev_names)
{ {
base_t::setKeyValue(NAMES_KEY, dev_names); base_t::setKeyValue(constants::ADC_DEVICE_NETPROTO_KEY_NAMES, dev_names);
keyHash(); keyHash();
} }

View File

@ -2,6 +2,7 @@
#include <memory> #include <memory>
#include <system_error> #include <system_error>
#include <thread>
#include "adc_device_netmsg.h" #include "adc_device_netmsg.h"
#include "adc_netserver.h" #include "adc_netserver.h"
@ -105,11 +106,12 @@ namespace adc
{ {
template <typename IdentT = std::string> template <typename IdentT = std::string, interfaces::adc_logger_c LoggerT = utils::AdcOstreamLogger<char>>
class AdcDeviceNetServer : public AdcGenericNetServer<IdentT> class AdcDeviceNetServer : public AdcGenericNetServer<IdentT, LoggerT>
{ {
public: public:
using typename AdcGenericNetServer<IdentT>::server_ident_t; using typename AdcGenericNetServer<IdentT, LoggerT>::server_ident_t;
using typename AdcGenericNetServer<IdentT, LoggerT>::logger_t;
// type for serialized data (attr/command ID, attr values etc...) // type for serialized data (attr/command ID, attr values etc...)
typedef std::vector<char> serialized_t; typedef std::vector<char> serialized_t;
@ -232,13 +234,15 @@ public:
_recvTimeout(ctx.recvTimeout), _recvTimeout(ctx.recvTimeout),
_sendTimeout(ctx.sendTimeout) _sendTimeout(ctx.sendTimeout)
{ {
_serverPtr->logInfo("Create client session with ID = {} (addr = {})", _ident, (void*)this); _serverPtr->logInfo("Create client session with ID = {} (addr = {}, thread = {})", _ident, (void*)this,
utils::AdcThisThreadId());
} }
virtual ~Session() virtual ~Session()
{ {
_serverPtr->logInfo("Delete client session with ID = {} (addr = {})", _ident, (void*)this); _serverPtr->logInfo("Delete client session with ID = {} (addr = {}, thread = {})", _ident, (void*)this,
utils::AdcThisThreadId());
} }
netsession_ident_t ident() const netsession_ident_t ident() const
@ -248,49 +252,16 @@ public:
void start() void start()
{ {
_serverPtr->logInfo("Start client session with ID = {} (addr = {})", _ident, (void*)this); _serverPtr->logInfo("Start client session with ID = {} (addr = {}, thread = {})", _ident, (void*)this,
utils::AdcThisThreadId());
auto self(this->shared_from_this()); do_start();
_netService.asyncReceive(
[self, this](netservice_t::async_callback_err_t ec, message_t msg) {
if (ec) {
// std::string str("asyncReceive operation completed with error: ");
// netservice_t::formatError(ec, str);
// _serverPtr->errorMessage(str);
_serverPtr->logError("asyncReceive operation completed with error: {}",
netservice_t::formattableError(ec));
stop();
} else {
auto msg_sptr = std::make_shared<message_t>(std::move(msg));
AdcDeviceProtoMessage dev_msg(*msg_sptr);
processMessage(dev_msg);
_netService.asyncSend(
*msg_sptr,
[self, msg_sptr, this](netservice_t::async_callback_err_t ec) {
if (ec) {
// std::string str("asyncSend operation completed with error: ");
// netservice_t::formatError(ec, str);
// _serverPtr->errorMessage(str);
_serverPtr->logError("asyncSend operation completed with error: {}",
netservice_t::formattableError(ec));
stop();
} else {
start();
}
},
_sendTimeout);
}
},
_recvTimeout);
} }
void stop() void stop()
{ {
_serverPtr->logInfo("Stop client session with ID = {} (addr = {})", _ident, (void*)this); _serverPtr->logInfo("Stop client session with ID = {} (addr = {}, thread = {})", _ident, (void*)this,
utils::AdcThisThreadId());
_netService.close(); _netService.close();
} }
@ -304,6 +275,43 @@ public:
std::chrono::milliseconds _recvTimeout = std::chrono::hours(12); std::chrono::milliseconds _recvTimeout = std::chrono::hours(12);
std::chrono::milliseconds _sendTimeout = std::chrono::seconds(5); std::chrono::milliseconds _sendTimeout = std::chrono::seconds(5);
void do_start()
{
auto self(this->shared_from_this());
_netService.asyncReceive(
[self, this](netservice_t::async_callback_err_t ec, message_t msg) {
if (ec) {
_serverPtr->logError(
"asyncReceive operation completed with error: {} (session addr = {}, thread = {})",
netservice_t::formattableError(ec), (void*)this, utils::AdcThisThreadId());
stop();
} else {
auto msg_sptr = std::make_shared<message_t>(std::move(msg));
AdcDeviceProtoMessage dev_msg(*msg_sptr);
processMessage(dev_msg);
_netService.asyncSend(
*msg_sptr,
[self, msg_sptr, this](netservice_t::async_callback_err_t ec) {
if (ec) {
_serverPtr->logError(
"asyncSend operation completed with error: {} (session addr = {}, thread = {})",
netservice_t::formattableError(ec), (void*)this, utils::AdcThisThreadId());
stop();
} else {
do_start();
}
},
_sendTimeout);
}
},
_recvTimeout);
}
void processMessage(auto& msg) void processMessage(auto& msg)
{ {
typedef std::decay_t<decltype(msg)> msg_t; typedef std::decay_t<decltype(msg)> msg_t;
@ -391,8 +399,8 @@ public:
}; };
// using AdcGenericNetServer::AdcGenericNetServer; using AdcGenericNetServer<IdentT, LoggerT>::AdcGenericNetServer;
AdcDeviceNetServer(const server_ident_t& id) : AdcGenericNetServer<IdentT>(id), _devices() {} // AdcDeviceNetServer(const server_ident_t& id) : AdcGenericNetServer<IdentT>(id), _devices() {}
virtual ~AdcDeviceNetServer() = default; virtual ~AdcDeviceNetServer() = default;
@ -406,6 +414,18 @@ public:
CmdIdDeserialT&& cmd_id_deser_func = {}) // deserializer of command ID CmdIdDeserialT&& cmd_id_deser_func = {}) // deserializer of command ID
{ {
auto id = std::forward<IdSerialT>(id_ser_func)(dev_ptr->ident()); auto id = std::forward<IdSerialT>(id_ser_func)(dev_ptr->ident());
if constexpr (traits::formattable<serialized_t>) {
this->logInfo("Add ADC device with ID: {} (addr = {}, thread = {})", id, (void*)dev_ptr,
utils::AdcThisThreadId());
} else {
std::string s;
std::ranges::copy(id, std::back_inserter(s));
this->logInfo("Add ADC device with ID: {} (addr = {}, thread = {})", s, (void*)dev_ptr,
utils::AdcThisThreadId());
}
_devices.try_emplace(dev_ptr, dev_ptr, id, std::forward<AttrIdDeserialT>(attr_id_deser_func), _devices.try_emplace(dev_ptr, dev_ptr, id, std::forward<AttrIdDeserialT>(attr_id_deser_func),
std::forward<CmdIdDeserialT>(cmd_id_deser_func)); std::forward<CmdIdDeserialT>(cmd_id_deser_func));
@ -415,7 +435,25 @@ public:
template <interfaces::adc_device_c DeviceT> template <interfaces::adc_device_c DeviceT>
AdcDeviceNetServer& delDevice(DeviceT* dev_ptr) AdcDeviceNetServer& delDevice(DeviceT* dev_ptr)
{ {
_devices.erase(dev_ptr); auto it = _devices.find(dev_ptr);
if (it == _devices.end()) {
this->logError("Invalid ADC device pointer ({})! It seems the device was not added!", (void*)dev_ptr);
return *this;
}
if constexpr (traits::formattable<serialized_t>) {
this->logInfo("Delete ADC device with ID: {} (addr = {}, thread = {})", it->ident(), (void*)dev_ptr,
utils::AdcThisThreadId());
} else {
std::string s;
std::ranges::copy(it->ident(), std::back_inserter(s));
this->logInfo("Delete ADC device with ID: {} (addr = {}, thread = {})", s, (void*)dev_ptr,
utils::AdcThisThreadId());
}
_devices.erase(it);
// _devices.erase(dev_ptr);
return *this; return *this;
} }

View File

@ -143,7 +143,7 @@ public:
_host = std::string_view{found.end(), _endpoint.end()}; _host = std::string_view{found.end(), _endpoint.end()};
auto f1 = std::ranges::search(_host, portPathDelim); auto f1 = std::ranges::search(_host, portPathDelim);
std::string_view port_sv; // std::string_view port_sv;
if (f1.empty() && isLocal()) { // no path, but it is mandatory for 'local'! if (f1.empty() && isLocal()) { // no path, but it is mandatory for 'local'!
return _isValid; return _isValid;
} else { } else {
@ -156,15 +156,15 @@ public:
return _isValid; return _isValid;
} }
port_sv = std::string_view(f1.end(), _host.end()); _portView = std::string_view(f1.end(), _host.end());
if (port_sv.size()) { if (_portView.size()) {
_host = std::string_view(_host.begin(), f1.begin()); _host = std::string_view(_host.begin(), f1.begin());
if (!isLocal()) { if (!isLocal()) {
// convert port string to int // convert port string to int
auto end_ptr = port_sv.data() + port_sv.size(); auto end_ptr = _portView.data() + _portView.size();
auto [ptr, ec] = std::from_chars(port_sv.data(), end_ptr, _port); auto [ptr, ec] = std::from_chars(_portView.data(), end_ptr, _port);
if (ec != std::errc() || ptr != end_ptr) { if (ec != std::errc() || ptr != end_ptr) {
return _isValid; return _isValid;
} }
@ -202,6 +202,12 @@ public:
return _isValid; return _isValid;
} }
auto endpoint() const
{
return _endpoint;
}
template <traits::adc_view_or_output_char_range R> template <traits::adc_view_or_output_char_range R>
R proto() const R proto() const
{ {
@ -229,6 +235,17 @@ public:
return _port; return _port;
} }
template <traits::adc_view_or_output_char_range R>
R portView() const
{
return part<R>(PORT_PART);
}
std::string_view portView() const
{
return portView<std::string_view>();
}
template <traits::adc_view_or_output_char_range R> template <traits::adc_view_or_output_char_range R>
R path() const R path() const
{ {
@ -289,7 +306,7 @@ public:
protected: protected:
std::string _endpoint; std::string _endpoint;
std::string_view _proto, _host, _path; std::string_view _proto, _host, _path, _portView;
int _port; int _port;
bool _isValid; bool _isValid;
@ -312,7 +329,7 @@ protected:
return found ? idx : -1; return found ? idx : -1;
} }
enum EndpointPart { PROTO_PART, HOST_PART, PATH_PART }; enum EndpointPart { PROTO_PART, HOST_PART, PATH_PART, PORT_PART };
template <traits::adc_view_or_output_char_range R> template <traits::adc_view_or_output_char_range R>
R part(EndpointPart what) const R part(EndpointPart what) const
@ -335,6 +352,9 @@ protected:
case PATH_PART: case PATH_PART:
part = _path; part = _path;
break; break;
case PORT_PART:
part = _portView;
break;
default: default:
break; break;
} }

View File

@ -52,15 +52,15 @@ using adc_common_duration_t = adc_duration_common_type_t<std::chrono::nanosecond
// a) true - asynchronous operation completed without errors // a) true - asynchronous operation completed without errors
// b) false - an error occured // b) false - an error occured
template <typename ERRT> template <typename ERRT>
concept adc_async_callback_err_t = std::convertible_to<std::remove_cvref_t<ERRT>, bool> || concept adc_async_callback_err_c = std::convertible_to<std::remove_cvref_t<ERRT>, bool> ||
requires(const std::remove_cvref_t<ERRT> err) { err.operator bool(); }; requires(const std::remove_cvref_t<ERRT> err) { err.operator bool(); };
// concepts for asynchronous opereration callback callable // concepts for asynchronous opereration callback callable
// 1) the type must be a callable with at least 1 input argument // 1) the type must be a callable with at least 1 input argument
// 2) the first argument type must satisfy the concept adc_async_callback_err_t // 2) the first argument type must satisfy the concept adc_async_callback_err_c
template <typename T> template <typename T>
concept adc_async_callback_t = traits::adc_is_callable<T> && (traits::adc_func_traits<T>::arity >= 1) && concept adc_async_callback_c = traits::adc_is_callable<T> && (traits::adc_func_traits<T>::arity >= 1) &&
adc_async_callback_err_t<traits::adc_func_arg1_t<T>>; adc_async_callback_err_c<traits::adc_func_arg1_t<T>>;
/* /*
struct NetService { struct NetService {
@ -107,23 +107,25 @@ concept adc_netservice_c = requires(SRVT srv, const SRVT srv_const) {
// underlying protocol // underlying protocol
// asynchronous operation error // asynchronous operation error
requires adc_async_callback_err_t<typename SRVT::async_callback_err_t>; requires adc_async_callback_err_c<typename SRVT::async_callback_err_t>;
// callback callables for asynchronous operations // callback callables for asynchronous operations
requires adc_async_callback_t<typename SRVT::async_connect_callback_t>; requires adc_async_callback_c<typename SRVT::async_connect_callback_t>;
requires adc_async_callback_t<typename SRVT::async_send_callback_t>; requires adc_async_callback_c<typename SRVT::async_send_callback_t>;
requires adc_async_callback_t<typename SRVT::async_receive_callback_t>; requires adc_async_callback_c<typename SRVT::async_receive_callback_t>;
// acceptor type // acceptor type
requires std::is_class_v<typename SRVT::acceptor_t>; requires std::is_class_v<typename SRVT::acceptor_t>;
requires adc_async_callback_t<typename SRVT::acceptor_t::async_accept_callback_t>; requires adc_async_callback_c<typename SRVT::acceptor_t::async_accept_callback_t>;
requires requires(typename SRVT::acceptor_t acc) { requires requires(typename SRVT::acceptor_t acc, const typename SRVT::acceptor_t acc_const) {
acc.asyncAccept(std::declval<typename SRVT::acceptor_t::async_accept_callback_t>(), acc.asyncAccept(std::declval<typename SRVT::acceptor_t::async_accept_callback_t>(),
std::declval<const typename SRVT::timeout_t&>()); std::declval<const typename SRVT::timeout_t&>());
// { acc.accept(std::declval<const typename SRVT::timeout_t&>()) } -> std::same_as<SRVT>; // { acc.accept(std::declval<const typename SRVT::timeout_t&>()) } -> std::same_as<SRVT>;
acc.accept(std::declval<const typename SRVT::timeout_t&>()); acc.accept(std::declval<const typename SRVT::timeout_t&>());
{ acc_const.localEndpoint() } -> traits::formattable;
}; };
@ -134,9 +136,6 @@ concept adc_netservice_c = requires(SRVT srv, const SRVT srv_const) {
// asynchronous (non-blocking) operations // asynchronous (non-blocking) operations
// srv.asyncAccept(std::declval<const typename SRVT::endpoint_t&>(), std::declval<typename
// SRVT::async_call_ctx_t&>(),
// std::declval<const typename SRVT::timeout_t&>());
srv.asyncConnect(std::declval<const typename SRVT::endpoint_t&>(), srv.asyncConnect(std::declval<const typename SRVT::endpoint_t&>(),
std::declval<typename SRVT::async_connect_callback_t>(), std::declval<typename SRVT::async_connect_callback_t>(),
@ -150,7 +149,7 @@ concept adc_netservice_c = requires(SRVT srv, const SRVT srv_const) {
std::declval<const typename SRVT::timeout_t&>()); std::declval<const typename SRVT::timeout_t&>());
// synchronous (blocking) operations // synchronous (blocking) operations
// srv.accept(std::declval<const typename SRVT::endpoint_t&>(), std::declval<const typename SRVT::timeout_t&>()); // it is assumed these methods throw an exception if error occures
srv.connect(std::declval<const typename SRVT::endpoint_t&>(), std::declval<const typename SRVT::timeout_t&>()); srv.connect(std::declval<const typename SRVT::endpoint_t&>(), std::declval<const typename SRVT::timeout_t&>());
@ -160,6 +159,8 @@ concept adc_netservice_c = requires(SRVT srv, const SRVT srv_const) {
srv.close(); srv.close();
{ srv_const.remoteEndpoint() } -> traits::formattable;
// // static method // // static method
// SRVT::formatError(std::declval<typename SRVT::async_callback_err_t>(), std::declval<std::string&>()); // SRVT::formatError(std::declval<typename SRVT::async_callback_err_t>(), std::declval<std::string&>());
@ -177,7 +178,7 @@ concept adc_netsession_c =
requires adc_netservice_c<typename SESST::netservice_t>; requires adc_netservice_c<typename SESST::netservice_t>;
typename SESST::netsession_ctx_t; typename SESST::netsession_ctx_t;
requires std::constructible_from<SESST, const typename SESST::netsession_ident_t, requires std::constructible_from<SESST, typename SESST::netsession_ident_t,
// std::shared_ptr<typename SESST::netservice_t>, // std::shared_ptr<typename SESST::netservice_t>,
typename SESST::netservice_t, typename SESST::netsession_ctx_t>; typename SESST::netservice_t, typename SESST::netsession_ctx_t>;
@ -224,16 +225,28 @@ concept adc_netsession_proto_c =
/* LOGGER */ /* LOGGER:
*
*
*/
template <typename LOGGERT> template <typename LOGGERT>
concept adc_logger_c = requires(LOGGERT log) { concept adc_logger_c = requires(LOGGERT log, const LOGGERT log_const) {
// logging method must accept at least the single argument - formating string typename LOGGERT::loglevel_t;
log.logInfo(std::declval<std::string_view>());
log.logInfo(std::declval<std::string_view>(), std::declval<std::string>());
log.logWarn(std::declval<std::string_view>(), std::declval<std::string>()); log.setLogLevel(std::declval<typename LOGGERT::loglevel_t>());
log.logError(std::declval<std::string_view>(), std::declval<std::string>()); { log_const.getLogLevel() } -> std::same_as<typename LOGGERT::loglevel_t>;
// logging method signature:
// void method(loglevel_t level, traits::formattable auto&& args...)
log.logMessage(std::declval<typename LOGGERT::loglevel_t>(), std::declval<std::string>());
// specialized logging methods signature:
log.logInfo(std::declval<std::string>());
log.logDebug(std::declval<std::string>());
log.logError(std::declval<std::string>());
}; };

120
net/adc_netclient.h Normal file
View File

@ -0,0 +1,120 @@
#pragma once
#include "../common/adc_utils.h"
#include "adc_net_concepts.h"
#include "adc_netserver.h"
namespace adc
{
/* A VERY GENERIC NETWORK CLIENT */
template <typename IdentT = std::string, interfaces::adc_logger_c LoggerT = utils::AdcOstreamLogger<char>>
class AdcGenericNetClient : public LoggerT, public AdcNetSessionManager
{
public:
typedef IdentT client_ident_t;
typedef LoggerT logger_t;
template <typename... LoggerCtorArgTs>
AdcGenericNetClient(const IdentT& ident, LoggerCtorArgTs&&... args)
: _clientIdent(ident), LoggerT(std::forward<LoggerCtorArgTs>(args)...)
{
}
virtual ~AdcGenericNetClient()
{
stopAllSessions();
}
// start the client: connect to server and start session
template <interfaces::adc_netsession_c SessionT, typename... NetServiceCtorArgTs>
void start(const SessionT::netservice_t::endpoint_t& endpoint,
SessionT::netsession_ident_t id,
SessionT::netsession_ctx_t sess_ctx,
NetServiceCtorArgTs&&... ctor_args)
{
auto srv_sptr =
std::make_shared<typename SessionT::netservice_t>(std::forward<NetServiceCtorArgTs>(ctor_args)...);
srv_sptr->asyncConnect(
endpoint,
[id = std::move(id), sess_ctx = std::move(sess_ctx), srv_sptr, this](auto ec) {
if (ec) {
this->logError("Cannot connect to server: {}", SessionT::netservice_t::formattableError(ec));
} else {
auto sess = std::make_shared<SessionT>(id, std::move(*srv_sptr), sess_ctx);
startSession(sess);
}
},
_connectTimeout);
}
template <interfaces::adc_netsession_c SessionT, typename TokenT, typename... NetServiceCtorArgTs>
void start(const SessionT::netservice_t::endpoint_t& endpoint,
SessionT::netsession_ident_t id,
SessionT::netsession_ctx_t sess_ctx,
TokenT&& token,
NetServiceCtorArgTs&&... ctor_args)
{
auto srv_sptr =
std::make_shared<typename SessionT::netservice_t>(std::forward<NetServiceCtorArgTs>(ctor_args)...);
// try to connect to server and create session
srv_sptr->asyncConnect(
endpoint,
[id = std::move(id), sess_ctx = std::move(sess_ctx), srv_sptr,
wrapper = traits::adc_pf_wrapper(std::forward<TokenT>(token))](auto ec) mutable {
// create session
std::get<0>(wrapper)(ec, {std::move(id), std::move(*srv_sptr), std::move(sess_ctx)});
},
_connectTimeout);
}
template <traits::adc_time_duration_c DT>
void setConnectTimeout(const DT& timeout)
{
_connectTimeout = std::chrono::duration_cast<decltype(_connectTimeout)>(timeout);
}
auto getConnectionTimeout() const
{
return _connectTimeout;
}
protected:
client_ident_t _clientIdent;
std::chrono::milliseconds _connectTimeout = std::chrono::milliseconds(5000);
template <interfaces::adc_netservice_c ServiceT,
traits::adc_input_char_range SendMsgT,
typename TokenT,
traits::adc_time_duration_c SendTimeoutT = std::chrono::milliseconds,
traits::adc_time_duration_c RecvTimeoutT = std::chrono::milliseconds>
auto asyncSendRecv(ServiceT& netservice,
const SendMsgT& send_msg,
TokenT&& token,
const SendTimeoutT& send_timeout = std::chrono::milliseconds(5000),
const RecvTimeoutT& recv_timeout = std::chrono::milliseconds(5000))
{
return netservice.asyncSend(
send_msg,
[&netservice, recv_timeout, wrapper = traits::adc_pf_wrapper(std::forward<TokenT>(token)), this](auto err) {
if (err) {
this->logError("An error occured while sending the message: {}", ServiceT::formattableError(err));
} else {
netservice.asyncReceive(std::get<0>(wrapper), recv_timeout);
}
},
send_timeout);
}
};
} // namespace adc

View File

@ -8,7 +8,6 @@ ABSTRACT DEVICE COMPONENTS LIBRARY
#include <filesystem> #include <filesystem>
#include <functional> #include <functional>
#include <iostream>
#include <list> #include <list>
#include <set> #include <set>
#include <unordered_map> #include <unordered_map>
@ -21,8 +20,10 @@ ABSTRACT DEVICE COMPONENTS LIBRARY
#endif #endif
#include "../common/adc_utils.h"
#include "adc_net_concepts.h" #include "adc_net_concepts.h"
namespace adc namespace adc
{ {
@ -30,87 +31,6 @@ namespace adc
/* SOME USEFULL PRIVITIVES */ /* SOME USEFULL PRIVITIVES */
// A simple std::ostream logger
class AdcTrivialLogger
{
public:
virtual ~AdcTrivialLogger() = default;
// protected:
static constexpr std::string_view errorLevelMark{"error"};
static constexpr std::string_view warnLevelMark{"warning"};
static constexpr std::string_view infoLevelMark{"info"};
std::basic_ostream<char>& _stream;
AdcTrivialLogger(std::basic_ostream<char>& stream = std::cout) : _stream(stream) {}
template <traits::formattable... ArgTs>
std::string logMsgFormat(std::string_view level_mark, std::string_view fmt, ArgTs&&... args)
{
std::string s;
std::format_to(std::back_inserter(s), fmt, std::forward<ArgTs>(args)...);
std::stringstream st;
const std::time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
// format log-message in form:
// [YYYY-MM-DD HH:MM:SS][level] log-message
//
st << std::put_time(std::localtime(&now), "[%F %T]") << "[" << level_mark << "] " << s;
return st.str();
}
// define default logging function of ERROR, WARNING and INFO levels
template <traits::formattable... ArgTs>
void logError(this auto&& self, std::string_view fmt, ArgTs&&... args)
{
using obj_t = decltype(self);
if constexpr (std::same_as<std::remove_cvref_t<obj_t>, AdcTrivialLogger>) {
std::forward<obj_t>(self)._stream
<< std::forward<obj_t>(self).logMsgFormat(errorLevelMark, fmt, std::forward<ArgTs>(args)...) << "\n"
<< std::flush;
} else {
std::forward<obj_t>(self).logError(fmt, std::forward<ArgTs>(args)...);
}
}
template <traits::formattable... ArgTs>
void logWarn(this auto&& self, std::string_view fmt, ArgTs&&... args)
{
using obj_t = decltype(self);
if constexpr (std::same_as<obj_t, AdcTrivialLogger>) {
std::forward<obj_t>(self)._stream
<< std::forward<obj_t>(self).logMsgFormat(warnLevelMark, fmt, std::forward<ArgTs>(args)...) << "\n"
<< std::flush;
} else {
std::forward<obj_t>(self).logWarn(fmt, std::forward<ArgTs>(args)...);
}
}
template <traits::formattable... ArgTs>
void logInfo(this auto&& self, std::string_view fmt, ArgTs&&... args)
{
using obj_t = decltype(self);
if constexpr (std::same_as<obj_t, AdcTrivialLogger>) {
std::forward<obj_t>(self)._stream
<< std::forward<obj_t>(self).logMsgFormat(infoLevelMark, fmt, std::forward<ArgTs>(args)...) << "\n"
<< std::flush;
} else {
std::forward<obj_t>(self).logInfo(fmt, std::forward<ArgTs>(args)...);
}
}
};
static_assert(interfaces::adc_logger_c<AdcTrivialLogger>, "!!!!!!!!!!!");
// A generic implementation of POSIX OS daemon // A generic implementation of POSIX OS daemon
class AdcPosixGenericDaemon class AdcPosixGenericDaemon
{ {
@ -131,8 +51,6 @@ public:
tmp_path = std::filesystem::current_path().root_path(); tmp_path = std::filesystem::current_path().root_path();
} }
// _ioContext.notify_fork(asio::execution_context::fork_prepare);
if (pid_t pid = fork()) { if (pid_t pid = fork()) {
if (pid > 0) { if (pid > 0) {
exit(0); exit(0);
@ -161,7 +79,6 @@ public:
close(1); close(1);
close(2); close(2);
// _ioContext.notify_fork(asio::io_context::fork_child);
#endif #endif
daemonizeFinalize(); daemonizeFinalize();
} }
@ -303,25 +220,39 @@ protected:
}; };
static_assert(interfaces::adc_logger_c<utils::AdcOstreamLogger<>>, "!!!!!");
/* very generic network server */ /* very generic network server */
template <typename IdentT = std::string> template <typename IdentT = std::string, interfaces::adc_logger_c LoggerT = utils::AdcOstreamLogger<char>>
class AdcGenericNetServer : public AdcPosixGenericDaemon, public AdcNetSessionManager, public AdcTrivialLogger class AdcGenericNetServer : public AdcPosixGenericDaemon, public AdcNetSessionManager, public LoggerT
{ {
public: public:
typedef IdentT server_ident_t; typedef IdentT server_ident_t;
typedef LoggerT logger_t;
AdcGenericNetServer(const server_ident_t& id, std::basic_ostream<char>& log_stream = std::cout) template <typename... LoggerCtorArgTs>
: AdcTrivialLogger(log_stream), _serverIdent(id) AdcGenericNetServer(const server_ident_t& id, LoggerCtorArgTs&&... ctor_args)
: _serverIdent(id), LoggerT(std::forward<LoggerCtorArgTs>(ctor_args)...)
{ {
if constexpr (traits::formattable<IdentT>) {
logInfo("Create ADC generic network server with ID: {} (addr = {}, thread = {})", id, (void*)this,
utils::AdcThisThreadId());
} else {
logInfo("Create ADC generic network server (addr = {}, thread = {})", (void*)this,
utils::AdcThisThreadId());
}
} }
AdcGenericNetServer(const AdcGenericNetServer&) = delete; AdcGenericNetServer(const AdcGenericNetServer&) = delete;
AdcGenericNetServer(AdcGenericNetServer&& other) AdcGenericNetServer(AdcGenericNetServer&& other)
: AdcPosixGenericDaemon(std::move(other)), AdcNetSessionManager(std::move(other)) : AdcPosixGenericDaemon(std::move(other)), AdcNetSessionManager(std::move(other)), LoggerT(std::move(other))
{ {
logDebug("Move ADC server class: this = {}, target = {}", (void*)this, (void*)&other);
if (this == &other) { if (this == &other) {
return; return;
} }
@ -336,11 +267,22 @@ public:
_moveCtorFunc = std::move(other._moveCtorFunc); _moveCtorFunc = std::move(other._moveCtorFunc);
} }
virtual ~AdcGenericNetServer() = default; virtual ~AdcGenericNetServer()
{
if constexpr (traits::formattable<IdentT>) {
logInfo("Delete ADC generic network server with ID: {} (addr = {}, thread = {})", _serverIdent, (void*)this,
utils::AdcThisThreadId());
} else {
logInfo("Delete ADC generic network server (addr = {}, thread = {})", (void*)this,
utils::AdcThisThreadId());
}
};
AdcGenericNetServer& operator=(const AdcGenericNetServer&) = delete; AdcGenericNetServer& operator=(const AdcGenericNetServer&) = delete;
AdcGenericNetServer& operator=(AdcGenericNetServer&& other) AdcGenericNetServer& operator=(AdcGenericNetServer&& other)
{ {
logDebug("Assign-move ADC server class: this = {}, target = {}", (void*)this, (void*)&other);
if (this != &other) { if (this != &other) {
AdcPosixGenericDaemon::operator=(std::move(other)); AdcPosixGenericDaemon::operator=(std::move(other));
AdcNetSessionManager::operator=(std::move(other)); AdcNetSessionManager::operator=(std::move(other));
@ -369,6 +311,8 @@ public:
void start(SessionT::netsession_ident_t id, SessionT::netsession_ctx_t sess_ctx, AcceptorCtorArgTs&&... ctor_args) void start(SessionT::netsession_ident_t id, SessionT::netsession_ctx_t sess_ctx, AcceptorCtorArgTs&&... ctor_args)
requires traits::adc_hashable_c<typename SessionT::netsession_ident_t> requires traits::adc_hashable_c<typename SessionT::netsession_ident_t>
{ {
logDebug("Call {}", __PRETTY_FUNCTION__);
if (!_isListening<SessionT>[this][id]) { if (!_isListening<SessionT>[this][id]) {
auto acceptor = std::make_shared<typename SessionT::netservice_t::acceptor_t>( auto acceptor = std::make_shared<typename SessionT::netservice_t::acceptor_t>(
std::forward<AcceptorCtorArgTs>(ctor_args)...); std::forward<AcceptorCtorArgTs>(ctor_args)...);
@ -380,6 +324,16 @@ public:
_isListening<SessionT>[inst][id] = false; _isListening<SessionT>[inst][id] = false;
}); });
if constexpr (traits::formattable<typename SessionT::netsession_ident_t>) {
logInfo(
"Start listening for client connections at <{}> endpoint (session ID: {}, server addr = {}, thread "
"= {})",
acceptor->localEndpoint(), id, (void*)this, utils::AdcThisThreadId());
} else {
logInfo("Start listening for client connections at <{}> endpoint (server addr = {}, thread = {})",
acceptor->localEndpoint(), (void*)this, utils::AdcThisThreadId());
}
doAccept<SessionT>(acceptor, std::move(id), std::move(sess_ctx)); doAccept<SessionT>(acceptor, std::move(id), std::move(sess_ctx));
} }
@ -413,8 +367,47 @@ public:
}; };
void setAcceptTimeout(const traits::adc_time_duration_c auto& timeout)
{
_acceptTimeout = std::chrono::duration_cast<decltype(_acceptTimeout)>(timeout);
}
auto getAcceptTimeout() const
{
return _acceptTimeout;
}
// helper methods for logging
template <traits::formattable... Ts>
void logMessage(LoggerT::loglevel_t level, std::format_string<Ts...> fmt, Ts&&... args)
{
LoggerT::logMessage(level, std::format(fmt, std::forward<Ts>(args)...));
}
template <traits::formattable... Ts>
void logInfo(std::format_string<Ts...> fmt, Ts&&... args)
{
LoggerT::logInfo(std::format(fmt, std::forward<Ts>(args)...));
}
template <traits::formattable... Ts>
void logDebug(std::format_string<Ts...> fmt, Ts&&... args)
{
LoggerT::logDebug(std::format(fmt, std::forward<Ts>(args)...));
}
template <traits::formattable... Ts>
void logError(std::format_string<Ts...> fmt, Ts&&... args)
{
LoggerT::logError(std::format(fmt, std::forward<Ts>(args)...));
}
protected: protected:
std::chrono::seconds _acceptTimeout = std::chrono::seconds::max();
// template <interfaces::adc_netsession_c SessionT> // template <interfaces::adc_netsession_c SessionT>
// inline static std::unordered_map<const AdcGenericNetServer*, bool> _isListening{}; // inline static std::unordered_map<const AdcGenericNetServer*, bool> _isListening{};
template <interfaces::adc_netsession_c SessionT> template <interfaces::adc_netsession_c SessionT>
@ -430,27 +423,29 @@ protected:
template <typename SessionT, typename AT, typename IDT, typename CTXT> template <typename SessionT, typename AT, typename IDT, typename CTXT>
void doAccept(std::shared_ptr<AT> acceptor, IDT id, CTXT sess_ctx) void doAccept(std::shared_ptr<AT> acceptor, IDT id, CTXT sess_ctx)
{ {
acceptor->asyncAccept([acceptor, id = std::move(id), sess_ctx = std::move(sess_ctx), this]( acceptor->asyncAccept(
[acceptor, id = std::move(id), sess_ctx = std::move(sess_ctx), this](
auto ec, typename SessionT::netservice_t srv) mutable { auto ec, typename SessionT::netservice_t srv) mutable {
if (!ec) { if (!ec) {
logInfo(
"Client connection is succesfully accepted! Client endpoint: {} (server addr = {}, thread = "
"{})",
srv.remoteEndpoint(), (void*)this, utils::AdcThisThreadId());
auto sess = std::make_shared<SessionT>(id, std::move(srv), sess_ctx); auto sess = std::make_shared<SessionT>(id, std::move(srv), sess_ctx);
startSession(sess); startSession(sess);
_isListening<SessionT>[this][id] = true; _isListening<SessionT>[this][id] = true;
doAccept<SessionT>(acceptor, std::move(id), std::move(sess_ctx)); doAccept<SessionT>(acceptor, std::move(id), std::move(sess_ctx));
} else { } else {
// std::string str{"Cannot start accepting connection: "}; this->logError("Cannot start accepting connection: {}",
// SessionT::netservice_t::formatError(ec, str); SessionT::netservice_t::formattableError(ec));
// errorMessage(str);
this->logError("Cannot start accepting connection: {}", SessionT::netservice_t::formattableError(ec));
_isListening<SessionT>[this][id] = false; _isListening<SessionT>[this][id] = false;
} }
}); },
_acceptTimeout);
} }
// virtual void errorMessage(const std::string&) {};
}; };

View File

@ -14,20 +14,26 @@
namespace adc::impl namespace adc::impl
{ {
template <typename IdentT = std::string> template <typename IdentT = std::string, interfaces::adc_logger_c LoggerT = utils::AdcOstreamLogger<char>>
class AdcDeviceNetServerASIO : public AdcDeviceNetServer<IdentT> class AdcDeviceNetServerASIO : public AdcDeviceNetServer<IdentT, LoggerT>
{ {
typedef AdcDeviceNetServer<IdentT> base_t; typedef AdcDeviceNetServer<IdentT, LoggerT> base_t;
public: public:
using typename base_t::logger_t;
using typename base_t::server_ident_t; using typename base_t::server_ident_t;
typedef std::string session_ident_t; typedef std::string session_ident_t;
template <typename ServiceT> template <typename ServiceT>
using Session = typename base_t::template Session<ServiceT, session_ident_t>; using Session = typename base_t::template Session<ServiceT, session_ident_t>;
AdcDeviceNetServerASIO(const server_ident_t& id, asio::io_context& io_context) template <typename... LoggerCtorArgTs>
: base_t(id), _ioContext(io_context), _stopSignal(io_context), _restartSignal(io_context) AdcDeviceNetServerASIO(const server_ident_t& id, asio::io_context& io_context, LoggerCtorArgTs&&... ctor_args)
: base_t(id, std::forward<LoggerCtorArgTs>(ctor_args)...),
_ioContext(io_context),
_stopSignal(io_context),
_restartSignal(io_context)
{ {
} }
@ -42,25 +48,71 @@ public:
#endif #endif
{ {
if (!endpoint.isValid()) { if (!endpoint.isValid()) {
this->logError("Invalid given endpoint: <{}>", endpoint.endpoint());
return; return;
} }
// may throw here! // may throw here!
if (endpoint.isTCP()) { if (endpoint.isTCP()) {
asio::ip::tcp::endpoint ept(asio::ip::make_address(endpoint.host()), endpoint.port()); // asio::ip::tcp::endpoint ept(asio::ip::make_address(endpoint.host()), endpoint.port());
using srv_t = AdcNetServiceASIO<asio::ip::tcp, SessProtoT>; using srv_t = AdcNetServiceASIO<asio::ip::tcp, SessProtoT>;
// base_t::template start<Session<srv_t>>("TCP", this, _ioContext, ept);
base_t::template start<Session<srv_t>>("TCP", {this, _sessionRecvTimeout, _sessionSendTimeout}, _ioContext, // base_t::template start<Session<srv_t>>("TCP", {this, _sessionRecvTimeout, _sessionSendTimeout},
ept); // _ioContext,
// ept);
auto res = std::make_shared<asio::ip::tcp::resolver>(_ioContext);
res->async_resolve(
endpoint.host(), endpoint.portView(), [endpoint, res, this](std::error_code ec, auto results) {
if (ec) {
this->logError(
"An error occured while resolve hostname ('{}') of the given endpoint! (ec = {})",
endpoint.host(), ec.message());
this->logError("Cannot start listening at endpoint '{}'!", endpoint.endpoint());
} else {
if (results.size() == 1) {
this->logDebug("Resolved the single IP-address for the hostname '{}'", endpoint.host());
} else {
this->logDebug("Resolved {} IP-addresses for the hostname '{}'! Use of the first one!",
results.size(), endpoint.host());
}
base_t::template start<Session<srv_t>>("TCP", {this, _sessionRecvTimeout, _sessionSendTimeout},
_ioContext, *results.begin());
}
});
#ifdef USE_OPENSSL_WITH_ASIO #ifdef USE_OPENSSL_WITH_ASIO
} else if (endpoint.isTLS()) { } else if (endpoint.isTLS()) {
asio::ip::tcp::endpoint ept(asio::ip::make_address(endpoint.host()), endpoint.port()); // asio::ip::tcp::endpoint ept(asio::ip::make_address(endpoint.host()), endpoint.port());
using srv_t = AdcNetServiceASIOTLS<asio::ip::tcp, SessProtoT>; using srv_t = AdcNetServiceASIOTLS<asio::ip::tcp, SessProtoT>;
// base_t::template start<Session<srv_t>>("TLS", this, _ioContext, ept, std::move(tls_context), // base_t::template start<Session<srv_t>>("TLS", {this, _sessionRecvTimeout, _sessionSendTimeout},
// tls_verify_mode); // _ioContext,
base_t::template start<Session<srv_t>>("TLS", {this, _sessionRecvTimeout, _sessionSendTimeout}, _ioContext, // ept, std::move(tls_context), tls_verify_mode);
ept, std::move(tls_context), tls_verify_mode);
auto res = std::make_shared<asio::ip::tcp::resolver>(_ioContext);
res->async_resolve(
endpoint.host(), endpoint.portView(),
[endpoint, res, tls_context = std::move(tls_context), tls_verify_mode, this](std::error_code ec,
auto results) mutable {
if (ec) {
this->logError(
"An error occured while resolve hostname ('{}') of the given endpoint! (ec = {})",
endpoint.host(), ec.message());
this->logError("Cannot start listening at endpoint '{}'!", endpoint.endpoint());
} else {
if (results.size() == 1) {
this->logDebug("Resolved the single IP-address for the hostname '{}'", endpoint.host());
} else {
this->logDebug("Resolved {} IP-addresses for the hostname '{}'! Use of the first one!",
results.size(), endpoint.host());
}
base_t::template start<Session<srv_t>>("TLS", {this, _sessionRecvTimeout, _sessionSendTimeout},
_ioContext, *results.begin(), std::move(tls_context),
tls_verify_mode);
}
});
#endif #endif
} else if (endpoint.isLocal()) { } else if (endpoint.isLocal()) {
if (endpoint.isLocalStream()) { if (endpoint.isLocalStream()) {
@ -93,6 +145,26 @@ public:
requires(std::convertible_to<std::ranges::range_value_t<RST>, int> && requires(std::convertible_to<std::ranges::range_value_t<RST>, int> &&
std::convertible_to<std::ranges::range_value_t<RRT>, int>) std::convertible_to<std::ranges::range_value_t<RRT>, int>)
{ {
auto sig_list = [](const auto& sig_range) {
std::string sgs;
#ifdef _GNU_SOURCE
std::vector<std::string> vsg;
std::ranges::transform(sig_range, std::back_inserter(vsg),
[](auto s) { return std::format("'{}' (No = {})", sigdescr_np(s), s); });
utils::AdcJoinRange(vsg, std::string_view(", "), sgs);
#else
sgs = utils::AdcDefaultValueConverter<utils::constants::DEFAULT_CONVERTER_DELIMITER_COMA>::serialize<
std::string>(sig_range);
#endif
return sgs;
};
this->logDebug("Setup 'stop-server' signal to: {}", sig_list(stop_sig_num));
this->logDebug("Setup 'restart-server' signal to: {}", sig_list(restart_sig_num));
for (const int sig : stop_sig_num) { for (const int sig : stop_sig_num) {
_stopSignal.add(sig); _stopSignal.add(sig);
} }
@ -119,6 +191,9 @@ public:
{ {
_sessionRecvTimeout = recv_timeout; _sessionRecvTimeout = recv_timeout;
_sessionSendTimeout = send_timeout; _sessionSendTimeout = send_timeout;
this->logDebug("Set session timeouts: recv = {} msec, send = {} msec", _sessionRecvTimeout.count(),
_sessionSendTimeout.count());
} }
protected: protected:
@ -129,25 +204,42 @@ protected:
std::chrono::milliseconds _sessionRecvTimeout = std::chrono::hours(12); std::chrono::milliseconds _sessionRecvTimeout = std::chrono::hours(12);
std::chrono::milliseconds _sessionSendTimeout = std::chrono::seconds(5); std::chrono::milliseconds _sessionSendTimeout = std::chrono::seconds(5);
void daemonize()
{
this->logInfo("Daemonize server process (server addr: {})", (void*)this);
base_t::daemonize();
}
// demonizing ASIO-related methods // demonizing ASIO-related methods
virtual void daemonizePrepare() virtual void daemonizePrepare()
{ {
this->logDebug("ASIO-related call of daemonizePrepare()");
_ioContext.notify_fork(asio::execution_context::fork_prepare); _ioContext.notify_fork(asio::execution_context::fork_prepare);
} }
virtual void daemonizeFinalize() virtual void daemonizeFinalize()
{ {
this->logDebug("ASIO-related call of daemonizeFinalize()");
_ioContext.notify_fork(asio::io_context::fork_child); _ioContext.notify_fork(asio::io_context::fork_child);
} }
virtual void signalReceived(std::error_code, int signo) virtual void signalReceived(std::error_code ec, int signo)
{ {
std::cout << "SIGNAL: " << signo << "\n"; #ifdef _GNU_SOURCE
this->logInfo("The server received the signal: '{}' (No = {}, ec = {})", sigdescr_np(signo), signo,
ec.message());
#else
this->logInfo("The server received the signal: {} (ec = {})", signo, ec.message());
#endif
}; };
virtual void restart() virtual void restart()
{ {
this->logInfo("Restart server (server addr: {})", (void*)this);
this->stopAllSessions(); this->stopAllSessions();
_restartSignal.async_wait([this](std::error_code ec, int signo) { _restartSignal.async_wait([this](std::error_code ec, int signo) {

View File

@ -259,6 +259,8 @@ public:
timer->cancel(); timer->cancel();
} }
srv->_socket.set_option(asio::socket_base::keep_alive(true));
self.complete(ec, std::move(*srv)); self.complete(ec, std::move(*srv));
srv.reset(); srv.reset();
@ -296,6 +298,15 @@ public:
_acceptor.close(ec); _acceptor.close(ec);
} }
std::string localEndpoint() const
{
std::stringstream st;
st << _acceptor.local_endpoint();
return st.str();
}
private: private:
asio::io_context& _ioContext; asio::io_context& _ioContext;
srv_acceptor_t _acceptor; srv_acceptor_t _acceptor;
@ -565,7 +576,7 @@ public:
if (!ec) { if (!ec) {
if (do_read) { if (do_read) {
do_read = false; // do_read = false;
if (_receiveQueue.size()) { // return message from queue if (_receiveQueue.size()) { // return message from queue
timer->cancel(); timer->cancel();
auto imsg = _receiveQueue.front(); auto imsg = _receiveQueue.front();
@ -579,11 +590,13 @@ public:
} }
auto n_avail = _socket.available(); auto n_avail = _socket.available();
if (!n_avail) { // if (!n_avail) {
return _socket.async_wait(asio::ip::tcp::socket::wait_read, std::move(self)); // return _socket.async_wait(asio::ip::tcp::socket::wait_read, std::move(self));
} // }
auto buff = _streamBuffer.prepare(n_avail ? n_avail : 1); auto buff = _streamBuffer.prepare(n_avail ? n_avail : 1);
do_read = false;
if constexpr (isTLS) { if constexpr (isTLS) {
return asio::async_read(_sessSocket, std::move(buff), asio::transfer_at_least(1), return asio::async_read(_sessSocket, std::move(buff), asio::transfer_at_least(1),
std::move(self)); std::move(self));
@ -629,8 +642,9 @@ public:
auto net_pack = this->search(std::span(start_ptr, _streamBuffer.size())); auto net_pack = this->search(std::span(start_ptr, _streamBuffer.size()));
if (net_pack.empty()) { if (net_pack.empty()) {
do_read = true; do_read = true;
asio::post(std::move(self)); // initiate consequence socket's read operation return _socket.async_wait(asio::ip::tcp::socket::wait_read, std::move(self));
return; // asio::post(std::move(self)); // initiate consequence socket's read operation
// return;
} }
timer->cancel(); // there were no errors in the asynchronous read-operation, so stop timer timer->cancel(); // there were no errors in the asynchronous read-operation, so stop timer
@ -758,11 +772,17 @@ public:
} }
// static void formatError(std::error_code err, std::string& result_str) std::string remoteEndpoint() const
// { {
// std::format_to(std::back_inserter(result_str), "{} (Err category: {}) (Err msg: {})", err.value(), std::stringstream st;
// err.category().name(), err.message()); st << _socket.remote_endpoint();
// }
if (st.str().empty()) {
return "<local>";
}
return st.str();
}
static std::string formattableError(std::error_code ec) static std::string formattableError(std::error_code ec)
{ {

View File

@ -8,6 +8,11 @@
#include "../net/adc_netproto.h" #include "../net/adc_netproto.h"
#include "../net/asio/adc_device_netserver_asio.h" #include "../net/asio/adc_device_netserver_asio.h"
#ifdef USE_SPDLOG_LIBRARY
#include <spdlog/sinks/stdout_color_sinks.h>
#include "../common/adc_spdlog.h"
#endif
typedef adc::impl::AdcDeviceNetServerASIO<std::string_view> server_t; typedef adc::impl::AdcDeviceNetServerASIO<std::string_view> server_t;
typedef adc::AdcDeviceAttribute<std::string, server_t::serialized_t> attr_t; typedef adc::AdcDeviceAttribute<std::string, server_t::serialized_t> attr_t;
@ -132,7 +137,18 @@ int main(int argc, char* argv[])
asio::signal_set signals(io_ctx, SIGINT, SIGTERM); asio::signal_set signals(io_ctx, SIGINT, SIGTERM);
signals.async_wait([&](std::error_code, int) { io_ctx.stop(); }); signals.async_wait([&](std::error_code, int) { io_ctx.stop(); });
adc::impl::AdcDeviceNetServerASIO server("TEST SRV", io_ctx); using server_t = adc::impl::AdcDeviceNetServerASIO<std::string, adc::AdcSpdlogLogger>;
std::shared_ptr<spdlog::logger> logger = spdlog::stdout_color_mt("console");
logger->set_level(spdlog::level::debug);
// server_t server("TEST SRV", io_ctx, logger, "[%Y-%m-%d %T.%e][%l]: %v");
server_t server("TEST SRV", io_ctx, logger);
// server.setAcceptTimeout(std::chrono::seconds(5));
// using server_t = adc::impl::AdcDeviceNetServerASIO<>;
// server_t server("TEST SRV", io_ctx);
// server.setLogLevel(server_t::logger_t::DEBUG_LEVEL);
server.setupSignals(); server.setupSignals();
server.addDevice(&dev1); server.addDevice(&dev1);
@ -154,11 +170,11 @@ int main(int argc, char* argv[])
} }
} }
std::cout << "try to start listenning at '" << ep << "' ..."; // std::cout << "try to start listenning at '" << ep << "' ...";
server.start<adc::AdcStopSeqSessionProto<>>(epn); server.start<adc::AdcStopSeqSessionProto<>>(epn);
std::cout << "\tOK\n"; // std::cout << "\tOK\n";
} else { } else {
std::cerr << "Unrecognized endpoint: '" << ep << "'! Ignore!\n"; std::cerr << "Unrecognized endpoint: '" << ep << "'! Ignore!\n";
} }