ADC/net/adc_netservice_asio.h
2024-06-13 18:24:52 +03:00

221 lines
8.4 KiB
C++

#pragma once
/*
ABSTRACT DEVICE COMPONENTS LIBRARY
ASIO-library implementation of network service
*/
#ifdef USE_ASIO_LIBRARY
#include <asio/awaitable.hpp>
#include <asio/basic_datagram_socket.hpp>
#include <asio/basic_seq_packet_socket.hpp>
#include <asio/basic_stream_socket.hpp>
#include <asio/compose.hpp>
#include <asio/experimental/awaitable_operators.hpp>
#include <asio/read_until.hpp>
#include <asio/steady_timer.hpp>>
#include <asio/streambuf.hpp>
#include <asio/write.hpp>
#include <concepts>
namespace adc::impl
{
template <typename NetMessageT, typename InetProtoT>
class AdcNetServiceASIO : public InetProtoT
{
public:
using socket_t = typename InetProtoT::socket;
using streambuff_iter_t = asio::buffers_iterator<asio::streambuf::const_buffers_type>;
template <typename... CtorArgTs>
AdcNetServiceASIO(socket_t& sock, CtorArgTs&&... ctor_args)
: InetProtoT(std::forward<CtorArgTs>(ctor_args)...), _socket(sock)
{
}
virtual ~AdcNetServiceASIO() = default;
template <typename TimeoutT, asio::completion_token_for<void(std::error_code, size_t)> CompletionTokenT>
auto asynSend(const NetMessageT& msg, const TimeoutT& timeout, CompletionTokenT&& token)
{
// create buffer sequence
std::vector<asio::const_buffer> buff;
std::ranges::for_each(msg.template bytesView<std::vector<std::string_view>>(),
[&buff](const auto& el) { buff.emplace_back(el); });
auto timer = getDeadlineTimer(timeout);
// wrapper
return asio::async_compose<CompletionTokenT, void(std::error_code)>(
[buff = std::move(buff), timer = std::move(timer), this](auto& self, const std::error_code& ec = {},
size_t sz = 0) {
if (!ec) {
if constexpr (std::derived_from<socket_t,
asio::basic_stream_socket<typename socket_t::protocol_type>>) {
return asio::async_write(_socket, buff, std::move(self));
} else if constexpr (std::derived_from<
socket_t, asio::basic_datagram_socket<typename socket_t::protocol_type>>) {
return _socket.async_send(buff, std::move(self));
} else if constexpr (std::derived_from<socket_t, asio::basic_seq_packet_socket<
typename socket_t::protocol_type>>) {
return _socket.async_send(buff, std::move(self));
} else {
static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!");
}
}
timer->cancel();
self.complete(ec);
},
token, _socket);
}
template <typename TimeoutT, typename CompletionTokenT>
auto asyncReceive(const TimeoutT& timeout, CompletionTokenT&& token)
{
std::shared_ptr<asio::socket_base::message_flags> out_flags;
auto timer = getDeadlineTimer(timeout); // armed timer
return asio::async_compose<CompletionTokenT, void(const std::error_code&, const NetMessageT&)>(
[timer = std::move(timer), out_flags, this](auto& self, const std::error_code& ec = {}, size_t sz = 0) {
if (!ec) {
if constexpr (std::derived_from<socket_t,
asio::basic_stream_socket<typename socket_t::protocol_type>>) {
return asio::async_read_until(
_socket, _streamBuffer, [this](auto begin, auto end) { this->matchCondition(begin, end); },
std::move(self));
} else if constexpr (std::derived_from<
socket_t, asio::basic_datagram_socket<typename socket_t::protocol_type>>) {
return _socket.receive(_streamBuffer,
std::move(self)); // datagram, so it should be received at once
} else if constexpr (std::derived_from<socket_t, asio::basic_seq_packet_socket<
typename socket_t::protocol_type>>) {
return _socket.receive(_streamBuffer, *out_flags,
std::move(self)); // datagram, so it should be received at once
} else {
static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!");
}
}
timer->cancel();
if (ec) {
self.complete(ec, NetMessageT()); // return an empty message
} else {
auto begin_it = streambuff_iter_t::begin(_streamBuffer.data());
auto end_it = begin_it + _streamBuffer.data().size();
// check for byte sequence is valid byte sequence and find the limits
// (stream buffer may contain number of bytes more than requred by protocol)
auto res = this->matchCondition(begin_it, end_it);
if (!res.second) {
self.complete(std::make_error_code(std::errc::protocol_error),
NetMessageT()); // return an empty message
} else {
auto nbytes = std::distance(begin_it, res.first);
NetMessageT msg;
auto msg_it = this->fromLowLevel(begin_it, res.first);
msg.setFromBytes(msg_it.first, msg_it.second);
_streamBuffer.consume(nbytes);
self.complete(ec, msg);
}
}
},
token, _socket);
}
protected:
socket_t& _socket;
asio::streambuf _streamBuffer;
template <typename TimeoutT>
std::unique_ptr<asio::steady_timer> getDeadlineTimer(const TimeoutT& timeout, bool arm = true)
{
std::unique_ptr<asio::steady_timer> timer(_socket.get_executor());
if (arm) {
timer->expires_after(timeout);
timer->async_wait([this](const std::error_code& ec) {
if (!ec) {
_socket.cancel(std::make_error_code(std::errc::timed_out));
}
});
}
return timer;
}
std::vector<asio::const_buffer> createConstBufferSequence(const NetMessageT& msg)
{
std::vector<asio::const_buffer> buff;
for (const auto& el : msg.template bytesView<std::vector<std::string_view>>()) {
buff.emplace_back(asio::const_buffer(el));
}
return buff;
}
asio::awaitable<void> asyncSendImpl(const NetMessageT& msg)
{
// for (const auto& buff : msg.bytesView()) {
if constexpr (std::derived_from<socket_t, asio::basic_stream_socket<typename socket_t::protocol_type>>) {
// asio::async_write(_socket, buff, asio::use_awaitable);
co_await asio::async_write(_socket, msg.bytesView(), asio::use_awaitable);
} else if constexpr (std::derived_from<socket_t,
asio::basic_datagram_socket<typename socket_t::protocol_type>>) {
co_await _socket.async_send(msg.bytesView(), asio::use_awaitable);
} else if constexpr (std::derived_from<socket_t,
asio::basic_seq_packet_socket<typename socket_t::protocol_type>>) {
co_await _socket.async_send(msg.bytesView(), asio::use_awaitable);
} else {
static_assert(false, "UNKNOWN ASIO-LIBRARY SOCKET TYPE!!!");
}
// }
}
template <typename TimepointT>
asio::awaitable<void> watchdog(TimepointT& deadline, std::error_code& ec)
{
ec.clear();
typename TimepointT::clock timer(co_await asio::this_coro::executor);
auto now = TimepointT::clock::template now();
while (deadline > now) {
timer.expires_at(deadline);
co_await timer.async_wait(asio::use_awaitable);
now = TimepointT::clock::template now();
}
ec = std::make_error_code(std::errc::timed_out);
throw std::system_error(ec);
}
};
} // namespace adc::impl
#endif