##// END OF EJS Templates
Fix bug in acquisition !. Test fuzzing with 30 var in 8 group
Fix bug in acquisition !. Test fuzzing with 30 var in 8 group

File last commit:

r1398:8cb3639e4927
r1398:8cb3639e4927
Show More
VariableAcquisitionWorker.cpp
368 lines | 13.3 KiB | text/x-c | CppLexer
/ core / src / Variable / VariableAcquisitionWorker.cpp
Add empty class VariableAcquisitionWorker
r527 #include "Variable/VariableAcquisitionWorker.h"
#include "Variable/Variable.h"
Implementation of V5 acquisition
r539
#include <Data/AcquisitionRequest.h>
#include <Data/SqpRange.h>
Add empty class VariableAcquisitionWorker
r527 #include <unordered_map>
Implementation of V5 acquisition
r539 #include <utility>
Add empty class VariableAcquisitionWorker
r527
Implementation of V5 acquisition
r539 #include <QMutex>
#include <QReadWriteLock>
Add empty class VariableAcquisitionWorker
r527 #include <QThread>
Implementation of V5 acquisition
r539
Implementation of progression
r750 #include <cmath>
Add empty class VariableAcquisitionWorker
r527 Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker")
struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate {
Implementation of abort mechanism
r754 explicit VariableAcquisitionWorkerPrivate(VariableAcquisitionWorker *parent)
: m_Lock{QReadWriteLock::Recursive}, q{parent}
{
}
Implementation of V5 acquisition
r539
void lockRead() { m_Lock.lockForRead(); }
void lockWrite() { m_Lock.lockForWrite(); }
void unlock() { m_Lock.unlock(); }
void removeVariableRequest(QUuid vIdentifier);
Implementation of varRequestId cancel for the worker
r818 /// Remove and/or abort all AcqRequest in link with varRequestId
void cancelVarRequest(QUuid varRequestId);
void removeAcqRequest(QUuid acqRequestId);
Implementation of V5 acquisition
r539 QMutex m_WorkingMutex;
QReadWriteLock m_Lock;
std::map<QUuid, QVector<AcquisitionDataPacket> > m_AcqIdentifierToAcqDataPacketVectorMap;
std::map<QUuid, AcquisitionRequest> m_AcqIdentifierToAcqRequestMap;
Remove unused pending request of worker since it's already in the VC....
r1395 std::map<QUuid, QUuid> m_VIdentifierToCurrrentAcqIdMap;
Implementation of abort mechanism
r754 VariableAcquisitionWorker *q;
Add empty class VariableAcquisitionWorker
r527 };
VariableAcquisitionWorker::VariableAcquisitionWorker(QObject *parent)
Implementation of abort mechanism
r754 : QObject{parent}, impl{spimpl::make_unique_impl<VariableAcquisitionWorkerPrivate>(this)}
Add empty class VariableAcquisitionWorker
r527 {
}
Implementation of V5 acquisition
r539 VariableAcquisitionWorker::~VariableAcquisitionWorker()
{
qCInfo(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker destruction")
<< QThread::currentThread();
this->waitForFinish();
}
push method of worker return the id of the nextRange which is canceled
r625 QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid vIdentifier,
DataProviderParameters parameters,
std::shared_ptr<IDataProvider> provider)
Implementation of V5 acquisition
r539 {
Next range of a variable synchronized is now computed using:...
r627 qCDebug(LOG_VariableAcquisitionWorker())
remove unused old parameter for worker
r1393 << tr("TORM VariableAcquisitionWorker::pushVariableRequest varRequestId: ") << varRequestId
<< "vId: " << vIdentifier;
push method of worker return the id of the nextRange which is canceled
r625 auto varRequestIdCanceled = QUuid();
Implementation of V5 acquisition
r539
// Request creation
auto acqRequest = AcquisitionRequest{};
push method of worker return the id of the nextRange which is canceled
r625 acqRequest.m_VarRequestId = varRequestId;
Implementation of V5 acquisition
r539 acqRequest.m_vIdentifier = vIdentifier;
acqRequest.m_DataProviderParameters = parameters;
acqRequest.m_Size = parameters.m_Times.size();
acqRequest.m_Provider = provider;
Remove unused pending request of worker since it's already in the VC....
r1395 qCInfo(LOG_VariableAcquisitionWorker()) << tr("Add acqRequest ") << acqRequest.m_AcqIdentifier
<< acqRequest.m_Size;
Implementation of V5 acquisition
r539
push method of worker return the id of the nextRange which is canceled
r625
Implementation of V5 acquisition
r539 // Register request
impl->lockWrite();
impl->m_AcqIdentifierToAcqRequestMap.insert(
std::make_pair(acqRequest.m_AcqIdentifier, acqRequest));
Remove unused pending request of worker since it's already in the VC....
r1395 auto it = impl->m_VIdentifierToCurrrentAcqIdMap.find(vIdentifier);
if (it != impl->m_VIdentifierToCurrrentAcqIdMap.cend()) {
// A current request already exists, we can cancel it
// remove old acqIdentifier from the worker
auto oldAcqId = it->second;
Implementation of varRequestId cancel for the worker
r818 auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(oldAcqId);
push method of worker return the id of the nextRange which is canceled
r625 if (acqIdentifierToAcqRequestMapIt != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
Implementation of varRequestId cancel for the worker
r818 auto oldAcqRequest = acqIdentifierToAcqRequestMapIt->second;
varRequestIdCanceled = oldAcqRequest.m_VarRequestId;
push method of worker return the id of the nextRange which is canceled
r625 }
Implementation of V5 acquisition
r539 impl->unlock();
Improve synchro robustness.
r820 impl->cancelVarRequest(varRequestIdCanceled);
Implementation of V5 acquisition
r539 }
else {
impl->unlock();
}
push method of worker return the id of the nextRange which is canceled
r625
Remove unused pending request of worker since it's already in the VC....
r1395 // Request for the variable, it must be stored and executed
impl->lockWrite();
impl->m_VIdentifierToCurrrentAcqIdMap.insert(
std::make_pair(vIdentifier, acqRequest.m_AcqIdentifier));
impl->unlock();
QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
Q_ARG(QUuid, acqRequest.m_AcqIdentifier));
push method of worker return the id of the nextRange which is canceled
r625 return varRequestIdCanceled;
Implementation of V5 acquisition
r539 }
void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier)
{
Implementation of abort mechanism
r754 impl->lockRead();
Remove unused pending request of worker since it's already in the VC....
r1395 auto it = impl->m_VIdentifierToCurrrentAcqIdMap.find(vIdentifier);
if (it != impl->m_VIdentifierToCurrrentAcqIdMap.cend()) {
auto currentAcqId = it->second;
Implementation of abort mechanism
r754
auto it = impl->m_AcqIdentifierToAcqRequestMap.find(currentAcqId);
if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
auto request = it->second;
impl->unlock();
// notify the request aborting to the provider
request.m_Provider->requestDataAborting(currentAcqId);
}
else {
impl->unlock();
qCWarning(LOG_VariableAcquisitionWorker())
<< tr("Impossible to abort an unknown acquisition request") << currentAcqId;
}
}
else {
impl->unlock();
}
Implementation of V5 acquisition
r539 }
void VariableAcquisitionWorker::onVariableRetrieveDataInProgress(QUuid acqIdentifier,
double progress)
{
Fix bug in acquisition !. Test fuzzing with 30 var in 8 group
r1398 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableRetrieveDataInProgress ")
<< QThread::currentThread()->objectName()
<< acqIdentifier << progress;
Implementation of progression
r750 impl->lockRead();
auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
Remove unused pending request of worker since it's already in the VC....
r1395 auto progressPartSize
= (aIdToARit->second.m_Size != 0) ? 100 / aIdToARit->second.m_Size : 0;
Implementation of progression
r750
auto currentPartProgress
Remove unused pending request of worker since it's already in the VC....
r1395 = std::isnan(progress) ? 0.0 : (progress * progressPartSize) / 100.0;
// We can only give an approximation of the currentProgression since its upgrade is async.
auto currentProgression = aIdToARit->second.m_Progression;
if (currentProgression == aIdToARit->second.m_Size) {
currentProgression = aIdToARit->second.m_Size - 1;
}
auto currentAlreadyProgress = progressPartSize * currentProgression;
Implementation of progression
r750
auto finalProgression = currentAlreadyProgress + currentPartProgress;
emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, finalProgression);
Remove unused pending request of worker since it's already in the VC....
r1395
Implementation of progression
r750 if (finalProgression == 100.0) {
emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, 0.0);
}
}
impl->unlock();
Implementation of V5 acquisition
r539 }
Implementation of automatic cancel for request that failed
r761 void VariableAcquisitionWorker::onVariableAcquisitionFailed(QUuid acqIdentifier)
{
qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
<< QThread::currentThread();
impl->lockRead();
auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
auto request = it->second;
impl->unlock();
emit variableCanceledRequested(request.m_vIdentifier);
}
else {
impl->unlock();
}
}
Implementation of V5 acquisition
r539 void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier,
std::shared_ptr<IDataSeries> dataSeries,
SqpRange dataRangeAcquired)
Add empty class VariableAcquisitionWorker
r527 {
request is now passed by shared pointer instead of const &
r751 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableDataAcquired on range ")
<< acqIdentifier << dataRangeAcquired;
Implementation of V5 acquisition
r539 impl->lockWrite();
auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
// Store the result
auto dataPacket = AcquisitionDataPacket{};
dataPacket.m_Range = dataRangeAcquired;
dataPacket.m_DateSeries = dataSeries;
auto aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
// A current request result already exists, we can update it
aIdToADPVit->second.push_back(dataPacket);
}
else {
// First request result for the variable, it must be stored
impl->m_AcqIdentifierToAcqDataPacketVectorMap.insert(
std::make_pair(acqIdentifier, QVector<AcquisitionDataPacket>() << dataPacket));
}
// Decrement the counter of the request
auto &acqRequest = aIdToARit->second;
Implementation of progression
r750 acqRequest.m_Progression = acqRequest.m_Progression + 1;
Implementation of V5 acquisition
r539
// if the counter is 0, we can return data then run the next request if it exists and
// removed the finished request
Implementation of progression
r750 if (acqRequest.m_Size == acqRequest.m_Progression) {
Implementation of varRequestId cancel for the worker
r818 auto varId = acqRequest.m_vIdentifier;
Implementation of V5 acquisition
r539 // Return the data
aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
remove unused old parameter for worker
r1393 emit dataProvided(varId, aIdToADPVit->second);
Implementation of V5 acquisition
r539 }
Implementation of varRequestId cancel for the worker
r818 impl->unlock();
Implementation of V5 acquisition
r539 }
Implementation of varRequestId cancel for the worker
r818 else {
impl->unlock();
}
Implementation of V5 acquisition
r539 }
else {
Implementation of varRequestId cancel for the worker
r818 impl->unlock();
Implementation of abort mechanism
r754 qCWarning(LOG_VariableAcquisitionWorker())
<< tr("Impossible to retrieve AcquisitionRequest for the incoming data.");
Implementation of V5 acquisition
r539 }
}
Implementation of progression
r750 void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier)
{
qCDebug(LOG_VariableAcquisitionWorker()) << tr("onExecuteRequest") << QThread::currentThread();
impl->lockRead();
auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
auto request = it->second;
impl->unlock();
emit variableRequestInProgress(request.m_vIdentifier, 0.1);
Fix asynchrone bug with reset of the download progress state
r1392 qCDebug(LOG_VariableAcquisitionWorker()) << tr("Start request 10%") << acqIdentifier
<< QThread::currentThread();
Implementation of progression
r750 request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters);
}
else {
impl->unlock();
// TODO log no acqIdentifier recognized
}
}
Implementation of V5 acquisition
r539 void VariableAcquisitionWorker::initialize()
{
qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init")
<< QThread::currentThread();
impl->m_WorkingMutex.lock();
qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init END");
}
void VariableAcquisitionWorker::finalize()
{
impl->m_WorkingMutex.unlock();
}
void VariableAcquisitionWorker::waitForFinish()
{
QMutexLocker locker{&impl->m_WorkingMutex};
}
void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariableRequest(
QUuid vIdentifier)
{
lockWrite();
Remove unused pending request of worker since it's already in the VC....
r1395 auto it = m_VIdentifierToCurrrentAcqIdMap.find(vIdentifier);
Implementation of V5 acquisition
r539
Remove unused pending request of worker since it's already in the VC....
r1395 if (it != m_VIdentifierToCurrrentAcqIdMap.cend()) {
Implementation of V5 acquisition
r539 // A current request already exists, we can replace the next one
Fix asynchrone bug with reset of the download progress state
r1392 qCDebug(LOG_VariableAcquisitionWorker())
<< "VariableAcquisitionWorkerPrivate::removeVariableRequest "
Remove unused pending request of worker since it's already in the VC....
r1395 << QThread::currentThread()->objectName() << it->second;
m_AcqIdentifierToAcqRequestMap.erase(it->second);
m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second);
Implementation of V5 acquisition
r539 }
Fix asynchrone bug with reset of the download progress state
r1392
// stop any progression
emit q->variableRequestInProgress(vIdentifier, 0.0);
Remove unused pending request of worker since it's already in the VC....
r1395 m_VIdentifierToCurrrentAcqIdMap.erase(vIdentifier);
Implementation of V5 acquisition
r539 unlock();
Add empty class VariableAcquisitionWorker
r527 }
Implementation of abort mechanism
r754
Implementation of varRequestId cancel for the worker
r818 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::cancelVarRequest(
QUuid varRequestId)
{
Remove old log
r822 qCDebug(LOG_VariableAcquisitionWorker())
Fix bug in acquisition !. Test fuzzing with 30 var in 8 group
r1398 << "VariableAcquisitionWorkerPrivate::cancelVarRequest start";
Implementation of varRequestId cancel for the worker
r818 lockRead();
// get all AcqIdentifier in link with varRequestId
QVector<QUuid> acqIdsToRm;
auto cend = m_AcqIdentifierToAcqRequestMap.cend();
for (auto it = m_AcqIdentifierToAcqRequestMap.cbegin(); it != cend; ++it) {
if (it->second.m_VarRequestId == varRequestId) {
acqIdsToRm << it->first;
}
}
unlock();
// run aborting or removing of acqIdsToRm
for (auto acqId : acqIdsToRm) {
removeAcqRequest(acqId);
}
Remove old log
r822 qCDebug(LOG_VariableAcquisitionWorker())
Improve synchro robustness.
r820 << "VariableAcquisitionWorkerPrivate::cancelVarRequest end";
Implementation of varRequestId cancel for the worker
r818 }
void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeAcqRequest(
QUuid acqRequestId)
{
Improve synchro robustness.
r820 qCDebug(LOG_VariableAcquisitionWorker())
Remove old log
r822 << "VariableAcquisitionWorkerPrivate::removeAcqRequest";
Implementation of varRequestId cancel for the worker
r818 QUuid vIdentifier;
std::shared_ptr<IDataProvider> provider;
lockRead();
auto acqIt = m_AcqIdentifierToAcqRequestMap.find(acqRequestId);
if (acqIt != m_AcqIdentifierToAcqRequestMap.cend()) {
vIdentifier = acqIt->second.m_vIdentifier;
provider = acqIt->second.m_Provider;
Remove unused pending request of worker since it's already in the VC....
r1395 auto it = m_VIdentifierToCurrrentAcqIdMap.find(vIdentifier);
if (it != m_VIdentifierToCurrrentAcqIdMap.cend()) {
if (it->second == acqRequestId) {
Implementation of varRequestId cancel for the worker
r818 // acqRequest is currently running -> let's aborting it
unlock();
// notify the request aborting to the provider
provider->requestDataAborting(acqRequestId);
}
else {
unlock();
}
}
else {
unlock();
}
}
else {
unlock();
}
lockWrite();
m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqRequestId);
m_AcqIdentifierToAcqRequestMap.erase(acqRequestId);
unlock();
Improve synchro robustness.
r820 qCDebug(LOG_VariableAcquisitionWorker())
Remove old log
r822 << "VariableAcquisitionWorkerPrivate::removeAcqRequest END";
Implementation of varRequestId cancel for the worker
r818 }