From 43638f383fbb7f3e4fc2c86c6f3945daf9430374 Mon Sep 17 00:00:00 2001 From: "Timur A. Fatkhullin" Date: Wed, 26 Nov 2025 18:01:34 +0300 Subject: [PATCH] ran client sessions in separated thread pool --- asibfm700/asibfm700_servocontroller.cpp | 40 +++++++-------- mcc/mcc_netserver.h | 21 ++++++-- mcc/mcc_slewing_model.h | 65 ++++++++++++++++++++++--- mcc/mcc_telemetry.h | 64 ++++++++++++------------ 4 files changed, 128 insertions(+), 62 deletions(-) diff --git a/asibfm700/asibfm700_servocontroller.cpp b/asibfm700/asibfm700_servocontroller.cpp index 969ac56..b62bbb7 100644 --- a/asibfm700/asibfm700_servocontroller.cpp +++ b/asibfm700/asibfm700_servocontroller.cpp @@ -159,33 +159,33 @@ AsibFM700ServoController::error_t AsibFM700ServoController::hardwareGetState(har state->speedX = mdata.encYspeed.val; state->speedY = mdata.encXspeed.val; - state->stateX = mdata.Xstate; - state->stateY = mdata.Ystate; + state->stateX = mdata.Ystate; + state->stateY = mdata.Xstate; - if (mdata.Xstate == AXIS_STOPPED) { - if (mdata.Ystate == AXIS_STOPPED) { - state->moving_state = hardware_moving_state_t::HW_MOVE_STOPPED; - } else if (mdata.Ystate == AXIS_SLEWING) { + if (mdata.Xstate == AXIS_ERROR || mdata.Ystate == AXIS_ERROR) { + state->moving_state = hardware_moving_state_t::HW_MOVE_ERROR; + } else { + if (mdata.Xstate == AXIS_STOPPED) { + if (mdata.Ystate == AXIS_STOPPED) { + state->moving_state = hardware_moving_state_t::HW_MOVE_STOPPED; + } else if (mdata.Ystate == AXIS_SLEWING) { + state->moving_state = hardware_moving_state_t::HW_MOVE_SLEWING; + } else if (mdata.Ystate == AXIS_POINTING) { + state->moving_state = hardware_moving_state_t::HW_MOVE_ADJUSTING; + } else if (mdata.Ystate == AXIS_GUIDING) { + state->moving_state = hardware_moving_state_t::HW_MOVE_GUIDING; + } else { + state->moving_state = hardware_moving_state_t::HW_MOVE_UNKNOWN; + } + } else if (mdata.Xstate == AXIS_SLEWING) { state->moving_state = hardware_moving_state_t::HW_MOVE_SLEWING; - } else if (mdata.Ystate == AXIS_POINTING) { + } else if (mdata.Xstate == AXIS_POINTING) { state->moving_state = hardware_moving_state_t::HW_MOVE_ADJUSTING; - } else if (mdata.Ystate == AXIS_GUIDING) { + } else if (mdata.Xstate == AXIS_GUIDING) { state->moving_state = hardware_moving_state_t::HW_MOVE_GUIDING; - } else if (mdata.Ystate == AXIS_ERROR) { - state->moving_state = hardware_moving_state_t::HW_MOVE_ERROR; } else { state->moving_state = hardware_moving_state_t::HW_MOVE_UNKNOWN; } - } else if (mdata.Xstate == AXIS_SLEWING) { - state->moving_state = hardware_moving_state_t::HW_MOVE_SLEWING; - } else if (mdata.Xstate == AXIS_POINTING) { - state->moving_state = hardware_moving_state_t::HW_MOVE_ADJUSTING; - } else if (mdata.Xstate == AXIS_GUIDING) { - state->moving_state = hardware_moving_state_t::HW_MOVE_GUIDING; - } else if (mdata.Xstate == AXIS_ERROR) { - state->moving_state = hardware_moving_state_t::HW_MOVE_ERROR; - } else { - state->moving_state = hardware_moving_state_t::HW_MOVE_UNKNOWN; } } diff --git a/mcc/mcc_netserver.h b/mcc/mcc_netserver.h index 44e1bf8..59f7adb 100644 --- a/mcc/mcc_netserver.h +++ b/mcc/mcc_netserver.h @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -181,7 +182,8 @@ public: _asioContext(ctx), _handleMessageFunc(func), _stopSignal(ctx), - _restartSignal(ctx) + _restartSignal(ctx), + _sessionThreadPool(7) { std::stringstream st; st << std::this_thread::get_id(); @@ -359,10 +361,21 @@ public: // start accepting connections for (;;) { + st.str(""); + st << std::this_thread::get_id(); + logDebug(std::format("Start accepting new connections (thread ID = {}) ...", st.str())); + auto sock = co_await acc.async_accept(asio::use_awaitable); + // start new client session - asio::co_spawn(_asioContext, startSession(std::move(sock)), asio::detached); - logDebug(std::format("session was spawned, start accepting new connections ...")); + logDebug("Spawn new user session ..."); + + // asio::co_spawn(_asioContext, startSession(std::move(sock)), asio::detached); + + // use of sessions own thread pool + asio::co_spawn(_sessionThreadPool, startSession(std::move(sock)), asio::detached); + + logDebug("The session was spawned"); } @@ -621,6 +634,8 @@ protected: std::mutex _serialPortsMutex, _tcpSocketsMutex, _localStreamSocketsMutex, _localSeqpackSocketsMutex; + asio::thread_pool _sessionThreadPool; + // helpers template void setSerialOpts(asio::serial_port& s_port, OptT&& opt, OptTs&&... opts) diff --git a/mcc/mcc_slewing_model.h b/mcc/mcc_slewing_model.h index c95a8d5..f8ad5e6 100644 --- a/mcc/mcc_slewing_model.h +++ b/mcc/mcc_slewing_model.h @@ -409,13 +409,34 @@ public: mcc::MccAngleFancyString(tdata.speedX), mcc::MccAngleFancyString(tdata.speedY))); - pz_err = controls->inPZone(cpt, &in_zone); + in_zone_vec.clear(); + pz_err = controls->inPZone(cpt, &in_zone, &in_zone_vec); if (pz_err) { *_stopSlewing = true; return mcc_deduce_error_code(pz_err, MccSimpleSlewingModelErrorCode::ERROR_PZONE_CONTAINER_COMP); } if (in_zone) { + size_t i = 0; + for (; i < in_zone_vec.size(); ++i) { + if (in_zone_vec[i]) { + break; + } + } + + logger.logError( + "target point is near prohibited zone (zone index: {})! Entered target coordinates:", i); + logger.logError(std::format( + " RA-APP, DEC-APP, HA, LST: {}, {}, {}, {}", mcc::MccAngle{tdata.RA_APP}.sexagesimal(true), + mcc::MccAngle{tdata.DEC_APP}.sexagesimal(), mcc::MccAngle{tdata.HA}.sexagesimal(true), + mcc::MccAngle{tdata.LST}.sexagesimal(true))); + logger.logError(std::format(" AZ, ZD, ALT: {}, {}, {}", mcc::MccAngle{tdata.AZ}.sexagesimal(), + mcc::MccAngle{tdata.ZD}.sexagesimal(), + mcc::MccAngle{tdata.ALT}.sexagesimal())); + + logger.logError(std::format(" hardware X, Y: {}, {}", mcc::MccAngle{tdata.X}.sexagesimal(), + mcc::MccAngle{tdata.Y}.sexagesimal())); + *_stopSlewing = true; return MccSimpleSlewingModelErrorCode::ERROR_NEAR_PZONE; } @@ -431,12 +452,17 @@ public: } } + logger.logTrace(std::format("get hw state ...")); + hw_err = controls->hardwareGetState(&hw_state); if (hw_err) { *_stopSlewing = true; return mcc_deduce_error_code(hw_err, MccSimpleSlewingModelErrorCode::ERROR_HW_GETSTATE); } + logger.logTrace(std::format("hw state was updated ({}, {})", MccAngle(hw_state.X).sexagesimal(true), + MccAngle(hw_state.Y).sexagesimal())); + if (slew_and_stop) { // just wait for mount to be stopped if (hw_state.moving_state == CONTROLS_T::hardware_moving_state_t::HW_MOVE_STOPPED) { logger.logInfo("mount moving state is STOPPED - exit!"); @@ -451,19 +477,28 @@ public: logger.logTrace(std::format(" target-to-mount distance: {}", mcc::MccAngleFancyString(dist))); + // if (dist < _currentParams.adjustCoordDiff) { + // if (dist < 1.0_degs) { if (dist <= _currentParams.slewToleranceRadius) { // stop slewing and exit from cycle logger.logInfo("target-to-mount distance is lesser than slew tolerance radius - exit!"); break; } + if (*_stopSlewing) { + return MccSimpleSlewingModelErrorCode::ERROR_STOPPED; + } + + // resend new position since target coordinates are changed in time hw_state.X = (double)tdata.target.X; hw_state.Y = (double)tdata.target.Y; - logger.logTrace(std::format("Send to hardware: X = {} degs, Y = {} degs", - mcc::MccAngle{hw_state.X}.degrees(), - mcc::MccAngle{hw_state.Y}.degrees())); + logger.logTrace( + std::format("Send to hardware: X = {} degs, Y = {} degs ({}, {})", + mcc::MccAngle{hw_state.X}.degrees(), mcc::MccAngle{hw_state.Y}.degrees(), + MccAngle(hw_state.X).sexagesimal(true), MccAngle(hw_state.Y).sexagesimal())); + hw_err = controls->hardwareSetState(hw_state); if (hw_err) { *_stopSlewing = true; @@ -471,10 +506,26 @@ public: } - { - std::lock_guard lock{*_currentParamsMutex}; - logger.logDebug(" the 'hardwareSetState' method performed successfully!"); + logger.logDebug(" the 'hardwareSetState' method performed successfully!"); + // } + + + // FOR DEBUG PURPOSE!!!! + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + logger.logTrace(std::format("get hw state right after hardwareSetState ...")); + + hw_err = controls->hardwareGetState(&hw_state); + if (hw_err) { + *_stopSlewing = true; + return mcc_deduce_error_code(hw_err, MccSimpleSlewingModelErrorCode::ERROR_HW_GETSTATE); } + + logger.logTrace(std::format("hw state was updated ({}, {})", MccAngle(hw_state.X).sexagesimal(true), + MccAngle(hw_state.Y).sexagesimal())); + } + + if (*_stopSlewing) { + return MccSimpleSlewingModelErrorCode::ERROR_STOPPED; } // sleep here diff --git a/mcc/mcc_telemetry.h b/mcc/mcc_telemetry.h index bcd908c..b86da31 100644 --- a/mcc/mcc_telemetry.h +++ b/mcc/mcc_telemetry.h @@ -262,7 +262,7 @@ public: }; _updateFunc = [controls, this](std::stop_token stop_token) -> std::error_code { - std::lock_guard lock{*_updateMutex}; + // std::lock_guard lock{*_updateMutex}; // first, update mount quantities typename hardware_t::hardware_state_t hw_pos; @@ -420,38 +420,11 @@ public: _internalUpdatingStopSource = std::stop_source{}; *_internalUpdating = false; - _dataUpdatingRequested->clear(); - _dataUpdatingStart->clear(); + // _dataUpdatingRequested->clear(); + // _dataUpdatingStart->clear(); - _updatingFuture = - std::async(std::launch::async, &MccTelemetry::updateLoop, this, _internalUpdatingStopSource.get_token()); - - // _updatingFuture = std::async( - // std::launch::async, - // [controls, this](std::stop_token stoken) { - // bool stop_flag = stoken.stop_requested(); - // // controls->logTrace(std::format("stop_requested() = {}", stop_flag)); - // // while (!stoken.stop_requested()) { - // while (!stop_flag) { - // _dataUpdatingRequested->wait(false); - - // stop_flag = stoken.stop_requested(); - // if (!stop_flag) { - // // if (!stoken.stop_requested()) { - // *_internalUpdating = true; - // std::lock_guard lock{*_timeoutMutex}; - - // _dataUpdatingStart->test_and_set(); - // _dataUpdatingStart->notify_all(); - - // _lastUpdateError = _updateFunc(stoken); - - // _dataUpdatingStart->clear(); - // _dataUpdatingRequested->clear(); - // } - // } - // }, - // _internalUpdatingStopSource.get_token()); + // _updatingFuture = + // std::async(std::launch::async, &MccTelemetry::updateLoop, this, _internalUpdatingStopSource.get_token()); } @@ -514,6 +487,10 @@ public: // asynchronuosly periodicaly update telemetry data (internal synchronization) void startInternalTelemetryDataUpdating() { + *_internalUpdating = true; + _internalUpdatingStopSource = std::stop_source{}; + return; + using intv_t = std::remove_cvref_t; if (_internalUpdatingStopSource.stop_requested()) { @@ -549,6 +526,9 @@ public: { // reset all possible locks _internalUpdatingStopSource.request_stop(); + *_internalUpdating = false; + + return; _dataUpdatingRequested->test_and_set(); _dataUpdatingRequested->notify_one(); @@ -581,6 +561,26 @@ public: error_t updateTelemetryData(traits::mcc_time_duration_c auto const& timeout) { + std::lock_guard thread_lock{*_updateMutex}; + + // _internalUpdatingStopSource = std::stop_source{}; + // return _lastUpdateError = _updateFunc(_internalUpdatingStopSource.get_token()); + + + auto ft = std::async(std::launch::async, _updateFunc, _internalUpdatingStopSource.get_token()); + auto st = ft.wait_for(timeout); + if (st == std::future_status::ready) { + return _lastUpdateError; + } else if (st == std::future_status::deferred) { + ft.get(); + _lastUpdateError = MccTelemetryErrorCode::ERROR_DATA_TIMEOUT; + } else { + _lastUpdateError = MccTelemetryErrorCode::ERROR_DATA_TIMEOUT; + } + + return _lastUpdateError; + + // trigger updating _dataUpdatingRequested->test_and_set(); _dataUpdatingRequested->notify_one();