diff --git a/net/adc_netproto.h b/net/adc_netproto.h index bb01e24..8398181 100644 --- a/net/adc_netproto.h +++ b/net/adc_netproto.h @@ -36,16 +36,27 @@ struct AdcStopSeqSessionProto { typedef std::string proto_ident_t; - proto_ident_t ident() const { return "STOP SEQUENCE PROTO"; } + proto_ident_t ident() const + { + return "STOP SEQUENCE PROTO"; + } template auto search(const R& r) { std::tuple, std::ranges::iterator_t, bool> res{r.begin(), r.end(), false}; - std::get<1>(res) = std::search(r.begin(), r.end(), STOP_SEQ.begin(), STOP_SEQ.end()); - if (std::get<1>(res) != r.end()) { // move iterator to the one-past-the-end position - std::advance(std::get<1>(res), STOP_SEQ_SIZE); + if (!r.size()) { + return res; + } + + // std::get<1>(res) = std::search(r.begin(), r.end(), STOP_SEQ.begin(), STOP_SEQ.end()); + auto found = std::ranges::search(r, STOP_SEQ); + // if (std::get<1>(res) != r.end()) { // move iterator to the one-past-the-end position + if (!found.empty()) { // move iterator to the one-past-the-end position + // std::advance(std::get<1>(res), STOP_SEQ_SIZE); + // std::get<1>(res) = found.begin() + STOP_SEQ_SIZE; + std::get<1>(res) = found.end(); std::get<2>(res) = true; } @@ -53,6 +64,28 @@ struct AdcStopSeqSessionProto { } + template + std::pair search(IT begin, IT end) + { + auto res = std::make_pair(begin, false); + + if (begin == end) { + return res; + } + + res.first = std::search(begin, end, STOP_SEQ.begin(), STOP_SEQ.end()); + if (res.first != end) { + std::advance(res.first, STOP_SEQ_SIZE); // move iterator to the one-past-the-end position + res.second = true; + } else { + // may be only a part of valid byte sequence was received, + // so start next matching from previous begin-iterator + res.first = begin; + } + + return res; + } + template auto toProto(const R& r) { diff --git a/net/asio/adc_netservice_asio.h b/net/asio/adc_netservice_asio.h index 0e7faee..a71e9e9 100644 --- a/net/asio/adc_netservice_asio.h +++ b/net/asio/adc_netservice_asio.h @@ -191,7 +191,12 @@ public: static constexpr std::chrono::duration DEFAULT_RECEIVE_TIMEOUT = std::chrono::seconds(5); AdcNetServiceASIOBase(asio::io_context& ctx) - : SESSION_PROTOT(), _ioContext(ctx), _receiveStrand(_ioContext), _receiveQueue(), _socket(_ioContext) + : SESSION_PROTOT(), + _ioContext(ctx), + _receiveStrand(_ioContext), + _receiveQueue(), + _acceptor(_ioContext), + _socket(_ioContext) { } @@ -228,26 +233,30 @@ public: static_assert(false, "INVALID TRANSPORT PROTOCOL TYPE!"); } - auto acc = acceptor_t(_ioContext); + // auto acc = acceptor_t(_ioContext); - auto timer = getDeadlineTimer(acc, timeout); + // auto timer = getDeadlineTimer(acc, timeout); + auto timer = getDeadlineTimer(_acceptor, timeout); return asio::async_compose( - [acc = std::move(acc), timer = std::move(timer), start = true, &endpoint, this]( - auto& self, std::error_code ec = {}) mutable { + // [acc = std::move(acc), timer = std::move(timer), start = true, &endpoint, this]( + [timer = std::move(timer), start = true, &endpoint, this](auto& self, std::error_code ec = {}) mutable { if (!ec) { if (start) { start = false; try { - acc = acceptor_t(_ioContext, endpoint); - // _socket.open(asio::ip::tcp::v4()); + // acc = acceptor_t(_ioContext, endpoint); + if (!_acceptor.is_open() || (_acceptor.local_endpoint() != endpoint)) { + _acceptor = acceptor_t(_ioContext, endpoint); + } } catch (std::system_error err) { timer->cancel(); self.complete(err.code()); return; } - return acc.async_accept(_socket, std::move(self)); + // return acc.async_accept(_socket, std::move(self)); + return _acceptor.async_accept(_socket, std::move(self)); } } @@ -354,8 +363,10 @@ public: std::remove_cvref_t::args_t>>>; // auto s_res = std::make_sharedtemplate search), RMSGT>>(); + auto tp = this->search(std::span()); auto s_res = std::make_shared(); + // auto s_res = std::make_shared>(); auto out_flags = std::make_shared(); @@ -390,13 +401,14 @@ public: // std::pair res{std::get<1>(*s_res), std::get<2>(*s_res)}; // return res; // }; - auto match_func = [s_res, this](asio_streambuff_iter_t begin, asio_streambuff_iter_t end) { - *s_res = this->search(std::span(&*begin, &*end)); - // return std::make_pair(std::get<1>(*s_res), std::get<2>(*s_res)); - auto N = std::distance(std::get<0>(*s_res), std::get<1>(*s_res)); - std::pair res{begin + N, std::get<2>(*s_res)}; - return res; - }; + // auto match_func = [s_res, this](asio_streambuff_iter_t begin, asio_streambuff_iter_t end) + // { + // *s_res = this->search(std::span(&*begin, &*end)); + // // return std::make_pair(std::get<1>(*s_res), std::get<2>(*s_res)); + // auto N = std::distance(std::get<0>(*s_res), std::get<1>(*s_res)); + // std::pair res{begin + N, std::get<2>(*s_res)}; + // return res; + // }; // return asio::async_read_until(_socket, _streamBuffer, std::move(match_func), // std::move(self)); @@ -540,7 +552,7 @@ protected: socket_t _socket; - // acceptor_t _acceptor; + acceptor_t _acceptor; asio::streambuf _streamBuffer; @@ -551,10 +563,22 @@ protected: template auto MatchCondition(asio_streambuff_iter_t begin, asio_streambuff_iter_t end, T& s_res) { - *s_res = this->search(std::span(&*begin, &*end)); + if (begin == end) { + *s_res = this->search(std::span()); + } else { + *s_res = this->search(std::span(&*begin, &*end)); + // *s_res = this->search(begin, end); + } + // return std::make_pair(std::get<1>(*s_res), std::get<2>(*s_res)); - auto N = std::distance(std::get<0>(*s_res), std::get<1>(*s_res)); - std::pair res{begin + N, std::get<2>(*s_res)}; + + std::pair res{end, false}; + + typename std::iterator_traits::difference_type N = 0; + if (std::get<2>(*s_res)) { + N = std::distance(std::get<0>(*s_res), std::get<1>(*s_res)); + res = std::make_pair(begin + N, true); + } return res; }; diff --git a/tests/adc_netservice_test.cpp b/tests/adc_netservice_test.cpp index 3535b24..3f84616 100644 --- a/tests/adc_netservice_test.cpp +++ b/tests/adc_netservice_test.cpp @@ -7,8 +7,11 @@ int main() { asio::ip::tcp::endpoint ept_s(asio::ip::tcp::v4(), 9999); - // asio::ip::tcp::endpoint ept_c(asio::ip::make_address_v4("127.0.0.1"), 9999); - asio::ip::tcp::endpoint ept_c(asio::ip::tcp::v4(), 9999); + asio::ip::tcp::endpoint ept_c(asio::ip::make_address_v4("0.0.0.0"), 9999); + // asio::ip::tcp::endpoint ept_c(asio::ip::address_v4::any(), 9999); + + std::cout << "ADDR: " << ept_s << "\n"; + std::cout << "ADDR: " << ept_c << "\n"; asio::io_context ctx; @@ -29,21 +32,24 @@ int main() // srv.asyncConnect(ept_c, s_ctx); // auto res = srv.asyncConnect(ept_c, asio::use_awaitable); - srv.asyncAccept(ept_c, [&srv](std::error_code ec) { - if (!ec) { - std::cout << "New connection\n"; + srv.asyncAccept( + ept_c, + [&srv](std::error_code ec) { + if (!ec) { + std::cout << "New connection\n"; - srv.asyncReceive( - [](std::error_code ec, std::string msg) { - if (!ec) { - std::cout << "Received: " << msg << "\n"; - } - }, - std::chrono::minutes(3)); - } else { - std::cout << "ACCEPT ERR: " << ec.message() << "\n"; - } - }); + srv.asyncReceive( + [](std::error_code ec, std::string msg) { + if (!ec) { + std::cout << "Received: [" << msg << "]\n"; + } + }, + std::chrono::minutes(3)); + } else { + std::cout << "ACCEPT ERR: " << ec.message() << "\n"; + } + }, + std::chrono::minutes(10)); ctx.run();