##// END OF EJS Templates
Prevent the creation of events with the same product on 2 graphs
Prevent the creation of events with the same product on 2 graphs

File last commit:

r828:4ab17865bd43
r1258:fc19e80a8809
Show More
VariableAcquisitionWorker.cpp
416 lines | 15.8 KiB | text/x-c | CppLexer
/ core / src / Variable / VariableAcquisitionWorker.cpp
Add empty class VariableAcquisitionWorker
r527 #include "Variable/VariableAcquisitionWorker.h"
#include "Variable/Variable.h"
Implementation of V5 acquisition
r539
#include <Data/AcquisitionRequest.h>
#include <Data/SqpRange.h>
Add empty class VariableAcquisitionWorker
r527 #include <unordered_map>
Implementation of V5 acquisition
r539 #include <utility>
Add empty class VariableAcquisitionWorker
r527
Implementation of V5 acquisition
r539 #include <QMutex>
#include <QReadWriteLock>
Add empty class VariableAcquisitionWorker
r527 #include <QThread>
Implementation of V5 acquisition
r539
Implementation of progression
r750 #include <cmath>
Add empty class VariableAcquisitionWorker
r527 Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker")
struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate {
Implementation of abort mechanism
r754 explicit VariableAcquisitionWorkerPrivate(VariableAcquisitionWorker *parent)
: m_Lock{QReadWriteLock::Recursive}, q{parent}
{
}
Implementation of V5 acquisition
r539
void lockRead() { m_Lock.lockForRead(); }
void lockWrite() { m_Lock.lockForWrite(); }
void unlock() { m_Lock.unlock(); }
void removeVariableRequest(QUuid vIdentifier);
Implementation of abort mechanism
r754 /// Remove the current request and execute the next one if exist
void updateToNextRequest(QUuid vIdentifier);
Implementation of varRequestId cancel for the worker
r813 /// Remove and/or abort all AcqRequest in link with varRequestId
void cancelVarRequest(QUuid varRequestId);
void removeAcqRequest(QUuid acqRequestId);
Implementation of V5 acquisition
r539 QMutex m_WorkingMutex;
QReadWriteLock m_Lock;
std::map<QUuid, QVector<AcquisitionDataPacket> > m_AcqIdentifierToAcqDataPacketVectorMap;
std::map<QUuid, AcquisitionRequest> m_AcqIdentifierToAcqRequestMap;
std::map<QUuid, std::pair<QUuid, QUuid> > m_VIdentifierToCurrrentAcqIdNextIdPairMap;
Implementation of abort mechanism
r754 VariableAcquisitionWorker *q;
Add empty class VariableAcquisitionWorker
r527 };
VariableAcquisitionWorker::VariableAcquisitionWorker(QObject *parent)
Implementation of abort mechanism
r754 : QObject{parent}, impl{spimpl::make_unique_impl<VariableAcquisitionWorkerPrivate>(this)}
Add empty class VariableAcquisitionWorker
r527 {
}
Implementation of V5 acquisition
r539 VariableAcquisitionWorker::~VariableAcquisitionWorker()
{
qCInfo(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker destruction")
<< QThread::currentThread();
this->waitForFinish();
}
push method of worker return the id of the nextRange which is canceled
r625 QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid vIdentifier,
SqpRange rangeRequested,
SqpRange cacheRangeRequested,
DataProviderParameters parameters,
std::shared_ptr<IDataProvider> provider)
Implementation of V5 acquisition
r539 {
Next range of a variable synchronized is now computed using:...
r627 qCDebug(LOG_VariableAcquisitionWorker())
Implementation of V5 acquisition
r539 << tr("TORM VariableAcquisitionWorker::pushVariableRequest ") << cacheRangeRequested;
push method of worker return the id of the nextRange which is canceled
r625 auto varRequestIdCanceled = QUuid();
Implementation of V5 acquisition
r539
// Request creation
auto acqRequest = AcquisitionRequest{};
Improve synchro robustness.
r815 qCDebug(LOG_VariableAcquisitionWorker()) << tr("PushVariableRequest ") << vIdentifier
<< varRequestId;
push method of worker return the id of the nextRange which is canceled
r625 acqRequest.m_VarRequestId = varRequestId;
Implementation of V5 acquisition
r539 acqRequest.m_vIdentifier = vIdentifier;
acqRequest.m_DataProviderParameters = parameters;
acqRequest.m_RangeRequested = rangeRequested;
acqRequest.m_CacheRangeRequested = cacheRangeRequested;
acqRequest.m_Size = parameters.m_Times.size();
acqRequest.m_Provider = provider;
push method of worker return the id of the nextRange which is canceled
r625
Implementation of V5 acquisition
r539 // Register request
impl->lockWrite();
impl->m_AcqIdentifierToAcqRequestMap.insert(
std::make_pair(acqRequest.m_AcqIdentifier, acqRequest));
auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
// A current request already exists, we can replace the next one
Implementation of varRequestId cancel for the worker
r813 auto oldAcqId = it->second.second;
auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(oldAcqId);
push method of worker return the id of the nextRange which is canceled
r625 if (acqIdentifierToAcqRequestMapIt != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
Implementation of varRequestId cancel for the worker
r813 auto oldAcqRequest = acqIdentifierToAcqRequestMapIt->second;
varRequestIdCanceled = oldAcqRequest.m_VarRequestId;
push method of worker return the id of the nextRange which is canceled
r625 }
Implementation of V5 acquisition
r539 it->second.second = acqRequest.m_AcqIdentifier;
impl->unlock();
Fix lock bug
r814
// remove old acqIdentifier from the worker
Improve synchro robustness.
r815 impl->cancelVarRequest(varRequestIdCanceled);
Remove regression for aborting request during processRequest method
r823 // impl->m_AcqIdentifierToAcqRequestMap.erase(oldAcqId);
Implementation of V5 acquisition
r539 }
else {
// First request for the variable, it must be stored and executed
impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.insert(
std::make_pair(vIdentifier, std::make_pair(acqRequest.m_AcqIdentifier, QUuid())));
impl->unlock();
QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
Q_ARG(QUuid, acqRequest.m_AcqIdentifier));
}
push method of worker return the id of the nextRange which is canceled
r625
return varRequestIdCanceled;
Implementation of V5 acquisition
r539 }
void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier)
{
Implementation of abort mechanism
r754 impl->lockRead();
auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
auto currentAcqId = it->second.first;
auto it = impl->m_AcqIdentifierToAcqRequestMap.find(currentAcqId);
if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
auto request = it->second;
impl->unlock();
// Remove the current request from the worker
impl->updateToNextRequest(vIdentifier);
// notify the request aborting to the provider
request.m_Provider->requestDataAborting(currentAcqId);
}
else {
impl->unlock();
qCWarning(LOG_VariableAcquisitionWorker())
<< tr("Impossible to abort an unknown acquisition request") << currentAcqId;
}
}
else {
impl->unlock();
}
Implementation of V5 acquisition
r539 }
void VariableAcquisitionWorker::onVariableRetrieveDataInProgress(QUuid acqIdentifier,
double progress)
{
Fix progression bug when aborting a request for Amda plugin
r758 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableRetrieveDataInProgress ")
<< acqIdentifier << progress;
Implementation of progression
r750 impl->lockRead();
auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
auto currentPartSize = (aIdToARit->second.m_Size != 0) ? 100 / aIdToARit->second.m_Size : 0;
auto currentPartProgress
= std::isnan(progress) ? 0.0 : (progress * currentPartSize) / 100.0;
auto currentAlreadyProgress = aIdToARit->second.m_Progression * currentPartSize;
auto finalProgression = currentAlreadyProgress + currentPartProgress;
emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, finalProgression);
Fix progression bug when aborting a request for Amda plugin
r758 qCDebug(LOG_VariableAcquisitionWorker())
Implementation of automatic cancel for request that failed
r761 << tr("TORM: onVariableRetrieveDataInProgress ")
<< QThread::currentThread()->objectName() << aIdToARit->second.m_vIdentifier
Fix progression bug when aborting a request for Amda plugin
r758 << currentPartSize << currentAlreadyProgress << currentPartProgress << finalProgression;
Implementation of progression
r750 if (finalProgression == 100.0) {
emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, 0.0);
}
}
impl->unlock();
Implementation of V5 acquisition
r539 }
Implementation of automatic cancel for request that failed
r761 void VariableAcquisitionWorker::onVariableAcquisitionFailed(QUuid acqIdentifier)
{
qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
<< QThread::currentThread();
impl->lockRead();
auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
auto request = it->second;
impl->unlock();
Run request canceling when unit isn"t found in the file. Clean log.
r828 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
<< acqIdentifier << request.m_vIdentifier
<< QThread::currentThread();
Implementation of automatic cancel for request that failed
r761 emit variableCanceledRequested(request.m_vIdentifier);
}
else {
impl->unlock();
// TODO log no acqIdentifier recognized
}
}
Implementation of V5 acquisition
r539 void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier,
std::shared_ptr<IDataSeries> dataSeries,
SqpRange dataRangeAcquired)
Add empty class VariableAcquisitionWorker
r527 {
request is now passed by shared pointer instead of const &
r751 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableDataAcquired on range ")
<< acqIdentifier << dataRangeAcquired;
Implementation of V5 acquisition
r539 impl->lockWrite();
auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
// Store the result
auto dataPacket = AcquisitionDataPacket{};
dataPacket.m_Range = dataRangeAcquired;
dataPacket.m_DateSeries = dataSeries;
auto aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
// A current request result already exists, we can update it
aIdToADPVit->second.push_back(dataPacket);
}
else {
// First request result for the variable, it must be stored
impl->m_AcqIdentifierToAcqDataPacketVectorMap.insert(
std::make_pair(acqIdentifier, QVector<AcquisitionDataPacket>() << dataPacket));
}
// Decrement the counter of the request
auto &acqRequest = aIdToARit->second;
Implementation of progression
r750 acqRequest.m_Progression = acqRequest.m_Progression + 1;
Implementation of V5 acquisition
r539
// if the counter is 0, we can return data then run the next request if it exists and
// removed the finished request
Implementation of progression
r750 if (acqRequest.m_Size == acqRequest.m_Progression) {
Implementation of varRequestId cancel for the worker
r813 auto varId = acqRequest.m_vIdentifier;
auto rangeRequested = acqRequest.m_RangeRequested;
auto cacheRangeRequested = acqRequest.m_CacheRangeRequested;
Implementation of V5 acquisition
r539 // Return the data
aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
Implementation of varRequestId cancel for the worker
r813 emit dataProvided(varId, rangeRequested, cacheRangeRequested, aIdToADPVit->second);
Implementation of V5 acquisition
r539 }
Implementation of varRequestId cancel for the worker
r813 impl->unlock();
Implementation of V5 acquisition
r539
Fix progression bug when aborting a request for Amda plugin
r758 // Update to the next request
impl->updateToNextRequest(acqRequest.m_vIdentifier);
Implementation of V5 acquisition
r539 }
Implementation of varRequestId cancel for the worker
r813 else {
impl->unlock();
}
Implementation of V5 acquisition
r539 }
else {
Implementation of varRequestId cancel for the worker
r813 impl->unlock();
Implementation of abort mechanism
r754 qCWarning(LOG_VariableAcquisitionWorker())
<< tr("Impossible to retrieve AcquisitionRequest for the incoming data.");
Implementation of V5 acquisition
r539 }
}
Implementation of progression
r750 void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier)
{
qCDebug(LOG_VariableAcquisitionWorker()) << tr("onExecuteRequest") << QThread::currentThread();
impl->lockRead();
auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
auto request = it->second;
impl->unlock();
emit variableRequestInProgress(request.m_vIdentifier, 0.1);
request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters);
}
else {
impl->unlock();
// TODO log no acqIdentifier recognized
}
}
Implementation of V5 acquisition
r539 void VariableAcquisitionWorker::initialize()
{
qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init")
<< QThread::currentThread();
impl->m_WorkingMutex.lock();
qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init END");
}
void VariableAcquisitionWorker::finalize()
{
impl->m_WorkingMutex.unlock();
}
void VariableAcquisitionWorker::waitForFinish()
{
QMutexLocker locker{&impl->m_WorkingMutex};
}
void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariableRequest(
QUuid vIdentifier)
{
lockWrite();
auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
// A current request already exists, we can replace the next one
m_AcqIdentifierToAcqRequestMap.erase(it->second.first);
m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.first);
m_AcqIdentifierToAcqRequestMap.erase(it->second.second);
m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.second);
}
m_VIdentifierToCurrrentAcqIdNextIdPairMap.erase(vIdentifier);
unlock();
Add empty class VariableAcquisitionWorker
r527 }
Implementation of abort mechanism
r754
void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::updateToNextRequest(
QUuid vIdentifier)
{
Implementation of varRequestId cancel for the worker
r813 lockRead();
Implementation of abort mechanism
r754 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
if (it->second.second.isNull()) {
Implementation of varRequestId cancel for the worker
r813 unlock();
Implementation of abort mechanism
r754 // There is no next request, we can remove the variable request
removeVariableRequest(vIdentifier);
}
else {
auto acqIdentifierToRemove = it->second.first;
// Move the next request to the current request
Implementation of varRequestId cancel for the worker
r813 auto nextRequestId = it->second.second;
it->second.first = nextRequestId;
Implementation of abort mechanism
r754 it->second.second = QUuid();
Implementation of varRequestId cancel for the worker
r813 unlock();
Implementation of abort mechanism
r754 // Remove AcquisitionRequest and results;
Implementation of varRequestId cancel for the worker
r813 lockWrite();
Implementation of abort mechanism
r754 m_AcqIdentifierToAcqRequestMap.erase(acqIdentifierToRemove);
m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqIdentifierToRemove);
Implementation of varRequestId cancel for the worker
r813 unlock();
Implementation of abort mechanism
r754 // Execute the current request
QMetaObject::invokeMethod(q, "onExecuteRequest", Qt::QueuedConnection,
Implementation of varRequestId cancel for the worker
r813 Q_ARG(QUuid, nextRequestId));
Implementation of abort mechanism
r754 }
}
else {
Implementation of varRequestId cancel for the worker
r813 unlock();
Implementation of abort mechanism
r754 qCCritical(LOG_VariableAcquisitionWorker())
<< tr("Impossible to execute the acquisition on an unfound variable ");
}
}
Implementation of varRequestId cancel for the worker
r813
void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::cancelVarRequest(
QUuid varRequestId)
{
Remove old log
r817 qCDebug(LOG_VariableAcquisitionWorker())
Improve synchro robustness.
r815 << "VariableAcquisitionWorkerPrivate::cancelVarRequest 0";
Implementation of varRequestId cancel for the worker
r813 lockRead();
// get all AcqIdentifier in link with varRequestId
QVector<QUuid> acqIdsToRm;
auto cend = m_AcqIdentifierToAcqRequestMap.cend();
for (auto it = m_AcqIdentifierToAcqRequestMap.cbegin(); it != cend; ++it) {
if (it->second.m_VarRequestId == varRequestId) {
acqIdsToRm << it->first;
}
}
unlock();
// run aborting or removing of acqIdsToRm
for (auto acqId : acqIdsToRm) {
removeAcqRequest(acqId);
}
Remove old log
r817 qCDebug(LOG_VariableAcquisitionWorker())
Improve synchro robustness.
r815 << "VariableAcquisitionWorkerPrivate::cancelVarRequest end";
Implementation of varRequestId cancel for the worker
r813 }
void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeAcqRequest(
QUuid acqRequestId)
{
Improve synchro robustness.
r815 qCDebug(LOG_VariableAcquisitionWorker())
Remove old log
r817 << "VariableAcquisitionWorkerPrivate::removeAcqRequest";
Implementation of varRequestId cancel for the worker
r813 QUuid vIdentifier;
std::shared_ptr<IDataProvider> provider;
lockRead();
auto acqIt = m_AcqIdentifierToAcqRequestMap.find(acqRequestId);
if (acqIt != m_AcqIdentifierToAcqRequestMap.cend()) {
vIdentifier = acqIt->second.m_vIdentifier;
provider = acqIt->second.m_Provider;
auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
if (it->second.first == acqRequestId) {
// acqRequest is currently running -> let's aborting it
unlock();
// Remove the current request from the worker
updateToNextRequest(vIdentifier);
// notify the request aborting to the provider
provider->requestDataAborting(acqRequestId);
}
else if (it->second.second == acqRequestId) {
it->second.second = QUuid();
unlock();
}
else {
unlock();
}
}
else {
unlock();
}
}
else {
unlock();
}
lockWrite();
m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqRequestId);
m_AcqIdentifierToAcqRequestMap.erase(acqRequestId);
unlock();
Improve synchro robustness.
r815 qCDebug(LOG_VariableAcquisitionWorker())
Remove old log
r817 << "VariableAcquisitionWorkerPrivate::removeAcqRequest END";
Implementation of varRequestId cancel for the worker
r813 }