##// END OF EJS Templates
Product field now display products event list instead of only its size
Product field now display products event list instead of only its size

File last commit:

r832:4ab17865bd43
r1364:6ea80a9a6c65
Show More
VariableAcquisitionWorker.cpp
416 lines | 15.8 KiB | text/x-c | CppLexer
/ core / src / Variable / VariableAcquisitionWorker.cpp
Add empty class VariableAcquisitionWorker
r527 #include "Variable/VariableAcquisitionWorker.h"
#include "Variable/Variable.h"
Implementation of V5 acquisition
r539
#include <Data/AcquisitionRequest.h>
#include <Data/SqpRange.h>
Add empty class VariableAcquisitionWorker
r527 #include <unordered_map>
Implementation of V5 acquisition
r539 #include <utility>
Add empty class VariableAcquisitionWorker
r527
Implementation of V5 acquisition
r539 #include <QMutex>
#include <QReadWriteLock>
Add empty class VariableAcquisitionWorker
r527 #include <QThread>
Implementation of V5 acquisition
r539
Implementation of progression
r750 #include <cmath>
Add empty class VariableAcquisitionWorker
r527 Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker")
struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate {
Implementation of abort mechanism
r754 explicit VariableAcquisitionWorkerPrivate(VariableAcquisitionWorker *parent)
: m_Lock{QReadWriteLock::Recursive}, q{parent}
{
}
Implementation of V5 acquisition
r539
void lockRead() { m_Lock.lockForRead(); }
void lockWrite() { m_Lock.lockForWrite(); }
void unlock() { m_Lock.unlock(); }
void removeVariableRequest(QUuid vIdentifier);
Implementation of abort mechanism
r754 /// Remove the current request and execute the next one if exist
void updateToNextRequest(QUuid vIdentifier);
Implementation of varRequestId cancel for the worker
r818 /// Remove and/or abort all AcqRequest in link with varRequestId
void cancelVarRequest(QUuid varRequestId);
void removeAcqRequest(QUuid acqRequestId);
Implementation of V5 acquisition
r539 QMutex m_WorkingMutex;
QReadWriteLock m_Lock;
std::map<QUuid, QVector<AcquisitionDataPacket> > m_AcqIdentifierToAcqDataPacketVectorMap;
std::map<QUuid, AcquisitionRequest> m_AcqIdentifierToAcqRequestMap;
std::map<QUuid, std::pair<QUuid, QUuid> > m_VIdentifierToCurrrentAcqIdNextIdPairMap;
Implementation of abort mechanism
r754 VariableAcquisitionWorker *q;
Add empty class VariableAcquisitionWorker
r527 };
VariableAcquisitionWorker::VariableAcquisitionWorker(QObject *parent)
Implementation of abort mechanism
r754 : QObject{parent}, impl{spimpl::make_unique_impl<VariableAcquisitionWorkerPrivate>(this)}
Add empty class VariableAcquisitionWorker
r527 {
}
Implementation of V5 acquisition
r539 VariableAcquisitionWorker::~VariableAcquisitionWorker()
{
qCInfo(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker destruction")
<< QThread::currentThread();
this->waitForFinish();
}
push method of worker return the id of the nextRange which is canceled
r625 QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid vIdentifier,
SqpRange rangeRequested,
SqpRange cacheRangeRequested,
DataProviderParameters parameters,
std::shared_ptr<IDataProvider> provider)
Implementation of V5 acquisition
r539 {
Next range of a variable synchronized is now computed using:...
r627 qCDebug(LOG_VariableAcquisitionWorker())
Implementation of V5 acquisition
r539 << tr("TORM VariableAcquisitionWorker::pushVariableRequest ") << cacheRangeRequested;
push method of worker return the id of the nextRange which is canceled
r625 auto varRequestIdCanceled = QUuid();
Implementation of V5 acquisition
r539
// Request creation
auto acqRequest = AcquisitionRequest{};
Improve synchro robustness.
r820 qCDebug(LOG_VariableAcquisitionWorker()) << tr("PushVariableRequest ") << vIdentifier
<< varRequestId;
push method of worker return the id of the nextRange which is canceled
r625 acqRequest.m_VarRequestId = varRequestId;
Implementation of V5 acquisition
r539 acqRequest.m_vIdentifier = vIdentifier;
acqRequest.m_DataProviderParameters = parameters;
acqRequest.m_RangeRequested = rangeRequested;
acqRequest.m_CacheRangeRequested = cacheRangeRequested;
acqRequest.m_Size = parameters.m_Times.size();
acqRequest.m_Provider = provider;
push method of worker return the id of the nextRange which is canceled
r625
Implementation of V5 acquisition
r539 // Register request
impl->lockWrite();
impl->m_AcqIdentifierToAcqRequestMap.insert(
std::make_pair(acqRequest.m_AcqIdentifier, acqRequest));
auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
// A current request already exists, we can replace the next one
Implementation of varRequestId cancel for the worker
r818 auto oldAcqId = it->second.second;
auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(oldAcqId);
push method of worker return the id of the nextRange which is canceled
r625 if (acqIdentifierToAcqRequestMapIt != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
Implementation of varRequestId cancel for the worker
r818 auto oldAcqRequest = acqIdentifierToAcqRequestMapIt->second;
varRequestIdCanceled = oldAcqRequest.m_VarRequestId;
push method of worker return the id of the nextRange which is canceled
r625 }
Implementation of V5 acquisition
r539 it->second.second = acqRequest.m_AcqIdentifier;
impl->unlock();
Fix lock bug
r819
// remove old acqIdentifier from the worker
Improve synchro robustness.
r820 impl->cancelVarRequest(varRequestIdCanceled);
Remove regression for aborting request during processRequest method
r827 // impl->m_AcqIdentifierToAcqRequestMap.erase(oldAcqId);
Implementation of V5 acquisition
r539 }
else {
// First request for the variable, it must be stored and executed
impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.insert(
std::make_pair(vIdentifier, std::make_pair(acqRequest.m_AcqIdentifier, QUuid())));
impl->unlock();
QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
Q_ARG(QUuid, acqRequest.m_AcqIdentifier));
}
push method of worker return the id of the nextRange which is canceled
r625
return varRequestIdCanceled;
Implementation of V5 acquisition
r539 }
void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier)
{
Implementation of abort mechanism
r754 impl->lockRead();
auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
auto currentAcqId = it->second.first;
auto it = impl->m_AcqIdentifierToAcqRequestMap.find(currentAcqId);
if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
auto request = it->second;
impl->unlock();
// Remove the current request from the worker
impl->updateToNextRequest(vIdentifier);
// notify the request aborting to the provider
request.m_Provider->requestDataAborting(currentAcqId);
}
else {
impl->unlock();
qCWarning(LOG_VariableAcquisitionWorker())
<< tr("Impossible to abort an unknown acquisition request") << currentAcqId;
}
}
else {
impl->unlock();
}
Implementation of V5 acquisition
r539 }
void VariableAcquisitionWorker::onVariableRetrieveDataInProgress(QUuid acqIdentifier,
double progress)
{
Fix progression bug when aborting a request for Amda plugin
r758 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableRetrieveDataInProgress ")
<< acqIdentifier << progress;
Implementation of progression
r750 impl->lockRead();
auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
auto currentPartSize = (aIdToARit->second.m_Size != 0) ? 100 / aIdToARit->second.m_Size : 0;
auto currentPartProgress
= std::isnan(progress) ? 0.0 : (progress * currentPartSize) / 100.0;
auto currentAlreadyProgress = aIdToARit->second.m_Progression * currentPartSize;
auto finalProgression = currentAlreadyProgress + currentPartProgress;
emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, finalProgression);
Fix progression bug when aborting a request for Amda plugin
r758 qCDebug(LOG_VariableAcquisitionWorker())
Implementation of automatic cancel for request that failed
r761 << tr("TORM: onVariableRetrieveDataInProgress ")
<< QThread::currentThread()->objectName() << aIdToARit->second.m_vIdentifier
Fix progression bug when aborting a request for Amda plugin
r758 << currentPartSize << currentAlreadyProgress << currentPartProgress << finalProgression;
Implementation of progression
r750 if (finalProgression == 100.0) {
emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, 0.0);
}
}
impl->unlock();
Implementation of V5 acquisition
r539 }
Implementation of automatic cancel for request that failed
r761 void VariableAcquisitionWorker::onVariableAcquisitionFailed(QUuid acqIdentifier)
{
qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
<< QThread::currentThread();
impl->lockRead();
auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
auto request = it->second;
impl->unlock();
Run request canceling when unit isn"t found in the file. Clean log.
r832 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
<< acqIdentifier << request.m_vIdentifier
<< QThread::currentThread();
Implementation of automatic cancel for request that failed
r761 emit variableCanceledRequested(request.m_vIdentifier);
}
else {
impl->unlock();
// TODO log no acqIdentifier recognized
}
}
Implementation of V5 acquisition
r539 void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier,
std::shared_ptr<IDataSeries> dataSeries,
SqpRange dataRangeAcquired)
Add empty class VariableAcquisitionWorker
r527 {
request is now passed by shared pointer instead of const &
r751 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableDataAcquired on range ")
<< acqIdentifier << dataRangeAcquired;
Implementation of V5 acquisition
r539 impl->lockWrite();
auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
// Store the result
auto dataPacket = AcquisitionDataPacket{};
dataPacket.m_Range = dataRangeAcquired;
dataPacket.m_DateSeries = dataSeries;
auto aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
// A current request result already exists, we can update it
aIdToADPVit->second.push_back(dataPacket);
}
else {
// First request result for the variable, it must be stored
impl->m_AcqIdentifierToAcqDataPacketVectorMap.insert(
std::make_pair(acqIdentifier, QVector<AcquisitionDataPacket>() << dataPacket));
}
// Decrement the counter of the request
auto &acqRequest = aIdToARit->second;
Implementation of progression
r750 acqRequest.m_Progression = acqRequest.m_Progression + 1;
Implementation of V5 acquisition
r539
// if the counter is 0, we can return data then run the next request if it exists and
// removed the finished request
Implementation of progression
r750 if (acqRequest.m_Size == acqRequest.m_Progression) {
Implementation of varRequestId cancel for the worker
r818 auto varId = acqRequest.m_vIdentifier;
auto rangeRequested = acqRequest.m_RangeRequested;
auto cacheRangeRequested = acqRequest.m_CacheRangeRequested;
Implementation of V5 acquisition
r539 // Return the data
aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
Implementation of varRequestId cancel for the worker
r818 emit dataProvided(varId, rangeRequested, cacheRangeRequested, aIdToADPVit->second);
Implementation of V5 acquisition
r539 }
Implementation of varRequestId cancel for the worker
r818 impl->unlock();
Implementation of V5 acquisition
r539
Fix progression bug when aborting a request for Amda plugin
r758 // Update to the next request
impl->updateToNextRequest(acqRequest.m_vIdentifier);
Implementation of V5 acquisition
r539 }
Implementation of varRequestId cancel for the worker
r818 else {
impl->unlock();
}
Implementation of V5 acquisition
r539 }
else {
Implementation of varRequestId cancel for the worker
r818 impl->unlock();
Implementation of abort mechanism
r754 qCWarning(LOG_VariableAcquisitionWorker())
<< tr("Impossible to retrieve AcquisitionRequest for the incoming data.");
Implementation of V5 acquisition
r539 }
}
Implementation of progression
r750 void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier)
{
qCDebug(LOG_VariableAcquisitionWorker()) << tr("onExecuteRequest") << QThread::currentThread();
impl->lockRead();
auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
auto request = it->second;
impl->unlock();
emit variableRequestInProgress(request.m_vIdentifier, 0.1);
request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters);
}
else {
impl->unlock();
// TODO log no acqIdentifier recognized
}
}
Implementation of V5 acquisition
r539 void VariableAcquisitionWorker::initialize()
{
qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init")
<< QThread::currentThread();
impl->m_WorkingMutex.lock();
qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init END");
}
void VariableAcquisitionWorker::finalize()
{
impl->m_WorkingMutex.unlock();
}
void VariableAcquisitionWorker::waitForFinish()
{
QMutexLocker locker{&impl->m_WorkingMutex};
}
void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariableRequest(
QUuid vIdentifier)
{
lockWrite();
auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
// A current request already exists, we can replace the next one
m_AcqIdentifierToAcqRequestMap.erase(it->second.first);
m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.first);
m_AcqIdentifierToAcqRequestMap.erase(it->second.second);
m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.second);
}
m_VIdentifierToCurrrentAcqIdNextIdPairMap.erase(vIdentifier);
unlock();
Add empty class VariableAcquisitionWorker
r527 }
Implementation of abort mechanism
r754
void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::updateToNextRequest(
QUuid vIdentifier)
{
Implementation of varRequestId cancel for the worker
r818 lockRead();
Implementation of abort mechanism
r754 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
if (it->second.second.isNull()) {
Implementation of varRequestId cancel for the worker
r818 unlock();
Implementation of abort mechanism
r754 // There is no next request, we can remove the variable request
removeVariableRequest(vIdentifier);
}
else {
auto acqIdentifierToRemove = it->second.first;
// Move the next request to the current request
Implementation of varRequestId cancel for the worker
r818 auto nextRequestId = it->second.second;
it->second.first = nextRequestId;
Implementation of abort mechanism
r754 it->second.second = QUuid();
Implementation of varRequestId cancel for the worker
r818 unlock();
Implementation of abort mechanism
r754 // Remove AcquisitionRequest and results;
Implementation of varRequestId cancel for the worker
r818 lockWrite();
Implementation of abort mechanism
r754 m_AcqIdentifierToAcqRequestMap.erase(acqIdentifierToRemove);
m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqIdentifierToRemove);
Implementation of varRequestId cancel for the worker
r818 unlock();
Implementation of abort mechanism
r754 // Execute the current request
QMetaObject::invokeMethod(q, "onExecuteRequest", Qt::QueuedConnection,
Implementation of varRequestId cancel for the worker
r818 Q_ARG(QUuid, nextRequestId));
Implementation of abort mechanism
r754 }
}
else {
Implementation of varRequestId cancel for the worker
r818 unlock();
Implementation of abort mechanism
r754 qCCritical(LOG_VariableAcquisitionWorker())
<< tr("Impossible to execute the acquisition on an unfound variable ");
}
}
Implementation of varRequestId cancel for the worker
r818
void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::cancelVarRequest(
QUuid varRequestId)
{
Remove old log
r822 qCDebug(LOG_VariableAcquisitionWorker())
Improve synchro robustness.
r820 << "VariableAcquisitionWorkerPrivate::cancelVarRequest 0";
Implementation of varRequestId cancel for the worker
r818 lockRead();
// get all AcqIdentifier in link with varRequestId
QVector<QUuid> acqIdsToRm;
auto cend = m_AcqIdentifierToAcqRequestMap.cend();
for (auto it = m_AcqIdentifierToAcqRequestMap.cbegin(); it != cend; ++it) {
if (it->second.m_VarRequestId == varRequestId) {
acqIdsToRm << it->first;
}
}
unlock();
// run aborting or removing of acqIdsToRm
for (auto acqId : acqIdsToRm) {
removeAcqRequest(acqId);
}
Remove old log
r822 qCDebug(LOG_VariableAcquisitionWorker())
Improve synchro robustness.
r820 << "VariableAcquisitionWorkerPrivate::cancelVarRequest end";
Implementation of varRequestId cancel for the worker
r818 }
void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeAcqRequest(
QUuid acqRequestId)
{
Improve synchro robustness.
r820 qCDebug(LOG_VariableAcquisitionWorker())
Remove old log
r822 << "VariableAcquisitionWorkerPrivate::removeAcqRequest";
Implementation of varRequestId cancel for the worker
r818 QUuid vIdentifier;
std::shared_ptr<IDataProvider> provider;
lockRead();
auto acqIt = m_AcqIdentifierToAcqRequestMap.find(acqRequestId);
if (acqIt != m_AcqIdentifierToAcqRequestMap.cend()) {
vIdentifier = acqIt->second.m_vIdentifier;
provider = acqIt->second.m_Provider;
auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
if (it->second.first == acqRequestId) {
// acqRequest is currently running -> let's aborting it
unlock();
// Remove the current request from the worker
updateToNextRequest(vIdentifier);
// notify the request aborting to the provider
provider->requestDataAborting(acqRequestId);
}
else if (it->second.second == acqRequestId) {
it->second.second = QUuid();
unlock();
}
else {
unlock();
}
}
else {
unlock();
}
}
else {
unlock();
}
lockWrite();
m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqRequestId);
m_AcqIdentifierToAcqRequestMap.erase(acqRequestId);
unlock();
Improve synchro robustness.
r820 qCDebug(LOG_VariableAcquisitionWorker())
Remove old log
r822 << "VariableAcquisitionWorkerPrivate::removeAcqRequest END";
Implementation of varRequestId cancel for the worker
r818 }