##// END OF EJS Templates
Remove unused pending request of worker since it's already in the VC....
Remove unused pending request of worker since it's already in the VC. Fix bug with progress asynchrone computation

File last commit:

r1387:3f0567bfecb5 HEAD
r1387:3f0567bfecb5 HEAD
Show More
VariableAcquisitionWorker.cpp
386 lines | 14.5 KiB | text/x-c | CppLexer
/ core / src / Variable / VariableAcquisitionWorker.cpp
#include "Variable/VariableAcquisitionWorker.h"
#include "Variable/Variable.h"
#include <Data/AcquisitionRequest.h>
#include <Data/SqpRange.h>
#include <unordered_map>
#include <utility>
#include <QMutex>
#include <QReadWriteLock>
#include <QThread>
#include <cmath>
Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker")
struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate {
explicit VariableAcquisitionWorkerPrivate(VariableAcquisitionWorker *parent)
: m_Lock{QReadWriteLock::Recursive}, q{parent}
{
}
void lockRead() { m_Lock.lockForRead(); }
void lockWrite() { m_Lock.lockForWrite(); }
void unlock() { m_Lock.unlock(); }
void removeVariableRequest(QUuid vIdentifier);
/// Remove and/or abort all AcqRequest in link with varRequestId
void cancelVarRequest(QUuid varRequestId);
void removeAcqRequest(QUuid acqRequestId);
QMutex m_WorkingMutex;
QReadWriteLock m_Lock;
std::map<QUuid, QVector<AcquisitionDataPacket> > m_AcqIdentifierToAcqDataPacketVectorMap;
std::map<QUuid, AcquisitionRequest> m_AcqIdentifierToAcqRequestMap;
std::map<QUuid, QUuid> m_VIdentifierToCurrrentAcqIdMap;
VariableAcquisitionWorker *q;
};
VariableAcquisitionWorker::VariableAcquisitionWorker(QObject *parent)
: QObject{parent}, impl{spimpl::make_unique_impl<VariableAcquisitionWorkerPrivate>(this)}
{
}
VariableAcquisitionWorker::~VariableAcquisitionWorker()
{
qCInfo(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker destruction")
<< QThread::currentThread();
this->waitForFinish();
}
QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid vIdentifier,
DataProviderParameters parameters,
std::shared_ptr<IDataProvider> provider)
{
qCDebug(LOG_VariableAcquisitionWorker())
<< tr("TORM VariableAcquisitionWorker::pushVariableRequest varRequestId: ") << varRequestId
<< "vId: " << vIdentifier;
auto varRequestIdCanceled = QUuid();
// Request creation
auto acqRequest = AcquisitionRequest{};
acqRequest.m_VarRequestId = varRequestId;
acqRequest.m_vIdentifier = vIdentifier;
acqRequest.m_DataProviderParameters = parameters;
acqRequest.m_Size = parameters.m_Times.size();
acqRequest.m_Provider = provider;
qCInfo(LOG_VariableAcquisitionWorker()) << tr("Add acqRequest ") << acqRequest.m_AcqIdentifier
<< acqRequest.m_Size;
// Register request
impl->lockWrite();
impl->m_AcqIdentifierToAcqRequestMap.insert(
std::make_pair(acqRequest.m_AcqIdentifier, acqRequest));
auto it = impl->m_VIdentifierToCurrrentAcqIdMap.find(vIdentifier);
if (it != impl->m_VIdentifierToCurrrentAcqIdMap.cend()) {
// A current request already exists, we can cancel it
// remove old acqIdentifier from the worker
auto oldAcqId = it->second;
auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(oldAcqId);
if (acqIdentifierToAcqRequestMapIt != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
auto oldAcqRequest = acqIdentifierToAcqRequestMapIt->second;
varRequestIdCanceled = oldAcqRequest.m_VarRequestId;
}
impl->unlock();
impl->cancelVarRequest(varRequestIdCanceled);
}
else {
impl->unlock();
}
// Request for the variable, it must be stored and executed
impl->lockWrite();
impl->m_VIdentifierToCurrrentAcqIdMap.insert(
std::make_pair(vIdentifier, acqRequest.m_AcqIdentifier));
impl->unlock();
QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
Q_ARG(QUuid, acqRequest.m_AcqIdentifier));
return varRequestIdCanceled;
}
void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier)
{
impl->lockRead();
auto it = impl->m_VIdentifierToCurrrentAcqIdMap.find(vIdentifier);
if (it != impl->m_VIdentifierToCurrrentAcqIdMap.cend()) {
auto currentAcqId = it->second;
auto it = impl->m_AcqIdentifierToAcqRequestMap.find(currentAcqId);
if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
auto request = it->second;
impl->unlock();
// notify the request aborting to the provider
request.m_Provider->requestDataAborting(currentAcqId);
}
else {
impl->unlock();
qCWarning(LOG_VariableAcquisitionWorker())
<< tr("Impossible to abort an unknown acquisition request") << currentAcqId;
}
}
else {
impl->unlock();
}
}
void VariableAcquisitionWorker::onVariableRetrieveDataInProgress(QUuid acqIdentifier,
double progress)
{
qCInfo(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableRetrieveDataInProgress ")
<< QThread::currentThread()->objectName()
<< acqIdentifier << progress;
impl->lockRead();
auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
auto progressPartSize
= (aIdToARit->second.m_Size != 0) ? 100 / aIdToARit->second.m_Size : 0;
auto currentPartProgress
= std::isnan(progress) ? 0.0 : (progress * progressPartSize) / 100.0;
// We can only give an approximation of the currentProgression since its upgrade is async.
qCInfo(LOG_VariableAcquisitionWorker())
<< tr("Progression: ") << aIdToARit->second.m_Progression << aIdToARit->second.m_Size;
auto currentProgression = aIdToARit->second.m_Progression;
if (currentProgression == aIdToARit->second.m_Size) {
currentProgression = aIdToARit->second.m_Size - 1;
}
auto currentAlreadyProgress = progressPartSize * currentProgression;
auto finalProgression = currentAlreadyProgress + currentPartProgress;
emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, finalProgression);
qCInfo(LOG_VariableAcquisitionWorker())
<< tr("TORM: onVariableRetrieveDataInProgress ")
<< QThread::currentThread()->objectName() << aIdToARit->second.m_vIdentifier
<< progressPartSize << currentAlreadyProgress << currentPartProgress
<< finalProgression;
if (finalProgression == 100.0) {
qCInfo(LOG_VariableAcquisitionWorker())
<< tr("TORM: onVariableRetrieveDataInProgress : finished : 0.0 ");
emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, 0.0);
}
}
impl->unlock();
}
void VariableAcquisitionWorker::onVariableAcquisitionFailed(QUuid acqIdentifier)
{
qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
<< QThread::currentThread();
impl->lockRead();
auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
auto request = it->second;
impl->unlock();
qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
<< acqIdentifier << request.m_vIdentifier
<< QThread::currentThread();
emit variableCanceledRequested(request.m_vIdentifier);
}
else {
impl->unlock();
// TODO log no acqIdentifier recognized
}
}
void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier,
std::shared_ptr<IDataSeries> dataSeries,
SqpRange dataRangeAcquired)
{
qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableDataAcquired on range ")
<< acqIdentifier << dataRangeAcquired;
impl->lockWrite();
auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
// Store the result
auto dataPacket = AcquisitionDataPacket{};
dataPacket.m_Range = dataRangeAcquired;
dataPacket.m_DateSeries = dataSeries;
auto aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
// A current request result already exists, we can update it
aIdToADPVit->second.push_back(dataPacket);
}
else {
// First request result for the variable, it must be stored
impl->m_AcqIdentifierToAcqDataPacketVectorMap.insert(
std::make_pair(acqIdentifier, QVector<AcquisitionDataPacket>() << dataPacket));
}
// Decrement the counter of the request
auto &acqRequest = aIdToARit->second;
qCInfo(LOG_VariableAcquisitionWorker()) << tr("TORM: +1 update progresson ")
<< acqIdentifier;
acqRequest.m_Progression = acqRequest.m_Progression + 1;
// if the counter is 0, we can return data then run the next request if it exists and
// removed the finished request
if (acqRequest.m_Size == acqRequest.m_Progression) {
auto varId = acqRequest.m_vIdentifier;
// Return the data
aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
emit dataProvided(varId, aIdToADPVit->second);
}
impl->unlock();
}
else {
impl->unlock();
}
}
else {
impl->unlock();
qCWarning(LOG_VariableAcquisitionWorker())
<< tr("Impossible to retrieve AcquisitionRequest for the incoming data.");
}
}
void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier)
{
qCDebug(LOG_VariableAcquisitionWorker()) << tr("onExecuteRequest") << QThread::currentThread();
impl->lockRead();
auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
auto request = it->second;
impl->unlock();
emit variableRequestInProgress(request.m_vIdentifier, 0.1);
qCDebug(LOG_VariableAcquisitionWorker()) << tr("Start request 10%") << acqIdentifier
<< QThread::currentThread();
request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters);
}
else {
impl->unlock();
// TODO log no acqIdentifier recognized
}
}
void VariableAcquisitionWorker::initialize()
{
qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init")
<< QThread::currentThread();
impl->m_WorkingMutex.lock();
qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init END");
}
void VariableAcquisitionWorker::finalize()
{
impl->m_WorkingMutex.unlock();
}
void VariableAcquisitionWorker::waitForFinish()
{
QMutexLocker locker{&impl->m_WorkingMutex};
}
void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariableRequest(
QUuid vIdentifier)
{
lockWrite();
auto it = m_VIdentifierToCurrrentAcqIdMap.find(vIdentifier);
if (it != m_VIdentifierToCurrrentAcqIdMap.cend()) {
// A current request already exists, we can replace the next one
qCDebug(LOG_VariableAcquisitionWorker())
<< "VariableAcquisitionWorkerPrivate::removeVariableRequest "
<< QThread::currentThread()->objectName() << it->second;
m_AcqIdentifierToAcqRequestMap.erase(it->second);
m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second);
}
// stop any progression
emit q->variableRequestInProgress(vIdentifier, 0.0);
m_VIdentifierToCurrrentAcqIdMap.erase(vIdentifier);
unlock();
}
void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::cancelVarRequest(
QUuid varRequestId)
{
qCDebug(LOG_VariableAcquisitionWorker())
<< "VariableAcquisitionWorkerPrivate::cancelVarRequest 0";
lockRead();
// get all AcqIdentifier in link with varRequestId
QVector<QUuid> acqIdsToRm;
auto cend = m_AcqIdentifierToAcqRequestMap.cend();
for (auto it = m_AcqIdentifierToAcqRequestMap.cbegin(); it != cend; ++it) {
if (it->second.m_VarRequestId == varRequestId) {
acqIdsToRm << it->first;
}
}
unlock();
// run aborting or removing of acqIdsToRm
for (auto acqId : acqIdsToRm) {
removeAcqRequest(acqId);
}
qCDebug(LOG_VariableAcquisitionWorker())
<< "VariableAcquisitionWorkerPrivate::cancelVarRequest end";
}
void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeAcqRequest(
QUuid acqRequestId)
{
qCDebug(LOG_VariableAcquisitionWorker())
<< "VariableAcquisitionWorkerPrivate::removeAcqRequest";
QUuid vIdentifier;
std::shared_ptr<IDataProvider> provider;
lockRead();
auto acqIt = m_AcqIdentifierToAcqRequestMap.find(acqRequestId);
if (acqIt != m_AcqIdentifierToAcqRequestMap.cend()) {
vIdentifier = acqIt->second.m_vIdentifier;
provider = acqIt->second.m_Provider;
auto it = m_VIdentifierToCurrrentAcqIdMap.find(vIdentifier);
if (it != m_VIdentifierToCurrrentAcqIdMap.cend()) {
if (it->second == acqRequestId) {
// acqRequest is currently running -> let's aborting it
unlock();
// notify the request aborting to the provider
provider->requestDataAborting(acqRequestId);
}
else {
unlock();
}
}
else {
unlock();
}
}
else {
unlock();
}
lockWrite();
qCDebug(LOG_VariableAcquisitionWorker())
<< "VariableAcquisitionWorkerPrivate::updateToNextRequest removeAcqRequest: "
<< acqRequestId;
m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqRequestId);
m_AcqIdentifierToAcqRequestMap.erase(acqRequestId);
unlock();
qCDebug(LOG_VariableAcquisitionWorker())
<< "VariableAcquisitionWorkerPrivate::removeAcqRequest END";
}