From 60a224a1b89a9df234d7ed6e699ba820f9e517b7 2017-10-03 14:49:59 From: mperrinel Date: 2017-10-03 14:49:59 Subject: [PATCH] Merge branch 'feature/fixAcquisitionSynchroBug' into develop --- diff --git a/core/include/Variable/Variable.h b/core/include/Variable/Variable.h index bf30f18..88c5682 100644 --- a/core/include/Variable/Variable.h +++ b/core/include/Variable/Variable.h @@ -68,6 +68,12 @@ public: QVector provideInCacheRangeList(const SqpRange &range) const noexcept; void mergeDataSeries(std::shared_ptr dataSeries) noexcept; + static QVector provideNotInCacheRangeList(const SqpRange &oldRange, + const SqpRange &nextRange); + + static QVector provideInCacheRangeList(const SqpRange &oldRange, + const SqpRange &nextRange); + signals: void updated(); diff --git a/core/src/Variable/Variable.cpp b/core/src/Variable/Variable.cpp index 988f06a..8d68fa0 100644 --- a/core/src/Variable/Variable.cpp +++ b/core/src/Variable/Variable.cpp @@ -138,7 +138,6 @@ void Variable::setCacheRange(const SqpRange &cacheRange) noexcept impl->lockWrite(); if (cacheRange != impl->m_CacheRange) { impl->m_CacheRange = cacheRange; - impl->purgeDataSeries(); } impl->unlock(); } @@ -174,6 +173,7 @@ void Variable::mergeDataSeries(std::shared_ptr dataSeries) noexcept impl->unlock(); } + std::shared_ptr Variable::dataSeries() const noexcept { impl->lockRead(); @@ -285,7 +285,7 @@ QVector Variable::provideInCacheRangeList(const SqpRange &range) const if (impl->m_CacheRange != INVALID_RANGE) { - if (this->intersect(range)) { + if (this->cacheIntersect(range)) { if (range.m_TStart <= impl->m_CacheRange.m_TStart && range.m_TEnd >= impl->m_CacheRange.m_TStart && range.m_TEnd < impl->m_CacheRange.m_TEnd) { @@ -313,3 +313,76 @@ QVector Variable::provideInCacheRangeList(const SqpRange &range) const return inCache; } + + +QVector Variable::provideNotInCacheRangeList(const SqpRange &oldRange, + const SqpRange &nextRange) +{ + + // This code assume that cach in contigue. Can return 0, 1 or 2 SqpRange + auto notInCache = QVector{}; + if (oldRange != INVALID_RANGE) { + + if (!oldRange.contains(nextRange)) { + if (nextRange.m_TEnd <= oldRange.m_TStart || nextRange.m_TStart >= oldRange.m_TEnd) { + notInCache << nextRange; + } + else if (nextRange.m_TStart < oldRange.m_TStart + && nextRange.m_TEnd <= oldRange.m_TEnd) { + notInCache << SqpRange{nextRange.m_TStart, oldRange.m_TStart}; + } + else if (nextRange.m_TStart < oldRange.m_TStart && nextRange.m_TEnd > oldRange.m_TEnd) { + notInCache << SqpRange{nextRange.m_TStart, oldRange.m_TStart} + << SqpRange{oldRange.m_TEnd, nextRange.m_TEnd}; + } + else if (nextRange.m_TStart < oldRange.m_TEnd) { + notInCache << SqpRange{oldRange.m_TEnd, nextRange.m_TEnd}; + } + else { + qCCritical(LOG_Variable()) << tr("Detection of unknown case.") + << QThread::currentThread(); + } + } + } + else { + notInCache << nextRange; + } + + return notInCache; +} + +QVector Variable::provideInCacheRangeList(const SqpRange &oldRange, + const SqpRange &nextRange) +{ + // This code assume that cach is contigue. Can return 0 or 1 SqpRange + + auto inCache = QVector{}; + + if (oldRange != INVALID_RANGE) { + + if (oldRange.intersect(nextRange)) { + if (nextRange.m_TStart <= oldRange.m_TStart && nextRange.m_TEnd >= oldRange.m_TStart + && nextRange.m_TEnd < oldRange.m_TEnd) { + inCache << SqpRange{oldRange.m_TStart, nextRange.m_TEnd}; + } + + else if (nextRange.m_TStart >= oldRange.m_TStart + && nextRange.m_TEnd <= oldRange.m_TEnd) { + inCache << nextRange; + } + else if (nextRange.m_TStart > oldRange.m_TStart && nextRange.m_TEnd > oldRange.m_TEnd) { + inCache << SqpRange{nextRange.m_TStart, oldRange.m_TEnd}; + } + else if (nextRange.m_TStart <= oldRange.m_TStart + && nextRange.m_TEnd >= oldRange.m_TEnd) { + inCache << oldRange; + } + else { + qCCritical(LOG_Variable()) << tr("Detection of unknown case.") + << QThread::currentThread(); + } + } + } + + return inCache; +} diff --git a/core/src/Variable/VariableAcquisitionWorker.cpp b/core/src/Variable/VariableAcquisitionWorker.cpp index 5eac38c..8487848 100644 --- a/core/src/Variable/VariableAcquisitionWorker.cpp +++ b/core/src/Variable/VariableAcquisitionWorker.cpp @@ -32,6 +32,10 @@ struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate { /// Remove the current request and execute the next one if exist void updateToNextRequest(QUuid vIdentifier); + /// Remove and/or abort all AcqRequest in link with varRequestId + void cancelVarRequest(QUuid varRequestId); + void removeAcqRequest(QUuid acqRequestId); + QMutex m_WorkingMutex; QReadWriteLock m_Lock; @@ -67,7 +71,8 @@ QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid v // Request creation auto acqRequest = AcquisitionRequest{}; - qCInfo(LOG_VariableAcquisitionWorker()) << tr("TpushVariableRequest ") << vIdentifier; + qCDebug(LOG_VariableAcquisitionWorker()) << tr("PushVariableRequest ") << vIdentifier + << varRequestId; acqRequest.m_VarRequestId = varRequestId; acqRequest.m_vIdentifier = vIdentifier; acqRequest.m_DataProviderParameters = parameters; @@ -85,15 +90,19 @@ QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid v auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier); if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) { // A current request already exists, we can replace the next one - auto nextAcqId = it->second.second; - auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(nextAcqId); + auto oldAcqId = it->second.second; + auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(oldAcqId); if (acqIdentifierToAcqRequestMapIt != impl->m_AcqIdentifierToAcqRequestMap.cend()) { - auto request = acqIdentifierToAcqRequestMapIt->second; - varRequestIdCanceled = request.m_VarRequestId; + auto oldAcqRequest = acqIdentifierToAcqRequestMapIt->second; + varRequestIdCanceled = oldAcqRequest.m_VarRequestId; } it->second.second = acqRequest.m_AcqIdentifier; impl->unlock(); + + // remove old acqIdentifier from the worker + impl->cancelVarRequest(varRequestIdCanceled); + // impl->m_AcqIdentifierToAcqRequestMap.erase(oldAcqId); } else { // First request for the variable, it must be stored and executed @@ -122,10 +131,7 @@ void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier) impl->unlock(); // Remove the current request from the worker - - impl->lockWrite(); impl->updateToNextRequest(vIdentifier); - impl->unlock(); // notify the request aborting to the provider request.m_Provider->requestDataAborting(currentAcqId); @@ -221,22 +227,28 @@ void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier, // if the counter is 0, we can return data then run the next request if it exists and // removed the finished request if (acqRequest.m_Size == acqRequest.m_Progression) { + auto varId = acqRequest.m_vIdentifier; + auto rangeRequested = acqRequest.m_RangeRequested; + auto cacheRangeRequested = acqRequest.m_CacheRangeRequested; // Return the data aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier); if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) { - emit dataProvided(acqRequest.m_vIdentifier, acqRequest.m_RangeRequested, - acqRequest.m_CacheRangeRequested, aIdToADPVit->second); + emit dataProvided(varId, rangeRequested, cacheRangeRequested, aIdToADPVit->second); } + impl->unlock(); // Update to the next request impl->updateToNextRequest(acqRequest.m_vIdentifier); } + else { + impl->unlock(); + } } else { + impl->unlock(); qCWarning(LOG_VariableAcquisitionWorker()) << tr("Impossible to retrieve AcquisitionRequest for the incoming data."); } - impl->unlock(); } void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier) @@ -296,27 +308,109 @@ void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariable void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::updateToNextRequest( QUuid vIdentifier) { + lockRead(); auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier); if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) { if (it->second.second.isNull()) { + unlock(); // There is no next request, we can remove the variable request removeVariableRequest(vIdentifier); } else { auto acqIdentifierToRemove = it->second.first; // Move the next request to the current request - it->second.first = it->second.second; + auto nextRequestId = it->second.second; + it->second.first = nextRequestId; it->second.second = QUuid(); + unlock(); // Remove AcquisitionRequest and results; + lockWrite(); m_AcqIdentifierToAcqRequestMap.erase(acqIdentifierToRemove); m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqIdentifierToRemove); + unlock(); // Execute the current request QMetaObject::invokeMethod(q, "onExecuteRequest", Qt::QueuedConnection, - Q_ARG(QUuid, it->second.first)); + Q_ARG(QUuid, nextRequestId)); } } else { + unlock(); qCCritical(LOG_VariableAcquisitionWorker()) << tr("Impossible to execute the acquisition on an unfound variable "); } } + +void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::cancelVarRequest( + QUuid varRequestId) +{ + qCDebug(LOG_VariableAcquisitionWorker()) + << "VariableAcquisitionWorkerPrivate::cancelVarRequest 0"; + lockRead(); + // get all AcqIdentifier in link with varRequestId + QVector acqIdsToRm; + auto cend = m_AcqIdentifierToAcqRequestMap.cend(); + for (auto it = m_AcqIdentifierToAcqRequestMap.cbegin(); it != cend; ++it) { + if (it->second.m_VarRequestId == varRequestId) { + acqIdsToRm << it->first; + } + } + unlock(); + // run aborting or removing of acqIdsToRm + + for (auto acqId : acqIdsToRm) { + removeAcqRequest(acqId); + } + qCDebug(LOG_VariableAcquisitionWorker()) + << "VariableAcquisitionWorkerPrivate::cancelVarRequest end"; +} + +void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeAcqRequest( + QUuid acqRequestId) +{ + qCDebug(LOG_VariableAcquisitionWorker()) + << "VariableAcquisitionWorkerPrivate::removeAcqRequest"; + QUuid vIdentifier; + std::shared_ptr provider; + lockRead(); + auto acqIt = m_AcqIdentifierToAcqRequestMap.find(acqRequestId); + if (acqIt != m_AcqIdentifierToAcqRequestMap.cend()) { + vIdentifier = acqIt->second.m_vIdentifier; + provider = acqIt->second.m_Provider; + + auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier); + if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) { + if (it->second.first == acqRequestId) { + // acqRequest is currently running -> let's aborting it + unlock(); + + // Remove the current request from the worker + updateToNextRequest(vIdentifier); + + // notify the request aborting to the provider + provider->requestDataAborting(acqRequestId); + } + else if (it->second.second == acqRequestId) { + it->second.second = QUuid(); + unlock(); + } + else { + unlock(); + } + } + else { + unlock(); + } + } + else { + unlock(); + } + + lockWrite(); + + m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqRequestId); + m_AcqIdentifierToAcqRequestMap.erase(acqRequestId); + + unlock(); + qCDebug(LOG_VariableAcquisitionWorker()) + << "VariableAcquisitionWorkerPrivate::removeAcqRequest END"; +} diff --git a/core/src/Variable/VariableController.cpp b/core/src/Variable/VariableController.cpp index 08faec6..6ef2097 100644 --- a/core/src/Variable/VariableController.cpp +++ b/core/src/Variable/VariableController.cpp @@ -105,9 +105,6 @@ struct VariableController::VariableControllerPrivate { void processRequest(std::shared_ptr var, const SqpRange &rangeRequested, QUuid varRequestId); - QVector provideNotInCacheDateTimeList(std::shared_ptr variable, - const SqpRange &dateTime); - std::shared_ptr findVariable(QUuid vIdentifier); std::shared_ptr retrieveDataSeries(const QVector acqDataPacketVector); @@ -554,23 +551,23 @@ AcquisitionZoomType VariableController::getZoomType(const SqpRange &range, const // t1.m_TStart <= t2.m_TStart && t2.m_TEnd <= t1.m_TEnd auto zoomType = AcquisitionZoomType::Unknown; if (range.m_TStart <= oldRange.m_TStart && oldRange.m_TEnd <= range.m_TEnd) { - qCCritical(LOG_VariableController()) << "zoomtype: ZoomOut"; + qCDebug(LOG_VariableController()) << "zoomtype: ZoomOut"; zoomType = AcquisitionZoomType::ZoomOut; } else if (range.m_TStart > oldRange.m_TStart && range.m_TEnd > oldRange.m_TEnd) { - qCCritical(LOG_VariableController()) << "zoomtype: PanRight"; + qCDebug(LOG_VariableController()) << "zoomtype: PanRight"; zoomType = AcquisitionZoomType::PanRight; } else if (range.m_TStart < oldRange.m_TStart && range.m_TEnd < oldRange.m_TEnd) { - qCCritical(LOG_VariableController()) << "zoomtype: PanLeft"; + qCDebug(LOG_VariableController()) << "zoomtype: PanLeft"; zoomType = AcquisitionZoomType::PanLeft; } else if (range.m_TStart > oldRange.m_TStart && oldRange.m_TEnd > range.m_TEnd) { - qCCritical(LOG_VariableController()) << "zoomtype: ZoomIn"; + qCDebug(LOG_VariableController()) << "zoomtype: ZoomIn"; zoomType = AcquisitionZoomType::ZoomIn; } else { - qCCritical(LOG_VariableController()) << "getZoomType: Unknown type detected"; + qCDebug(LOG_VariableController()) << "getZoomType: Unknown type detected"; } return zoomType; } @@ -596,13 +593,10 @@ void VariableController::VariableControllerPrivate::processRequest(std::shared_p auto varStrategyRangesRequested = m_VariableCacheStrategy->computeRange(oldRange, rangeRequested); - auto notInCacheRangeList = QVector{varStrategyRangesRequested.second}; - auto inCacheRangeList = QVector{}; - if (m_VarIdToVarRequestIdQueueMap.find(varId) == m_VarIdToVarRequestIdQueueMap.cend()) { - notInCacheRangeList - = var->provideNotInCacheRangeList(varStrategyRangesRequested.second); - inCacheRangeList = var->provideInCacheRangeList(varStrategyRangesRequested.second); - } + auto notInCacheRangeList + = Variable::provideNotInCacheRangeList(oldRange, varStrategyRangesRequested.second); + auto inCacheRangeList + = Variable::provideInCacheRangeList(oldRange, varStrategyRangesRequested.second); if (!notInCacheRangeList.empty()) { varRequest.m_RangeRequested = varStrategyRangesRequested.first; @@ -620,8 +614,8 @@ void VariableController::VariableControllerPrivate::processRequest(std::shared_p varProvider); if (!varRequestIdCanceled.isNull()) { - qCDebug(LOG_VariableAcquisitionWorker()) << tr("vsarRequestIdCanceled: ") - << varRequestIdCanceled; + qCInfo(LOG_VariableAcquisitionWorker()) << tr("varRequestIdCanceled: ") + << varRequestIdCanceled; cancelVariableRequest(varRequestIdCanceled); } } @@ -804,26 +798,22 @@ void VariableController::VariableControllerPrivate::updateVariableRequest(QUuid var->setRange(varRequest.m_RangeRequested); var->setCacheRange(varRequest.m_CacheRangeRequested); qCDebug(LOG_VariableController()) << tr("1: onDataProvided") - << varRequest.m_RangeRequested; - qCDebug(LOG_VariableController()) << tr("2: onDataProvided") + << varRequest.m_RangeRequested << varRequest.m_CacheRangeRequested; + qCDebug(LOG_VariableController()) << tr("2: onDataProvided var points before") + << var->nbPoints() + << varRequest.m_DataSeries->nbPoints(); var->mergeDataSeries(varRequest.m_DataSeries); - qCDebug(LOG_VariableController()) << tr("3: onDataProvided"); + qCDebug(LOG_VariableController()) << tr("3: onDataProvided var points after") + << var->nbPoints(); - /// @todo MPL: confirm - // Variable update is notified only if there is no pending request for it - // if - // (m_VarIdToVarRequestIdQueueMap.count(varIdToVarRequestMapIt->first) - // == 0) { emit var->updated(); - // } } else { qCCritical(LOG_VariableController()) << tr("Impossible to update data to a null variable"); } } - // cleaning varRequestId qCDebug(LOG_VariableController()) << tr("0: erase REQUEST in MAP ?") << m_VarRequestIdToVarIdVarRequestMap.size(); @@ -852,6 +842,8 @@ void VariableController::VariableControllerPrivate::cancelVariableRequest(QUuid if (varRequestIdQueue.empty()) { varIdToVarRequestIdQueueMapIt = m_VarIdToVarRequestIdQueueMap.erase(varIdToVarRequestIdQueueMapIt); + + // Recompute if there is any next request based on the removed request. } else { ++varIdToVarRequestIdQueueMapIt; diff --git a/plugins/mockplugin/src/CosinusProvider.cpp b/plugins/mockplugin/src/CosinusProvider.cpp index fb09008..827f7f7 100644 --- a/plugins/mockplugin/src/CosinusProvider.cpp +++ b/plugins/mockplugin/src/CosinusProvider.cpp @@ -180,7 +180,6 @@ void CosinusProvider::requestDataLoading(QUuid acqIdentifier, for (const auto &dateTime : qAsConst(times)) { if (m_VariableToEnableProvider[acqIdentifier]) { auto scalarSeries = this->retrieveData(acqIdentifier, dateTime, parameters.m_Data); - qCDebug(LOG_CosinusProvider()) << "TORM: CosinusProvider::dataProvided"; emit dataProvided(acqIdentifier, scalarSeries, dateTime); } } @@ -188,7 +187,6 @@ void CosinusProvider::requestDataLoading(QUuid acqIdentifier, void CosinusProvider::requestDataAborting(QUuid acqIdentifier) { - // TODO: Add Mutex qCDebug(LOG_CosinusProvider()) << "CosinusProvider::requestDataAborting" << acqIdentifier << QThread::currentThread()->objectName(); auto it = m_VariableToEnableProvider.find(acqIdentifier); @@ -196,7 +194,7 @@ void CosinusProvider::requestDataAborting(QUuid acqIdentifier) it.value() = false; } else { - qCWarning(LOG_CosinusProvider()) + qCDebug(LOG_CosinusProvider()) << tr("Aborting progression of inexistant identifier detected !!!"); } }