fix 100% load of CPU after client disconnection

(AdcBaseNetServiceASIO.asyncReceive)
add resolving domain name (AdcDeviceNetServerASIO)
This commit is contained in:
Timur A. Fatkhullin 2024-11-17 23:50:15 +03:00
parent 221f595bcb
commit 45b8d4a3c7
4 changed files with 108 additions and 38 deletions

View File

@ -143,7 +143,7 @@ public:
_host = std::string_view{found.end(), _endpoint.end()}; _host = std::string_view{found.end(), _endpoint.end()};
auto f1 = std::ranges::search(_host, portPathDelim); auto f1 = std::ranges::search(_host, portPathDelim);
std::string_view port_sv; // std::string_view port_sv;
if (f1.empty() && isLocal()) { // no path, but it is mandatory for 'local'! if (f1.empty() && isLocal()) { // no path, but it is mandatory for 'local'!
return _isValid; return _isValid;
} else { } else {
@ -156,15 +156,15 @@ public:
return _isValid; return _isValid;
} }
port_sv = std::string_view(f1.end(), _host.end()); _portView = std::string_view(f1.end(), _host.end());
if (port_sv.size()) { if (_portView.size()) {
_host = std::string_view(_host.begin(), f1.begin()); _host = std::string_view(_host.begin(), f1.begin());
if (!isLocal()) { if (!isLocal()) {
// convert port string to int // convert port string to int
auto end_ptr = port_sv.data() + port_sv.size(); auto end_ptr = _portView.data() + _portView.size();
auto [ptr, ec] = std::from_chars(port_sv.data(), end_ptr, _port); auto [ptr, ec] = std::from_chars(_portView.data(), end_ptr, _port);
if (ec != std::errc() || ptr != end_ptr) { if (ec != std::errc() || ptr != end_ptr) {
return _isValid; return _isValid;
} }
@ -235,6 +235,17 @@ public:
return _port; return _port;
} }
template <traits::adc_view_or_output_char_range R>
R portView() const
{
return part<R>(PORT_PART);
}
std::string_view portView() const
{
return portView<std::string_view>();
}
template <traits::adc_view_or_output_char_range R> template <traits::adc_view_or_output_char_range R>
R path() const R path() const
{ {
@ -295,7 +306,7 @@ public:
protected: protected:
std::string _endpoint; std::string _endpoint;
std::string_view _proto, _host, _path; std::string_view _proto, _host, _path, _portView;
int _port; int _port;
bool _isValid; bool _isValid;
@ -318,7 +329,7 @@ protected:
return found ? idx : -1; return found ? idx : -1;
} }
enum EndpointPart { PROTO_PART, HOST_PART, PATH_PART }; enum EndpointPart { PROTO_PART, HOST_PART, PATH_PART, PORT_PART };
template <traits::adc_view_or_output_char_range R> template <traits::adc_view_or_output_char_range R>
R part(EndpointPart what) const R part(EndpointPart what) const
@ -341,6 +352,9 @@ protected:
case PATH_PART: case PATH_PART:
part = _path; part = _path;
break; break;
case PORT_PART:
part = _portView;
break;
default: default:
break; break;
} }

View File

@ -8,7 +8,7 @@ namespace adc
{ {
/* */ /* A VERY GENERIC NETWORK CLIENT */
template <typename IdentT = std::string, interfaces::adc_logger_c LoggerT = utils::AdcOstreamLogger<char>> template <typename IdentT = std::string, interfaces::adc_logger_c LoggerT = utils::AdcOstreamLogger<char>>
@ -30,11 +30,13 @@ public:
stopAllSessions(); stopAllSessions();
} }
// start the client: connect to server and start session
template <interfaces::adc_netsession_c SessionT, typename... NetServiceCtorArgTs> template <interfaces::adc_netsession_c SessionT, typename... NetServiceCtorArgTs>
void asyncConnect(const SessionT::netservice_t::endpoint_t& endpoint, void start(const SessionT::netservice_t::endpoint_t& endpoint,
SessionT::netsession_ident_t id, SessionT::netsession_ident_t id,
SessionT::netsession_ctx_t sess_ctx, SessionT::netsession_ctx_t sess_ctx,
NetServiceCtorArgTs&&... ctor_args) NetServiceCtorArgTs&&... ctor_args)
{ {
auto srv_sptr = auto srv_sptr =
std::make_shared<typename SessionT::netservice_t>(std::forward<NetServiceCtorArgTs>(ctor_args)...); std::make_shared<typename SessionT::netservice_t>(std::forward<NetServiceCtorArgTs>(ctor_args)...);
@ -53,20 +55,6 @@ public:
} }
template <interfaces::adc_netsession_c SessionT, typename... NetServiceCtorArgTs>
void connect(const SessionT::netservice_t::endpoint_t& endpoint,
SessionT::netsession_ident_t id,
SessionT::netsession_ctx_t sess_ctx,
NetServiceCtorArgTs&&... ctor_args)
{
typename SessionT::netservice_t srv(std::forward<NetServiceCtorArgTs>(ctor_args)...);
srv.connect(endpoint, _connectTimeout);
auto sess = std::make_shared<SessionT>(std::move(id), std::move(srv), std::move(sess_ctx));
startSession(sess);
}
template <traits::adc_time_duration_c DT> template <traits::adc_time_duration_c DT>
void setConnectTimeout(const DT& timeout) void setConnectTimeout(const DT& timeout)
{ {
@ -82,6 +70,29 @@ protected:
client_ident_t _clientIdent; client_ident_t _clientIdent;
std::chrono::milliseconds _connectTimeout = std::chrono::milliseconds(5000); std::chrono::milliseconds _connectTimeout = std::chrono::milliseconds(5000);
template <interfaces::adc_netservice_c ServiceT,
traits::adc_input_char_range SendMsgT,
typename TokenT,
traits::adc_time_duration_c SendTimeoutT = std::chrono::milliseconds,
traits::adc_time_duration_c RecvTimeoutT = std::chrono::milliseconds>
auto asyncSendRecv(ServiceT& netservice,
const SendMsgT& send_msg,
TokenT&& token,
const SendTimeoutT& send_timeout = std::chrono::milliseconds(5000),
const RecvTimeoutT& recv_timeout = std::chrono::milliseconds(5000))
{
return netservice.asyncSend(
send_msg,
[&netservice, recv_timeout, wrapper = traits::adc_pf_wrapper(std::forward<TokenT>(token)), this](auto err) {
if (err) {
this->logError("An error occured while sending the message: {}", ServiceT::formattableError(err));
} else {
netservice.asyncReceive(std::get<0>(wrapper), recv_timeout);
}
},
send_timeout);
}
}; };
} // namespace adc } // namespace adc

