VariableAcquisitionWorker.cpp
416 lines
| 15.8 KiB
| text/x-c
|
CppLexer
r527 | #include "Variable/VariableAcquisitionWorker.h" | |||
#include "Variable/Variable.h" | ||||
r539 | ||||
#include <Data/AcquisitionRequest.h> | ||||
#include <Data/SqpRange.h> | ||||
r527 | #include <unordered_map> | |||
r539 | #include <utility> | |||
r527 | ||||
r539 | #include <QMutex> | |||
#include <QReadWriteLock> | ||||
r527 | #include <QThread> | |||
r539 | ||||
r750 | #include <cmath> | |||
r527 | Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker") | |||
struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate { | ||||
r754 | explicit VariableAcquisitionWorkerPrivate(VariableAcquisitionWorker *parent) | |||
: m_Lock{QReadWriteLock::Recursive}, q{parent} | ||||
{ | ||||
} | ||||
r539 | ||||
void lockRead() { m_Lock.lockForRead(); } | ||||
void lockWrite() { m_Lock.lockForWrite(); } | ||||
void unlock() { m_Lock.unlock(); } | ||||
void removeVariableRequest(QUuid vIdentifier); | ||||
r754 | /// Remove the current request and execute the next one if exist | |||
void updateToNextRequest(QUuid vIdentifier); | ||||
r813 | /// Remove and/or abort all AcqRequest in link with varRequestId | |||
void cancelVarRequest(QUuid varRequestId); | ||||
void removeAcqRequest(QUuid acqRequestId); | ||||
r539 | QMutex m_WorkingMutex; | |||
QReadWriteLock m_Lock; | ||||
std::map<QUuid, QVector<AcquisitionDataPacket> > m_AcqIdentifierToAcqDataPacketVectorMap; | ||||
std::map<QUuid, AcquisitionRequest> m_AcqIdentifierToAcqRequestMap; | ||||
std::map<QUuid, std::pair<QUuid, QUuid> > m_VIdentifierToCurrrentAcqIdNextIdPairMap; | ||||
r754 | VariableAcquisitionWorker *q; | |||
r527 | }; | |||
VariableAcquisitionWorker::VariableAcquisitionWorker(QObject *parent) | ||||
r754 | : QObject{parent}, impl{spimpl::make_unique_impl<VariableAcquisitionWorkerPrivate>(this)} | |||
r527 | { | |||
} | ||||
r539 | VariableAcquisitionWorker::~VariableAcquisitionWorker() | |||
{ | ||||
qCInfo(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker destruction") | ||||
<< QThread::currentThread(); | ||||
this->waitForFinish(); | ||||
} | ||||
r625 | QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid vIdentifier, | |||
SqpRange rangeRequested, | ||||
SqpRange cacheRangeRequested, | ||||
DataProviderParameters parameters, | ||||
std::shared_ptr<IDataProvider> provider) | ||||
r539 | { | |||
r627 | qCDebug(LOG_VariableAcquisitionWorker()) | |||
r539 | << tr("TORM VariableAcquisitionWorker::pushVariableRequest ") << cacheRangeRequested; | |||
r625 | auto varRequestIdCanceled = QUuid(); | |||
r539 | ||||
// Request creation | ||||
auto acqRequest = AcquisitionRequest{}; | ||||
r815 | qCDebug(LOG_VariableAcquisitionWorker()) << tr("PushVariableRequest ") << vIdentifier | |||
<< varRequestId; | ||||
r625 | acqRequest.m_VarRequestId = varRequestId; | |||
r539 | acqRequest.m_vIdentifier = vIdentifier; | |||
acqRequest.m_DataProviderParameters = parameters; | ||||
acqRequest.m_RangeRequested = rangeRequested; | ||||
acqRequest.m_CacheRangeRequested = cacheRangeRequested; | ||||
acqRequest.m_Size = parameters.m_Times.size(); | ||||
acqRequest.m_Provider = provider; | ||||
r625 | ||||
r539 | // Register request | |||
impl->lockWrite(); | ||||
impl->m_AcqIdentifierToAcqRequestMap.insert( | ||||
std::make_pair(acqRequest.m_AcqIdentifier, acqRequest)); | ||||
auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier); | ||||
if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) { | ||||
// A current request already exists, we can replace the next one | ||||
r813 | auto oldAcqId = it->second.second; | |||
auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(oldAcqId); | ||||
r625 | if (acqIdentifierToAcqRequestMapIt != impl->m_AcqIdentifierToAcqRequestMap.cend()) { | |||
r813 | auto oldAcqRequest = acqIdentifierToAcqRequestMapIt->second; | |||
varRequestIdCanceled = oldAcqRequest.m_VarRequestId; | ||||
r625 | } | |||
r539 | it->second.second = acqRequest.m_AcqIdentifier; | |||
impl->unlock(); | ||||
r814 | ||||
// remove old acqIdentifier from the worker | ||||
r815 | impl->cancelVarRequest(varRequestIdCanceled); | |||
r823 | // impl->m_AcqIdentifierToAcqRequestMap.erase(oldAcqId); | |||
r539 | } | |||
else { | ||||
// First request for the variable, it must be stored and executed | ||||
impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.insert( | ||||
std::make_pair(vIdentifier, std::make_pair(acqRequest.m_AcqIdentifier, QUuid()))); | ||||
impl->unlock(); | ||||
QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection, | ||||
Q_ARG(QUuid, acqRequest.m_AcqIdentifier)); | ||||
} | ||||
r625 | ||||
return varRequestIdCanceled; | ||||
r539 | } | |||
void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier) | ||||
{ | ||||
r754 | impl->lockRead(); | |||
auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier); | ||||
if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) { | ||||
auto currentAcqId = it->second.first; | ||||
auto it = impl->m_AcqIdentifierToAcqRequestMap.find(currentAcqId); | ||||
if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) { | ||||
auto request = it->second; | ||||
impl->unlock(); | ||||
// Remove the current request from the worker | ||||
impl->updateToNextRequest(vIdentifier); | ||||
// notify the request aborting to the provider | ||||
request.m_Provider->requestDataAborting(currentAcqId); | ||||
} | ||||
else { | ||||
impl->unlock(); | ||||
qCWarning(LOG_VariableAcquisitionWorker()) | ||||
<< tr("Impossible to abort an unknown acquisition request") << currentAcqId; | ||||
} | ||||
} | ||||
else { | ||||
impl->unlock(); | ||||
} | ||||
r539 | } | |||
void VariableAcquisitionWorker::onVariableRetrieveDataInProgress(QUuid acqIdentifier, | ||||
double progress) | ||||
{ | ||||
r758 | qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableRetrieveDataInProgress ") | |||
<< acqIdentifier << progress; | ||||
r750 | impl->lockRead(); | |||
auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier); | ||||
if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) { | ||||
auto currentPartSize = (aIdToARit->second.m_Size != 0) ? 100 / aIdToARit->second.m_Size : 0; | ||||
auto currentPartProgress | ||||
= std::isnan(progress) ? 0.0 : (progress * currentPartSize) / 100.0; | ||||
auto currentAlreadyProgress = aIdToARit->second.m_Progression * currentPartSize; | ||||
auto finalProgression = currentAlreadyProgress + currentPartProgress; | ||||
emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, finalProgression); | ||||
r758 | qCDebug(LOG_VariableAcquisitionWorker()) | |||
r761 | << tr("TORM: onVariableRetrieveDataInProgress ") | |||
<< QThread::currentThread()->objectName() << aIdToARit->second.m_vIdentifier | ||||
r758 | << currentPartSize << currentAlreadyProgress << currentPartProgress << finalProgression; | |||
r750 | if (finalProgression == 100.0) { | |||
emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, 0.0); | ||||
} | ||||
} | ||||
impl->unlock(); | ||||
r539 | } | |||
r761 | void VariableAcquisitionWorker::onVariableAcquisitionFailed(QUuid acqIdentifier) | |||
{ | ||||
qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed") | ||||
<< QThread::currentThread(); | ||||
impl->lockRead(); | ||||
auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier); | ||||
if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) { | ||||
auto request = it->second; | ||||
impl->unlock(); | ||||
r828 | qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed") | |||
<< acqIdentifier << request.m_vIdentifier | ||||
<< QThread::currentThread(); | ||||
r761 | emit variableCanceledRequested(request.m_vIdentifier); | |||
} | ||||
else { | ||||
impl->unlock(); | ||||
// TODO log no acqIdentifier recognized | ||||
} | ||||
} | ||||
r539 | void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier, | |||
std::shared_ptr<IDataSeries> dataSeries, | ||||
SqpRange dataRangeAcquired) | ||||
r527 | { | |||
r751 | qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableDataAcquired on range ") | |||
<< acqIdentifier << dataRangeAcquired; | ||||
r539 | impl->lockWrite(); | |||
auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier); | ||||
if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) { | ||||
// Store the result | ||||
auto dataPacket = AcquisitionDataPacket{}; | ||||
dataPacket.m_Range = dataRangeAcquired; | ||||
dataPacket.m_DateSeries = dataSeries; | ||||
auto aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier); | ||||
if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) { | ||||
// A current request result already exists, we can update it | ||||
aIdToADPVit->second.push_back(dataPacket); | ||||
} | ||||
else { | ||||
// First request result for the variable, it must be stored | ||||
impl->m_AcqIdentifierToAcqDataPacketVectorMap.insert( | ||||
std::make_pair(acqIdentifier, QVector<AcquisitionDataPacket>() << dataPacket)); | ||||
} | ||||
// Decrement the counter of the request | ||||
auto &acqRequest = aIdToARit->second; | ||||
r750 | acqRequest.m_Progression = acqRequest.m_Progression + 1; | |||
r539 | ||||
// if the counter is 0, we can return data then run the next request if it exists and | ||||
// removed the finished request | ||||
r750 | if (acqRequest.m_Size == acqRequest.m_Progression) { | |||
r813 | auto varId = acqRequest.m_vIdentifier; | |||
auto rangeRequested = acqRequest.m_RangeRequested; | ||||
auto cacheRangeRequested = acqRequest.m_CacheRangeRequested; | ||||
r539 | // Return the data | |||
aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier); | ||||
if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) { | ||||
r813 | emit dataProvided(varId, rangeRequested, cacheRangeRequested, aIdToADPVit->second); | |||
r539 | } | |||
r813 | impl->unlock(); | |||
r539 | ||||
r758 | // Update to the next request | |||
impl->updateToNextRequest(acqRequest.m_vIdentifier); | ||||
r539 | } | |||
r813 | else { | |||
impl->unlock(); | ||||
} | ||||
r539 | } | |||
else { | ||||
r813 | impl->unlock(); | |||
r754 | qCWarning(LOG_VariableAcquisitionWorker()) | |||
<< tr("Impossible to retrieve AcquisitionRequest for the incoming data."); | ||||
r539 | } | |||
} | ||||
r750 | void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier) | |||
{ | ||||
qCDebug(LOG_VariableAcquisitionWorker()) << tr("onExecuteRequest") << QThread::currentThread(); | ||||
impl->lockRead(); | ||||
auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier); | ||||
if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) { | ||||
auto request = it->second; | ||||
impl->unlock(); | ||||
emit variableRequestInProgress(request.m_vIdentifier, 0.1); | ||||
request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters); | ||||
} | ||||
else { | ||||
impl->unlock(); | ||||
// TODO log no acqIdentifier recognized | ||||
} | ||||
} | ||||
r539 | void VariableAcquisitionWorker::initialize() | |||
{ | ||||
qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init") | ||||
<< QThread::currentThread(); | ||||
impl->m_WorkingMutex.lock(); | ||||
qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init END"); | ||||
} | ||||
void VariableAcquisitionWorker::finalize() | ||||
{ | ||||
impl->m_WorkingMutex.unlock(); | ||||
} | ||||
void VariableAcquisitionWorker::waitForFinish() | ||||
{ | ||||
QMutexLocker locker{&impl->m_WorkingMutex}; | ||||
} | ||||
void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariableRequest( | ||||
QUuid vIdentifier) | ||||
{ | ||||
lockWrite(); | ||||
auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier); | ||||
if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) { | ||||
// A current request already exists, we can replace the next one | ||||
m_AcqIdentifierToAcqRequestMap.erase(it->second.first); | ||||
m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.first); | ||||
m_AcqIdentifierToAcqRequestMap.erase(it->second.second); | ||||
m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.second); | ||||
} | ||||
m_VIdentifierToCurrrentAcqIdNextIdPairMap.erase(vIdentifier); | ||||
unlock(); | ||||
r527 | } | |||
r754 | ||||
void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::updateToNextRequest( | ||||
QUuid vIdentifier) | ||||
{ | ||||
r813 | lockRead(); | |||
r754 | auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier); | |||
if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) { | ||||
if (it->second.second.isNull()) { | ||||
r813 | unlock(); | |||
r754 | // There is no next request, we can remove the variable request | |||
removeVariableRequest(vIdentifier); | ||||
} | ||||
else { | ||||
auto acqIdentifierToRemove = it->second.first; | ||||
// Move the next request to the current request | ||||
r813 | auto nextRequestId = it->second.second; | |||
it->second.first = nextRequestId; | ||||
r754 | it->second.second = QUuid(); | |||
r813 | unlock(); | |||
r754 | // Remove AcquisitionRequest and results; | |||
r813 | lockWrite(); | |||
r754 | m_AcqIdentifierToAcqRequestMap.erase(acqIdentifierToRemove); | |||
m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqIdentifierToRemove); | ||||
r813 | unlock(); | |||
r754 | // Execute the current request | |||
QMetaObject::invokeMethod(q, "onExecuteRequest", Qt::QueuedConnection, | ||||
r813 | Q_ARG(QUuid, nextRequestId)); | |||
r754 | } | |||
} | ||||
else { | ||||
r813 | unlock(); | |||
r754 | qCCritical(LOG_VariableAcquisitionWorker()) | |||
<< tr("Impossible to execute the acquisition on an unfound variable "); | ||||
} | ||||
} | ||||
r813 | ||||
void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::cancelVarRequest( | ||||
QUuid varRequestId) | ||||
{ | ||||
r817 | qCDebug(LOG_VariableAcquisitionWorker()) | |||
r815 | << "VariableAcquisitionWorkerPrivate::cancelVarRequest 0"; | |||
r813 | lockRead(); | |||
// get all AcqIdentifier in link with varRequestId | ||||
QVector<QUuid> acqIdsToRm; | ||||
auto cend = m_AcqIdentifierToAcqRequestMap.cend(); | ||||
for (auto it = m_AcqIdentifierToAcqRequestMap.cbegin(); it != cend; ++it) { | ||||
if (it->second.m_VarRequestId == varRequestId) { | ||||
acqIdsToRm << it->first; | ||||
} | ||||
} | ||||
unlock(); | ||||
// run aborting or removing of acqIdsToRm | ||||
for (auto acqId : acqIdsToRm) { | ||||
removeAcqRequest(acqId); | ||||
} | ||||
r817 | qCDebug(LOG_VariableAcquisitionWorker()) | |||
r815 | << "VariableAcquisitionWorkerPrivate::cancelVarRequest end"; | |||
r813 | } | |||
void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeAcqRequest( | ||||
QUuid acqRequestId) | ||||
{ | ||||
r815 | qCDebug(LOG_VariableAcquisitionWorker()) | |||
r817 | << "VariableAcquisitionWorkerPrivate::removeAcqRequest"; | |||
r813 | QUuid vIdentifier; | |||
std::shared_ptr<IDataProvider> provider; | ||||
lockRead(); | ||||
auto acqIt = m_AcqIdentifierToAcqRequestMap.find(acqRequestId); | ||||
if (acqIt != m_AcqIdentifierToAcqRequestMap.cend()) { | ||||
vIdentifier = acqIt->second.m_vIdentifier; | ||||
provider = acqIt->second.m_Provider; | ||||
auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier); | ||||
if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) { | ||||
if (it->second.first == acqRequestId) { | ||||
// acqRequest is currently running -> let's aborting it | ||||
unlock(); | ||||
// Remove the current request from the worker | ||||
updateToNextRequest(vIdentifier); | ||||
// notify the request aborting to the provider | ||||
provider->requestDataAborting(acqRequestId); | ||||
} | ||||
else if (it->second.second == acqRequestId) { | ||||
it->second.second = QUuid(); | ||||
unlock(); | ||||
} | ||||
else { | ||||
unlock(); | ||||
} | ||||
} | ||||
else { | ||||
unlock(); | ||||
} | ||||
} | ||||
else { | ||||
unlock(); | ||||
} | ||||
lockWrite(); | ||||
m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqRequestId); | ||||
m_AcqIdentifierToAcqRequestMap.erase(acqRequestId); | ||||
unlock(); | ||||
r815 | qCDebug(LOG_VariableAcquisitionWorker()) | |||
r817 | << "VariableAcquisitionWorkerPrivate::removeAcqRequest END"; | |||
r813 | } | |||