##// END OF EJS Templates
Fix bug when canceling acquisition
Fix bug when canceling acquisition

File last commit:

r1399:5f9a7b80e8f3
r1399:5f9a7b80e8f3
Show More
VariableAcquisitionWorker.cpp
369 lines | 13.3 KiB | text/x-c | CppLexer
/ core / src / Variable / VariableAcquisitionWorker.cpp
#include "Variable/VariableAcquisitionWorker.h"
#include "Variable/Variable.h"
#include <Data/AcquisitionRequest.h>
#include <Data/SqpRange.h>
#include <unordered_map>
#include <utility>
#include <QMutex>
#include <QReadWriteLock>
#include <QThread>
#include <cmath>
Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker")
struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate {
explicit VariableAcquisitionWorkerPrivate(VariableAcquisitionWorker *parent)
: m_Lock{QReadWriteLock::Recursive}, q{parent}
{
}
void lockRead() { m_Lock.lockForRead(); }
void lockWrite() { m_Lock.lockForWrite(); }
void unlock() { m_Lock.unlock(); }
void removeVariableRequest(QUuid vIdentifier);
/// Remove and/or abort all AcqRequest in link with varRequestId
void cancelVarRequest(QUuid varRequestId);
void removeAcqRequest(QUuid acqRequestId);
QMutex m_WorkingMutex;
QReadWriteLock m_Lock;
std::map<QUuid, QVector<AcquisitionDataPacket> > m_AcqIdentifierToAcqDataPacketVectorMap;
std::map<QUuid, AcquisitionRequest> m_AcqIdentifierToAcqRequestMap;
std::map<QUuid, QUuid> m_VIdentifierToCurrrentAcqIdMap;
VariableAcquisitionWorker *q;
};
VariableAcquisitionWorker::VariableAcquisitionWorker(QObject *parent)
: QObject{parent}, impl{spimpl::make_unique_impl<VariableAcquisitionWorkerPrivate>(this)}
{
}
VariableAcquisitionWorker::~VariableAcquisitionWorker()
{
qCInfo(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker destruction")
<< QThread::currentThread();
this->waitForFinish();
}
QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid vIdentifier,
DataProviderParameters parameters,
std::shared_ptr<IDataProvider> provider)
{
qCDebug(LOG_VariableAcquisitionWorker())
<< tr("TORM VariableAcquisitionWorker::pushVariableRequest varRequestId: ") << varRequestId
<< "vId: " << vIdentifier;
auto varRequestIdCanceled = QUuid();
// Request creation
auto acqRequest = AcquisitionRequest{};
acqRequest.m_VarRequestId = varRequestId;
acqRequest.m_vIdentifier = vIdentifier;
acqRequest.m_DataProviderParameters = parameters;
acqRequest.m_Size = parameters.m_Times.size();
acqRequest.m_Provider = provider;
qCInfo(LOG_VariableAcquisitionWorker()) << tr("Add acqRequest ") << acqRequest.m_AcqIdentifier
<< acqRequest.m_Size;
// Register request
impl->lockWrite();
impl->m_AcqIdentifierToAcqRequestMap.insert(
std::make_pair(acqRequest.m_AcqIdentifier, acqRequest));
auto it = impl->m_VIdentifierToCurrrentAcqIdMap.find(vIdentifier);
if (it != impl->m_VIdentifierToCurrrentAcqIdMap.cend()) {
// A current request already exists, we can cancel it
// remove old acqIdentifier from the worker
auto oldAcqId = it->second;
auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(oldAcqId);
if (acqIdentifierToAcqRequestMapIt != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
auto oldAcqRequest = acqIdentifierToAcqRequestMapIt->second;
varRequestIdCanceled = oldAcqRequest.m_VarRequestId;
}
impl->unlock();
impl->cancelVarRequest(varRequestIdCanceled);
}
else {
impl->unlock();
}
// Request for the variable, it must be stored and executed
impl->lockWrite();
impl->m_VIdentifierToCurrrentAcqIdMap.insert(
std::make_pair(vIdentifier, acqRequest.m_AcqIdentifier));
impl->unlock();
QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
Q_ARG(QUuid, acqRequest.m_AcqIdentifier));
return varRequestIdCanceled;
}
void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier)
{
impl->lockRead();
auto it = impl->m_VIdentifierToCurrrentAcqIdMap.find(vIdentifier);
if (it != impl->m_VIdentifierToCurrrentAcqIdMap.cend()) {
auto currentAcqId = it->second;
auto it = impl->m_AcqIdentifierToAcqRequestMap.find(currentAcqId);
if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
auto request = it->second;
impl->unlock();
// notify the request aborting to the provider
request.m_Provider->requestDataAborting(currentAcqId);
}
else {
impl->unlock();
qCWarning(LOG_VariableAcquisitionWorker())
<< tr("Impossible to abort an unknown acquisition request") << currentAcqId;
}
}
else {
impl->unlock();
}
}
void VariableAcquisitionWorker::onVariableRetrieveDataInProgress(QUuid acqIdentifier,
double progress)
{
qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableRetrieveDataInProgress ")
<< QThread::currentThread()->objectName()
<< acqIdentifier << progress;
impl->lockRead();
auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
auto progressPartSize
= (aIdToARit->second.m_Size != 0) ? 100 / aIdToARit->second.m_Size : 0;
auto currentPartProgress
= std::isnan(progress) ? 0.0 : (progress * progressPartSize) / 100.0;
// We can only give an approximation of the currentProgression since its upgrade is async.
auto currentProgression = aIdToARit->second.m_Progression;
if (currentProgression == aIdToARit->second.m_Size) {
currentProgression = aIdToARit->second.m_Size - 1;
}
auto currentAlreadyProgress = progressPartSize * currentProgression;
auto finalProgression = currentAlreadyProgress + currentPartProgress;
emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, finalProgression);
if (finalProgression == 100.0) {
emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, 0.0);
}
}
impl->unlock();
}
void VariableAcquisitionWorker::onVariableAcquisitionFailed(QUuid acqIdentifier)
{
qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
<< QThread::currentThread();
impl->lockRead();
auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
auto request = it->second;
impl->unlock();
emit variableCanceledRequested(request.m_vIdentifier);
}
else {
impl->unlock();
}
}
void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier,
std::shared_ptr<IDataSeries> dataSeries,
SqpRange dataRangeAcquired)
{
qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableDataAcquired on range ")
<< acqIdentifier << dataRangeAcquired;
impl->lockWrite();
auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
// Store the result
auto dataPacket = AcquisitionDataPacket{};
dataPacket.m_Range = dataRangeAcquired;
dataPacket.m_DateSeries = dataSeries;
auto aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
// A current request result already exists, we can update it
aIdToADPVit->second.push_back(dataPacket);
}
else {
// First request result for the variable, it must be stored
impl->m_AcqIdentifierToAcqDataPacketVectorMap.insert(
std::make_pair(acqIdentifier, QVector<AcquisitionDataPacket>() << dataPacket));
}
// Decrement the counter of the request
auto &acqRequest = aIdToARit->second;
acqRequest.m_Progression = acqRequest.m_Progression + 1;
// if the counter is 0, we can return data then run the next request if it exists and
// removed the finished request
if (acqRequest.m_Size == acqRequest.m_Progression) {
auto varId = acqRequest.m_vIdentifier;
// Return the data
aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
emit dataProvided(varId, aIdToADPVit->second);
}
impl->unlock();
}
else {
impl->unlock();
}
}
else {
impl->unlock();
qCWarning(LOG_VariableAcquisitionWorker())
<< tr("Impossible to retrieve AcquisitionRequest for the incoming data.");
}
}
void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier)
{
qCDebug(LOG_VariableAcquisitionWorker()) << tr("onExecuteRequest") << QThread::currentThread();
impl->lockRead();
auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
auto request = it->second;
impl->unlock();
emit variableRequestInProgress(request.m_vIdentifier, 0.1);
qCDebug(LOG_VariableAcquisitionWorker()) << tr("Start request 10%") << acqIdentifier
<< QThread::currentThread();
request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters);
}
else {
impl->unlock();
// TODO log no acqIdentifier recognized
}
}
void VariableAcquisitionWorker::initialize()
{
qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init")
<< QThread::currentThread();
impl->m_WorkingMutex.lock();
qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init END");
}
void VariableAcquisitionWorker::finalize()
{
impl->m_WorkingMutex.unlock();
}
void VariableAcquisitionWorker::waitForFinish()
{
QMutexLocker locker{&impl->m_WorkingMutex};
}
void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariableRequest(
QUuid vIdentifier)
{
lockWrite();
auto it = m_VIdentifierToCurrrentAcqIdMap.find(vIdentifier);
if (it != m_VIdentifierToCurrrentAcqIdMap.cend()) {
// A current request already exists, we can replace the next one
qCDebug(LOG_VariableAcquisitionWorker())
<< "VariableAcquisitionWorkerPrivate::removeVariableRequest "
<< QThread::currentThread()->objectName() << it->second;
m_AcqIdentifierToAcqRequestMap.erase(it->second);
m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second);
}
// stop any progression
emit q->variableRequestInProgress(vIdentifier, 0.0);
m_VIdentifierToCurrrentAcqIdMap.erase(vIdentifier);
unlock();
}
void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::cancelVarRequest(
QUuid varRequestId)
{
qCDebug(LOG_VariableAcquisitionWorker())
<< "VariableAcquisitionWorkerPrivate::cancelVarRequest start";
lockRead();
// get all AcqIdentifier in link with varRequestId
QVector<QUuid> acqIdsToRm;
auto cend = m_AcqIdentifierToAcqRequestMap.cend();
for (auto it = m_AcqIdentifierToAcqRequestMap.cbegin(); it != cend; ++it) {
if (it->second.m_VarRequestId == varRequestId) {
acqIdsToRm << it->first;
}
}
unlock();
// run aborting or removing of acqIdsToRm
for (auto acqId : acqIdsToRm) {
removeAcqRequest(acqId);
}
qCDebug(LOG_VariableAcquisitionWorker())
<< "VariableAcquisitionWorkerPrivate::cancelVarRequest end";
}
void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeAcqRequest(
QUuid acqRequestId)
{
qCDebug(LOG_VariableAcquisitionWorker())
<< "VariableAcquisitionWorkerPrivate::removeAcqRequest";
QUuid vIdentifier;
std::shared_ptr<IDataProvider> provider;
lockRead();
auto acqIt = m_AcqIdentifierToAcqRequestMap.find(acqRequestId);
if (acqIt != m_AcqIdentifierToAcqRequestMap.cend()) {
vIdentifier = acqIt->second.m_vIdentifier;
provider = acqIt->second.m_Provider;
auto it = m_VIdentifierToCurrrentAcqIdMap.find(vIdentifier);
if (it != m_VIdentifierToCurrrentAcqIdMap.cend()) {
if (it->second == acqRequestId) {
// acqRequest is currently running -> let's aborting it
unlock();
// notify the request aborting to the provider
provider->requestDataAborting(acqRequestId);
}
else {
unlock();
}
}
else {
unlock();
}
}
else {
unlock();
}
lockWrite();
m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqRequestId);
m_AcqIdentifierToAcqRequestMap.erase(acqRequestId);
m_VIdentifierToCurrrentAcqIdMap.erase(vIdentifier);
unlock();
qCDebug(LOG_VariableAcquisitionWorker())
<< "VariableAcquisitionWorkerPrivate::removeAcqRequest END";
}