...
This commit is contained in:
@@ -24,7 +24,8 @@ enum class MccTelemetryErrorCode : int {
|
||||
ERROR_HARDWARE_GETPOS,
|
||||
ERROR_UPDATE_STOPPED,
|
||||
ERROR_DATA_TIMEOUT,
|
||||
ERROR_UNSUPPORTED_COORD_PAIR
|
||||
ERROR_UNSUPPORTED_COORD_PAIR,
|
||||
ERROR_UPDATE_LOOP_WAIT
|
||||
};
|
||||
|
||||
} // namespace mcc
|
||||
@@ -49,10 +50,7 @@ namespace mcc
|
||||
struct MccTelemetryCategory : public std::error_category {
|
||||
MccTelemetryCategory() : std::error_category() {}
|
||||
|
||||
const char* name() const noexcept
|
||||
{
|
||||
return "MCC-TELEMETRY";
|
||||
}
|
||||
const char* name() const noexcept { return "MCC-TELEMETRY"; }
|
||||
|
||||
std::string message(int ec) const
|
||||
{
|
||||
@@ -75,6 +73,8 @@ struct MccTelemetryCategory : public std::error_category {
|
||||
return "a timeout occured while waiting for new data";
|
||||
case MccTelemetryErrorCode::ERROR_UNSUPPORTED_COORD_PAIR:
|
||||
return "unsupported coordinate pair";
|
||||
case MccTelemetryErrorCode::ERROR_UPDATE_LOOP_WAIT:
|
||||
return "a timeout occured while waiting to exit the update loop";
|
||||
default:
|
||||
return "UNKNOWN";
|
||||
}
|
||||
@@ -256,6 +256,8 @@ public:
|
||||
};
|
||||
|
||||
_updateFunc = [controls, this](std::stop_token stop_token) -> std::error_code {
|
||||
std::lock_guard lock{*_updateMutex};
|
||||
|
||||
// first, update mount quantities
|
||||
typename hardware_t::hardware_state_t hw_pos;
|
||||
auto hw_err = controls->hardwareGetState(&hw_pos);
|
||||
@@ -394,26 +396,29 @@ public:
|
||||
|
||||
|
||||
// update thread
|
||||
// _updatingFuture = std::async(
|
||||
// std::launch::async,
|
||||
// [this](std::stop_token stoken) {
|
||||
// while (!stoken.stop_requested()) {
|
||||
// {
|
||||
// std::unique_lock ulock{*_updateMutex};
|
||||
_dataUpdatingRequested->clear();
|
||||
_dataUpdatingStart->clear();
|
||||
|
||||
// bool ok = _updateCondVar->wait(ulock, [&stoken, this]() -> bool {
|
||||
// return _dataUpdatingRequested || stoken.stop_requested();
|
||||
// });
|
||||
// }
|
||||
_updatingFuture = std::async(
|
||||
std::launch::async,
|
||||
[this](std::stop_token stoken) {
|
||||
while (!stoken.stop_requested()) {
|
||||
_dataUpdatingRequested->wait(false);
|
||||
|
||||
// if (!stoken.stop_requested()) {
|
||||
// std::lock_guard lock{*_timeoutMutex};
|
||||
if (!stoken.stop_requested()) {
|
||||
std::lock_guard lock{*_timeoutMutex};
|
||||
|
||||
// _lastUpdateError = _updateFunc(stoken);
|
||||
// }
|
||||
// }
|
||||
// },
|
||||
// _internalUpdatingStopSource.get_token());
|
||||
_dataUpdatingStart->test_and_set();
|
||||
_dataUpdatingStart->notify_all();
|
||||
|
||||
_lastUpdateError = _updateFunc(stoken);
|
||||
|
||||
_dataUpdatingStart->clear();
|
||||
_dataUpdatingRequested->clear();
|
||||
}
|
||||
}
|
||||
},
|
||||
_internalUpdatingStopSource.get_token());
|
||||
}
|
||||
|
||||
|
||||
@@ -428,11 +433,9 @@ public:
|
||||
{
|
||||
stopInternalTelemetryDataUpdating();
|
||||
|
||||
if (_internalUpdatingFuture.valid()) {
|
||||
if (_updatingFuture.valid()) {
|
||||
// try to exit correctly
|
||||
// auto status = _internalUpdatingFuture.wait_for(std::chrono::seconds(1));
|
||||
_internalUpdatingFuture.wait_for(std::chrono::seconds(1));
|
||||
// _internalUpdatingFuture.get();
|
||||
_updatingFuture.wait_for(std::chrono::seconds(1));
|
||||
}
|
||||
};
|
||||
|
||||
@@ -470,124 +473,77 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
auto getTelemetryUpdateTimeout() const
|
||||
{
|
||||
return _currentUpdateTimeout;
|
||||
}
|
||||
auto getTelemetryUpdateTimeout() const { return _currentUpdateTimeout; }
|
||||
|
||||
// asynchronuosly periodicaly update telemetry data (internal synchronization)
|
||||
void startInternalTelemetryDataUpdating()
|
||||
{
|
||||
using intv_t = std::remove_cvref_t<decltype(_currentUpdateInterval)>;
|
||||
|
||||
_internalUpdatingStopSource = std::stop_source{}; // reset state
|
||||
if (_internalUpdatingStopSource.stop_requested()) {
|
||||
_internalUpdatingStopSource = std::stop_source{}; // reset stop source
|
||||
if (_updatingFuture.valid()) { // it should not be!!!
|
||||
// updating loop was stopped, just wait fo future result
|
||||
// it must return immediately!
|
||||
auto status = _updatingFuture.wait_for(std::chrono::milliseconds(500));
|
||||
if (status == std::future_status::ready) { // OK!
|
||||
|
||||
*_internalUpdating = true;
|
||||
|
||||
_internalUpdatingFuture = std::async(
|
||||
std::launch::async,
|
||||
[this](std::stop_token stop_token) -> error_t {
|
||||
while (!stop_token.stop_requested()) {
|
||||
// while (true) {
|
||||
_lastUpdateError = updateTelemetryData(_currentUpdateTimeout);
|
||||
if (_lastUpdateError) {
|
||||
*_internalUpdating = false;
|
||||
return _lastUpdateError;
|
||||
}
|
||||
|
||||
// auto nn = std::this_thread::get_id();
|
||||
|
||||
std::this_thread::sleep_for(_currentUpdateInterval);
|
||||
|
||||
// {
|
||||
// std::lock_guard lock{*_currentUpdateIntervalMutex};
|
||||
|
||||
// // compute it here because of possible changing _currentUpdateInterval
|
||||
// auto sleep_td = _currentUpdateInterval / internalUpdatingIntervalDiv;
|
||||
|
||||
// for (uint16_t i = 0; i < internalUpdatingIntervalDiv - 1; ++i) {
|
||||
// if (stop_token.stop_requested()) {
|
||||
// break;
|
||||
// }
|
||||
|
||||
// std::this_thread::sleep_for(sleep_td);
|
||||
// }
|
||||
|
||||
// if (stop_token.stop_requested()) {
|
||||
// break;
|
||||
// }
|
||||
|
||||
// if constexpr (std::floating_point<intv_t>) {
|
||||
// std::this_thread::sleep_for(sleep_td);
|
||||
// } else {
|
||||
// auto rem = _currentUpdateInterval % internalUpdatingIntervalDiv;
|
||||
|
||||
// if (rem.count()) {
|
||||
// std::this_thread::sleep_for(rem);
|
||||
// } else {
|
||||
// std::this_thread::sleep_for(sleep_td);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
} else if (status == std::future_status::deferred) { // ???!!!!
|
||||
_updatingFuture.get();
|
||||
} else {
|
||||
_lastUpdateError = MccTelemetryErrorCode::ERROR_UPDATE_LOOP_WAIT;
|
||||
}
|
||||
|
||||
*_internalUpdating = false;
|
||||
return MccTelemetryErrorCode::ERROR_OK;
|
||||
},
|
||||
_internalUpdatingStopSource.get_token());
|
||||
startInternalTelemetryDataUpdating();
|
||||
|
||||
*_internalUpdating = true;
|
||||
} else {
|
||||
startInternalTelemetryDataUpdating();
|
||||
|
||||
*_internalUpdating = true;
|
||||
}
|
||||
} // here the loop is already started
|
||||
}
|
||||
|
||||
|
||||
void stopInternalTelemetryDataUpdating()
|
||||
{
|
||||
_internalUpdatingStopSource.request_stop();
|
||||
_dataUpdatingRequested->test_and_set();
|
||||
_dataUpdatingRequested->notify_one();
|
||||
|
||||
auto status = _updatingFuture.wait_for(std::chrono::milliseconds(500));
|
||||
if (status == std::future_status::ready) { // OK!
|
||||
|
||||
} else if (status == std::future_status::deferred) { // ???!!!!
|
||||
_updatingFuture.get();
|
||||
} else {
|
||||
_lastUpdateError = MccTelemetryErrorCode::ERROR_UPDATE_LOOP_WAIT;
|
||||
}
|
||||
|
||||
_dataUpdatingRequested->clear();
|
||||
|
||||
*_internalUpdating = false;
|
||||
}
|
||||
|
||||
|
||||
bool isInternalTelemetryDataUpdating() const
|
||||
{
|
||||
return *_internalUpdating;
|
||||
}
|
||||
bool isInternalTelemetryDataUpdating() const { return *_internalUpdating; }
|
||||
|
||||
|
||||
error_t updateTelemetryData(traits::mcc_time_duration_c auto const& timeout)
|
||||
{
|
||||
{
|
||||
std::lock_guard thread_lock{*_updateMutex};
|
||||
// trigger updating
|
||||
_dataUpdatingRequested->test_and_set();
|
||||
_dataUpdatingRequested->notify_one();
|
||||
|
||||
std::stop_source stop_source;
|
||||
|
||||
*_isDataUpdated = false;
|
||||
|
||||
// std::future<error_t> update_ft = std::async(std::launch::async, _updateFunc, stop_source.get_token());
|
||||
// // std::future<error_t> update_ft =
|
||||
// // std::async(std::launch::async, _updateFunc, _internalUpdatingStopSource.get_token());
|
||||
// auto status = update_ft.wait_for(timeout);
|
||||
|
||||
// if (status == std::future_status::ready) {
|
||||
// *_isDataUpdated = true;
|
||||
// _lastUpdateError = update_ft.get();
|
||||
// } else if (status == std::future_status::deferred) { // std::async was invoked in this thread, get
|
||||
// result
|
||||
// _lastUpdateError = update_ft.get();
|
||||
// if (!_lastUpdateError) {
|
||||
// *_isDataUpdated = true;
|
||||
// }
|
||||
// } else { // timeout
|
||||
// stop_source.request_stop();
|
||||
// _lastUpdateError = MccTelemetryErrorCode::ERROR_DATA_TIMEOUT;
|
||||
// }
|
||||
|
||||
_lastUpdateError = _updateFunc(_internalUpdatingStopSource.get_token());
|
||||
*_isDataUpdated = true;
|
||||
// wait for updating start
|
||||
_dataUpdatingStart->wait(false);
|
||||
if (_timeoutMutex->try_lock_for(timeout)) {
|
||||
_timeoutMutex->unlock();
|
||||
} else {
|
||||
_lastUpdateError = MccTelemetryErrorCode::ERROR_DATA_TIMEOUT;
|
||||
}
|
||||
|
||||
// unblock waiting threads even in the case of timeout!
|
||||
_updateCondVar->notify_all();
|
||||
|
||||
// *_isDataUpdated = false;
|
||||
|
||||
return _lastUpdateError;
|
||||
}
|
||||
|
||||
@@ -598,19 +554,15 @@ public:
|
||||
return MccTelemetryErrorCode::ERROR_NULLPTR;
|
||||
}
|
||||
|
||||
std::unique_lock ulock(*_updateMutex);
|
||||
|
||||
auto res = _updateCondVar->wait_for(ulock, timeout, [this]() -> bool { return *_isDataUpdated; });
|
||||
if (!res) {
|
||||
return MccTelemetryErrorCode::ERROR_DATA_TIMEOUT;
|
||||
}
|
||||
|
||||
// std::lock_guard thread_lock{*_updateMutex};
|
||||
updateTelemetryData(timeout);
|
||||
|
||||
if (!_lastUpdateError) {
|
||||
std::lock_guard thread_lock{*_updateMutex};
|
||||
mcc_copy_telemetry_data(_data, tdata);
|
||||
}
|
||||
|
||||
|
||||
return _lastUpdateError;
|
||||
}
|
||||
|
||||
@@ -628,10 +580,7 @@ public:
|
||||
return MccTelemetryErrorCode::ERROR_OK;
|
||||
}
|
||||
|
||||
error_t lastUpdateError() const
|
||||
{
|
||||
return _lastUpdateError;
|
||||
}
|
||||
error_t lastUpdateError() const { return _lastUpdateError; }
|
||||
|
||||
error_t setPointingTarget(mcc_celestial_point_c auto pt)
|
||||
{
|
||||
@@ -742,7 +691,8 @@ protected:
|
||||
std::unique_ptr<std::condition_variable> _updateCondVar;
|
||||
|
||||
std::future<void> _updatingFuture{};
|
||||
std::unique_ptr<std::atomic_bool> _dataUpdatingRequested{new std::atomic_bool{false}};
|
||||
std::unique_ptr<std::atomic_flag> _dataUpdatingRequested{new std::atomic_flag{}};
|
||||
std::unique_ptr<std::atomic_flag> _dataUpdatingStart{new std::atomic_flag{}};
|
||||
std::unique_ptr<std::timed_mutex> _timeoutMutex{new std::timed_mutex()};
|
||||
|
||||
error_t _lastUpdateError{MccTelemetryErrorCode::ERROR_OK};
|
||||
|
||||
Reference in New Issue
Block a user