diff --git a/app/src/Main.cpp b/app/src/Main.cpp index 30db0fe..ea4abc5 100644 --- a/app/src/Main.cpp +++ b/app/src/Main.cpp @@ -41,6 +41,14 @@ const auto PLUGIN_DIRECTORY_NAME = QStringLiteral("plugins"); int main(int argc, char *argv[]) { + QLoggingCategory::setFilterRules( + "*.warning=false\n" + "*.info=false\n" + "*.debug=false\n" + "AmdaProvider.info=true\n" + "NetworkController.info=true\n" + "VariableAcquisitionWorker.info=true\n"); + SqpApplication a{argc, argv}; SqpApplication::setOrganizationName("LPP"); SqpApplication::setOrganizationDomain("lpp.fr"); diff --git a/core/include/Data/AcquisitionRequest.h b/core/include/Data/AcquisitionRequest.h index d9d338a..859078d 100644 --- a/core/include/Data/AcquisitionRequest.h +++ b/core/include/Data/AcquisitionRequest.h @@ -7,6 +7,7 @@ #include #include +#include #include #include #include @@ -31,6 +32,7 @@ struct AcquisitionRequest { SqpRange m_CacheRangeRequested; int m_Size; std::shared_ptr m_Provider; + QVector m_DataPackets; }; SCIQLOP_REGISTER_META_TYPE(ACQUISITIONREQUEST_REGISTRY, AcquisitionRequest) diff --git a/core/include/Data/IDataProvider.h b/core/include/Data/IDataProvider.h index 55dd069..4ee4c7e 100644 --- a/core/include/Data/IDataProvider.h +++ b/core/include/Data/IDataProvider.h @@ -60,6 +60,7 @@ signals: */ void dataProvidedProgress(QUuid acqIdentifier, double progress); + void requestCanceled(QUuid acqIdentifier); /** * @brief requestConstructed send a request for the data identified by acqIdentifier diff --git a/core/include/Variable/VariableAcquisitionWorker.h b/core/include/Variable/VariableAcquisitionWorker.h index 25f7f0b..967886f 100644 --- a/core/include/Variable/VariableAcquisitionWorker.h +++ b/core/include/Variable/VariableAcquisitionWorker.h @@ -46,6 +46,7 @@ signals: public slots: void onVariableDataAcquired(QUuid acqIdentifier, std::shared_ptr dataSeries, SqpRange dataRangeAcquired); + void onVariableAcquisitionCanceled(QUuid acqIdentifier); void onVariableRetrieveDataInProgress(QUuid acqIdentifier, double progress); private: diff --git a/core/src/Network/NetworkController.cpp b/core/src/Network/NetworkController.cpp index dfbac8b..e348b95 100644 --- a/core/src/Network/NetworkController.cpp +++ b/core/src/Network/NetworkController.cpp @@ -33,19 +33,18 @@ NetworkController::NetworkController(QObject *parent) void NetworkController::onProcessRequested(const QNetworkRequest &request, QUuid identifier, std::function callback) { - qCDebug(LOG_NetworkController()) - << tr("NetworkController registered") << QThread::currentThread()->objectName(); - auto reply = impl->m_AccessManager->get(request); - // Store the couple reply id impl->lockWrite(); + qCInfo(LOG_NetworkController()) << tr("NetworkController registered") << identifier; + auto reply = impl->m_AccessManager->get(request); + impl->m_NetworkReplyToVariableId[reply] = identifier; + qCInfo(LOG_NetworkController()) << tr("Reply stored") << identifier; impl->unlock(); auto onReplyFinished = [reply, this, identifier, callback]() { - qCDebug(LOG_NetworkController()) - << tr("NetworkController onReplyFinished") << QThread::currentThread() << reply; + qCInfo(LOG_NetworkController()) << tr("NetworkController onReplyFinished") << identifier; impl->lockRead(); auto it = impl->m_NetworkReplyToVariableId.find(reply); impl->unlock(); @@ -112,18 +111,28 @@ void NetworkController::finalize() void NetworkController::onReplyCanceled(QUuid identifier) { - auto findReply = [identifier](const auto &entry) { return identifier == entry.second; }; + // auto findReply = [identifier](const auto &entry) { return identifier == entry.second; }; qCDebug(LOG_NetworkController()) << tr("NetworkController onReplyCanceled") << QThread::currentThread(); + qCInfo(LOG_NetworkController()) << tr("Calls cancel on ") << identifier; - impl->lockRead(); - auto end = impl->m_NetworkReplyToVariableId.cend(); - auto it = std::find_if(impl->m_NetworkReplyToVariableId.cbegin(), end, findReply); - impl->unlock(); - if (it != end) { - it->first->abort(); + impl->lockWrite(); + for (auto &reply : impl->m_NetworkReplyToVariableId) { + if (reply.second == identifier) { + qCInfo(LOG_NetworkController()) << tr("Cancel on ") << identifier << "applied"; + reply.first->abort(); + } } + impl->unlock(); + + // auto end = impl->m_NetworkReplyToVariableId.cend(); + // auto it = std::find_if(impl->m_NetworkReplyToVariableId.cbegin(), end, findReply); + // impl->unlock(); + // if (it != end) { + // qCInfo(LOG_NetworkController()) << tr("Cancel on ") << identifier << "applied"; + // it->first->abort(); + // } qCDebug(LOG_NetworkController()) << tr("NetworkController onReplyCanceled END") << QThread::currentThread(); } diff --git a/core/src/Variable/VariableAcquisitionWorker.cpp b/core/src/Variable/VariableAcquisitionWorker.cpp index fd75f09..c0b277d 100644 --- a/core/src/Variable/VariableAcquisitionWorker.cpp +++ b/core/src/Variable/VariableAcquisitionWorker.cpp @@ -11,9 +11,17 @@ #include #include #include +#include Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker") +namespace { + +using AcquisitionId = QUuid; +using VariableId = QUuid; + +} // namespace + struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate { explicit VariableAcquisitionWorkerPrivate() : m_Lock{QReadWriteLock::Recursive} {} @@ -22,14 +30,18 @@ struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate { void lockWrite() { m_Lock.lockForWrite(); } void unlock() { m_Lock.unlock(); } + void eraseRequest(AcquisitionId id); + std::map::iterator insertRequest(AcquisitionId id, + AcquisitionRequest request); + void removeVariableRequest(QUuid vIdentifier); QMutex m_WorkingMutex; QReadWriteLock m_Lock; - std::map > m_AcqIdentifierToAcqDataPacketVectorMap; - std::map m_AcqIdentifierToAcqRequestMap; - std::map > m_VIdentifierToCurrrentAcqIdNextIdPairMap; + /// Current acquisitions (by variable) + std::map m_Requests; + std::map m_RequestsIndex; }; @@ -66,34 +78,33 @@ QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid v acqRequest.m_Size = parameters.m_Times.size(); acqRequest.m_Provider = provider; - - // Register request impl->lockWrite(); - impl->m_AcqIdentifierToAcqRequestMap.insert( - std::make_pair(acqRequest.m_AcqIdentifier, acqRequest)); - - auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier); - if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) { - // A current request already exists, we can replace the next one - auto nextAcqId = it->second.second; - auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(nextAcqId); - if (acqIdentifierToAcqRequestMapIt != impl->m_AcqIdentifierToAcqRequestMap.cend()) { - auto request = acqIdentifierToAcqRequestMapIt->second; - varRequestIdCanceled = request.m_VarRequestId; - } - it->second.second = acqRequest.m_AcqIdentifier; - impl->unlock(); + // Checks if there is a current acquisition on variable + auto currentRequestIt = impl->m_RequestsIndex.find(vIdentifier); + if (currentRequestIt != impl->m_RequestsIndex.cend()) { + auto request = currentRequestIt->second; + QtConcurrent::run( + [ provider = request->m_Provider, acqIdentifier = request->m_AcqIdentifier ]() { + provider->requestDataAborting(acqIdentifier); + }); + varRequestIdCanceled = request->m_VarRequestId; + + impl->eraseRequest(request->m_AcqIdentifier); } - else { - // First request for the variable, it must be stored and executed - impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.insert( - std::make_pair(vIdentifier, std::make_pair(acqRequest.m_AcqIdentifier, QUuid()))); - impl->unlock(); + // Sets the new acquisition request as the current request for the variable + auto newRequestIt = impl->insertRequest(acqRequest.m_AcqIdentifier, std::move(acqRequest)); + if (newRequestIt != impl->m_Requests.end()) { + qCInfo(LOG_VariableAcquisitionWorker()) << "EXECUTE REQUEST" << acqRequest.m_AcqIdentifier; QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection, - Q_ARG(QUuid, acqRequest.m_AcqIdentifier)); + Q_ARG(QUuid, newRequestIt->first)); } + else { + /// @todo ALX : log + } + + impl->unlock(); return varRequestIdCanceled; } @@ -116,71 +127,28 @@ void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier, qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableDataAcquired on range ") << acqIdentifier << dataRangeAcquired; impl->lockWrite(); - auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier); - if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) { - // Store the result - auto dataPacket = AcquisitionDataPacket{}; - dataPacket.m_Range = dataRangeAcquired; - dataPacket.m_DateSeries = dataSeries; - - auto aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier); - if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) { - // A current request result already exists, we can update it - aIdToADPVit->second.push_back(dataPacket); - } - else { - // First request result for the variable, it must be stored - impl->m_AcqIdentifierToAcqDataPacketVectorMap.insert( - std::make_pair(acqIdentifier, QVector() << dataPacket)); - } + auto it = impl->m_Requests.find(acqIdentifier); + if (it != impl->m_Requests.cend()) { + auto &request = it->second; - // Decrement the counter of the request - auto &acqRequest = aIdToARit->second; - acqRequest.m_Size = acqRequest.m_Size - 1; - - // if the counter is 0, we can return data then run the next request if it exists and - // removed the finished request - if (acqRequest.m_Size == 0) { - // Return the data - aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier); - if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) { - emit dataProvided(acqRequest.m_vIdentifier, acqRequest.m_RangeRequested, - acqRequest.m_CacheRangeRequested, aIdToADPVit->second); - } - - // Execute the next one - auto it - = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(acqRequest.m_vIdentifier); - - if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) { - if (it->second.second.isNull()) { - // There is no next request, we can remove the variable request - impl->removeVariableRequest(acqRequest.m_vIdentifier); - } - else { - auto acqIdentifierToRemove = it->second.first; - // Move the next request to the current request - it->second.first = it->second.second; - it->second.second = QUuid(); - // Remove AcquisitionRequest and results; - impl->m_AcqIdentifierToAcqRequestMap.erase(acqIdentifierToRemove); - impl->m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqIdentifierToRemove); - // Execute the current request - QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection, - Q_ARG(QUuid, it->second.first)); - } - } - else { - qCCritical(LOG_VariableAcquisitionWorker()) - << tr("Impossible to execute the acquisition on an unfound variable "); - } + // Store the result + auto dataPacket = AcquisitionDataPacket{dataSeries, dataRangeAcquired}; + request.m_DataPackets.push_back(dataPacket); + request.m_Size = request.m_Size - 1; + + if (request.m_Size == 0) { + emit dataProvided(request.m_vIdentifier, request.m_RangeRequested, + request.m_CacheRangeRequested, request.m_DataPackets); + impl->eraseRequest(acqIdentifier); } } - else { - qCCritical(LOG_VariableAcquisitionWorker()) - << tr("Impossible to retrieve AcquisitionRequest for the incoming data"); - } + impl->unlock(); +} + +void VariableAcquisitionWorker::onVariableAcquisitionCanceled(QUuid acqIdentifier) +{ + impl->lockWrite(); impl->unlock(); } @@ -202,32 +170,51 @@ void VariableAcquisitionWorker::waitForFinish() QMutexLocker locker{&impl->m_WorkingMutex}; } -void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariableRequest( - QUuid vIdentifier) + +void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::eraseRequest(AcquisitionId id) { - lockWrite(); - auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier); + auto it = m_Requests.find(id); + if (it != m_Requests.end()) { + // Removes from index + m_RequestsIndex.erase(it->second.m_vIdentifier); - if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) { - // A current request already exists, we can replace the next one + // Removes request + m_Requests.erase(it); + } +} - m_AcqIdentifierToAcqRequestMap.erase(it->second.first); - m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.first); +std::map::iterator +VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::insertRequest( + AcquisitionId id, AcquisitionRequest request) +{ + // Inserts request + auto variableId = request.m_vIdentifier; + auto result = m_Requests.insert(std::make_pair(id, std::move(request))); + + if (result.second) { + // Inserts index + m_RequestsIndex[variableId] = &result.first->second; - m_AcqIdentifierToAcqRequestMap.erase(it->second.second); - m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.second); + return result.first; } - m_VIdentifierToCurrrentAcqIdNextIdPairMap.erase(vIdentifier); - unlock(); + else { + return m_Requests.end(); + } +} + +void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariableRequest( + QUuid vIdentifier) +{ + /// @todo ALX + // m_Acquisitions.erase(vIdentifier); } void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier) { - qCDebug(LOG_VariableAcquisitionWorker()) << tr("onExecuteRequest") << QThread::currentThread(); impl->lockRead(); - auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier); - if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) { - auto request = it->second; + auto it = impl->m_Requests.find(acqIdentifier); + if (it != impl->m_Requests.cend()) { + auto &request = it->second; impl->unlock(); request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters); } diff --git a/core/src/Variable/VariableController.cpp b/core/src/Variable/VariableController.cpp index 86d597f..c685d54 100644 --- a/core/src/Variable/VariableController.cpp +++ b/core/src/Variable/VariableController.cpp @@ -313,6 +313,7 @@ void VariableController::onDataProvided(QUuid vIdentifier, const SqpRange &range const SqpRange &cacheRangeRequested, QVector dataAcquired) { + qCInfo(LOG_VariableController()) << "VariableController::onDataProvided"; auto retrievedDataSeries = impl->retrieveDataSeries(dataAcquired); auto varRequestId = impl->acceptVariableRequest(vIdentifier, retrievedDataSeries); if (!varRequestId.isNull()) { @@ -431,8 +432,8 @@ void VariableController::onRequestDataLoading(QVector // For the other, we ask the provider to give them. auto varRequestId = QUuid::createUuid(); - qCInfo(LOG_VariableController()) << "VariableController::onRequestDataLoading" - << QThread::currentThread()->objectName() << varRequestId; + qCDebug(LOG_VariableController()) << "VariableController::onRequestDataLoading" + << QThread::currentThread()->objectName() << varRequestId; for (const auto &var : variables) { qCDebug(LOG_VariableController()) << "processRequest for" << var->name() << varRequestId; @@ -549,10 +550,10 @@ void VariableController::VariableControllerPrivate::processRequest(std::shared_p if (!notInCacheRangeList.empty()) { varRequest.m_RangeRequested = varStrategyRangesRequested.first; varRequest.m_CacheRangeRequested = varStrategyRangesRequested.second; - qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM processRequest RR ") << rangeRequested; - qCDebug(LOG_VariableAcquisitionWorker()) + qCInfo(LOG_VariableController()) << tr("TORM processRequest RR ") << rangeRequested; + qCInfo(LOG_VariableController()) << tr("TORM processRequest R ") << varStrategyRangesRequested.first; - qCDebug(LOG_VariableAcquisitionWorker()) + qCInfo(LOG_VariableController()) << tr("TORM processRequest CR ") << varStrategyRangesRequested.second; // store VarRequest storeVariableRequest(varId, varRequestId, varRequest); @@ -566,7 +567,7 @@ void VariableController::VariableControllerPrivate::processRequest(std::shared_p varProvider); if (!varRequestIdCanceled.isNull()) { - qCInfo(LOG_VariableAcquisitionWorker()) + qCInfo(LOG_VariableController()) << tr("varRequestIdCanceled: ") << varRequestIdCanceled; cancelVariableRequest(varRequestIdCanceled); } @@ -652,12 +653,12 @@ void VariableController::VariableControllerPrivate::storeVariableRequest( auto varIdToVarRequestIdQueueMapIt = m_VarIdToVarRequestIdQueueMap.find(varId); if (varIdToVarRequestIdQueueMapIt == m_VarIdToVarRequestIdQueueMap.cend()) { auto varRequestIdQueue = std::deque{}; - qCDebug(LOG_VariableController()) << tr("Store REQUEST in QUEUE"); + qCInfo(LOG_VariableController()) << tr("Store REQUEST in QUEUE"); varRequestIdQueue.push_back(varRequestId); m_VarIdToVarRequestIdQueueMap.insert(std::make_pair(varId, std::move(varRequestIdQueue))); } else { - qCDebug(LOG_VariableController()) << tr("Store REQUEST in EXISTING QUEUE"); + qCInfo(LOG_VariableController()) << tr("Store REQUEST in EXISTING QUEUE"); auto &varRequestIdQueue = varIdToVarRequestIdQueueMapIt->second; varRequestIdQueue.push_back(varRequestId); } @@ -666,13 +667,13 @@ void VariableController::VariableControllerPrivate::storeVariableRequest( if (varRequestIdToVarIdVarRequestMapIt == m_VarRequestIdToVarIdVarRequestMap.cend()) { auto varIdToVarRequestMap = std::map{}; varIdToVarRequestMap.insert(std::make_pair(varId, varRequest)); - qCDebug(LOG_VariableController()) << tr("Store REQUESTID in MAP"); + qCInfo(LOG_VariableController()) << tr("Store REQUESTID in MAP"); m_VarRequestIdToVarIdVarRequestMap.insert( std::make_pair(varRequestId, std::move(varIdToVarRequestMap))); } else { auto &varIdToVarRequestMap = varRequestIdToVarIdVarRequestMapIt->second; - qCDebug(LOG_VariableController()) << tr("Store REQUESTID in EXISTING MAP"); + qCInfo(LOG_VariableController()) << tr("Store REQUESTID in EXISTING MAP"); varIdToVarRequestMap.insert(std::make_pair(varId, varRequest)); } } diff --git a/plugins/amda/src/AmdaProvider.cpp b/plugins/amda/src/AmdaProvider.cpp index d17e6f9..5d05458 100644 --- a/plugins/amda/src/AmdaProvider.cpp +++ b/plugins/amda/src/AmdaProvider.cpp @@ -13,6 +13,8 @@ #include #include +#include + Q_LOGGING_CATEGORY(LOG_AmdaProvider, "AmdaProvider") namespace { @@ -117,16 +119,19 @@ void AmdaProvider::retrieveData(QUuid token, const SqpRange &dateTime, const QVa auto endDate = dateFormat(dateTime.m_TEnd); auto url = QUrl{QString{AMDA_URL_FORMAT}.arg(startDate, endDate, productId)}; - qCInfo(LOG_AmdaProvider()) << tr("TORM AmdaProvider::retrieveData url:") << url; + qCInfo(LOG_AmdaProvider()) << token << tr("AmdaProvider::retrieveData url:") << url; auto tempFile = std::make_shared(); // LAMBDA auto httpDownloadFinished = [this, dateTime, tempFile, productValueType](QNetworkReply *reply, QUuid dataId) noexcept { + qCInfo(LOG_AmdaProvider()) << "Download url file completed" << dataId; + // Don't do anything if the reply was abort if (reply->error() != QNetworkReply::OperationCanceledError) { - + QElapsedTimer timer{}; + timer.start(); if (tempFile) { auto replyReadAll = reply->readAll(); if (!replyReadAll.isEmpty()) { @@ -137,6 +142,8 @@ void AmdaProvider::retrieveData(QUuid token, const SqpRange &dateTime, const QVa // Parse results file if (auto dataSeries = AmdaResultParser::readTxt(tempFile->fileName(), productValueType)) { + qCInfo(LOG_AmdaProvider()) << tr("Request was finished") << dataId << "(took" + << timer.elapsed() << "ms)"; emit dataProvided(dataId, dataSeries, dateTime); } else { @@ -144,28 +151,31 @@ void AmdaProvider::retrieveData(QUuid token, const SqpRange &dateTime, const QVa } } } + else { + qCInfo(LOG_AmdaProvider()) + << "Request was canceled when downloading result file" << dataId; + } + }; + auto httpFinishedLambda = [this, httpDownloadFinished, tempFile](QNetworkReply *reply, + QUuid dataId) noexcept { + qCInfo(LOG_AmdaProvider()) << "Generating url file completed" << dataId; + // Don't do anything if the reply was abort + if (reply->error() != QNetworkReply::OperationCanceledError) { + auto downloadFileUrl = QUrl{QString{reply->readAll()}}; + + // Executes request for downloading file // + // Creates destination file + if (tempFile->open()) { + // Executes request + emit requestConstructed(QNetworkRequest{downloadFileUrl}, dataId, + httpDownloadFinished); + } + } + else { + qCInfo(LOG_AmdaProvider()) << "Request was canceled when generating file url" << dataId; + } }; - auto httpFinishedLambda - = [this, httpDownloadFinished, tempFile](QNetworkReply *reply, QUuid dataId) noexcept { - - // Don't do anything if the reply was abort - if (reply->error() != QNetworkReply::OperationCanceledError) { - auto downloadFileUrl = QUrl{QString{reply->readAll()}}; - - - qCInfo(LOG_AmdaProvider()) - << tr("TORM AmdaProvider::retrieveData downloadFileUrl:") << downloadFileUrl; - // Executes request for downloading file // - - // Creates destination file - if (tempFile->open()) { - // Executes request - emit requestConstructed(QNetworkRequest{downloadFileUrl}, dataId, - httpDownloadFinished); - } - } - }; // //////////////// // // Executes request //