AdcNetServiceASIOBase: delete move constructor (asio::streambuf is not

movable), acceptor_t.asyncAccept completion token has signature:
void(std::error_code, std::shared_ptr<netservice_t>)
This commit is contained in:
Timur A. Fatkhullin 2024-10-02 16:43:45 +03:00
parent f329bcecec
commit 3d89dd3715
5 changed files with 103 additions and 54 deletions

View File

@ -133,6 +133,10 @@ template <typename F>
struct adc_func_traits<F&> : adc_func_traits<F> {
};
template <typename F>
struct adc_func_traits<const F&> : adc_func_traits<F> {
};
template <typename F>
struct adc_func_traits<F&&> : adc_func_traits<F> {
};

View File

@ -52,13 +52,14 @@ using adc_common_duration_t = adc_duration_common_type_t<std::chrono::nanosecond
// a) true - asynchronous operation completed without errors
// b) false - an error occured
template <typename ERRT>
concept adc_async_callback_err_t = std::convertible_to<std::remove_cvref_t<ERRT>, bool>;
concept adc_async_callback_err_t = std::convertible_to<std::remove_cvref_t<ERRT>, bool> ||
requires(const std::remove_cvref_t<ERRT> err) { err.operator bool(); };
// concepts for asynchronous opereration callback callable
// 1) the type must be a callable with at least 1 input argument
// 2) the first argument type must satisfy the concept adc_async_callback_err_t
template <typename T>
concept adc_async_callback_t = traits::adc_is_callable<T> && traits::adc_func_traits<T>::arity &&
concept adc_async_callback_t = traits::adc_is_callable<T> && (traits::adc_func_traits<T>::arity >= 1) &&
adc_async_callback_err_t<traits::adc_func_arg1_t<T>>;
/*
@ -94,7 +95,8 @@ concept adc_async_callback_t = traits::adc_is_callable<T> && traits::adc_func_tr
}
*/
template <typename SRVT>
concept adc_netservice_c = std::movable<SRVT> && requires(SRVT srv, const SRVT srv_const) {
concept adc_netservice_c = requires(SRVT srv, const SRVT srv_const) {
// concept adc_netservice_c = std::movable<SRVT> && requires(SRVT srv, const SRVT srv_const) {
typename SRVT::netservice_ident_t; // service identificator type
typename SRVT::send_msg_t; // sending message type
@ -117,7 +119,8 @@ concept adc_netservice_c = std::movable<SRVT> && requires(SRVT srv, const SRVT s
acc.asyncAccept(std::declval<typename SRVT::acceptor_t::async_accept_callback_t>(),
std::declval<const typename SRVT::timeout_t&>());
{ acc.accept(std::declval<const typename SRVT::timeout_t&>()) } -> std::same_as<SRVT>;
// { acc.accept(std::declval<const typename SRVT::timeout_t&>()) } -> std::same_as<SRVT>;
acc.accept(std::declval<const typename SRVT::timeout_t&>());
};

View File

@ -109,8 +109,9 @@ struct AdcStopSeqSessionProto {
// the second one - a view of the stop sequence
if constexpr (std::ranges::viewable_range<R>) {
auto res = std::array{std::ranges::subrange(r.begin(), r.end()),
std::ranges::subrange(STOP_SEQ.begin(), STOP_SEQ.end())};
auto res = std::array{std::span(r.begin(), r.end()), std::span(STOP_SEQ.begin(), STOP_SEQ.end())};
// auto res = std::array{std::ranges::subrange(r.begin(), r.end()),
// std::ranges::subrange(STOP_SEQ.begin(), STOP_SEQ.end())};
return res;
} else { // return a copy of input range with appended stop sequence

View File

@ -36,6 +36,36 @@
#include "../adc_net_concepts.h"
#include "../adc_netproto.h"
namespace adc::traits
{
// special ASIO-related template specializations
template <>
struct adc_func_traits<asio::use_future_t<>> {
using ret_t = std::nullptr_t;
using args_t = std::tuple<std::nullptr_t, std::nullptr_t>;
using arg1_t = std::nullptr_t;
static constexpr size_t arity = 0;
};
template <>
struct adc_func_traits<asio::use_awaitable_t<>> {
using ret_t = std::nullptr_t;
using args_t = std::tuple<std::nullptr_t, std::nullptr_t>;
using arg1_t = std::nullptr_t;
static constexpr size_t arity = 0;
};
template <>
struct adc_func_traits<asio::deferred_t> {
using ret_t = std::nullptr_t;
using args_t = std::tuple<std::nullptr_t, std::nullptr_t>;
using arg1_t = std::nullptr_t;
static constexpr size_t arity = 0;
};
} // namespace adc::traits
namespace adc::impl
{
@ -136,9 +166,14 @@ public:
}
}
typedef std::function<void(std::error_code, AdcNetServiceASIOBase)> async_accept_callback_t;
typedef AdcNetServiceASIOBase netservice_t;
typedef std::shared_ptr<netservice_t> sptr_netservice_t;
template <asio::completion_token_for<void(std::error_code, AdcNetServiceASIOBase)> TokenT,
// typedef std::function<void(std::error_code, AdcNetServiceASIOBase)> async_accept_callback_t;
typedef std::function<void(std::error_code, sptr_netservice_t)> async_accept_callback_t;
// template <asio::completion_token_for<void(std::error_code, AdcNetServiceASIOBase)> TokenT,
template <asio::completion_token_for<void(std::error_code, sptr_netservice_t)> TokenT,
traits::adc_time_duration_c DT = decltype(DEFAULT_ACCEPT_TIMEOUT)>
auto asyncAccept(TokenT&& token, const DT& timeout = DEFAULT_ACCEPT_TIMEOUT)
{
@ -150,7 +185,8 @@ public:
_socket = AdcNetServiceASIOBase::socket_t{_ioContext};
auto timer = getDeadlineTimer(_acceptor, timeout);
return asio::async_compose<TokenT, void(std::error_code, AdcNetServiceASIOBase)>(
// return asio::async_compose<TokenT, void(std::error_code, AdcNetServiceASIOBase)>(
return asio::async_compose<TokenT, void(std::error_code, sptr_netservice_t)>(
[timer = std::move(timer), start = true, this](auto& self, std::error_code ec = {}) mutable {
if (!ec) {
if (start) {
@ -161,7 +197,8 @@ public:
}
} catch (std::system_error err) {
timer->cancel();
self.complete(err.code(), AdcNetServiceASIOBase(_ioContext));
// self.complete(err.code(), AdcNetServiceASIOBase(_ioContext));
self.complete(err.code(), std::make_shared<netservice_t>(_ioContext));
return;
}
@ -175,7 +212,8 @@ public:
timer->cancel();
}
self.complete(ec, AdcNetServiceASIOBase(std::move(_socket)));
// self.complete(ec, AdcNetServiceASIOBase(std::move(_socket)));
self.complete(ec, std::make_shared<netservice_t>(std::move(_socket)));
},
token, _ioContext);
}
@ -191,7 +229,8 @@ public:
}
template <traits::adc_time_duration_c DT = decltype(DEFAULT_ACCEPT_TIMEOUT)>
AdcNetServiceASIOBase accept(const DT& timeout = DEFAULT_ACCEPT_TIMEOUT)
auto accept(const DT& timeout = DEFAULT_ACCEPT_TIMEOUT)
// AdcNetServiceASIOBase accept(const DT& timeout = DEFAULT_ACCEPT_TIMEOUT)
{
auto f = asyncAccept(asio::use_future, timeout);
@ -199,7 +238,8 @@ public:
}
template <traits::adc_time_duration_c DT = decltype(DEFAULT_ACCEPT_TIMEOUT)>
AdcNetServiceASIOBase accept(const endpoint_t& endpoint, const DT& timeout = DEFAULT_ACCEPT_TIMEOUT)
auto accept(const endpoint_t& endpoint, const DT& timeout = DEFAULT_ACCEPT_TIMEOUT)
// AdcNetServiceASIOBase accept(const endpoint_t& endpoint, const DT& timeout = DEFAULT_ACCEPT_TIMEOUT)
{
return accept(endpoint, timeout);
}
@ -239,19 +279,22 @@ public:
}
AdcNetServiceASIOBase(AdcNetServiceASIOBase&& other)
: _ioContext(other._ioContext),
_receiveStrand(std::move(other._receiveStrand)),
_receiveQueue(std::move(_receiveQueue)),
_socket(std::move(other._socket)),
_streamBuffer()
{
if (this == &other)
return;
// NOTE: CANNOT MOVE asio::streambuf CORRECTLY?!!!
AdcNetServiceASIOBase(AdcNetServiceASIOBase&& other) = delete;
// AdcNetServiceASIOBase(AdcNetServiceASIOBase&& other)
// : SESSION_PROTOT(std::move(other)),
// _ioContext(other._ioContext),
// _receiveStrand(std::move(other._receiveStrand)),
// _receiveQueue(std::move(_receiveQueue)),
// _socket(std::move(other._socket)),
// _streamBuffer()
// {
// if (this == &other)
// return;
auto bytes = asio::buffer_copy(_streamBuffer.prepare(other._streamBuffer.size()), other._streamBuffer.data());
_streamBuffer.commit(bytes);
};
// auto bytes = asio::buffer_copy(_streamBuffer.prepare(other._streamBuffer.size()),
// other._streamBuffer.data()); _streamBuffer.commit(bytes);
// };
AdcNetServiceASIOBase(const AdcNetServiceASIOBase&) = delete; // no copy constructor!
@ -295,8 +338,8 @@ public:
{
auto timer = getDeadlineTimer(_socket, timeout);
return asio::async_compose<TokenT, void(asio::error_code)>(
[start = true, endpoint, timer = std::move(timer), this](auto& self, asio::error_code ec = {}) mutable {
return asio::async_compose<TokenT, void(std::error_code)>(
[start = true, endpoint, timer = std::move(timer), this](auto& self, std::error_code ec = {}) mutable {
if (!ec) {
if (start) {
start = false;
@ -323,13 +366,15 @@ public:
{
// create buffer sequence of sending session protocol representation of the input message
std::vector<asio::const_buffer> buff_seq;
std::ranges::for_each(this->toProto(msg), [&buff_seq](const auto& el) { buff_seq.emplace_back(el); });
// std::ranges::for_each(this->toProto(msg), [&buff_seq](const auto& el) { buff_seq.emplace_back(el); });
std::ranges::for_each(this->toProto(msg),
[&buff_seq](const auto& el) { buff_seq.emplace_back(el.data(), el.size()); });
auto timer = getDeadlineTimer(_socket, timeout);
return asio::async_compose<TokenT, void(asio::error_code)>(
return asio::async_compose<TokenT, void(std::error_code)>(
[start = true, buff_seq = std::move(buff_seq), timer = std::move(timer), this](
auto& self, asio::error_code ec = {}) mutable {
auto& self, std::error_code ec = {}, size_t = 0) mutable {
if (!ec) {
if (start) {
start = false;
@ -385,8 +430,8 @@ public:
auto timer = getDeadlineTimer(_socket, timeout);
return asio::async_compose<TokenT, void(asio::error_code, msg_t)>(
[out_flags, do_read = true, timer = std::move(timer), this](auto& self, asio::error_code ec = {},
return asio::async_compose<TokenT, void(std::error_code, msg_t)>(
[out_flags, do_read = true, timer = std::move(timer), this](auto& self, std::error_code ec = {},
size_t nbytes = 0) mutable {
msg_t msg;
@ -471,6 +516,12 @@ public:
} else {
self.complete(ec, {msg.begin(), msg.end()});
}
// if constexpr (adc_asio_special_comp_token_c<TokenT>) {
// self.complete(ec, std::move(msg));
// } else { // may be of non-RMSGT type
// self.complete(ec, {msg.begin(), msg.end()});
// }
},
token, _receiveStrand);
}
@ -487,14 +538,14 @@ public:
template <traits::adc_input_char_range R, traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_SEND_TIMEOUT)>
auto send(const R& msg, const TimeoutT& timeout = DEFAULT_SEND_TIMEOUT)
{
std::future<void> ftr = asyncSend(msg, timeout, asio::use_future);
std::future<void> ftr = asyncSend(msg, asio::use_future, timeout);
ftr.get();
}
template <traits::adc_time_duration_c TimeoutT = decltype(DEFAULT_RECEIVE_TIMEOUT)>
auto receive(const TimeoutT& timeout = DEFAULT_RECEIVE_TIMEOUT)
{
std::future<RMSGT> ftr = asyncReceive(timeout, asio::use_future);
std::future<RMSGT> ftr = asyncReceive(asio::use_future, timeout);
return ftr.get();
}
@ -589,8 +640,8 @@ protected:
};
// static_assert(adc::interfaces::adc_netservice_c<AdcNetServiceASIOBase<asio::ip::tcp, adc::AdcStopSeqSessionProto<>>>,
// "");
static_assert(adc::interfaces::adc_netservice_c<AdcNetServiceASIOBase<asio::ip::tcp, adc::AdcStopSeqSessionProto<>>>,
"");
static_assert(adc::interfaces::adc_netsession_proto_c<adc::AdcStopSeqSessionProto<>>, "");
@ -694,8 +745,8 @@ public:
auto timer = getDeadlineTimer(srv._socket, timeout);
asio::async_compose<decltype(token), void(asio::error_code)>(
[start = true, endpoint, timer = std::move(timer), &srv](auto& self, asio::error_code ec = {}) mutable {
asio::async_compose<decltype(token), void(std::error_code)>(
[start = true, endpoint, timer = std::move(timer), &srv](auto& self, std::error_code ec = {}) mutable {
if (!ec) {
if (start) {
start = false;
@ -744,9 +795,9 @@ public:
auto timer = getDeadlineTimer(_socket, timeout);
return asio::async_compose<decltype(token), void(asio::error_code)>(
return asio::async_compose<decltype(token), void(std::error_code)>(
[start = true, buff_seq = std::move(buff_seq), timer = std::move(timer), this](
auto& self, asio::error_code ec = {}) mutable {
auto& self, std::error_code ec = {}) mutable {
if (!ec) {
if (start) {
start = false;

View File

@ -7,8 +7,8 @@
template <typename T>
void receive(T srv)
{
srv.asyncReceive(
[&srv](std::error_code ec, std::string msg) {
srv->asyncReceive(
[srv](std::error_code ec, std::string msg) {
if (!ec) {
std::cout << "Received: [" << msg << "]\n";
receive(std::move(srv));
@ -34,19 +34,9 @@ int main()
typename srv_t::acceptor_t acc(ctx, ept_c);
acc.asyncAccept([](std::error_code ec, srv_t srv) {
acc.asyncAccept([](std::error_code ec, auto srv) {
if (!ec) {
// receive(std::move(srv));
srv.asyncReceive(
[](std::error_code ec, std::string msg) {
if (!ec) {
std::cout << "Received: [" << msg << "]\n";
// receive(std::move(srv));
} else {
std::cout << "Received: " << ec.message() << "\n";
}
},
std::chrono::minutes(1));
receive(std::move(srv));
} else {
std::cout << "ACCEPT ERR: " << ec.message() << "\n";
}