diff --git a/core/src/Variable/VariableAcquisitionWorker.cpp b/core/src/Variable/VariableAcquisitionWorker.cpp index 5eac38c..9a8f896 100644 --- a/core/src/Variable/VariableAcquisitionWorker.cpp +++ b/core/src/Variable/VariableAcquisitionWorker.cpp @@ -32,6 +32,10 @@ struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate { /// Remove the current request and execute the next one if exist void updateToNextRequest(QUuid vIdentifier); + /// Remove and/or abort all AcqRequest in link with varRequestId + void cancelVarRequest(QUuid varRequestId); + void removeAcqRequest(QUuid acqRequestId); + QMutex m_WorkingMutex; QReadWriteLock m_Lock; @@ -67,7 +71,8 @@ QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid v // Request creation auto acqRequest = AcquisitionRequest{}; - qCInfo(LOG_VariableAcquisitionWorker()) << tr("TpushVariableRequest ") << vIdentifier; + qCInfo(LOG_VariableAcquisitionWorker()) << tr("TpushVariableRequest ") << vIdentifier + << varRequestId; acqRequest.m_VarRequestId = varRequestId; acqRequest.m_vIdentifier = vIdentifier; acqRequest.m_DataProviderParameters = parameters; @@ -85,14 +90,19 @@ QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid v auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier); if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) { // A current request already exists, we can replace the next one - auto nextAcqId = it->second.second; - auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(nextAcqId); + auto oldAcqId = it->second.second; + auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(oldAcqId); if (acqIdentifierToAcqRequestMapIt != impl->m_AcqIdentifierToAcqRequestMap.cend()) { - auto request = acqIdentifierToAcqRequestMapIt->second; - varRequestIdCanceled = request.m_VarRequestId; + auto oldAcqRequest = acqIdentifierToAcqRequestMapIt->second; + varRequestIdCanceled = oldAcqRequest.m_VarRequestId; } + // remove old acqIdentifier from the worker + impl->cancelVarRequest(oldAcqId); + // impl->m_AcqIdentifierToAcqRequestMap.erase(oldAcqId); it->second.second = acqRequest.m_AcqIdentifier; + + impl->unlock(); } else { @@ -122,10 +132,7 @@ void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier) impl->unlock(); // Remove the current request from the worker - - impl->lockWrite(); impl->updateToNextRequest(vIdentifier); - impl->unlock(); // notify the request aborting to the provider request.m_Provider->requestDataAborting(currentAcqId); @@ -221,22 +228,28 @@ void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier, // if the counter is 0, we can return data then run the next request if it exists and // removed the finished request if (acqRequest.m_Size == acqRequest.m_Progression) { + auto varId = acqRequest.m_vIdentifier; + auto rangeRequested = acqRequest.m_RangeRequested; + auto cacheRangeRequested = acqRequest.m_CacheRangeRequested; // Return the data aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier); if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) { - emit dataProvided(acqRequest.m_vIdentifier, acqRequest.m_RangeRequested, - acqRequest.m_CacheRangeRequested, aIdToADPVit->second); + emit dataProvided(varId, rangeRequested, cacheRangeRequested, aIdToADPVit->second); } + impl->unlock(); // Update to the next request impl->updateToNextRequest(acqRequest.m_vIdentifier); } + else { + impl->unlock(); + } } else { + impl->unlock(); qCWarning(LOG_VariableAcquisitionWorker()) << tr("Impossible to retrieve AcquisitionRequest for the incoming data."); } - impl->unlock(); } void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier) @@ -296,27 +309,101 @@ void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariable void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::updateToNextRequest( QUuid vIdentifier) { + lockRead(); auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier); if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) { if (it->second.second.isNull()) { + unlock(); // There is no next request, we can remove the variable request removeVariableRequest(vIdentifier); } else { auto acqIdentifierToRemove = it->second.first; // Move the next request to the current request - it->second.first = it->second.second; + auto nextRequestId = it->second.second; + it->second.first = nextRequestId; it->second.second = QUuid(); + unlock(); // Remove AcquisitionRequest and results; + lockWrite(); m_AcqIdentifierToAcqRequestMap.erase(acqIdentifierToRemove); m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqIdentifierToRemove); + unlock(); // Execute the current request QMetaObject::invokeMethod(q, "onExecuteRequest", Qt::QueuedConnection, - Q_ARG(QUuid, it->second.first)); + Q_ARG(QUuid, nextRequestId)); } } else { + unlock(); qCCritical(LOG_VariableAcquisitionWorker()) << tr("Impossible to execute the acquisition on an unfound variable "); } } + +void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::cancelVarRequest( + QUuid varRequestId) +{ + lockRead(); + // get all AcqIdentifier in link with varRequestId + QVector acqIdsToRm; + auto cend = m_AcqIdentifierToAcqRequestMap.cend(); + for (auto it = m_AcqIdentifierToAcqRequestMap.cbegin(); it != cend; ++it) { + if (it->second.m_VarRequestId == varRequestId) { + acqIdsToRm << it->first; + } + } + unlock(); + // run aborting or removing of acqIdsToRm + + for (auto acqId : acqIdsToRm) { + removeAcqRequest(acqId); + } +} + +void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeAcqRequest( + QUuid acqRequestId) +{ + QUuid vIdentifier; + std::shared_ptr provider; + lockRead(); + auto acqIt = m_AcqIdentifierToAcqRequestMap.find(acqRequestId); + if (acqIt != m_AcqIdentifierToAcqRequestMap.cend()) { + vIdentifier = acqIt->second.m_vIdentifier; + provider = acqIt->second.m_Provider; + + auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier); + if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) { + if (it->second.first == acqRequestId) { + // acqRequest is currently running -> let's aborting it + unlock(); + + // Remove the current request from the worker + updateToNextRequest(vIdentifier); + + // notify the request aborting to the provider + provider->requestDataAborting(acqRequestId); + } + else if (it->second.second == acqRequestId) { + it->second.second = QUuid(); + unlock(); + } + else { + unlock(); + } + } + else { + unlock(); + } + } + else { + unlock(); + } + + lockWrite(); + + m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqRequestId); + m_AcqIdentifierToAcqRequestMap.erase(acqRequestId); + + unlock(); +} diff --git a/core/src/Variable/VariableController.cpp b/core/src/Variable/VariableController.cpp index 08faec6..d23ebc4 100644 --- a/core/src/Variable/VariableController.cpp +++ b/core/src/Variable/VariableController.cpp @@ -620,8 +620,8 @@ void VariableController::VariableControllerPrivate::processRequest(std::shared_p varProvider); if (!varRequestIdCanceled.isNull()) { - qCDebug(LOG_VariableAcquisitionWorker()) << tr("vsarRequestIdCanceled: ") - << varRequestIdCanceled; + qCInfo(LOG_VariableAcquisitionWorker()) << tr("varRequestIdCanceled: ") + << varRequestIdCanceled; cancelVariableRequest(varRequestIdCanceled); } } @@ -768,7 +768,7 @@ QUuid VariableController::VariableControllerPrivate::acceptVariableRequest( varRequestIdQueue.pop_front(); if (varRequestIdQueue.empty()) { - qCDebug(LOG_VariableController()) + qCCritical(LOG_VariableController()) << tr("TORM Erase REQUEST because it has been accepted") << varId; m_VarIdToVarRequestIdQueueMap.erase(varId); } @@ -850,6 +850,10 @@ void VariableController::VariableControllerPrivate::cancelVariableRequest(QUuid std::remove(varRequestIdQueue.begin(), varRequestIdQueue.end(), varRequestId), varRequestIdQueue.end()); if (varRequestIdQueue.empty()) { + + qCCritical(LOG_VariableController()) + << tr("VariableControllerPrivate::cancelVariableRequest") + << varIdToVarRequestIdQueueMapIt->first; varIdToVarRequestIdQueueMapIt = m_VarIdToVarRequestIdQueueMap.erase(varIdToVarRequestIdQueueMapIt); } diff --git a/plugins/mockplugin/src/CosinusProvider.cpp b/plugins/mockplugin/src/CosinusProvider.cpp index fb09008..f62f3da 100644 --- a/plugins/mockplugin/src/CosinusProvider.cpp +++ b/plugins/mockplugin/src/CosinusProvider.cpp @@ -180,7 +180,7 @@ void CosinusProvider::requestDataLoading(QUuid acqIdentifier, for (const auto &dateTime : qAsConst(times)) { if (m_VariableToEnableProvider[acqIdentifier]) { auto scalarSeries = this->retrieveData(acqIdentifier, dateTime, parameters.m_Data); - qCDebug(LOG_CosinusProvider()) << "TORM: CosinusProvider::dataProvided"; + qCCritical(LOG_CosinusProvider()) << "TORM: CosinusProvider::dataProvided"; emit dataProvided(acqIdentifier, scalarSeries, dateTime); } }