This commit is contained in:
Timur A. Fatkhullin 2024-10-29 01:21:24 +03:00
parent 1047b57013
commit 4e3e3ec60e
7 changed files with 109 additions and 87 deletions

View File

@ -105,7 +105,7 @@ public:
} else { } else {
T res; T res;
auto bs = _byteSequence | std::views::drop_while([](const auto& ch) { return ch = ' '; }); auto bs = _byteSequence | std::views::drop_while([](const auto& ch) { return ch == ' '; });
auto found = std::ranges::search(bs, keyValueDelimiter); auto found = std::ranges::search(bs, keyValueDelimiter);
if (!found.empty()) { if (!found.empty()) {

View File

@ -299,7 +299,7 @@ public:
auto dev_name = get_elem(0); auto dev_name = get_elem(0);
bool found = false; bool found = false;
for (auto& [ptr, dev_wr] : _serverPtr->_devices) { for (auto& [ptr, dev_wr] : _serverPtr->_devices) {
if (dev_wr.ident() == dev_name) { if (std::ranges::equal(dev_wr.ident(), dev_name)) {
_bindDevice = dev_wr; _bindDevice = dev_wr;
found = true; found = true;
break; break;

View File

@ -40,8 +40,6 @@ namespace adc
class AdcEndpointParser class AdcEndpointParser
{ {
typedef std::span<char> host_part_t;
public: public:
static constexpr std::string_view protoHostDelim = "://"; static constexpr std::string_view protoHostDelim = "://";
static constexpr std::string_view hostPortDelim = ":"; static constexpr std::string_view hostPortDelim = ":";
@ -117,19 +115,16 @@ public:
_proto = validProtoMarks[idx]; _proto = validProtoMarks[idx];
// _host = std::string_view{found.end(), _endpoint.end()}; _host = std::string_view{found.end(), _endpoint.end()};
_host = host_part_t{found.end(), _endpoint.end()};
auto f1 = std::ranges::search(_host, portPathDelim); auto f1 = std::ranges::search(_host, portPathDelim);
std::string_view port_sv; std::string_view port_sv;
if (f1.empty() && isLocal()) { // no path, but it is mandatory for 'local'! if (f1.empty() && isLocal()) { // no path, but it is mandatory for 'local'!
return _isValid; return _isValid;
} else { } else {
// _host = std::string_view(_host.begin(), f1.begin()); _host = std::string_view(_host.begin(), f1.begin());
_host = host_part_t{found.end(), _endpoint.end()};
// _path = std::string_view(f1.end(), &*_endpoint.end()); _path = std::string_view(f1.end(), &*_endpoint.end());
_path = std::string_view(&*f1.end(), &*_endpoint.end());
f1 = std::ranges::search(_host, hostPortDelim); f1 = std::ranges::search(_host, hostPortDelim);
if (f1.empty() && !isLocal()) { // no port, but it is mandatory for non-local! if (f1.empty() && !isLocal()) { // no port, but it is mandatory for non-local!
@ -138,8 +133,7 @@ public:
port_sv = std::string_view(f1.end(), _host.end()); port_sv = std::string_view(f1.end(), _host.end());
if (port_sv.size()) { if (port_sv.size()) {
// _host = std::string_view(_host.begin(), f1.begin()); _host = std::string_view(_host.begin(), f1.begin());
_host = host_part_t{found.end(), _endpoint.end()};
if (!isLocal()) { if (!isLocal()) {
// convert port string to int // convert port string to int
@ -165,7 +159,7 @@ public:
} }
return ok; return ok;
})) { })) {
// _host = validLocalProtoTypes[idx]; _host = validLocalProtoTypes[idx];
} else { } else {
return _isValid; return _isValid;
} }
@ -229,20 +223,17 @@ public:
bool isLocalStream() const bool isLocalStream() const
{ {
// return host() == localProtoTypeStream; return host() == localProtoTypeStream;
return utils::AdcCharRangeCompare(host(), localProtoTypeStream, true);
} }
bool isLocalDatagram() const bool isLocalDatagram() const
{ {
// return host() == localProtoTypeDatagram; return host() == localProtoTypeDatagram;
return utils::AdcCharRangeCompare(host(), localProtoTypeDatagram, true);
} }
bool isLocalSeqpacket() const bool isLocalSeqpacket() const
{ {
// return host() == localProtoTypeSeqpacket; return host() == localProtoTypeSeqpacket;
return utils::AdcCharRangeCompare(host(), localProtoTypeSeqpacket, true);
} }
@ -273,9 +264,7 @@ public:
protected: protected:
std::string _endpoint; std::string _endpoint;
// std::string_view _proto, _host, _path; std::string_view _proto, _host, _path;
std::string_view _proto, _path;
host_part_t _host;
int _port; int _port;
bool _isValid; bool _isValid;
@ -309,54 +298,28 @@ protected:
// return res; // return res;
// } // }
// auto part = _proto; auto part = _proto;
// switch (what) {
// case PROTO_PART:
// part = _proto;
// break;
// case HOST_PART:
// part = _host;
// break;
// case PATH_PART:
// part = _path;
// break;
// default:
// break;
// }
// if constexpr (std::ranges::view<R>) {
// return R(part.begin(), part.end());
// } else {
// std::ranges::copy(part, std::back_inserter(res));
// }
switch (what) { switch (what) {
case PROTO_PART: case PROTO_PART:
if constexpr (std::ranges::view<R>) { part = _proto;
res = R(_proto.begin(), _proto.size());
} else {
std::ranges::copy(_proto, std::back_inserter(res));
}
break; break;
case HOST_PART: case HOST_PART:
if constexpr (std::ranges::view<R>) { part = _host;
res = R(_host.begin(), _host.end());
} else {
std::ranges::copy(_host, std::back_inserter(res));
}
break; break;
case PATH_PART: case PATH_PART:
if constexpr (std::ranges::view<R>) { part = _path;
res = R(_path.begin(), _path.end());
} else {
std::ranges::copy(_path, std::back_inserter(res));
}
break; break;
default: default:
break; break;
} }
if constexpr (std::ranges::view<R>) {
return {part.begin(), part.end()};
} else {
std::ranges::copy(part, std::back_inserter(res));
}
return res; return res;
} }
}; };

View File

@ -11,6 +11,7 @@ ABSTRACT DEVICE COMPONENTS LIBRARY
#include <map> #include <map>
#include <set> #include <set>
#include <unordered_map> #include <unordered_map>
#include <unordered_set>
#if __has_include(<unistd.h>) // POSIX #if __has_include(<unistd.h>) // POSIX
#define FORK_EXISTS 1 #define FORK_EXISTS 1
@ -138,7 +139,7 @@ protected:
// started sessions weak pointers // started sessions weak pointers
template <interfaces::adc_netsession_c SessionT> template <interfaces::adc_netsession_c SessionT>
static std::unordered_map<const AdcNetSessionManager*, std::set<std::weak_ptr<SessionT>>> _serverSessions; static std::unordered_map<const AdcNetSessionManager*, std::unordered_set<std::weak_ptr<SessionT>>> _serverSessions;
std::vector<std::function<bool()>> _stopSessionFunc; std::vector<std::function<bool()>> _stopSessionFunc;
std::vector<std::function<void(const AdcNetSessionManager*)>> _moveCtorFunc; std::vector<std::function<void(const AdcNetSessionManager*)>> _moveCtorFunc;
@ -147,11 +148,11 @@ protected:
{ {
auto res = _serverSessions<SessionT>[this].emplace(sess_ptr); auto res = _serverSessions<SessionT>[this].emplace(sess_ptr);
if (res.second) { if (res.second) {
sess_ptr.start(); sess_ptr->start();
_stopSessionFunc.emplace_back([res, this]() { _stopSessionFunc.emplace_back([res, this]() {
if (!res.first.expired()) { // session is still existing if (!res.first->expired()) { // session is still existing
auto sess = res.first.lock(); auto sess = res.first->lock();
sess->stop(); sess->stop();
_serverSessions<SessionT>[this].erase(res.first); _serverSessions<SessionT>[this].erase(res.first);
return true; return true;
@ -308,9 +309,9 @@ public:
// only once per SessionT // only once per SessionT
if (_isListening<SessionT>[this].size() == 1) { if (_isListening<SessionT>[this].size() == 1) {
_moveCtorFunc = [this](const AdcGenericNetServer* new_instance) { _moveCtorFunc.emplace_back([this](const AdcGenericNetServer* new_instance) {
_isListening<SessionT>[new_instance] = std::move(_isListening<SessionT>[this]); _isListening<SessionT>[new_instance] = std::move(_isListening<SessionT>[this]);
}; });
} }
}; };
@ -358,7 +359,7 @@ protected:
startSession(sess); startSession(sess);
_isListening<SessionT>[this][id] = true; _isListening<SessionT>[this][id] = true;
doAccept(acceptor, id, sess_ctx); doAccept<SessionT>(acceptor, id, sess_ctx);
} else { } else {
_isListening<SessionT>[this][id] = false; _isListening<SessionT>[this][id] = false;
} }

View File

@ -25,7 +25,13 @@ public:
template <interfaces::adc_netsession_proto_c SessProtoT, std::derived_from<AdcEndpointParser> EptT> template <interfaces::adc_netsession_proto_c SessProtoT, std::derived_from<AdcEndpointParser> EptT>
#ifdef USE_OPENSSL_WITH_ASIO
void start(const EptT& endpoint,
asio::ssl::context tls_context = asio::ssl::context(asio::ssl::context::tlsv13_server),
asio::ssl::verify_mode tls_verify_mode = asio::ssl::context_base::verify_peer)
#else
void start(const EptT& endpoint) void start(const EptT& endpoint)
#endif
{ {
if (!endpoint.isValid()) { if (!endpoint.isValid()) {
return; return;
@ -38,10 +44,11 @@ public:
asio::ip::tcp::endpoint ept(asio::ip::make_address(endpoint.host()), endpoint.port()); asio::ip::tcp::endpoint ept(asio::ip::make_address(endpoint.host()), endpoint.port());
if (endpoint.isTCP()) { if (endpoint.isTCP()) {
using srv_t = AdcNetServiceASIO<asio::ip::tcp, SessProtoT>; using srv_t = AdcNetServiceASIO<asio::ip::tcp, SessProtoT>;
AdcDeviceNetServer::start<Session<srv_t>>("TCP", this, _ioContext, ept); AdcGenericNetServer::start<Session<srv_t>>("TCP", this, _ioContext, ept);
} else { } else {
using srv_t = AdcNetServiceASIOTLS<asio::ip::tcp, SessProtoT>; using srv_t = AdcNetServiceASIOTLS<asio::ip::tcp, SessProtoT>;
AdcDeviceNetServer::start<Session<srv_t>>("TLS", this, _ioContext, ept); AdcGenericNetServer::start<Session<srv_t>>("TLS", this, _ioContext, ept, tls_context, tls_verify_mode);
} }
#else #else
if (endpoint.isTCP()) { if (endpoint.isTCP()) {
@ -53,7 +60,7 @@ public:
if (endpoint.isLocalStream()) { if (endpoint.isLocalStream()) {
asio::local::stream_protocol::endpoint ept(endpoint.template path<std::string>()); asio::local::stream_protocol::endpoint ept(endpoint.template path<std::string>());
using srv_t = AdcNetServiceASIO<asio::local::stream_protocol, SessProtoT>; using srv_t = AdcNetServiceASIO<asio::local::stream_protocol, SessProtoT>;
AdcDeviceNetServer::start<Session<srv_t>>("LOCAL STREAM", this, _ioContext, ept); AdcGenericNetServer::start<Session<srv_t>>("LOCAL STREAM", this, _ioContext, ept);
// } else if (endpoint.isLocalDatagram()) { // } else if (endpoint.isLocalDatagram()) {
// asio::local::datagram_protocol::endpoint ept(endpoint.template path<std::string>()); // asio::local::datagram_protocol::endpoint ept(endpoint.template path<std::string>());
// using srv_t = AdcNetServiceASIO<asio::local::datagram_protocol, SessProtoT>; // using srv_t = AdcNetServiceASIO<asio::local::datagram_protocol, SessProtoT>;
@ -61,7 +68,7 @@ public:
} else if (endpoint.isLocalSeqpacket()) { } else if (endpoint.isLocalSeqpacket()) {
asio::local::seq_packet_protocol::endpoint ept(endpoint.template path<std::string>()); asio::local::seq_packet_protocol::endpoint ept(endpoint.template path<std::string>());
using srv_t = AdcNetServiceASIO<asio::local::seq_packet_protocol, SessProtoT>; using srv_t = AdcNetServiceASIO<asio::local::seq_packet_protocol, SessProtoT>;
AdcDeviceNetServer::start<Session<srv_t>>("LOCAL SEQPACK", this, _ioContext, ept); AdcGenericNetServer::start<Session<srv_t>>("LOCAL SEQPACK", this, _ioContext, ept);
} }
} else { } else {
throw std::system_error(std::make_error_code(std::errc::protocol_not_supported)); throw std::system_error(std::make_error_code(std::errc::protocol_not_supported));

View File

@ -183,7 +183,9 @@ public:
auto timer = netservice_t::getDeadlineTimer(_acceptor, timeout); auto timer = netservice_t::getDeadlineTimer(_acceptor, timeout);
auto srv = std::make_unique<netservice_t>(_ioContext); // auto srv = std::make_unique<netservice_t>(_ioContext);
auto srv = netservice_t::isTLS ? std::make_unique<netservice_t>(_ioContext, srv->_tlsContext)
: std::make_unique<netservice_t>(_ioContext);
return asio::async_compose<TokenT, void(std::error_code, netservice_t)>( return asio::async_compose<TokenT, void(std::error_code, netservice_t)>(
[timer = std::move(timer), srv = std::move(srv), state = sock_accept, this]( [timer = std::move(timer), srv = std::move(srv), state = sock_accept, this](
@ -254,6 +256,11 @@ public:
return accept(timeout); return accept(timeout);
} }
void close(std::error_code& ec)
{
_acceptor.close(ec);
}
private: private:
asio::io_context& _ioContext; asio::io_context& _ioContext;
srv_acceptor_t _acceptor; srv_acceptor_t _acceptor;
@ -273,14 +280,14 @@ public:
} }
AdcBaseNetServiceASIO(socket_t socket) // AdcBaseNetServiceASIO(socket_t socket)
: SESSION_PROTOT(), // : SESSION_PROTOT(),
_ioContext(static_cast<asio::io_context&>(socket.get_executor().context())), // _ioContext(static_cast<asio::io_context&>(socket.get_executor().context())),
_socket(std::move(socket)), // _socket(std::move(socket)),
_receiveStrand(_ioContext), // _receiveStrand(_ioContext),
_receiveQueue() // _receiveQueue()
{ // {
} // }
#ifdef USE_OPENSSL_WITH_ASIO #ifdef USE_OPENSSL_WITH_ASIO
AdcBaseNetServiceASIO(asio::io_context& ctx, AdcBaseNetServiceASIO(asio::io_context& ctx,
@ -300,6 +307,7 @@ public:
AdcBaseNetServiceASIO(AdcBaseNetServiceASIO&& other) AdcBaseNetServiceASIO(AdcBaseNetServiceASIO&& other)
requires(!isTLS)
: _ioContext(other._ioContext), : _ioContext(other._ioContext),
_receiveStrand(std::move(other._receiveStrand)), _receiveStrand(std::move(other._receiveStrand)),
_socket(std::move(other._socket)), _socket(std::move(other._socket)),
@ -312,6 +320,23 @@ public:
_streamBuffer.commit(bytes); _streamBuffer.commit(bytes);
} }
#ifdef USE_OPENSSL_WITH_ASIO
AdcBaseNetServiceASIO(AdcBaseNetServiceASIO&& other)
requires isTLS
: _ioContext(other._ioContext),
_receiveStrand(std::move(other._receiveStrand)),
_socket(std::move(other._socket)),
_sessSocket(std::move(other._sessSocket)),
_streamBuffer(),
_receiveQueue(std::move(other._receiveQueue)),
_tlsContext(std::move(other._tlsContext)),
_tlsPeerVerifyMode(std::move(other._tlsPeerVerifyMode))
{
auto bytes = asio::buffer_copy(_streamBuffer.prepare(other._streamBuffer.size()), other._streamBuffer.data());
_streamBuffer.commit(bytes);
}
#endif
AdcBaseNetServiceASIO(const AdcBaseNetServiceASIO&) = delete; // no copy constructor! AdcBaseNetServiceASIO(const AdcBaseNetServiceASIO&) = delete; // no copy constructor!
@ -321,6 +346,7 @@ public:
AdcBaseNetServiceASIO& operator=(const AdcBaseNetServiceASIO&) = delete; AdcBaseNetServiceASIO& operator=(const AdcBaseNetServiceASIO&) = delete;
AdcBaseNetServiceASIO& operator=(AdcBaseNetServiceASIO&& other) AdcBaseNetServiceASIO& operator=(AdcBaseNetServiceASIO&& other)
requires(!isTLS)
{ {
_ioContext = other._ioContext; _ioContext = other._ioContext;
_receiveStrand = std::move(other._receiveStrand); _receiveStrand = std::move(other._receiveStrand);
@ -335,7 +361,29 @@ public:
return *this; return *this;
}; };
#ifdef USE_OPENSSL_WITH_ASIO
AdcBaseNetServiceASIO& operator=(AdcBaseNetServiceASIO&& other)
requires isTLS
{
_ioContext = other._ioContext;
_receiveStrand = std::move(other._receiveStrand);
_receiveQueue = std::move(other._receiveQueue);
_socket = std::move(other._socket);
_sessSocket = std::move(other._sessSocket);
_tlsContext = std::move(other._tlsContext);
_tlsPeerVerifyMode = std::move(other._tlsPeerVerifyMode);
_streamBuffer.consume(_streamBuffer.size());
auto bytes = asio::buffer_copy(_streamBuffer.prepare(other._streamBuffer.size()), other._streamBuffer.data());
_streamBuffer.commit(bytes);
return *this;
};
#endif
constexpr netservice_ident_t ident() const constexpr netservice_ident_t ident() const
{ {
return _ident; return _ident;
@ -424,7 +472,7 @@ public:
return _socket.async_send(buff_seq, std::move(self)); return _socket.async_send(buff_seq, std::move(self));
} else if constexpr (std::derived_from<socket_t, asio::basic_seq_packet_socket< } else if constexpr (std::derived_from<socket_t, asio::basic_seq_packet_socket<
typename socket_t::protocol_type>>) { typename socket_t::protocol_type>>) {
return _socket.async_send(buff_seq, std::move(self)); return _socket.async_send(buff_seq, 0, std::move(self));
} else { } else {
static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!"); static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!");
} }
@ -772,8 +820,12 @@ protected:
std::chrono::steady_clock::time_point::max() - std::chrono::steady_clock::now() - std::chrono::steady_clock::time_point::max() - std::chrono::steady_clock::now() -
std::chrono::seconds(1)); std::chrono::seconds(1));
timer->expires_after(timeout < max_d ? timeout : max_d); // to avoid overflow! if (timeout < max_d) {
// timer->expires_after(timeout); timer->expires_after(timeout);
} else {
timer->expires_after(max_d); // to avoid overflow!
}
timer->async_wait([&obj](const std::error_code& ec) mutable { timer->async_wait([&obj](const std::error_code& ec) mutable {
if (!ec) { if (!ec) {

View File

@ -15,7 +15,7 @@ int main(int argc, char* argv[])
options.add_options()("h,help", "Print usage"); options.add_options()("h,help", "Print usage");
options.add_options()( options.add_options()(
"endpoints", "endpoints server will be listening for", "endpoints", "endpoints server will be listening for",
cxxopts::value<std::vector<std::string>>()->default_value("local://stream/tmp/ADC_ASIO_TEST_SERVER")); cxxopts::value<std::vector<std::string>>()->default_value("local://stream/@ADC_ASIO_TEST_SERVER"));
options.parse_positional({"endpoints"}); options.parse_positional({"endpoints"});
@ -37,11 +37,10 @@ int main(int argc, char* argv[])
adc::AdcEndpointParser epn(ep); adc::AdcEndpointParser epn(ep);
if (epn.isValid()) { if (epn.isValid()) {
if (epn.isLocalSeqpacket() || epn.isLocalStream()) { if (epn.isLocalSeqpacket() || epn.isLocalStream()) {
if (opt_result["abstract"].as<bool>()) { if (epn.path()[0] == '@') { // replace '@' to '\0' (use of UNIX abstract namespace)
auto s = epn.path<std::span<char>>(); auto it = std::ranges::find(ep, '@');
if (s[0] == '@') { // replace '@' to '\0' (use of UNIX abstract namespace) *it = '\0';
s[0] = '\0'; epn = adc::AdcEndpointParser(ep);
}
} }
} }