ran client sessions in separated thread pool

This commit is contained in:
Timur A. Fatkhullin 2025-11-26 18:01:34 +03:00
parent a42f6dbc98
commit 43638f383f
4 changed files with 128 additions and 62 deletions

View File

@ -159,33 +159,33 @@ AsibFM700ServoController::error_t AsibFM700ServoController::hardwareGetState(har
state->speedX = mdata.encYspeed.val;
state->speedY = mdata.encXspeed.val;
state->stateX = mdata.Xstate;
state->stateY = mdata.Ystate;
state->stateX = mdata.Ystate;
state->stateY = mdata.Xstate;
if (mdata.Xstate == AXIS_STOPPED) {
if (mdata.Ystate == AXIS_STOPPED) {
state->moving_state = hardware_moving_state_t::HW_MOVE_STOPPED;
} else if (mdata.Ystate == AXIS_SLEWING) {
if (mdata.Xstate == AXIS_ERROR || mdata.Ystate == AXIS_ERROR) {
state->moving_state = hardware_moving_state_t::HW_MOVE_ERROR;
} else {
if (mdata.Xstate == AXIS_STOPPED) {
if (mdata.Ystate == AXIS_STOPPED) {
state->moving_state = hardware_moving_state_t::HW_MOVE_STOPPED;
} else if (mdata.Ystate == AXIS_SLEWING) {
state->moving_state = hardware_moving_state_t::HW_MOVE_SLEWING;
} else if (mdata.Ystate == AXIS_POINTING) {
state->moving_state = hardware_moving_state_t::HW_MOVE_ADJUSTING;
} else if (mdata.Ystate == AXIS_GUIDING) {
state->moving_state = hardware_moving_state_t::HW_MOVE_GUIDING;
} else {
state->moving_state = hardware_moving_state_t::HW_MOVE_UNKNOWN;
}
} else if (mdata.Xstate == AXIS_SLEWING) {
state->moving_state = hardware_moving_state_t::HW_MOVE_SLEWING;
} else if (mdata.Ystate == AXIS_POINTING) {
} else if (mdata.Xstate == AXIS_POINTING) {
state->moving_state = hardware_moving_state_t::HW_MOVE_ADJUSTING;
} else if (mdata.Ystate == AXIS_GUIDING) {
} else if (mdata.Xstate == AXIS_GUIDING) {
state->moving_state = hardware_moving_state_t::HW_MOVE_GUIDING;
} else if (mdata.Ystate == AXIS_ERROR) {
state->moving_state = hardware_moving_state_t::HW_MOVE_ERROR;
} else {
state->moving_state = hardware_moving_state_t::HW_MOVE_UNKNOWN;
}
} else if (mdata.Xstate == AXIS_SLEWING) {
state->moving_state = hardware_moving_state_t::HW_MOVE_SLEWING;
} else if (mdata.Xstate == AXIS_POINTING) {
state->moving_state = hardware_moving_state_t::HW_MOVE_ADJUSTING;
} else if (mdata.Xstate == AXIS_GUIDING) {
state->moving_state = hardware_moving_state_t::HW_MOVE_GUIDING;
} else if (mdata.Xstate == AXIS_ERROR) {
state->moving_state = hardware_moving_state_t::HW_MOVE_ERROR;
} else {
state->moving_state = hardware_moving_state_t::HW_MOVE_UNKNOWN;
}
}

View File

@ -23,6 +23,7 @@
#include <asio/signal_set.hpp>
#include <asio/steady_timer.hpp>
#include <asio/streambuf.hpp>
#include <asio/thread_pool.hpp>
#include <asio/write.hpp>
#include <spdlog/sinks/null_sink.h>
@ -181,7 +182,8 @@ public:
_asioContext(ctx),
_handleMessageFunc(func),
_stopSignal(ctx),
_restartSignal(ctx)
_restartSignal(ctx),
_sessionThreadPool(7)
{
std::stringstream st;
st << std::this_thread::get_id();
@ -359,10 +361,21 @@ public:
// start accepting connections
for (;;) {
st.str("");
st << std::this_thread::get_id();
logDebug(std::format("Start accepting new connections (thread ID = {}) ...", st.str()));
auto sock = co_await acc.async_accept(asio::use_awaitable);
// start new client session
asio::co_spawn(_asioContext, startSession(std::move(sock)), asio::detached);
logDebug(std::format("session was spawned, start accepting new connections ..."));
logDebug("Spawn new user session ...");
// asio::co_spawn(_asioContext, startSession(std::move(sock)), asio::detached);
// use of sessions own thread pool
asio::co_spawn(_sessionThreadPool, startSession(std::move(sock)), asio::detached);
logDebug("The session was spawned");
}
@ -621,6 +634,8 @@ protected:
std::mutex _serialPortsMutex, _tcpSocketsMutex, _localStreamSocketsMutex, _localSeqpackSocketsMutex;
asio::thread_pool _sessionThreadPool;
// helpers
template <typename OptT, typename... OptTs>
void setSerialOpts(asio::serial_port& s_port, OptT&& opt, OptTs&&... opts)

View File

@ -409,13 +409,34 @@ public:
mcc::MccAngleFancyString(tdata.speedX),
mcc::MccAngleFancyString(tdata.speedY)));
pz_err = controls->inPZone(cpt, &in_zone);
in_zone_vec.clear();
pz_err = controls->inPZone(cpt, &in_zone, &in_zone_vec);
if (pz_err) {
*_stopSlewing = true;
return mcc_deduce_error_code(pz_err, MccSimpleSlewingModelErrorCode::ERROR_PZONE_CONTAINER_COMP);
}
if (in_zone) {
size_t i = 0;
for (; i < in_zone_vec.size(); ++i) {
if (in_zone_vec[i]) {
break;
}
}
logger.logError(
"target point is near prohibited zone (zone index: {})! Entered target coordinates:", i);
logger.logError(std::format(
" RA-APP, DEC-APP, HA, LST: {}, {}, {}, {}", mcc::MccAngle{tdata.RA_APP}.sexagesimal(true),
mcc::MccAngle{tdata.DEC_APP}.sexagesimal(), mcc::MccAngle{tdata.HA}.sexagesimal(true),
mcc::MccAngle{tdata.LST}.sexagesimal(true)));
logger.logError(std::format(" AZ, ZD, ALT: {}, {}, {}", mcc::MccAngle{tdata.AZ}.sexagesimal(),
mcc::MccAngle{tdata.ZD}.sexagesimal(),
mcc::MccAngle{tdata.ALT}.sexagesimal()));
logger.logError(std::format(" hardware X, Y: {}, {}", mcc::MccAngle{tdata.X}.sexagesimal(),
mcc::MccAngle{tdata.Y}.sexagesimal()));
*_stopSlewing = true;
return MccSimpleSlewingModelErrorCode::ERROR_NEAR_PZONE;
}
@ -431,12 +452,17 @@ public:
}
}
logger.logTrace(std::format("get hw state ..."));
hw_err = controls->hardwareGetState(&hw_state);
if (hw_err) {
*_stopSlewing = true;
return mcc_deduce_error_code(hw_err, MccSimpleSlewingModelErrorCode::ERROR_HW_GETSTATE);
}
logger.logTrace(std::format("hw state was updated ({}, {})", MccAngle(hw_state.X).sexagesimal(true),
MccAngle(hw_state.Y).sexagesimal()));
if (slew_and_stop) { // just wait for mount to be stopped
if (hw_state.moving_state == CONTROLS_T::hardware_moving_state_t::HW_MOVE_STOPPED) {
logger.logInfo("mount moving state is STOPPED - exit!");
@ -451,19 +477,28 @@ public:
logger.logTrace(std::format(" target-to-mount distance: {}", mcc::MccAngleFancyString(dist)));
// if (dist < _currentParams.adjustCoordDiff) {
// if (dist < 1.0_degs) {
if (dist <= _currentParams.slewToleranceRadius) { // stop slewing and exit from cycle
logger.logInfo("target-to-mount distance is lesser than slew tolerance radius - exit!");
break;
}
if (*_stopSlewing) {
return MccSimpleSlewingModelErrorCode::ERROR_STOPPED;
}
// resend new position since target coordinates are changed in time
hw_state.X = (double)tdata.target.X;
hw_state.Y = (double)tdata.target.Y;
logger.logTrace(std::format("Send to hardware: X = {} degs, Y = {} degs",
mcc::MccAngle{hw_state.X}.degrees(),
mcc::MccAngle{hw_state.Y}.degrees()));
logger.logTrace(
std::format("Send to hardware: X = {} degs, Y = {} degs ({}, {})",
mcc::MccAngle{hw_state.X}.degrees(), mcc::MccAngle{hw_state.Y}.degrees(),
MccAngle(hw_state.X).sexagesimal(true), MccAngle(hw_state.Y).sexagesimal()));
hw_err = controls->hardwareSetState(hw_state);
if (hw_err) {
*_stopSlewing = true;
@ -471,10 +506,26 @@ public:
}
{
std::lock_guard lock{*_currentParamsMutex};
logger.logDebug(" the 'hardwareSetState' method performed successfully!");
logger.logDebug(" the 'hardwareSetState' method performed successfully!");
// }
// FOR DEBUG PURPOSE!!!!
std::this_thread::sleep_for(std::chrono::milliseconds(50));
logger.logTrace(std::format("get hw state right after hardwareSetState ..."));
hw_err = controls->hardwareGetState(&hw_state);
if (hw_err) {
*_stopSlewing = true;
return mcc_deduce_error_code(hw_err, MccSimpleSlewingModelErrorCode::ERROR_HW_GETSTATE);
}
logger.logTrace(std::format("hw state was updated ({}, {})", MccAngle(hw_state.X).sexagesimal(true),
MccAngle(hw_state.Y).sexagesimal()));
}
if (*_stopSlewing) {
return MccSimpleSlewingModelErrorCode::ERROR_STOPPED;
}
// sleep here

View File

@ -262,7 +262,7 @@ public:
};
_updateFunc = [controls, this](std::stop_token stop_token) -> std::error_code {
std::lock_guard lock{*_updateMutex};
// std::lock_guard lock{*_updateMutex};
// first, update mount quantities
typename hardware_t::hardware_state_t hw_pos;
@ -420,38 +420,11 @@ public:
_internalUpdatingStopSource = std::stop_source{};
*_internalUpdating = false;
_dataUpdatingRequested->clear();
_dataUpdatingStart->clear();
// _dataUpdatingRequested->clear();
// _dataUpdatingStart->clear();
_updatingFuture =
std::async(std::launch::async, &MccTelemetry::updateLoop, this, _internalUpdatingStopSource.get_token());
// _updatingFuture = std::async(
// std::launch::async,
// [controls, this](std::stop_token stoken) {
// bool stop_flag = stoken.stop_requested();
// // controls->logTrace(std::format("stop_requested() = {}", stop_flag));
// // while (!stoken.stop_requested()) {
// while (!stop_flag) {
// _dataUpdatingRequested->wait(false);
// stop_flag = stoken.stop_requested();
// if (!stop_flag) {
// // if (!stoken.stop_requested()) {
// *_internalUpdating = true;
// std::lock_guard lock{*_timeoutMutex};
// _dataUpdatingStart->test_and_set();
// _dataUpdatingStart->notify_all();
// _lastUpdateError = _updateFunc(stoken);
// _dataUpdatingStart->clear();
// _dataUpdatingRequested->clear();
// }
// }
// },
// _internalUpdatingStopSource.get_token());
// _updatingFuture =
// std::async(std::launch::async, &MccTelemetry::updateLoop, this, _internalUpdatingStopSource.get_token());
}
@ -514,6 +487,10 @@ public:
// asynchronuosly periodicaly update telemetry data (internal synchronization)
void startInternalTelemetryDataUpdating()
{
*_internalUpdating = true;
_internalUpdatingStopSource = std::stop_source{};
return;
using intv_t = std::remove_cvref_t<decltype(_currentUpdateInterval)>;
if (_internalUpdatingStopSource.stop_requested()) {
@ -549,6 +526,9 @@ public:
{
// reset all possible locks
_internalUpdatingStopSource.request_stop();
*_internalUpdating = false;
return;
_dataUpdatingRequested->test_and_set();
_dataUpdatingRequested->notify_one();
@ -581,6 +561,26 @@ public:
error_t updateTelemetryData(traits::mcc_time_duration_c auto const& timeout)
{
std::lock_guard thread_lock{*_updateMutex};
// _internalUpdatingStopSource = std::stop_source{};
// return _lastUpdateError = _updateFunc(_internalUpdatingStopSource.get_token());
auto ft = std::async(std::launch::async, _updateFunc, _internalUpdatingStopSource.get_token());
auto st = ft.wait_for(timeout);
if (st == std::future_status::ready) {
return _lastUpdateError;
} else if (st == std::future_status::deferred) {
ft.get();
_lastUpdateError = MccTelemetryErrorCode::ERROR_DATA_TIMEOUT;
} else {
_lastUpdateError = MccTelemetryErrorCode::ERROR_DATA_TIMEOUT;
}
return _lastUpdateError;
// trigger updating
_dataUpdatingRequested->test_and_set();
_dataUpdatingRequested->notify_one();