##// END OF EJS Templates
Fix bug when creating two variables crash the app. ...
Fix bug when creating two variables crash the app. Variable as now invalid range and cache range at creation

File last commit:

r754:2015ea331dc4
r756:a7f60f6512e6
Show More
VariableAcquisitionWorker.cpp
321 lines | 12.6 KiB | text/x-c | CppLexer
/ core / src / Variable / VariableAcquisitionWorker.cpp
Add empty class VariableAcquisitionWorker
r527 #include "Variable/VariableAcquisitionWorker.h"
#include "Variable/Variable.h"
Implementation of V5 acquisition
r539
#include <Data/AcquisitionRequest.h>
#include <Data/SqpRange.h>
Add empty class VariableAcquisitionWorker
r527 #include <unordered_map>
Implementation of V5 acquisition
r539 #include <utility>
Add empty class VariableAcquisitionWorker
r527
Implementation of V5 acquisition
r539 #include <QMutex>
#include <QReadWriteLock>
Add empty class VariableAcquisitionWorker
r527 #include <QThread>
Implementation of V5 acquisition
r539
Implementation of progression
r750 #include <cmath>
Add empty class VariableAcquisitionWorker
r527 Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker")
struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate {
Implementation of abort mechanism
r754 explicit VariableAcquisitionWorkerPrivate(VariableAcquisitionWorker *parent)
: m_Lock{QReadWriteLock::Recursive}, q{parent}
{
}
Implementation of V5 acquisition
r539
void lockRead() { m_Lock.lockForRead(); }
void lockWrite() { m_Lock.lockForWrite(); }
void unlock() { m_Lock.unlock(); }
void removeVariableRequest(QUuid vIdentifier);
Implementation of abort mechanism
r754 /// Remove the current request and execute the next one if exist
void updateToNextRequest(QUuid vIdentifier);
Implementation of V5 acquisition
r539 QMutex m_WorkingMutex;
QReadWriteLock m_Lock;
std::map<QUuid, QVector<AcquisitionDataPacket> > m_AcqIdentifierToAcqDataPacketVectorMap;
std::map<QUuid, AcquisitionRequest> m_AcqIdentifierToAcqRequestMap;
std::map<QUuid, std::pair<QUuid, QUuid> > m_VIdentifierToCurrrentAcqIdNextIdPairMap;
Implementation of abort mechanism
r754 VariableAcquisitionWorker *q;
Add empty class VariableAcquisitionWorker
r527 };
VariableAcquisitionWorker::VariableAcquisitionWorker(QObject *parent)
Implementation of abort mechanism
r754 : QObject{parent}, impl{spimpl::make_unique_impl<VariableAcquisitionWorkerPrivate>(this)}
Add empty class VariableAcquisitionWorker
r527 {
}
Implementation of V5 acquisition
r539 VariableAcquisitionWorker::~VariableAcquisitionWorker()
{
qCInfo(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker destruction")
<< QThread::currentThread();
this->waitForFinish();
}
push method of worker return the id of the nextRange which is canceled
r625 QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid vIdentifier,
SqpRange rangeRequested,
SqpRange cacheRangeRequested,
DataProviderParameters parameters,
std::shared_ptr<IDataProvider> provider)
Implementation of V5 acquisition
r539 {
Next range of a variable synchronized is now computed using:...
r627 qCDebug(LOG_VariableAcquisitionWorker())
Implementation of V5 acquisition
r539 << tr("TORM VariableAcquisitionWorker::pushVariableRequest ") << cacheRangeRequested;
push method of worker return the id of the nextRange which is canceled
r625 auto varRequestIdCanceled = QUuid();
Implementation of V5 acquisition
r539
// Request creation
auto acqRequest = AcquisitionRequest{};
push method of worker return the id of the nextRange which is canceled
r625 acqRequest.m_VarRequestId = varRequestId;
Implementation of V5 acquisition
r539 acqRequest.m_vIdentifier = vIdentifier;
acqRequest.m_DataProviderParameters = parameters;
acqRequest.m_RangeRequested = rangeRequested;
acqRequest.m_CacheRangeRequested = cacheRangeRequested;
acqRequest.m_Size = parameters.m_Times.size();
acqRequest.m_Provider = provider;
push method of worker return the id of the nextRange which is canceled
r625
Implementation of V5 acquisition
r539 // Register request
impl->lockWrite();
impl->m_AcqIdentifierToAcqRequestMap.insert(
std::make_pair(acqRequest.m_AcqIdentifier, acqRequest));
auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
// A current request already exists, we can replace the next one
push method of worker return the id of the nextRange which is canceled
r625 auto nextAcqId = it->second.second;
auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(nextAcqId);
if (acqIdentifierToAcqRequestMapIt != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
auto request = acqIdentifierToAcqRequestMapIt->second;
varRequestIdCanceled = request.m_VarRequestId;
}
Implementation of V5 acquisition
r539 it->second.second = acqRequest.m_AcqIdentifier;
impl->unlock();
}
else {
// First request for the variable, it must be stored and executed
impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.insert(
std::make_pair(vIdentifier, std::make_pair(acqRequest.m_AcqIdentifier, QUuid())));
impl->unlock();
QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
Q_ARG(QUuid, acqRequest.m_AcqIdentifier));
}
push method of worker return the id of the nextRange which is canceled
r625
return varRequestIdCanceled;
Implementation of V5 acquisition
r539 }
void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier)
{
// TODO
Implementation of abort mechanism
r754 impl->lockRead();
auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
auto currentAcqId = it->second.first;
auto it = impl->m_AcqIdentifierToAcqRequestMap.find(currentAcqId);
if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
auto request = it->second;
impl->unlock();
// Remove the current request from the worker
impl->lockWrite();
impl->updateToNextRequest(vIdentifier);
impl->unlock();
// notify the request aborting to the provider
request.m_Provider->requestDataAborting(currentAcqId);
}
else {
impl->unlock();
qCWarning(LOG_VariableAcquisitionWorker())
<< tr("Impossible to abort an unknown acquisition request") << currentAcqId;
}
}
else {
impl->unlock();
}
Implementation of V5 acquisition
r539 }
void VariableAcquisitionWorker::onVariableRetrieveDataInProgress(QUuid acqIdentifier,
double progress)
{
Implementation of progression
r750 impl->lockRead();
auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
auto currentPartSize = (aIdToARit->second.m_Size != 0) ? 100 / aIdToARit->second.m_Size : 0;
auto currentPartProgress
= std::isnan(progress) ? 0.0 : (progress * currentPartSize) / 100.0;
auto currentAlreadyProgress = aIdToARit->second.m_Progression * currentPartSize;
auto finalProgression = currentAlreadyProgress + currentPartProgress;
emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, finalProgression);
if (finalProgression == 100.0) {
emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, 0.0);
}
}
impl->unlock();
Implementation of V5 acquisition
r539 }
void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier,
std::shared_ptr<IDataSeries> dataSeries,
SqpRange dataRangeAcquired)
Add empty class VariableAcquisitionWorker
r527 {
request is now passed by shared pointer instead of const &
r751 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableDataAcquired on range ")
<< acqIdentifier << dataRangeAcquired;
Implementation of V5 acquisition
r539 impl->lockWrite();
auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
// Store the result
auto dataPacket = AcquisitionDataPacket{};
dataPacket.m_Range = dataRangeAcquired;
dataPacket.m_DateSeries = dataSeries;
auto aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
// A current request result already exists, we can update it
aIdToADPVit->second.push_back(dataPacket);
}
else {
// First request result for the variable, it must be stored
impl->m_AcqIdentifierToAcqDataPacketVectorMap.insert(
std::make_pair(acqIdentifier, QVector<AcquisitionDataPacket>() << dataPacket));
}
// Decrement the counter of the request
auto &acqRequest = aIdToARit->second;
Implementation of progression
r750 acqRequest.m_Progression = acqRequest.m_Progression + 1;
Implementation of V5 acquisition
r539
// if the counter is 0, we can return data then run the next request if it exists and
// removed the finished request
Implementation of progression
r750 if (acqRequest.m_Size == acqRequest.m_Progression) {
Implementation of V5 acquisition
r539 // Return the data
aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
emit dataProvided(acqRequest.m_vIdentifier, acqRequest.m_RangeRequested,
acqRequest.m_CacheRangeRequested, aIdToADPVit->second);
}
// Execute the next one
auto it
= impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(acqRequest.m_vIdentifier);
if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
if (it->second.second.isNull()) {
change grapheRange to graphRange
r545 // There is no next request, we can remove the variable request
Implementation of V5 acquisition
r539 impl->removeVariableRequest(acqRequest.m_vIdentifier);
}
else {
auto acqIdentifierToRemove = it->second.first;
// Move the next request to the current request
it->second.first = it->second.second;
it->second.second = QUuid();
// Remove AcquisitionRequest and results;
impl->m_AcqIdentifierToAcqRequestMap.erase(acqIdentifierToRemove);
impl->m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqIdentifierToRemove);
// Execute the current request
QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
Q_ARG(QUuid, it->second.first));
}
}
else {
qCCritical(LOG_VariableAcquisitionWorker())
<< tr("Impossible to execute the acquisition on an unfound variable ");
}
}
}
else {
Implementation of abort mechanism
r754 qCWarning(LOG_VariableAcquisitionWorker())
<< tr("Impossible to retrieve AcquisitionRequest for the incoming data.");
Implementation of V5 acquisition
r539 }
impl->unlock();
}
Implementation of progression
r750 void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier)
{
qCDebug(LOG_VariableAcquisitionWorker()) << tr("onExecuteRequest") << QThread::currentThread();
impl->lockRead();
auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
auto request = it->second;
impl->unlock();
emit variableRequestInProgress(request.m_vIdentifier, 0.1);
request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters);
}
else {
impl->unlock();
// TODO log no acqIdentifier recognized
}
}
Implementation of V5 acquisition
r539 void VariableAcquisitionWorker::initialize()
{
qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init")
<< QThread::currentThread();
impl->m_WorkingMutex.lock();
qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init END");
}
void VariableAcquisitionWorker::finalize()
{
impl->m_WorkingMutex.unlock();
}
void VariableAcquisitionWorker::waitForFinish()
{
QMutexLocker locker{&impl->m_WorkingMutex};
}
void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariableRequest(
QUuid vIdentifier)
{
lockWrite();
auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
// A current request already exists, we can replace the next one
m_AcqIdentifierToAcqRequestMap.erase(it->second.first);
m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.first);
m_AcqIdentifierToAcqRequestMap.erase(it->second.second);
m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.second);
}
m_VIdentifierToCurrrentAcqIdNextIdPairMap.erase(vIdentifier);
unlock();
Add empty class VariableAcquisitionWorker
r527 }
Implementation of abort mechanism
r754
void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::updateToNextRequest(
QUuid vIdentifier)
{
auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
if (it->second.second.isNull()) {
// There is no next request, we can remove the variable request
removeVariableRequest(vIdentifier);
}
else {
auto acqIdentifierToRemove = it->second.first;
// Move the next request to the current request
it->second.first = it->second.second;
it->second.second = QUuid();
// Remove AcquisitionRequest and results;
m_AcqIdentifierToAcqRequestMap.erase(acqIdentifierToRemove);
m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqIdentifierToRemove);
// Execute the current request
QMetaObject::invokeMethod(q, "onExecuteRequest", Qt::QueuedConnection,
Q_ARG(QUuid, it->second.first));
}
}
else {
qCCritical(LOG_VariableAcquisitionWorker())
<< tr("Impossible to execute the acquisition on an unfound variable ");
}
}