From d1d822415b2ffad00cf745e7c354df08e470f569 Mon Sep 17 00:00:00 2001 From: "Timur A. Fatkhullin" Date: Thu, 10 Oct 2024 17:57:06 +0300 Subject: [PATCH] ... --- net/asio/adc_netservice_asio.h | 509 ++++++--------------------------- tests/adc_netservice_test.cpp | 8 +- 2 files changed, 95 insertions(+), 422 deletions(-) diff --git a/net/asio/adc_netservice_asio.h b/net/asio/adc_netservice_asio.h index f1b01c6..a9381d2 100644 --- a/net/asio/adc_netservice_asio.h +++ b/net/asio/adc_netservice_asio.h @@ -22,7 +22,6 @@ #include #include -#include #ifdef USE_OPENSSL_WITH_ASIO @@ -83,6 +82,11 @@ concept adc_asio_transport_proto_c = std::derived_from; +template +concept adc_asio_tls_transport_proto_c = + std::derived_from || std::derived_from || + std::derived_from; + template concept adc_asio_stream_transport_proto_c = std::derived_from || std::derived_from; @@ -107,20 +111,6 @@ concept adc_asio_special_comp_token_c = adc_asio_is_future || adc_asio_is_awaitable || std::same_as, asio::deferred_t>; -// class adc_asio_async_call_ctx_t -// { -// }; - - -// template -// concept adc_completion_token_c = -// std::same_as || -// (traits::adc_is_callable && -// std::conditional_t, -// std::true_type, -// std::bool_constant>>::value); - - template SESSION_PROTOT, @@ -138,7 +128,6 @@ public: using endpoint_t = typename TRANSPORT_PROTOT::endpoint; // typedefs for completion tokens (callbacks, required by 'adc_netservice_c' concept) - // typedef std::function async_accept_callback_t; typedef std::function async_connect_callback_t; typedef std::function async_send_callback_t; typedef std::function async_receive_callback_t; @@ -146,7 +135,7 @@ public: // typedefs from transport protocol using socket_t = typename TRANSPORT_PROTOT::socket; - + using sock_stream_t = socket_t&; // acceptor class acceptor_t @@ -172,10 +161,8 @@ public: typedef AdcNetServiceASIOBase netservice_t; typedef std::shared_ptr sptr_netservice_t; - // typedef std::function async_accept_callback_t; typedef std::function async_accept_callback_t; - // template TokenT, template TokenT, traits::adc_time_duration_c DT = decltype(DEFAULT_ACCEPT_TIMEOUT)> auto asyncAccept(TokenT&& token, const DT& timeout = DEFAULT_ACCEPT_TIMEOUT) @@ -188,7 +175,6 @@ public: _socket = AdcNetServiceASIOBase::socket_t{_ioContext}; auto timer = getDeadlineTimer(_acceptor, timeout); - // return asio::async_compose( return asio::async_compose( [timer = std::move(timer), start = true, this](auto& self, std::error_code ec = {}) mutable { if (!ec) { @@ -200,7 +186,6 @@ public: } } catch (std::system_error err) { timer->cancel(); - // self.complete(err.code(), AdcNetServiceASIOBase(_ioContext)); self.complete(err.code(), std::make_shared(_ioContext)); return; } @@ -233,7 +218,6 @@ public: template auto accept(const DT& timeout = DEFAULT_ACCEPT_TIMEOUT) - // AdcNetServiceASIOBase accept(const DT& timeout = DEFAULT_ACCEPT_TIMEOUT) { auto f = asyncAccept(asio::use_future, timeout); @@ -242,7 +226,6 @@ public: template auto accept(const endpoint_t& endpoint, const DT& timeout = DEFAULT_ACCEPT_TIMEOUT) - // AdcNetServiceASIOBase accept(const endpoint_t& endpoint, const DT& timeout = DEFAULT_ACCEPT_TIMEOUT) { return accept(endpoint, timeout); } @@ -284,21 +267,6 @@ public: // NOTE: CANNOT MOVE asio::streambuf CORRECTLY?!!! AdcNetServiceASIOBase(AdcNetServiceASIOBase&& other) = delete; - // AdcNetServiceASIOBase(AdcNetServiceASIOBase&& other) - // : SESSION_PROTOT(std::move(other)), - // _ioContext(other._ioContext), - // _receiveStrand(std::move(other._receiveStrand)), - // _receiveQueue(std::move(_receiveQueue)), - // _socket(std::move(other._socket)), - // _streamBuffer() - // { - // if (this == &other) - // return; - - // auto bytes = asio::buffer_copy(_streamBuffer.prepare(other._streamBuffer.size()), - // other._streamBuffer.data()); _streamBuffer.commit(bytes); - // }; - AdcNetServiceASIOBase(const AdcNetServiceASIOBase&) = delete; // no copy constructor! virtual ~AdcNetServiceASIOBase() {} @@ -307,23 +275,6 @@ public: AdcNetServiceASIOBase& operator=(const AdcNetServiceASIOBase&) = delete; AdcNetServiceASIOBase& operator=(AdcNetServiceASIOBase&& other) = delete; - // { - // if (this != &other) { - // close(); - // _streamBuffer.consume(_streamBuffer.size()); - - // auto bytes = - // asio::buffer_copy(_streamBuffer.prepare(other._streamBuffer.size()), other._streamBuffer.data()); - // _streamBuffer.commit(bytes); - - // _ioContext = other._ioContext; - // _receiveStrand = std::move(other._receiveStrand), _socket = std::move(other._socket); - - // _receiveQueue = other._receiveQueue; - // } - - // return *this; - // } constexpr netservice_ident_t ident() const @@ -411,8 +362,6 @@ public: template auto asyncReceive(TokenT&& token, const TimeoutT& timeout = DEFAULT_RECEIVE_TIMEOUT) { - // constexpr auto is_async_ctx_t = std::same_as, async_call_ctx_t>; - // check completion token signature and deduce message type // if constexpr (!adc_asio_special_comp_token_c && !is_async_ctx_t) { if constexpr (!adc_asio_special_comp_token_c) { @@ -453,7 +402,8 @@ public: return; } - auto buff = _streamBuffer.prepare(2880); + auto n_avail = _socket.available(); + auto buff = _streamBuffer.prepare(n_avail ? n_avail : 1); if constexpr (std::derived_from>) { @@ -470,22 +420,6 @@ public: } else { static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!"); } - // if constexpr (std::derived_from>) - // { - // return asio::async_read(_socket, _streamBuffer, asio::transfer_at_least(1), - // std::move(self)); - // } else if constexpr (std::derived_from>) { - // // datagram, so it should be received at once - // return _socket.async_receive(_streamBuffer, std::move(self)); - // } else if constexpr (std::derived_from>) { - // // datagram, so it should be received at once - // return _socket.async_receive(_streamBuffer, *out_flags, std::move(self)); - // } else { - // static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!"); - // } } // zero-length message for SEQ_PACK sockets is EOF @@ -493,7 +427,7 @@ public: asio::basic_seq_packet_socket>) { if (!nbytes) { timer->cancel(); - self.complete(std::make_error_code(std::errc::connection_aborted), std::move(msg)); + self.complete(std::error_code(asio::error::misc_errors::eof), std::move(msg)); return; } } @@ -676,27 +610,30 @@ protected: #ifdef USE_OPENSSL_WITH_ASIO -template SESSION_PROTOT, traits::adc_output_char_range RMSGT = std::vector> // used only for inner storing of message byte sequence -class AdcNetServiceASIOTLS : public AdcNetServiceASIOBase +class AdcNetServiceASIOTLS : public TRANSPORT_PROTOT { - typedef AdcNetServiceASIOBase base_t; - public: - using typename base_t::socket_t; + using socket_t = typename TRANSPORT_PROTOT::socket; + typedef asio::ssl::stream tls_stream_t; - using base_t::connect; - using base_t::getShutdownType; - using base_t::receive; - using base_t::send; - using base_t::setShutdownType; + // TLS certificate attributes comparison function: + // 'serial' - as returned by OpenSSL BN_bn2hex + // 'fingerprint' - as returned by OpenSSL X509_digest + // 'depth' - depth in chain + // the function must return 0 - if comparison failed; otherwise - something != 0 + typedef std::function& fingerprint, int depth)> + cert_comp_func_t; // reimplement acceptor class - class acceptor_t : public base_t::acceptor_t + class acceptor_t : public AdcNetServiceASIOBase::acceptor_t { + using base_t = AdcNetServiceASIOBase; + public: typedef AdcNetServiceASIOTLS netservice_t; typedef std::shared_ptr sptr_netservice_t; @@ -711,6 +648,7 @@ public: { enum { starting, handshaking, finishing }; + this->_socket = base_t::socket_t(this->_ioContext); auto timer = getDeadlineTimer(this->_acceptor, timeout); } @@ -732,11 +670,11 @@ public: { std::error_code ec; - _tlsStream.shutdown(ec); // shutdown OpenSSL stream + this->_sock_stream.shutdown(ec); // shutdown OpenSSL stream if (!ec) { - _tlsStream.lowest_layer().shutdown(this->_shutdownType, ec); + this->_sock_stream.lowest_layer().shutdown(this->_shutdownType, ec); if (!ec) { - _tlsStream.lowest_layer().close(ec); + this->_sock_stream.lowest_layer().close(ec); } } @@ -744,7 +682,70 @@ public: } protected: - asio::ssl::stream _tlsStream; + asio::ssl::context _tlsContext; + asio::ssl::verify_mode _tlsPeerVerifyMode; + std::string _tlsCertFingerprintDigest; + cert_comp_func_t _tlsCertCompFunc; + + asio::streambuf _streamBuffer; + + + // reference implementation + virtual bool verifyCertificate(int preverified_ok, X509_STORE_CTX* store) + { + if (preverified_ok == 0) { + int err = X509_STORE_CTX_get_error(store); + auto err_str = X509_verify_cert_error_string(err); + // log_error("TLS certificate verification error: {}", err_str); + + return preverified_ok; + } + + char subject_name[256]; + + int depth; + ASN1_INTEGER* serial; + BIGNUM* bnser; + + X509* cert = X509_STORE_CTX_get_current_cert(store); + + if (cert != NULL) { + depth = X509_STORE_CTX_get_error_depth(store); + X509_NAME_oneline(X509_get_subject_name(cert), subject_name, 256); + serial = X509_get_serialNumber(cert); // IT IS INTERNAL POINTER SO IT MUST NOT BE FREED UP!!! + bnser = ASN1_INTEGER_to_BN(serial, NULL); + auto serial_hex = BN_bn2hex(bnser); + + // log_debug("Received TLS certificate: SUBJECT = {}, SERIAL = {}, DEPTH = {}", subject_name, serial_hex, + // depth); + + // if no compare function then do not compute fingerprint + if (_tlsCertCompFunc) { + // compute certificate fingerprint + unsigned char digest_buff[EVP_MAX_MD_SIZE]; + const EVP_MD* digest = EVP_get_digestbyname(_tlsCertFingerprintDigest.c_str()); + unsigned int N; + + if (X509_digest(cert, digest, digest_buff, &N)) { + preverified_ok = _tlsCertCompFunc(std::string(serial_hex), + std::vector(digest_buff, digest_buff + N), depth); + + } else { + // log_error("Cannot compute client certificate fingerprint! Cannot verify the certificate!"); + preverified_ok = 0; + } + } + + BN_free(bnser); + OPENSSL_free(serial_hex); + + } else { + // log_error("OpenSSL error: cannot get current certificate"); + preverified_ok = 0; + } + + return preverified_ok; + } }; #endif @@ -757,332 +758,4 @@ static_assert(adc::interfaces::adc_netsession_proto_c SESSION_PROTOT> -class AsioNetService : public SESSION_PROTOT -{ -public: - // typedefs from transport protocol - using endpoint_t = typename TRANSPORT_PROTOT::endpoint; - using socket_t = typename TRANSPORT_PROTOT::socket; - using acceptor_t = - std::conditional_t>, - std::nullptr_t, // there is no acceptor - typename TRANSPORT_PROTOT::acceptor>; - - typedef std::function connect_event_hndl_t; - typedef std::function send_event_hndl_t; - typedef std::function close_event_hndl_t; - typedef std::function)> message_event_hndl_t; - - struct Events { - bool listening = false; // true - server role, false - client role - connect_event_hndl_t onConnect = [](auto...) {}; - send_event_hndl_t onSend = [](auto...) {}; - close_event_hndl_t onClose = [](auto...) {}; - message_event_hndl_t onMessage = [](auto...) {}; - }; - - AsioNetService(asio::io_context& ctx, Events events) - : SESSION_PROTOT(), - _ioContext(ctx), - _receiveStrand(_ioContext), - _receiveQueue(), - _acceptor(_ioContext), - _socket(_ioContext), - _events(events), - _stopReceiving(false), - _waitTimer(_ioContext) - { - } - - - template - static void start(AsioNetService& srv, const endpoint_t& endpoint, const DT& timeout) - { - // no acceptor for UDP-sockets - if constexpr (std::is_null_pointer_v) { - srv.startReceiving(); - srv._events.onConnect(&srv, std::error_code{}, endpoint_t()); - return; - } - - auto token = [&endpoint, &timeout, &srv](std::error_code ec) { - if (!ec) { - srv.startReceiving(); - } - - // post event - srv._events.onConnect(&srv, ec, srv._socket.remote_endpoint()); - }; - - if (srv._events.listening) { // server role (accept connections to given endpoint) - - auto timer = getDeadlineTimer(srv._acceptor, timeout); - - asio::async_compose( - [timer = std::move(timer), start = true, &endpoint, &srv](auto& self, std::error_code ec = {}) mutable { - if (!ec) { - if (start) { - start = false; - try { - if (!srv._acceptor.is_open() || (srv._acceptor.local_endpoint() != endpoint)) { - srv._acceptor = acceptor_t(srv._ioContext, endpoint); - } - } catch (std::system_error err) { - timer->cancel(); - self.complete(err.code()); - return; - } - - // return acc.async_accept(_socket, std::move(self)); - return srv._acceptor.async_accept(srv._socket, std::move(self)); - } - } - - if (isTimeout(timer, ec)) { - ec = std::make_error_code(std::errc::timed_out); - } else { // an error occured in async_connect - timer->cancel(); - } - - self.complete(ec); - }, - std::move(token), srv._ioContext); - - } else { // client role (connect to remote host) - - auto timer = getDeadlineTimer(srv._socket, timeout); - - asio::async_compose( - [start = true, endpoint, timer = std::move(timer), &srv](auto& self, std::error_code ec = {}) mutable { - if (!ec) { - if (start) { - start = false; - return srv._socket.async_connect(endpoint, std::move(self)); - } - } - - if (isTimeout(timer, ec)) { - ec = std::make_error_code(std::errc::timed_out); - } else { // an error occured in async_connect - timer->cancel(); - } - - self.complete(ec); - }, - std::move(token), srv._socket); - } - } - - - void stop() - { - std::error_code ec; - - _stopReceiving = true; - - _socket.shutdown(_shutdownType, ec); - if (!ec) { - _socket.close(ec); - } - - _events.onClose(ec); - } - - template - auto send(const R& msg, const DT& timeout) - { - auto token = [this](std::error_code ec, size_t) { - // - _events.onSend(this, ec); - }; - - // create buffer sequence of sending session protocol representation of the input message - std::vector buff_seq; - std::ranges::for_each(this->toProto(msg), [&buff_seq](const auto& el) { buff_seq.emplace_back(el); }); - - auto timer = getDeadlineTimer(_socket, timeout); - - return asio::async_compose( - [start = true, buff_seq = std::move(buff_seq), timer = std::move(timer), this]( - auto& self, std::error_code ec = {}) mutable { - if (!ec) { - if (start) { - start = false; - if constexpr (std::derived_from>) { - return asio::async_write(_socket, buff_seq, std::move(self)); - } else if constexpr (std::derived_from>) { - return _socket.async_send(buff_seq, std::move(self)); - } else if constexpr (std::derived_from>) { - return _socket.async_send(buff_seq, std::move(self)); - } else { - static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!"); - } - } - } - - if (isTimeout(timer, ec)) { - ec = std::make_error_code(std::errc::timed_out); - } else { // an error occured in async_write/async_send - timer->cancel(); - } - - self.complete(ec); - }, - std::move(token), _socket); - } - - template - bool wait(const DT& timeout) - { - if (_receiveQueue.size()) { - return true; - } - - std::error_code ec; - - std::chrono::seconds max_d = std::chrono::duration_cast( - std::chrono::steady_clock::time_point::max() - std::chrono::steady_clock::now() - std::chrono::seconds(1)); - - auto res = _waitTimers.emplace(_ioContext); - if (res.second) { - (*(res.first))->expires_after(timeout < max_d ? timeout : max_d); - - // _waitTimer.expires_after(timeout < max_d ? timeout : max_d); // to avoid overflow! - - // auto f = _waitTimer.async_wait(asio::use_future); - - auto f = (*(res.first))->async_wait(asio::use_future); - try { - f.get(); - _waitTimers.erase(res.first); - } catch (std::system_error& ex) { - if (ex.code() == asio::error::operation_aborted) { // canceled in startReceiving (message was received) - return true; - } else { - return false; - } - } - } - } - -protected: - asio::io_context& _ioContext; - asio::io_context::strand _receiveStrand; - - socket_t _socket; - - acceptor_t _acceptor; - - asio::streambuf _streamBuffer; - - std::queue> _receiveQueue; - - asio::socket_base::shutdown_type _shutdownType = asio::socket_base::shutdown_both; - - std::atomic_bool _stopReceiving; - - Events _events; - - asio::steady_timer _waitTimer; - std::set> _waitTimers; - - void startReceiving() - { - auto get_msg = [this](std::error_code ec, size_t) { - if (!ec) { - auto start_ptr = static_cast(_streamBuffer.data().data()); - - auto net_pack = this->search(std::span(start_ptr, _streamBuffer.size())); - if (!net_pack.empty()) { - _receiveQueue.emplace(); - std::ranges::copy(this->fromProto(net_pack), std::back_inserter(_receiveQueue.back())); - _streamBuffer.consume(net_pack.size()); - - - while (_streamBuffer.size()) { // search for possible additional session protocol packets - start_ptr = static_cast(_streamBuffer.data().data()); - - net_pack = this->search(std::span(start_ptr, _streamBuffer.size())); - - if (!net_pack.empty()) { - _receiveQueue.emplace(); - std::ranges::copy(this->fromProto(net_pack), std::back_inserter(_receiveQueue.back())); - _streamBuffer.consume(net_pack.size()); - } else { - break; // exit and hold remaining bytes in stream buffer - } - } - - auto msg = _receiveQueue.front(); - _events.onMessage(this, ec, std::span(msg.begin(), msg.end())); - - _receiveQueue.pop(); - } - - if (!_stopReceiving) { - startReceiving(); // initiate consequence socket's read operation - } - } else { - _events.onMessage(this, ec, std::span()); - } - }; - - - auto out_flags = std::make_shared(); - - if constexpr (std::derived_from>) { - return asio::async_read(_socket, _streamBuffer, asio::transfer_at_least(1), std::move(get_msg)); - } else if constexpr (std::derived_from>) { - // datagram, so it should be received at once - return _socket.receive(_streamBuffer, std::move(get_msg)); - } else if constexpr (std::derived_from>) { - // datagram, so it should be received at once - return _socket.receive(_streamBuffer, *out_flags, std::move(get_msg)); - } else { - static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!"); - } - } - - - template - static std::unique_ptr getDeadlineTimer(CancelableT& obj, - const TimeoutT& timeout, - bool arm = true) - { - auto timer = std::make_unique(obj.get_executor()); - - if (arm) { - std::chrono::seconds max_d = std::chrono::duration_cast( - std::chrono::steady_clock::time_point::max() - std::chrono::steady_clock::now() - - std::chrono::seconds(1)); - - timer->expires_after(timeout < max_d ? timeout : max_d); // to avoid overflow! - // timer->expires_after(timeout); - - timer->async_wait([&obj](const std::error_code& ec) mutable { - if (!ec) { - obj.cancel(); - } - }); - } - - return timer; - } - - template - static bool isTimeout(const std::unique_ptr& timer, const std::error_code& ec) - { - auto exp_time = timer->expiry(); - return (exp_time < std::chrono::steady_clock::now()) && (ec == asio::error::operation_aborted); - } -}; - } // namespace adc::impl diff --git a/tests/adc_netservice_test.cpp b/tests/adc_netservice_test.cpp index 3e2b284..17abff1 100644 --- a/tests/adc_netservice_test.cpp +++ b/tests/adc_netservice_test.cpp @@ -21,14 +21,14 @@ void receive(T srv) int main() { - // using tr_p_t = asio ::ip::tcp; + using tr_p_t = asio ::ip::tcp; // using tr_p_t = asio::local::stream_protocol; - using tr_p_t = asio::local::seq_packet_protocol; + // using tr_p_t = asio::local::seq_packet_protocol; - tr_p_t::endpoint ept_c(std::string("/tmp/AAA").insert(0, 1, '\0')); + // tr_p_t::endpoint ept_c(std::string("/tmp/AAA").insert(0, 1, '\0')); // tr_p_t::endpoint ept_c("/tmp/AAA"); - // tr_p_t::endpoint ept_c(asio::ip::make_address_v4("0.0.0.0"), 9999); + tr_p_t::endpoint ept_c(asio::ip::make_address_v4("0.0.0.0"), 9999); std::cout << "ADDR: " << ept_c << "\n"; asio::io_context ctx;