Files
ADC/tart
2026-05-30 16:21:45 +03:00

465 lines
94 KiB
Plaintext
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#pragma once
/*
ABSTRACT DEVICE COMPONENTS LIBRARY
*/
#include <filesystem>
#include <functional>
#include <list>
#include <set>
#include <unordered_map>
#if __has_include(<unistd.h>) // POSIX
#define FORK_EXISTS 1
#include <sys/stat.h>
#include <unistd.h>
#include <cerrno>
#endif
#include "../common/adc_utils.h"
#include "adc_net_concepts.h"
namespace adc
{
/* SOME USEFULL PRIVITIVES */
// A generic implementation of POSIX OS daemon
class AdcPosixGenericDaemon
{
public:
 virtual ~AdcPosixGenericDaemon() = default;
 // run server as daemon (still only on POSIX OSes)
 void daemonize()
 {
 daemonizePrepare();
// reference implementation of forking for POSIX OSes
#ifdef FORK_EXISTS
 // get TEMP directory in OS
 auto tmp_path = std::filesystem::temp_directory_path();
 if (tmp_path.empty()) {
 tmp_path = std::filesystem::current_path().root_path();
 }
 if (pid_t pid = fork()) {
 if (pid > 0) {
 exit(0);
 } else {
 throw std::system_error(errno, std::generic_category(), "CANNOT FORK 1-STAGE");
 }
 }
 if (setsid() == -1) {
 throw std::system_error(errno, std::generic_category(), "CANNOT FORK SETSID");
 }
 std::filesystem::current_path(tmp_path);
 umask(0);
 if (pid_t pid = fork()) {
 if (pid > 0) {
 exit(0);
 } else {
 throw std::system_error(errno, std::generic_category(), "CANNOT FORK 2-STAGE");
 }
 }
 close(0);
 close(1);
 close(2);
#endif
 daemonizeFinalize();
 }
protected:
 virtual void daemonizePrepare() = 0;
 virtual void daemonizeFinalize() = 0;
};
// a basic network session manager (basic start and stop functionality)
class AdcNetSessionManager
{
public:
 virtual ~AdcNetSessionManager() = default;
protected:
 AdcNetSessionManager() = default;
 AdcNetSessionManager(const AdcNetSessionManager&) = delete;
 AdcNetSessionManager(AdcNetSessionManager&& other)
 {
 moveInstFunc(&other, this);
 }
 AdcNetSessionManager& operator=(const AdcNetSessionManager&) = delete;
 AdcNetSessionManager& operator=(AdcNetSessionManager&& other)
 {
 moveInstFunc(&other, this);
 return *this;
 };
 template <interfaces::adc_netsession_c SessionT>
 constexpr static bool anySessionPredicate(const typename SessionT::netsession_ident_t&)
 {
 return true;
 }
 // started sessions weak pointers
 template <interfaces::adc_netsession_c SessionT>
 static inline std::unordered_map<const AdcNetSessionManager*, std::list<std::weak_ptr<SessionT>>> _serverSessions{};
 std::vector<std::function<bool(const AdcNetSessionManager*)>> _stopSessionFunc;
 std::vector<std::function<void(const AdcNetSessionManager*, const AdcNetSessionManager*)>> _moveCtorFunc;
 template <interfaces::adc_netsession_c SessionT>
 void startSession(std::shared_ptr<SessionT>& sess_ptr)
 {
 auto it = _serverSessions<SessionT>[this].end();
 it = _serverSessions<SessionT>[this].emplace(it, sess_ptr);
 sess_ptr->start();
 _stopSessionFunc.emplace_back([it](const AdcNetSessionManager* inst) {
 if (!it->expired()) { // session is still existing
 auto sess = it->lock();
 sess->stop();
 _serverSessions<SessionT>[inst].erase(it);
 return true;
 } else {
 return false;
 }
 });
 // define move-function only once per SessionT!
 if (_serverSessions<SessionT>[this].size() == 1) {
 _moveCtorFunc.emplace_back(
 [](const AdcNetSessionManager* new_instance, const AdcNetSessionManager* from_inst) {
 _serverSessions<SessionT>[new_instance] = std::move(_serverSessions<SessionT>[from_inst]);
 });
 }
 }
 template <interfaces::adc_netsession_c SessionT,
 std::predicate<typename SessionT::netsession_ident_t> PredT = decltype(anySessionPredicate<SessionT>)>
 size_t stopSessions(PredT&& comp_func = anySessionPredicate<SessionT>())
 {
 size_t N = 0;
 std::set<std::weak_ptr<SessionT>> remove_wptr;
 for (auto& wptr : _serverSessions<SessionT>[this]) {
 if (std::shared_ptr<SessionT> sptr = wptr.lock()) {
 if constexpr (std::same_as<PredT, decltype(anySessionPredicate<SessionT>)>) {
 sptr->stop();
 remove_wptr.emplace(wptr);
 ++N;
 } else {
 if (std::forward<PredT>(comp_func)(sptr->ident())) {
 sptr->stop();
 remove_wptr.emplace(wptr);
 ++N;
 }
 }
 } else { // remove already stopped sessions?!!
 remove_wptr.emplace(wptr);
 }
 }
 for (auto& wptr : remove_wptr) {
 _serverSessions<SessionT>[this].erase(wptr);
 }
 return N;
 }
 size_t stopAllSessions()
 {
 size_t N = 0;
 for (auto& func : _stopSessionFunc) {
 func(this) ? ++N : 0;
 }
 _stopSessionFunc.clear();
 _moveCtorFunc.clear(); // there are nothing to move after stopping of all sessions
 return N;
 }
 void moveInstFunc(const AdcNetSessionManager* to, const AdcNetSessionManager* from)
 {
 if (from != to) {
 for (auto& func : _moveCtorFunc) {
 func(to, from);
 }
 _stopSessionFunc = std::move(from->_stopSessionFunc);
 _moveCtorFunc = std::move(from->_moveCtorFunc);
 }
 }
};
static_assert(interfaces::adc_logger_c<utils::AdcOstreamLogger<>>, "!!!!!");
/* very generic network server */
template <typename IdentT = std::string, interfaces::adc_logger_c LoggerT = utils::AdcOstreamLogger<char>>
class AdcGenericNetServer : public AdcPosixGenericDaemon, public AdcNetSessionManager, public LoggerT
{
public:
 typedef IdentT server_ident_t;
 typedef LoggerT logger_t;
 template <typename... LoggerCtorArgTs>
 AdcGenericNetServer(const server_ident_t& id, LoggerCtorArgTs&&... ctor_args)
 : _serverIdent(id), LoggerT(std::forward<LoggerCtorArgTs>(ctor_args)...)
 {
 if constexpr (traits::formattable<IdentT>) {
 logInfo("Create ADC generic network server with ID: {} (addr = {}, thread = {})", id, (void*)this,
 utils::AdcThisThreadId());
 } else {
 logInfo("Create ADC generic network server (addr = {}, thread = {})", (void*)this,
 utils::AdcThisThreadId());
 }
 }
 AdcGenericNetServer(const AdcGenericNetServer&) = delete;
 AdcGenericNetServer(AdcGenericNetServer&& other)
 : AdcPosixGenericDaemon(std::move(other)), AdcNetSessionManager(std::move(other)), LoggerT(std::move(other))
 {
 logDebug("Move ADC server class: this = {}, target = {}", (void*)this, (void*)&other);
 if (this == &other) {
 return;
 }
 _serverIdent = std::move(other._serverIdent);
 _stopListenFunc = std::move(other._stopListenFunc);
 for (auto& func : other._moveCtorFunc) {
 func(&other, this);
 }
 _moveCtorFunc = std::move(other._moveCtorFunc);
 }
 virtual ~AdcGenericNetServer()
 {
 if constexpr (traits::formattable<IdentT>) {
 logInfo("Delete ADC generic network server with ID: {} (addr = {}, thread = {})", _serverIdent, (void*)this,
 utils::AdcThisThreadId());
 } else {
 logInfo("Delete ADC generic network server (addr = {}, thread = {})", (void*)this,
 utils::AdcThisThreadId());
 }
 };
 AdcGenericNetServer& operator=(const AdcGenericNetServer&) = delete;
 AdcGenericNetServer& operator=(AdcGenericNetServer&& other)
 {
 logDebug("Assign-move ADC server class: this = {}, target = {}", (void*)this, (void*)&other);
 if (this != &other) {
 AdcPosixGenericDaemon::operator=(std::move(other));
 AdcNetSessionManager::operator=(std::move(other));
 _serverIdent = std::move(other._serverIdent);
 _stopListenFunc = std::move(other._stopListenFunc);
 for (auto& func : other._moveCtorFunc) {
 func(&other, this);
 }
 _moveCtorFunc = std::move(other._moveCtorFunc);
 }
 return *this;
 }
 virtual server_ident_t ident() const
 {
 return _serverIdent;
 }
 // start accepting remote connections, create and start given network session
 // It must be assumed that this is asynchronous operation!!!
 template <interfaces::adc_netsession_c SessionT, typename... AcceptorCtorArgTs>
 void start(SessionT::netsession_ident_t id, SessionT::netsession_ctx_t sess_ctx, AcceptorCtorArgTs&&... ctor_args)
 requires traits::adc_hashable_c<typename SessionT::netsession_ident_t>
 {
 logDebug("Call {}", __PRETTY_FUNCTION__);
 if (!_isListening<SessionT>[this][id]) {
 auto acceptor = std::make_shared<typename SessionT::netservice_t::acceptor_t>(
 std::forward<AcceptorCtorArgTs>(ctor_args)...);
 _stopListenFunc.emplace_back([acceptor, id](const AdcGenericNetServer* inst) {
 std::error_code ec;
 acceptor->close(ec);
 _isListening<SessionT>[inst][id] = false;
 });
 if constexpr (traits::formattable<typename SessionT::netsession_ident_t>) {
 logInfo(
 "Start listening for client connections at <{}> endpoint (session ID: {}, server addr = {}, thread "
 "= {})",
 acceptor->localEndpoint(), id, (void*)this, utils::AdcThisThreadId());
 } else {
 logInfo("Start listening for client connections at <{}> endpoint (server addr = {}, thread = {})",
 acceptor->localEndpoint(), (void*)this, utils::AdcThisThreadId());
 }
 doAccept<SessionT>(acceptor, std::move(id), std::move(sess_ctx));
 }
 // only once per SessionT
 if (_isListening<SessionT>[this].size() == 1) {
 _moveCtorFunc.emplace_back(
 [](const AdcGenericNetServer* new_instance, const AdcGenericNetServer* from_inst) {
 _isListening<SessionT>[new_instance] = std::move(_isListening<SessionT>[from_inst]);
 });
 }
 };
 template <interfaces::adc_netsession_c SessionT>
 bool isListening(const typename SessionT::netsession_ident_t& id) const
 {
 return _isListening<SessionT>[this][id];
 }
 virtual void start() = 0;
 virtual void stop()
 {
 for (auto& func : _stopListenFunc) {
 func(this);
 }
 _stopListenFunc.clear();
 stopAllSessions();
 };
 void setAcceptTimeout(const traits::adc_time_duration_c auto& timeout)
 {
 _acceptTimeout = std::chrono::duration_cast<decltype(_acceptTimeout)>(timeout);
 }
 auto getAcceptTimeout() const
 {
 return _acceptTimeout;
 }
 // helper methods for logging
 template <traits::formattable... Ts>
 void logMessage(LoggerT::loglevel_t level, std::format_string<Ts...> fmt, Ts&&... args)
 {
 LoggerT::logMessage(level, std::format(fmt, std::forward<Ts>(args)...));
 }
 template <traits::formattable... Ts>
 void logInfo(std::format_string<Ts...> fmt, Ts&&... args)
 {
 LoggerT::logInfo(std::format(fmt, std::forward<Ts>(args)...));
 }
 template <traits::formattable... Ts>
 void logDebug(std::format_string<Ts...> fmt, Ts&&... args)
 {
 LoggerT::logDebug(std::format(fmt, std::forward<Ts>(args)...));
 }
 template <traits::formattable... Ts>
 void logError(std::format_string<Ts...> fmt, Ts&&... args)
 {
 LoggerT::logError(std::format(fmt, std::forward<Ts>(args)...));
 }
protected:
 std::chrono::seconds _acceptTimeout = std::chrono::seconds::max();
 // template <interfaces::adc_netsession_c SessionT>
 // inline static std::unordered_map<const AdcGenericNetServer*, bool> _isListening{};
 template <interfaces::adc_netsession_c SessionT>
 inline static std::unordered_map<const AdcGenericNetServer*,
 std::unordered_map<typename SessionT::netsession_ident_t, bool>>
 _isListening{};
 std::vector<std::function<void(const AdcGenericNetServer*)>> _stopListenFunc;
 std::vector<std::function<void(const AdcGenericNetServer*, const AdcGenericNetServer*)>> _moveCtorFunc;
 server_ident_t _serverIdent;
 template <typename SessionT, typename AT, typename IDT, typename CTXT>
 void doAccept(std::shared_ptr<AT> acceptor, IDT id, CTXT sess_ctx)
 {
 acceptor->asyncAccept(
 [acceptor, id = std::move(id), sess_ctx = std::move(sess_ctx), this](
 auto ec, typename SessionT::netservice_t srv) mutable {
 if (!ec) {
 logInfo(
 "Client connection is succesfully accepted! Client endpoint: {} (server addr = {}, thread = "
 "{})",
 srv.remoteEndpoint(), (void*)this, utils::AdcThisThreadId());
 auto sess = std::make_shared<SessionT>(id, std::move(srv), sess_ctx);
 startSession(sess);
 _isListening<SessionT>[this][id] = true;
 doAccept<SessionT>(acceptor, std::move(id), std::move(sess_ctx));
 } else {
 this->logError("Cannot start accepting connection: {}",
 SessionT::netservice_t::formattableError(ec));
 _isListening<SessionT>[this][id] = false;
 }
 },
 _acceptTimeout);
 }
};
namespace interfaces
{
template <typename T>
concept adc_generic_netserver_c = requires {
 typename T::server_ident_t;
 requires std::derived_from<T, adc::AdcGenericNetServer<typename T::server_ident_t>>;
};
} // namespace interfaces
} // namespace adc