VariableAcquisitionWorker.cpp
369 lines
| 13.3 KiB
| text/x-c
|
CppLexer
r527 | #include "Variable/VariableAcquisitionWorker.h" | |||
#include "Variable/Variable.h" | ||||
r539 | ||||
#include <Data/AcquisitionRequest.h> | ||||
#include <Data/SqpRange.h> | ||||
r527 | #include <unordered_map> | |||
r539 | #include <utility> | |||
r527 | ||||
r539 | #include <QMutex> | |||
#include <QReadWriteLock> | ||||
r527 | #include <QThread> | |||
r539 | ||||
r750 | #include <cmath> | |||
r527 | Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker") | |||
struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate { | ||||
r754 | explicit VariableAcquisitionWorkerPrivate(VariableAcquisitionWorker *parent) | |||
: m_Lock{QReadWriteLock::Recursive}, q{parent} | ||||
{ | ||||
} | ||||
r539 | ||||
void lockRead() { m_Lock.lockForRead(); } | ||||
void lockWrite() { m_Lock.lockForWrite(); } | ||||
void unlock() { m_Lock.unlock(); } | ||||
void removeVariableRequest(QUuid vIdentifier); | ||||
r818 | /// Remove and/or abort all AcqRequest in link with varRequestId | |||
void cancelVarRequest(QUuid varRequestId); | ||||
void removeAcqRequest(QUuid acqRequestId); | ||||
r539 | QMutex m_WorkingMutex; | |||
QReadWriteLock m_Lock; | ||||
std::map<QUuid, QVector<AcquisitionDataPacket> > m_AcqIdentifierToAcqDataPacketVectorMap; | ||||
std::map<QUuid, AcquisitionRequest> m_AcqIdentifierToAcqRequestMap; | ||||
r1395 | std::map<QUuid, QUuid> m_VIdentifierToCurrrentAcqIdMap; | |||
r754 | VariableAcquisitionWorker *q; | |||
r527 | }; | |||
VariableAcquisitionWorker::VariableAcquisitionWorker(QObject *parent) | ||||
r754 | : QObject{parent}, impl{spimpl::make_unique_impl<VariableAcquisitionWorkerPrivate>(this)} | |||
r527 | { | |||
} | ||||
r539 | VariableAcquisitionWorker::~VariableAcquisitionWorker() | |||
{ | ||||
qCInfo(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker destruction") | ||||
<< QThread::currentThread(); | ||||
this->waitForFinish(); | ||||
} | ||||
r625 | QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid vIdentifier, | |||
DataProviderParameters parameters, | ||||
std::shared_ptr<IDataProvider> provider) | ||||
r539 | { | |||
r627 | qCDebug(LOG_VariableAcquisitionWorker()) | |||
r1393 | << tr("TORM VariableAcquisitionWorker::pushVariableRequest varRequestId: ") << varRequestId | |||
<< "vId: " << vIdentifier; | ||||
r625 | auto varRequestIdCanceled = QUuid(); | |||
r539 | ||||
// Request creation | ||||
auto acqRequest = AcquisitionRequest{}; | ||||
r625 | acqRequest.m_VarRequestId = varRequestId; | |||
r539 | acqRequest.m_vIdentifier = vIdentifier; | |||
acqRequest.m_DataProviderParameters = parameters; | ||||
acqRequest.m_Size = parameters.m_Times.size(); | ||||
acqRequest.m_Provider = provider; | ||||
r1395 | qCInfo(LOG_VariableAcquisitionWorker()) << tr("Add acqRequest ") << acqRequest.m_AcqIdentifier | |||
<< acqRequest.m_Size; | ||||
r539 | ||||
r625 | ||||
r539 | // Register request | |||
impl->lockWrite(); | ||||
impl->m_AcqIdentifierToAcqRequestMap.insert( | ||||
std::make_pair(acqRequest.m_AcqIdentifier, acqRequest)); | ||||
r1395 | auto it = impl->m_VIdentifierToCurrrentAcqIdMap.find(vIdentifier); | |||
if (it != impl->m_VIdentifierToCurrrentAcqIdMap.cend()) { | ||||
// A current request already exists, we can cancel it | ||||
// remove old acqIdentifier from the worker | ||||
auto oldAcqId = it->second; | ||||
r818 | auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(oldAcqId); | |||
r625 | if (acqIdentifierToAcqRequestMapIt != impl->m_AcqIdentifierToAcqRequestMap.cend()) { | |||
r818 | auto oldAcqRequest = acqIdentifierToAcqRequestMapIt->second; | |||
varRequestIdCanceled = oldAcqRequest.m_VarRequestId; | ||||
r625 | } | |||
r539 | impl->unlock(); | |||
r820 | impl->cancelVarRequest(varRequestIdCanceled); | |||
r539 | } | |||
else { | ||||
impl->unlock(); | ||||
} | ||||
r625 | ||||
r1395 | // Request for the variable, it must be stored and executed | |||
impl->lockWrite(); | ||||
impl->m_VIdentifierToCurrrentAcqIdMap.insert( | ||||
std::make_pair(vIdentifier, acqRequest.m_AcqIdentifier)); | ||||
impl->unlock(); | ||||
QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection, | ||||
Q_ARG(QUuid, acqRequest.m_AcqIdentifier)); | ||||
r625 | return varRequestIdCanceled; | |||
r539 | } | |||
void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier) | ||||
{ | ||||
r754 | impl->lockRead(); | |||
r1395 | auto it = impl->m_VIdentifierToCurrrentAcqIdMap.find(vIdentifier); | |||
if (it != impl->m_VIdentifierToCurrrentAcqIdMap.cend()) { | ||||
auto currentAcqId = it->second; | ||||
r754 | ||||
auto it = impl->m_AcqIdentifierToAcqRequestMap.find(currentAcqId); | ||||
if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) { | ||||
auto request = it->second; | ||||
impl->unlock(); | ||||
// notify the request aborting to the provider | ||||
request.m_Provider->requestDataAborting(currentAcqId); | ||||
} | ||||
else { | ||||
impl->unlock(); | ||||
qCWarning(LOG_VariableAcquisitionWorker()) | ||||
<< tr("Impossible to abort an unknown acquisition request") << currentAcqId; | ||||
} | ||||
} | ||||
else { | ||||
impl->unlock(); | ||||
} | ||||
r539 | } | |||
void VariableAcquisitionWorker::onVariableRetrieveDataInProgress(QUuid acqIdentifier, | ||||
double progress) | ||||
{ | ||||
r1398 | qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableRetrieveDataInProgress ") | |||
<< QThread::currentThread()->objectName() | ||||
<< acqIdentifier << progress; | ||||
r750 | impl->lockRead(); | |||
auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier); | ||||
if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) { | ||||
r1395 | auto progressPartSize | |||
= (aIdToARit->second.m_Size != 0) ? 100 / aIdToARit->second.m_Size : 0; | ||||
r750 | ||||
auto currentPartProgress | ||||
r1395 | = std::isnan(progress) ? 0.0 : (progress * progressPartSize) / 100.0; | |||
// We can only give an approximation of the currentProgression since its upgrade is async. | ||||
auto currentProgression = aIdToARit->second.m_Progression; | ||||
if (currentProgression == aIdToARit->second.m_Size) { | ||||
currentProgression = aIdToARit->second.m_Size - 1; | ||||
} | ||||
auto currentAlreadyProgress = progressPartSize * currentProgression; | ||||
r750 | ||||
auto finalProgression = currentAlreadyProgress + currentPartProgress; | ||||
emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, finalProgression); | ||||
r1395 | ||||
r750 | if (finalProgression == 100.0) { | |||
emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, 0.0); | ||||
} | ||||
} | ||||
impl->unlock(); | ||||
r539 | } | |||
r761 | void VariableAcquisitionWorker::onVariableAcquisitionFailed(QUuid acqIdentifier) | |||
{ | ||||
qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed") | ||||
<< QThread::currentThread(); | ||||
impl->lockRead(); | ||||
auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier); | ||||
if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) { | ||||
auto request = it->second; | ||||
impl->unlock(); | ||||
emit variableCanceledRequested(request.m_vIdentifier); | ||||
} | ||||
else { | ||||
impl->unlock(); | ||||
} | ||||
} | ||||
r539 | void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier, | |||
std::shared_ptr<IDataSeries> dataSeries, | ||||
SqpRange dataRangeAcquired) | ||||
r527 | { | |||
r751 | qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableDataAcquired on range ") | |||
<< acqIdentifier << dataRangeAcquired; | ||||
r539 | impl->lockWrite(); | |||
auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier); | ||||
if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) { | ||||
// Store the result | ||||
auto dataPacket = AcquisitionDataPacket{}; | ||||
dataPacket.m_Range = dataRangeAcquired; | ||||
dataPacket.m_DateSeries = dataSeries; | ||||
auto aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier); | ||||
if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) { | ||||
// A current request result already exists, we can update it | ||||
aIdToADPVit->second.push_back(dataPacket); | ||||
} | ||||
else { | ||||
// First request result for the variable, it must be stored | ||||
impl->m_AcqIdentifierToAcqDataPacketVectorMap.insert( | ||||
std::make_pair(acqIdentifier, QVector<AcquisitionDataPacket>() << dataPacket)); | ||||
} | ||||
// Decrement the counter of the request | ||||
auto &acqRequest = aIdToARit->second; | ||||
r750 | acqRequest.m_Progression = acqRequest.m_Progression + 1; | |||
r539 | ||||
// if the counter is 0, we can return data then run the next request if it exists and | ||||
// removed the finished request | ||||
r750 | if (acqRequest.m_Size == acqRequest.m_Progression) { | |||
r818 | auto varId = acqRequest.m_vIdentifier; | |||
r539 | // Return the data | |||
aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier); | ||||
if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) { | ||||
r1393 | emit dataProvided(varId, aIdToADPVit->second); | |||
r539 | } | |||
r818 | impl->unlock(); | |||
r539 | } | |||
r818 | else { | |||
impl->unlock(); | ||||
} | ||||
r539 | } | |||
else { | ||||
r818 | impl->unlock(); | |||
r754 | qCWarning(LOG_VariableAcquisitionWorker()) | |||
<< tr("Impossible to retrieve AcquisitionRequest for the incoming data."); | ||||
r539 | } | |||
} | ||||
r750 | void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier) | |||
{ | ||||
qCDebug(LOG_VariableAcquisitionWorker()) << tr("onExecuteRequest") << QThread::currentThread(); | ||||
impl->lockRead(); | ||||
auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier); | ||||
if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) { | ||||
auto request = it->second; | ||||
impl->unlock(); | ||||
emit variableRequestInProgress(request.m_vIdentifier, 0.1); | ||||
r1392 | qCDebug(LOG_VariableAcquisitionWorker()) << tr("Start request 10%") << acqIdentifier | |||
<< QThread::currentThread(); | ||||
r750 | request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters); | |||
} | ||||
else { | ||||
impl->unlock(); | ||||
// TODO log no acqIdentifier recognized | ||||
} | ||||
} | ||||
r539 | void VariableAcquisitionWorker::initialize() | |||
{ | ||||
qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init") | ||||
<< QThread::currentThread(); | ||||
impl->m_WorkingMutex.lock(); | ||||
qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init END"); | ||||
} | ||||
void VariableAcquisitionWorker::finalize() | ||||
{ | ||||
impl->m_WorkingMutex.unlock(); | ||||
} | ||||
void VariableAcquisitionWorker::waitForFinish() | ||||
{ | ||||
QMutexLocker locker{&impl->m_WorkingMutex}; | ||||
} | ||||
void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariableRequest( | ||||
QUuid vIdentifier) | ||||
{ | ||||
lockWrite(); | ||||
r1395 | auto it = m_VIdentifierToCurrrentAcqIdMap.find(vIdentifier); | |||
r539 | ||||
r1395 | if (it != m_VIdentifierToCurrrentAcqIdMap.cend()) { | |||
r539 | // A current request already exists, we can replace the next one | |||
r1392 | qCDebug(LOG_VariableAcquisitionWorker()) | |||
<< "VariableAcquisitionWorkerPrivate::removeVariableRequest " | ||||
r1395 | << QThread::currentThread()->objectName() << it->second; | |||
m_AcqIdentifierToAcqRequestMap.erase(it->second); | ||||
m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second); | ||||
r539 | } | |||
r1392 | ||||
// stop any progression | ||||
emit q->variableRequestInProgress(vIdentifier, 0.0); | ||||
r1395 | m_VIdentifierToCurrrentAcqIdMap.erase(vIdentifier); | |||
r539 | unlock(); | |||
r527 | } | |||
r754 | ||||
r818 | void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::cancelVarRequest( | |||
QUuid varRequestId) | ||||
{ | ||||
r822 | qCDebug(LOG_VariableAcquisitionWorker()) | |||
r1398 | << "VariableAcquisitionWorkerPrivate::cancelVarRequest start"; | |||
r818 | lockRead(); | |||
// get all AcqIdentifier in link with varRequestId | ||||
QVector<QUuid> acqIdsToRm; | ||||
auto cend = m_AcqIdentifierToAcqRequestMap.cend(); | ||||
for (auto it = m_AcqIdentifierToAcqRequestMap.cbegin(); it != cend; ++it) { | ||||
if (it->second.m_VarRequestId == varRequestId) { | ||||
acqIdsToRm << it->first; | ||||
} | ||||
} | ||||
unlock(); | ||||
// run aborting or removing of acqIdsToRm | ||||
for (auto acqId : acqIdsToRm) { | ||||
removeAcqRequest(acqId); | ||||
} | ||||
r822 | qCDebug(LOG_VariableAcquisitionWorker()) | |||
r820 | << "VariableAcquisitionWorkerPrivate::cancelVarRequest end"; | |||
r818 | } | |||
void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeAcqRequest( | ||||
QUuid acqRequestId) | ||||
{ | ||||
r820 | qCDebug(LOG_VariableAcquisitionWorker()) | |||
r822 | << "VariableAcquisitionWorkerPrivate::removeAcqRequest"; | |||
r818 | QUuid vIdentifier; | |||
std::shared_ptr<IDataProvider> provider; | ||||
lockRead(); | ||||
auto acqIt = m_AcqIdentifierToAcqRequestMap.find(acqRequestId); | ||||
if (acqIt != m_AcqIdentifierToAcqRequestMap.cend()) { | ||||
vIdentifier = acqIt->second.m_vIdentifier; | ||||
provider = acqIt->second.m_Provider; | ||||
r1395 | auto it = m_VIdentifierToCurrrentAcqIdMap.find(vIdentifier); | |||
if (it != m_VIdentifierToCurrrentAcqIdMap.cend()) { | ||||
if (it->second == acqRequestId) { | ||||
r818 | // acqRequest is currently running -> let's aborting it | |||
unlock(); | ||||
// notify the request aborting to the provider | ||||
provider->requestDataAborting(acqRequestId); | ||||
} | ||||
else { | ||||
unlock(); | ||||
} | ||||
} | ||||
else { | ||||
unlock(); | ||||
} | ||||
} | ||||
else { | ||||
unlock(); | ||||
} | ||||
lockWrite(); | ||||
m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqRequestId); | ||||
m_AcqIdentifierToAcqRequestMap.erase(acqRequestId); | ||||
r1399 | m_VIdentifierToCurrrentAcqIdMap.erase(vIdentifier); | |||
r818 | ||||
unlock(); | ||||
r820 | qCDebug(LOG_VariableAcquisitionWorker()) | |||
r822 | << "VariableAcquisitionWorkerPrivate::removeAcqRequest END"; | |||
r818 | } | |||