View File

@ -53,21 +53,66 @@ public:
} }
// may throw here! // may throw here!
if (endpoint.isTCP()) { if (endpoint.isTCP()) {
asio::ip::tcp::endpoint ept(asio::ip::make_address(endpoint.host()), endpoint.port()); // asio::ip::tcp::endpoint ept(asio::ip::make_address(endpoint.host()), endpoint.port());
using srv_t = AdcNetServiceASIO<asio::ip::tcp, SessProtoT>; using srv_t = AdcNetServiceASIO<asio::ip::tcp, SessProtoT>;
// base_t::template start<Session<srv_t>>("TCP", this, _ioContext, ept);
base_t::template start<Session<srv_t>>("TCP", {this, _sessionRecvTimeout, _sessionSendTimeout}, _ioContext, // base_t::template start<Session<srv_t>>("TCP", {this, _sessionRecvTimeout, _sessionSendTimeout},
ept); // _ioContext,
// ept);
auto res = std::make_shared<asio::ip::tcp::resolver>(_ioContext);
res->async_resolve(
endpoint.host(), endpoint.portView(), [endpoint, res, this](std::error_code ec, auto results) {
if (ec) {
this->logError(
"An error occured while resolve hostname ('{}') of the given endpoint! (ec = {})",
endpoint.host(), ec.message());
this->logError("Cannot start listening at endpoint '{}'!", endpoint.endpoint());
} else {
if (results.size() == 1) {
this->logDebug("Resolved the single IP-address for the hostname '{}'", endpoint.host());
} else {
this->logDebug("Resolved {} IP-addresses for the hostname '{}'! Use of the first one!",
results.size(), endpoint.host());
}
base_t::template start<Session<srv_t>>("TCP", {this, _sessionRecvTimeout, _sessionSendTimeout},
_ioContext, *results.begin());
}
});
#ifdef USE_OPENSSL_WITH_ASIO #ifdef USE_OPENSSL_WITH_ASIO
} else if (endpoint.isTLS()) { } else if (endpoint.isTLS()) {
asio::ip::tcp::endpoint ept(asio::ip::make_address(endpoint.host()), endpoint.port()); // asio::ip::tcp::endpoint ept(asio::ip::make_address(endpoint.host()), endpoint.port());
using srv_t = AdcNetServiceASIOTLS<asio::ip::tcp, SessProtoT>; using srv_t = AdcNetServiceASIOTLS<asio::ip::tcp, SessProtoT>;
// base_t::template start<Session<srv_t>>("TLS", this, _ioContext, ept, std::move(tls_context), // base_t::template start<Session<srv_t>>("TLS", {this, _sessionRecvTimeout, _sessionSendTimeout},
// tls_verify_mode); // _ioContext,
base_t::template start<Session<srv_t>>("TLS", {this, _sessionRecvTimeout, _sessionSendTimeout}, _ioContext, // ept, std::move(tls_context), tls_verify_mode);
ept, std::move(tls_context), tls_verify_mode);
auto res = std::make_shared<asio::ip::tcp::resolver>(_ioContext);
res->async_resolve(
endpoint.host(), endpoint.portView(),
[endpoint, res, tls_context = std::move(tls_context), tls_verify_mode, this](std::error_code ec,
auto results) mutable {
if (ec) {
this->logError(
"An error occured while resolve hostname ('{}') of the given endpoint! (ec = {})",
endpoint.host(), ec.message());
this->logError("Cannot start listening at endpoint '{}'!", endpoint.endpoint());
} else {
if (results.size() == 1) {
this->logDebug("Resolved the single IP-address for the hostname '{}'", endpoint.host());
} else {
this->logDebug("Resolved {} IP-addresses for the hostname '{}'! Use of the first one!",
results.size(), endpoint.host());
}
base_t::template start<Session<srv_t>>("TLS", {this, _sessionRecvTimeout, _sessionSendTimeout},
_ioContext, *results.begin(), std::move(tls_context),
tls_verify_mode);
}
});
#endif #endif
} else if (endpoint.isLocal()) { } else if (endpoint.isLocal()) {
if (endpoint.isLocalStream()) { if (endpoint.isLocalStream()) {

View File

@ -590,9 +590,9 @@ public:
} }
auto n_avail = _socket.available(); auto n_avail = _socket.available();
if (!n_avail) { // if (!n_avail) {
return _socket.async_wait(asio::ip::tcp::socket::wait_read, std::move(self)); // return _socket.async_wait(asio::ip::tcp::socket::wait_read, std::move(self));
} // }
auto buff = _streamBuffer.prepare(n_avail ? n_avail : 1); auto buff = _streamBuffer.prepare(n_avail ? n_avail : 1);
do_read = false; do_read = false;