##// END OF EJS Templates
Merge branch 'feature/UpdateArrayDataStruct' into develop
Merge branch 'feature/UpdateArrayDataStruct' into develop

File last commit:

r586:4e5df98215ef
r605:340220774853 merge
Show More
VariableAcquisitionWorker.cpp
238 lines | 9.3 KiB | text/x-c | CppLexer
/ core / src / Variable / VariableAcquisitionWorker.cpp
Add empty class VariableAcquisitionWorker
r498 #include "Variable/VariableAcquisitionWorker.h"
#include "Variable/Variable.h"
Implementation of V5 acquisition
r510
#include <Data/AcquisitionRequest.h>
#include <Data/SqpRange.h>
Add empty class VariableAcquisitionWorker
r498 #include <unordered_map>
Implementation of V5 acquisition
r510 #include <utility>
Add empty class VariableAcquisitionWorker
r498
Implementation of V5 acquisition
r510 #include <QMutex>
#include <QReadWriteLock>
Add empty class VariableAcquisitionWorker
r498 #include <QThread>
Implementation of V5 acquisition
r510
Add empty class VariableAcquisitionWorker
r498 Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker")
struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate {
Implementation of V5 acquisition
r510 explicit VariableAcquisitionWorkerPrivate() : m_Lock{QReadWriteLock::Recursive} {}
void lockRead() { m_Lock.lockForRead(); }
void lockWrite() { m_Lock.lockForWrite(); }
void unlock() { m_Lock.unlock(); }
void removeVariableRequest(QUuid vIdentifier);
QMutex m_WorkingMutex;
QReadWriteLock m_Lock;
std::map<QUuid, QVector<AcquisitionDataPacket> > m_AcqIdentifierToAcqDataPacketVectorMap;
std::map<QUuid, AcquisitionRequest> m_AcqIdentifierToAcqRequestMap;
std::map<QUuid, std::pair<QUuid, QUuid> > m_VIdentifierToCurrrentAcqIdNextIdPairMap;
Add empty class VariableAcquisitionWorker
r498 };
VariableAcquisitionWorker::VariableAcquisitionWorker(QObject *parent)
: QObject{parent}, impl{spimpl::make_unique_impl<VariableAcquisitionWorkerPrivate>()}
{
}
Implementation of V5 acquisition
r510 VariableAcquisitionWorker::~VariableAcquisitionWorker()
{
qCInfo(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker destruction")
<< QThread::currentThread();
this->waitForFinish();
}
push method of worker return the id of the nextRange which is canceled
r584 QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid vIdentifier,
SqpRange rangeRequested,
SqpRange cacheRangeRequested,
DataProviderParameters parameters,
std::shared_ptr<IDataProvider> provider)
Implementation of V5 acquisition
r510 {
Next range of a variable synchronized is now computed using:...
r586 qCDebug(LOG_VariableAcquisitionWorker())
Implementation of V5 acquisition
r510 << tr("TORM VariableAcquisitionWorker::pushVariableRequest ") << cacheRangeRequested;
push method of worker return the id of the nextRange which is canceled
r584 auto varRequestIdCanceled = QUuid();
Implementation of V5 acquisition
r510
// Request creation
auto acqRequest = AcquisitionRequest{};
push method of worker return the id of the nextRange which is canceled
r584 acqRequest.m_VarRequestId = varRequestId;
Implementation of V5 acquisition
r510 acqRequest.m_vIdentifier = vIdentifier;
acqRequest.m_DataProviderParameters = parameters;
acqRequest.m_RangeRequested = rangeRequested;
acqRequest.m_CacheRangeRequested = cacheRangeRequested;
acqRequest.m_Size = parameters.m_Times.size();
acqRequest.m_Provider = provider;
push method of worker return the id of the nextRange which is canceled
r584
Implementation of V5 acquisition
r510 // Register request
impl->lockWrite();
impl->m_AcqIdentifierToAcqRequestMap.insert(
std::make_pair(acqRequest.m_AcqIdentifier, acqRequest));
auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
// A current request already exists, we can replace the next one
push method of worker return the id of the nextRange which is canceled
r584 auto nextAcqId = it->second.second;
auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(nextAcqId);
if (acqIdentifierToAcqRequestMapIt != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
auto request = acqIdentifierToAcqRequestMapIt->second;
varRequestIdCanceled = request.m_VarRequestId;
}
Implementation of V5 acquisition
r510 it->second.second = acqRequest.m_AcqIdentifier;
impl->unlock();
}
else {
// First request for the variable, it must be stored and executed
impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.insert(
std::make_pair(vIdentifier, std::make_pair(acqRequest.m_AcqIdentifier, QUuid())));
impl->unlock();
QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
Q_ARG(QUuid, acqRequest.m_AcqIdentifier));
}
push method of worker return the id of the nextRange which is canceled
r584
return varRequestIdCanceled;
Implementation of V5 acquisition
r510 }
void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier)
{
// TODO
}
void VariableAcquisitionWorker::onVariableRetrieveDataInProgress(QUuid acqIdentifier,
double progress)
{
// TODO
}
void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier,
std::shared_ptr<IDataSeries> dataSeries,
SqpRange dataRangeAcquired)
Add empty class VariableAcquisitionWorker
r498 {
Implementation of V5 acquisition
r510 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableDataAcquired on range ")
<< acqIdentifier << dataRangeAcquired;
impl->lockWrite();
auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
// Store the result
auto dataPacket = AcquisitionDataPacket{};
dataPacket.m_Range = dataRangeAcquired;
dataPacket.m_DateSeries = dataSeries;
auto aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
// A current request result already exists, we can update it
aIdToADPVit->second.push_back(dataPacket);
}
else {
// First request result for the variable, it must be stored
impl->m_AcqIdentifierToAcqDataPacketVectorMap.insert(
std::make_pair(acqIdentifier, QVector<AcquisitionDataPacket>() << dataPacket));
}
// Decrement the counter of the request
auto &acqRequest = aIdToARit->second;
acqRequest.m_Size = acqRequest.m_Size - 1;
// if the counter is 0, we can return data then run the next request if it exists and
// removed the finished request
if (acqRequest.m_Size == 0) {
// Return the data
aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
emit dataProvided(acqRequest.m_vIdentifier, acqRequest.m_RangeRequested,
acqRequest.m_CacheRangeRequested, aIdToADPVit->second);
}
// Execute the next one
auto it
= impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(acqRequest.m_vIdentifier);
if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
if (it->second.second.isNull()) {
change grapheRange to graphRange
r516 // There is no next request, we can remove the variable request
Implementation of V5 acquisition
r510 impl->removeVariableRequest(acqRequest.m_vIdentifier);
}
else {
auto acqIdentifierToRemove = it->second.first;
// Move the next request to the current request
it->second.first = it->second.second;
it->second.second = QUuid();
// Remove AcquisitionRequest and results;
impl->m_AcqIdentifierToAcqRequestMap.erase(acqIdentifierToRemove);
impl->m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqIdentifierToRemove);
// Execute the current request
QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
Q_ARG(QUuid, it->second.first));
}
}
else {
qCCritical(LOG_VariableAcquisitionWorker())
<< tr("Impossible to execute the acquisition on an unfound variable ");
}
}
}
else {
qCCritical(LOG_VariableAcquisitionWorker())
<< tr("Impossible to retrieve AcquisitionRequest for the incoming data");
}
impl->unlock();
}
void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier)
{
Next range of a variable synchronized is now computed using:...
r586 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onExecuteRequest") << QThread::currentThread();
Implementation of V5 acquisition
r510 impl->lockRead();
auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
auto request = it->second;
impl->unlock();
request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters);
}
else {
impl->unlock();
// TODO log no acqIdentifier recognized
}
}
void VariableAcquisitionWorker::initialize()
{
qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init")
<< QThread::currentThread();
impl->m_WorkingMutex.lock();
qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init END");
}
void VariableAcquisitionWorker::finalize()
{
impl->m_WorkingMutex.unlock();
}
void VariableAcquisitionWorker::waitForFinish()
{
QMutexLocker locker{&impl->m_WorkingMutex};
}
void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariableRequest(
QUuid vIdentifier)
{
lockWrite();
auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
// A current request already exists, we can replace the next one
m_AcqIdentifierToAcqRequestMap.erase(it->second.first);
m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.first);
m_AcqIdentifierToAcqRequestMap.erase(it->second.second);
m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.second);
}
m_VIdentifierToCurrrentAcqIdNextIdPairMap.erase(vIdentifier);
unlock();
Add empty class VariableAcquisitionWorker
r498 }