##// END OF EJS Templates
push method of worker return the id of the nextRange which is canceled
push method of worker return the id of the nextRange which is canceled

File last commit:

r625:d6648352006d
r625:d6648352006d
Show More
VariableAcquisitionWorker.cpp
238 lines | 9.3 KiB | text/x-c | CppLexer
/ core / src / Variable / VariableAcquisitionWorker.cpp
Add empty class VariableAcquisitionWorker
r527 #include "Variable/VariableAcquisitionWorker.h"
#include "Variable/Variable.h"
Implementation of V5 acquisition
r539
#include <Data/AcquisitionRequest.h>
#include <Data/SqpRange.h>
Add empty class VariableAcquisitionWorker
r527 #include <unordered_map>
Implementation of V5 acquisition
r539 #include <utility>
Add empty class VariableAcquisitionWorker
r527
Implementation of V5 acquisition
r539 #include <QMutex>
#include <QReadWriteLock>
Add empty class VariableAcquisitionWorker
r527 #include <QThread>
Implementation of V5 acquisition
r539
Add empty class VariableAcquisitionWorker
r527 Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker")
struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate {
Implementation of V5 acquisition
r539 explicit VariableAcquisitionWorkerPrivate() : m_Lock{QReadWriteLock::Recursive} {}
void lockRead() { m_Lock.lockForRead(); }
void lockWrite() { m_Lock.lockForWrite(); }
void unlock() { m_Lock.unlock(); }
void removeVariableRequest(QUuid vIdentifier);
QMutex m_WorkingMutex;
QReadWriteLock m_Lock;
std::map<QUuid, QVector<AcquisitionDataPacket> > m_AcqIdentifierToAcqDataPacketVectorMap;
std::map<QUuid, AcquisitionRequest> m_AcqIdentifierToAcqRequestMap;
std::map<QUuid, std::pair<QUuid, QUuid> > m_VIdentifierToCurrrentAcqIdNextIdPairMap;
Add empty class VariableAcquisitionWorker
r527 };
VariableAcquisitionWorker::VariableAcquisitionWorker(QObject *parent)
: QObject{parent}, impl{spimpl::make_unique_impl<VariableAcquisitionWorkerPrivate>()}
{
}
Implementation of V5 acquisition
r539 VariableAcquisitionWorker::~VariableAcquisitionWorker()
{
qCInfo(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker destruction")
<< QThread::currentThread();
this->waitForFinish();
}
push method of worker return the id of the nextRange which is canceled
r625 QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid vIdentifier,
SqpRange rangeRequested,
SqpRange cacheRangeRequested,
DataProviderParameters parameters,
std::shared_ptr<IDataProvider> provider)
Implementation of V5 acquisition
r539 {
The dataSeries of a variable is now shared istead of uniq to avoid...
r542 qCInfo(LOG_VariableAcquisitionWorker())
Implementation of V5 acquisition
r539 << tr("TORM VariableAcquisitionWorker::pushVariableRequest ") << cacheRangeRequested;
push method of worker return the id of the nextRange which is canceled
r625 auto varRequestIdCanceled = QUuid();
Implementation of V5 acquisition
r539
// Request creation
auto acqRequest = AcquisitionRequest{};
push method of worker return the id of the nextRange which is canceled
r625 acqRequest.m_VarRequestId = varRequestId;
Implementation of V5 acquisition
r539 acqRequest.m_vIdentifier = vIdentifier;
acqRequest.m_DataProviderParameters = parameters;
acqRequest.m_RangeRequested = rangeRequested;
acqRequest.m_CacheRangeRequested = cacheRangeRequested;
acqRequest.m_Size = parameters.m_Times.size();
acqRequest.m_Provider = provider;
push method of worker return the id of the nextRange which is canceled
r625
Implementation of V5 acquisition
r539 // Register request
impl->lockWrite();
impl->m_AcqIdentifierToAcqRequestMap.insert(
std::make_pair(acqRequest.m_AcqIdentifier, acqRequest));
auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
// A current request already exists, we can replace the next one
push method of worker return the id of the nextRange which is canceled
r625 auto nextAcqId = it->second.second;
auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(nextAcqId);
if (acqIdentifierToAcqRequestMapIt != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
auto request = acqIdentifierToAcqRequestMapIt->second;
varRequestIdCanceled = request.m_VarRequestId;
}
Implementation of V5 acquisition
r539 it->second.second = acqRequest.m_AcqIdentifier;
impl->unlock();
}
else {
// First request for the variable, it must be stored and executed
impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.insert(
std::make_pair(vIdentifier, std::make_pair(acqRequest.m_AcqIdentifier, QUuid())));
impl->unlock();
QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
Q_ARG(QUuid, acqRequest.m_AcqIdentifier));
}
push method of worker return the id of the nextRange which is canceled
r625
return varRequestIdCanceled;
Implementation of V5 acquisition
r539 }
void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier)
{
// TODO
}
void VariableAcquisitionWorker::onVariableRetrieveDataInProgress(QUuid acqIdentifier,
double progress)
{
// TODO
}
void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier,
std::shared_ptr<IDataSeries> dataSeries,
SqpRange dataRangeAcquired)
Add empty class VariableAcquisitionWorker
r527 {
Implementation of V5 acquisition
r539 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableDataAcquired on range ")
<< acqIdentifier << dataRangeAcquired;
impl->lockWrite();
auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
// Store the result
auto dataPacket = AcquisitionDataPacket{};
dataPacket.m_Range = dataRangeAcquired;
dataPacket.m_DateSeries = dataSeries;
auto aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
// A current request result already exists, we can update it
aIdToADPVit->second.push_back(dataPacket);
}
else {
// First request result for the variable, it must be stored
impl->m_AcqIdentifierToAcqDataPacketVectorMap.insert(
std::make_pair(acqIdentifier, QVector<AcquisitionDataPacket>() << dataPacket));
}
// Decrement the counter of the request
auto &acqRequest = aIdToARit->second;
acqRequest.m_Size = acqRequest.m_Size - 1;
// if the counter is 0, we can return data then run the next request if it exists and
// removed the finished request
if (acqRequest.m_Size == 0) {
// Return the data
aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
emit dataProvided(acqRequest.m_vIdentifier, acqRequest.m_RangeRequested,
acqRequest.m_CacheRangeRequested, aIdToADPVit->second);
}
// Execute the next one
auto it
= impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(acqRequest.m_vIdentifier);
if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
if (it->second.second.isNull()) {
change grapheRange to graphRange
r545 // There is no next request, we can remove the variable request
Implementation of V5 acquisition
r539 impl->removeVariableRequest(acqRequest.m_vIdentifier);
}
else {
auto acqIdentifierToRemove = it->second.first;
// Move the next request to the current request
it->second.first = it->second.second;
it->second.second = QUuid();
// Remove AcquisitionRequest and results;
impl->m_AcqIdentifierToAcqRequestMap.erase(acqIdentifierToRemove);
impl->m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqIdentifierToRemove);
// Execute the current request
QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
Q_ARG(QUuid, it->second.first));
}
}
else {
qCCritical(LOG_VariableAcquisitionWorker())
<< tr("Impossible to execute the acquisition on an unfound variable ");
}
}
}
else {
qCCritical(LOG_VariableAcquisitionWorker())
<< tr("Impossible to retrieve AcquisitionRequest for the incoming data");
}
impl->unlock();
}
void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier)
{
The dataSeries of a variable is now shared istead of uniq to avoid...
r542 qCInfo(LOG_VariableAcquisitionWorker()) << tr("onExecuteRequest") << QThread::currentThread();
Implementation of V5 acquisition
r539 impl->lockRead();
auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
auto request = it->second;
impl->unlock();
request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters);
}
else {
impl->unlock();
// TODO log no acqIdentifier recognized
}
}
void VariableAcquisitionWorker::initialize()
{
qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init")
<< QThread::currentThread();
impl->m_WorkingMutex.lock();
qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init END");
}
void VariableAcquisitionWorker::finalize()
{
impl->m_WorkingMutex.unlock();
}
void VariableAcquisitionWorker::waitForFinish()
{
QMutexLocker locker{&impl->m_WorkingMutex};
}
void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariableRequest(
QUuid vIdentifier)
{
lockWrite();
auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
// A current request already exists, we can replace the next one
m_AcqIdentifierToAcqRequestMap.erase(it->second.first);
m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.first);
m_AcqIdentifierToAcqRequestMap.erase(it->second.second);
m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.second);
}
m_VIdentifierToCurrrentAcqIdNextIdPairMap.erase(vIdentifier);
unlock();
Add empty class VariableAcquisitionWorker
r527 }