...
This commit is contained in:
@@ -94,20 +94,13 @@ inline std::error_code make_error_code(MccTelemetryErrorCode ec)
|
||||
}
|
||||
|
||||
|
||||
/* TELEMETRY UPDATE POLICY */
|
||||
|
||||
enum class MccTelemetryUpdatePolicy : int { TEMETRY_UPDATE_INNER, TEMETRY_UPDATE_EXTERNAL };
|
||||
|
||||
|
||||
template <MccTelemetryUpdatePolicy UPDATE_POLICY = MccTelemetryUpdatePolicy::TEMETRY_UPDATE_INNER>
|
||||
class MccTelemetry : public mcc_telemetry_interface_t<std::error_code>
|
||||
{
|
||||
protected:
|
||||
static constexpr uint16_t internalUpdatingIntervalDiv = 5;
|
||||
|
||||
public:
|
||||
static constexpr MccTelemetryUpdatePolicy updatePolicy = UPDATE_POLICY;
|
||||
|
||||
static constexpr auto defaultUpdateInterval = std::chrono::milliseconds(100);
|
||||
static constexpr auto defaultInternalUpdateTimeout = defaultUpdateInterval * 5;
|
||||
|
||||
@@ -115,13 +108,17 @@ public:
|
||||
|
||||
|
||||
MccTelemetry(mcc_ccte_c auto* ccte, mcc_PCM_c auto* pcm, mcc_hardware_c auto* hardware)
|
||||
: _isDataUpdated(false),
|
||||
: _isDataUpdated(new std::atomic_bool()),
|
||||
_data(),
|
||||
_internalUpdating(false),
|
||||
_internalUpdating(new std::atomic_bool),
|
||||
_currentUpdateInterval(defaultUpdateInterval),
|
||||
_currentUpdateIntervalMutex(new std::mutex),
|
||||
_updateMutex(new std::mutex),
|
||||
_updateCondVar(new std::condition_variable)
|
||||
{
|
||||
*_isDataUpdated = false;
|
||||
*_internalUpdating = false;
|
||||
|
||||
_data.target.pair_kind = MccCoordPairKind::COORDS_KIND_RADEC_ICRS;
|
||||
|
||||
using ccte_t = std::remove_cvref_t<decltype(*ccte)>;
|
||||
@@ -240,6 +237,8 @@ public:
|
||||
return MccTelemetryErrorCode::ERROR_UPDATE_STOPPED;
|
||||
}
|
||||
|
||||
_data.X = (double)hw_pos.X;
|
||||
_data.Y = (double)hw_pos.Y;
|
||||
_data.speedX = (double)hw_pos.speedX;
|
||||
_data.speedY = (double)hw_pos.speedY;
|
||||
|
||||
@@ -375,12 +374,21 @@ public:
|
||||
}
|
||||
|
||||
|
||||
MccTelemetry(MccTelemetry&&) = default;
|
||||
MccTelemetry& operator=(MccTelemetry&&) = default;
|
||||
|
||||
MccTelemetry(const MccTelemetry&) = delete;
|
||||
MccTelemetry& operator=(const MccTelemetry&) = delete;
|
||||
|
||||
|
||||
virtual ~MccTelemetry()
|
||||
{
|
||||
_internalUpdatingStopSource.request_stop();
|
||||
stopInternalTelemetryDataUpdating();
|
||||
|
||||
if (_internalUpdatingFuture.valid()) {
|
||||
_internalUpdatingFuture.get();
|
||||
// try to exit correctly
|
||||
auto status = _internalUpdatingFuture.wait_for(std::chrono::seconds(1));
|
||||
// _internalUpdatingFuture.get();
|
||||
}
|
||||
};
|
||||
|
||||
@@ -388,6 +396,8 @@ public:
|
||||
template <traits::mcc_time_duration_c DT>
|
||||
DT telemetryDataUpdateInterval() const
|
||||
{
|
||||
std::lock_guard lock{_currentUpdateIntervalMutex};
|
||||
|
||||
return std::chrono::duration_cast<DT>(_currentUpdateInterval);
|
||||
}
|
||||
|
||||
@@ -400,6 +410,8 @@ public:
|
||||
{
|
||||
using d_t = std::remove_cvref_t<decltype(interval)>;
|
||||
|
||||
std::lock_guard lock{_currentUpdateIntervalMutex};
|
||||
|
||||
if constexpr (std::floating_point<typename d_t::rep>) {
|
||||
_currentUpdateInterval = utils::isEqual(interval.count(), 0.0) ? defaultUpdateInterval : interval;
|
||||
} else {
|
||||
@@ -407,50 +419,57 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
// asynchronuosly periodicaly update telemetry data (internal synchronization)
|
||||
void startInternalTelemetryDataUpdating()
|
||||
{
|
||||
using intv_t = std::remove_cvref_t<decltype(_currentUpdateInterval)>;
|
||||
|
||||
_internalUpdating = true;
|
||||
*_internalUpdating = true;
|
||||
|
||||
_internalUpdatingFuture = std::async(
|
||||
std::launch::async,
|
||||
[this](std::stop_token stop_token) {
|
||||
if (stop_token.stop_requested()) {
|
||||
return MccTelemetryErrorCode::ERROR_OK;
|
||||
}
|
||||
|
||||
_lastUpdateError = updateTelemetryData(defaultInternalUpdateTimeout);
|
||||
if (_lastUpdateError) {
|
||||
_internalUpdating = false;
|
||||
return _lastUpdateError;
|
||||
}
|
||||
|
||||
auto sleep_td = _currentUpdateInterval / internalUpdatingIntervalDiv;
|
||||
|
||||
for (uint16_t i = 0; i < internalUpdatingIntervalDiv - 1; ++i) {
|
||||
if (stop_token.stop_requested()) {
|
||||
return MccTelemetryErrorCode::ERROR_OK;
|
||||
[this](std::stop_token stop_token) -> error_t {
|
||||
while (!stop_token.stop_requested()) {
|
||||
_lastUpdateError = updateTelemetryData(defaultInternalUpdateTimeout);
|
||||
if (_lastUpdateError) {
|
||||
*_internalUpdating = false;
|
||||
return _lastUpdateError;
|
||||
}
|
||||
|
||||
std::this_thread::sleep_for(sleep_td);
|
||||
}
|
||||
{
|
||||
std::lock_guard lock{_currentUpdateIntervalMutex};
|
||||
|
||||
if (stop_token.stop_requested()) {
|
||||
return MccTelemetryErrorCode::ERROR_OK;
|
||||
}
|
||||
// compute it here because of possible changing _currentUpdateInterval
|
||||
auto sleep_td = _currentUpdateInterval / internalUpdatingIntervalDiv;
|
||||
|
||||
if constexpr (std::floating_point<intv_t>) {
|
||||
std::this_thread::sleep_for(sleep_td);
|
||||
} else {
|
||||
auto rem = _currentUpdateInterval % internalUpdatingIntervalDiv;
|
||||
for (uint16_t i = 0; i < internalUpdatingIntervalDiv - 1; ++i) {
|
||||
if (stop_token.stop_requested()) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (rem.count()) {
|
||||
std::this_thread::sleep_for(rem);
|
||||
} else {
|
||||
std::this_thread::sleep_for(sleep_td);
|
||||
std::this_thread::sleep_for(sleep_td);
|
||||
}
|
||||
|
||||
if (stop_token.stop_requested()) {
|
||||
break;
|
||||
}
|
||||
|
||||
if constexpr (std::floating_point<intv_t>) {
|
||||
std::this_thread::sleep_for(sleep_td);
|
||||
} else {
|
||||
auto rem = _currentUpdateInterval % internalUpdatingIntervalDiv;
|
||||
|
||||
if (rem.count()) {
|
||||
std::this_thread::sleep_for(rem);
|
||||
} else {
|
||||
std::this_thread::sleep_for(sleep_td);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
*_internalUpdating = false;
|
||||
return MccTelemetryErrorCode::ERROR_OK;
|
||||
},
|
||||
_internalUpdatingStopSource.get_token());
|
||||
}
|
||||
@@ -459,13 +478,13 @@ public:
|
||||
void stopInternalTelemetryDataUpdating()
|
||||
{
|
||||
_internalUpdatingStopSource.request_stop();
|
||||
_internalUpdating = false;
|
||||
*_internalUpdating = false;
|
||||
}
|
||||
|
||||
|
||||
bool isInternalTelemetryDataUpdating() const
|
||||
{
|
||||
return _internalUpdating;
|
||||
return *_internalUpdating;
|
||||
}
|
||||
|
||||
|
||||
@@ -475,24 +494,26 @@ public:
|
||||
|
||||
std::stop_source stop_source;
|
||||
|
||||
_isDataUpdated = false;
|
||||
*_isDataUpdated = false;
|
||||
|
||||
std::future<error_t> update_ft = std::async(std::launch::async, _updateFunc, stop_source.get_token());
|
||||
auto status = update_ft.wait_for(timeout);
|
||||
|
||||
if (status != std::future_status::ready) {
|
||||
auto ok = stop_source.stop_requested();
|
||||
return _lastUpdateError = MccTelemetryErrorCode::ERROR_DATA_TIMEOUT;
|
||||
if (status == std::future_status::ready) {
|
||||
*_isDataUpdated = true;
|
||||
_lastUpdateError = update_ft.get();
|
||||
} else {
|
||||
stop_source.request_stop();
|
||||
_lastUpdateError = MccTelemetryErrorCode::ERROR_DATA_TIMEOUT;
|
||||
}
|
||||
|
||||
_isDataUpdated = true;
|
||||
|
||||
// unblock waiting threads even in the case of timeout!
|
||||
_updateCondVar->notify_all();
|
||||
|
||||
return _lastUpdateError = update_ft.get();
|
||||
return _lastUpdateError;
|
||||
}
|
||||
|
||||
// block the thread and wait for data to be ready (external synchronization)
|
||||
// block the thread and wait for data to be ready (internal synchronization)
|
||||
error_t waitForTelemetryData(mcc_telemetry_data_c auto* tdata, traits::mcc_time_duration_c auto const& timeout)
|
||||
{
|
||||
if (tdata == nullptr) {
|
||||
@@ -501,7 +522,7 @@ public:
|
||||
|
||||
std::unique_lock ulock(*_updateMutex);
|
||||
|
||||
auto res = _updateCondVar->wait_for(ulock, timeout, [this]() { return _isDataUpdated; });
|
||||
auto res = _updateCondVar->wait_for(ulock, timeout, [this]() { return *_isDataUpdated; });
|
||||
if (res == std::cv_status::timeout) {
|
||||
return MccTelemetryErrorCode::ERROR_DATA_TIMEOUT;
|
||||
}
|
||||
@@ -539,11 +560,12 @@ public:
|
||||
|
||||
|
||||
protected:
|
||||
std::atomic_bool _isDataUpdated;
|
||||
std::unique_ptr<std::atomic_bool> _isDataUpdated;
|
||||
MccTelemetryData _data;
|
||||
|
||||
std::atomic_bool _internalUpdating{false};
|
||||
std::unique_ptr<std::atomic_bool> _internalUpdating;
|
||||
std::chrono::nanoseconds _currentUpdateInterval{std::chrono::milliseconds(100)};
|
||||
std::unique_ptr<std::mutex> _currentUpdateIntervalMutex;
|
||||
std::future<error_t> _internalUpdatingFuture{};
|
||||
std::stop_source _internalUpdatingStopSource{};
|
||||
|
||||
@@ -555,7 +577,9 @@ protected:
|
||||
std::unique_ptr<std::condition_variable> _updateCondVar;
|
||||
|
||||
error_t _lastUpdateError{MccTelemetryErrorCode::ERROR_OK};
|
||||
std::jthread _timerThread;
|
||||
};
|
||||
|
||||
|
||||
static_assert(mcc_telemetry_c<MccTelemetry>, "");
|
||||
|
||||
} // namespace mcc
|
||||
|
||||
Reference in New Issue
Block a user