|
|
#include "Variable/VariableAcquisitionWorker.h"
|
|
|
|
|
|
#include "Variable/Variable.h"
|
|
|
|
|
|
#include <Data/AcquisitionRequest.h>
|
|
|
#include <Data/SqpRange.h>
|
|
|
|
|
|
#include <unordered_map>
|
|
|
#include <utility>
|
|
|
|
|
|
#include <QMutex>
|
|
|
#include <QReadWriteLock>
|
|
|
#include <QThread>
|
|
|
|
|
|
#include <cmath>
|
|
|
|
|
|
Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker")
|
|
|
|
|
|
struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate {
|
|
|
|
|
|
explicit VariableAcquisitionWorkerPrivate(VariableAcquisitionWorker *parent)
|
|
|
: m_Lock{QReadWriteLock::Recursive}, q{parent}
|
|
|
{
|
|
|
}
|
|
|
|
|
|
void lockRead() { m_Lock.lockForRead(); }
|
|
|
void lockWrite() { m_Lock.lockForWrite(); }
|
|
|
void unlock() { m_Lock.unlock(); }
|
|
|
|
|
|
void removeVariableRequest(QUuid vIdentifier);
|
|
|
|
|
|
/// Remove and/or abort all AcqRequest in link with varRequestId
|
|
|
void cancelVarRequest(QUuid varRequestId);
|
|
|
void removeAcqRequest(QUuid acqRequestId);
|
|
|
|
|
|
QMutex m_WorkingMutex;
|
|
|
QReadWriteLock m_Lock;
|
|
|
|
|
|
std::map<QUuid, QVector<AcquisitionDataPacket> > m_AcqIdentifierToAcqDataPacketVectorMap;
|
|
|
std::map<QUuid, AcquisitionRequest> m_AcqIdentifierToAcqRequestMap;
|
|
|
std::map<QUuid, QUuid> m_VIdentifierToCurrrentAcqIdMap;
|
|
|
VariableAcquisitionWorker *q;
|
|
|
};
|
|
|
|
|
|
|
|
|
VariableAcquisitionWorker::VariableAcquisitionWorker(QObject *parent)
|
|
|
: QObject{parent}, impl{spimpl::make_unique_impl<VariableAcquisitionWorkerPrivate>(this)}
|
|
|
{
|
|
|
}
|
|
|
|
|
|
VariableAcquisitionWorker::~VariableAcquisitionWorker()
|
|
|
{
|
|
|
qCInfo(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker destruction")
|
|
|
<< QThread::currentThread();
|
|
|
this->waitForFinish();
|
|
|
}
|
|
|
|
|
|
|
|
|
QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid vIdentifier,
|
|
|
DataProviderParameters parameters,
|
|
|
std::shared_ptr<IDataProvider> provider)
|
|
|
{
|
|
|
qCDebug(LOG_VariableAcquisitionWorker())
|
|
|
<< tr("TORM VariableAcquisitionWorker::pushVariableRequest varRequestId: ") << varRequestId
|
|
|
<< "vId: " << vIdentifier;
|
|
|
auto varRequestIdCanceled = QUuid();
|
|
|
|
|
|
// Request creation
|
|
|
auto acqRequest = AcquisitionRequest{};
|
|
|
acqRequest.m_VarRequestId = varRequestId;
|
|
|
acqRequest.m_vIdentifier = vIdentifier;
|
|
|
acqRequest.m_DataProviderParameters = parameters;
|
|
|
acqRequest.m_Size = parameters.m_Times.size();
|
|
|
acqRequest.m_Provider = provider;
|
|
|
qCInfo(LOG_VariableAcquisitionWorker()) << tr("Add acqRequest ") << acqRequest.m_AcqIdentifier
|
|
|
<< acqRequest.m_Size;
|
|
|
|
|
|
|
|
|
// Register request
|
|
|
impl->lockWrite();
|
|
|
impl->m_AcqIdentifierToAcqRequestMap.insert(
|
|
|
std::make_pair(acqRequest.m_AcqIdentifier, acqRequest));
|
|
|
|
|
|
auto it = impl->m_VIdentifierToCurrrentAcqIdMap.find(vIdentifier);
|
|
|
if (it != impl->m_VIdentifierToCurrrentAcqIdMap.cend()) {
|
|
|
// A current request already exists, we can cancel it
|
|
|
// remove old acqIdentifier from the worker
|
|
|
auto oldAcqId = it->second;
|
|
|
auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(oldAcqId);
|
|
|
if (acqIdentifierToAcqRequestMapIt != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
|
|
|
auto oldAcqRequest = acqIdentifierToAcqRequestMapIt->second;
|
|
|
varRequestIdCanceled = oldAcqRequest.m_VarRequestId;
|
|
|
}
|
|
|
impl->unlock();
|
|
|
impl->cancelVarRequest(varRequestIdCanceled);
|
|
|
}
|
|
|
else {
|
|
|
impl->unlock();
|
|
|
}
|
|
|
|
|
|
// Request for the variable, it must be stored and executed
|
|
|
impl->lockWrite();
|
|
|
impl->m_VIdentifierToCurrrentAcqIdMap.insert(
|
|
|
std::make_pair(vIdentifier, acqRequest.m_AcqIdentifier));
|
|
|
impl->unlock();
|
|
|
|
|
|
QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
|
|
|
Q_ARG(QUuid, acqRequest.m_AcqIdentifier));
|
|
|
|
|
|
return varRequestIdCanceled;
|
|
|
}
|
|
|
|
|
|
void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier)
|
|
|
{
|
|
|
impl->lockRead();
|
|
|
|
|
|
auto it = impl->m_VIdentifierToCurrrentAcqIdMap.find(vIdentifier);
|
|
|
if (it != impl->m_VIdentifierToCurrrentAcqIdMap.cend()) {
|
|
|
auto currentAcqId = it->second;
|
|
|
|
|
|
auto it = impl->m_AcqIdentifierToAcqRequestMap.find(currentAcqId);
|
|
|
if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
|
|
|
auto request = it->second;
|
|
|
impl->unlock();
|
|
|
|
|
|
// notify the request aborting to the provider
|
|
|
request.m_Provider->requestDataAborting(currentAcqId);
|
|
|
}
|
|
|
else {
|
|
|
impl->unlock();
|
|
|
qCWarning(LOG_VariableAcquisitionWorker())
|
|
|
<< tr("Impossible to abort an unknown acquisition request") << currentAcqId;
|
|
|
}
|
|
|
}
|
|
|
else {
|
|
|
impl->unlock();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
void VariableAcquisitionWorker::onVariableRetrieveDataInProgress(QUuid acqIdentifier,
|
|
|
double progress)
|
|
|
{
|
|
|
qCInfo(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableRetrieveDataInProgress ")
|
|
|
<< QThread::currentThread()->objectName()
|
|
|
<< acqIdentifier << progress;
|
|
|
impl->lockRead();
|
|
|
auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
|
|
|
if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
|
|
|
auto progressPartSize
|
|
|
= (aIdToARit->second.m_Size != 0) ? 100 / aIdToARit->second.m_Size : 0;
|
|
|
|
|
|
auto currentPartProgress
|
|
|
= std::isnan(progress) ? 0.0 : (progress * progressPartSize) / 100.0;
|
|
|
|
|
|
// We can only give an approximation of the currentProgression since its upgrade is async.
|
|
|
qCInfo(LOG_VariableAcquisitionWorker())
|
|
|
<< tr("Progression: ") << aIdToARit->second.m_Progression << aIdToARit->second.m_Size;
|
|
|
auto currentProgression = aIdToARit->second.m_Progression;
|
|
|
if (currentProgression == aIdToARit->second.m_Size) {
|
|
|
currentProgression = aIdToARit->second.m_Size - 1;
|
|
|
}
|
|
|
|
|
|
auto currentAlreadyProgress = progressPartSize * currentProgression;
|
|
|
|
|
|
|
|
|
auto finalProgression = currentAlreadyProgress + currentPartProgress;
|
|
|
emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, finalProgression);
|
|
|
qCInfo(LOG_VariableAcquisitionWorker())
|
|
|
<< tr("TORM: onVariableRetrieveDataInProgress ")
|
|
|
<< QThread::currentThread()->objectName() << aIdToARit->second.m_vIdentifier
|
|
|
<< progressPartSize << currentAlreadyProgress << currentPartProgress
|
|
|
<< finalProgression;
|
|
|
|
|
|
if (finalProgression == 100.0) {
|
|
|
qCInfo(LOG_VariableAcquisitionWorker())
|
|
|
<< tr("TORM: onVariableRetrieveDataInProgress : finished : 0.0 ");
|
|
|
emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, 0.0);
|
|
|
}
|
|
|
}
|
|
|
impl->unlock();
|
|
|
}
|
|
|
|
|
|
void VariableAcquisitionWorker::onVariableAcquisitionFailed(QUuid acqIdentifier)
|
|
|
{
|
|
|
qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
|
|
|
<< QThread::currentThread();
|
|
|
impl->lockRead();
|
|
|
auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
|
|
|
if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
|
|
|
auto request = it->second;
|
|
|
impl->unlock();
|
|
|
qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
|
|
|
<< acqIdentifier << request.m_vIdentifier
|
|
|
<< QThread::currentThread();
|
|
|
emit variableCanceledRequested(request.m_vIdentifier);
|
|
|
}
|
|
|
else {
|
|
|
impl->unlock();
|
|
|
// TODO log no acqIdentifier recognized
|
|
|
}
|
|
|
}
|
|
|
|
|
|
void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier,
|
|
|
std::shared_ptr<IDataSeries> dataSeries,
|
|
|
SqpRange dataRangeAcquired)
|
|
|
{
|
|
|
qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableDataAcquired on range ")
|
|
|
<< acqIdentifier << dataRangeAcquired;
|
|
|
impl->lockWrite();
|
|
|
auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
|
|
|
if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
|
|
|
// Store the result
|
|
|
auto dataPacket = AcquisitionDataPacket{};
|
|
|
dataPacket.m_Range = dataRangeAcquired;
|
|
|
dataPacket.m_DateSeries = dataSeries;
|
|
|
|
|
|
auto aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
|
|
|
if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
|
|
|
// A current request result already exists, we can update it
|
|
|
aIdToADPVit->second.push_back(dataPacket);
|
|
|
}
|
|
|
else {
|
|
|
// First request result for the variable, it must be stored
|
|
|
impl->m_AcqIdentifierToAcqDataPacketVectorMap.insert(
|
|
|
std::make_pair(acqIdentifier, QVector<AcquisitionDataPacket>() << dataPacket));
|
|
|
}
|
|
|
|
|
|
|
|
|
// Decrement the counter of the request
|
|
|
auto &acqRequest = aIdToARit->second;
|
|
|
qCInfo(LOG_VariableAcquisitionWorker()) << tr("TORM: +1 update progresson ")
|
|
|
<< acqIdentifier;
|
|
|
acqRequest.m_Progression = acqRequest.m_Progression + 1;
|
|
|
|
|
|
// if the counter is 0, we can return data then run the next request if it exists and
|
|
|
// removed the finished request
|
|
|
if (acqRequest.m_Size == acqRequest.m_Progression) {
|
|
|
auto varId = acqRequest.m_vIdentifier;
|
|
|
// Return the data
|
|
|
aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
|
|
|
if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
|
|
|
emit dataProvided(varId, aIdToADPVit->second);
|
|
|
}
|
|
|
impl->unlock();
|
|
|
}
|
|
|
else {
|
|
|
impl->unlock();
|
|
|
}
|
|
|
}
|
|
|
else {
|
|
|
impl->unlock();
|
|
|
qCWarning(LOG_VariableAcquisitionWorker())
|
|
|
<< tr("Impossible to retrieve AcquisitionRequest for the incoming data.");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier)
|
|
|
{
|
|
|
qCDebug(LOG_VariableAcquisitionWorker()) << tr("onExecuteRequest") << QThread::currentThread();
|
|
|
impl->lockRead();
|
|
|
auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
|
|
|
if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
|
|
|
auto request = it->second;
|
|
|
impl->unlock();
|
|
|
emit variableRequestInProgress(request.m_vIdentifier, 0.1);
|
|
|
qCDebug(LOG_VariableAcquisitionWorker()) << tr("Start request 10%") << acqIdentifier
|
|
|
<< QThread::currentThread();
|
|
|
request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters);
|
|
|
}
|
|
|
else {
|
|
|
impl->unlock();
|
|
|
// TODO log no acqIdentifier recognized
|
|
|
}
|
|
|
}
|
|
|
|
|
|
void VariableAcquisitionWorker::initialize()
|
|
|
{
|
|
|
qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init")
|
|
|
<< QThread::currentThread();
|
|
|
impl->m_WorkingMutex.lock();
|
|
|
qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init END");
|
|
|
}
|
|
|
|
|
|
void VariableAcquisitionWorker::finalize()
|
|
|
{
|
|
|
impl->m_WorkingMutex.unlock();
|
|
|
}
|
|
|
|
|
|
void VariableAcquisitionWorker::waitForFinish()
|
|
|
{
|
|
|
QMutexLocker locker{&impl->m_WorkingMutex};
|
|
|
}
|
|
|
|
|
|
void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariableRequest(
|
|
|
QUuid vIdentifier)
|
|
|
{
|
|
|
lockWrite();
|
|
|
auto it = m_VIdentifierToCurrrentAcqIdMap.find(vIdentifier);
|
|
|
|
|
|
if (it != m_VIdentifierToCurrrentAcqIdMap.cend()) {
|
|
|
// A current request already exists, we can replace the next one
|
|
|
|
|
|
qCDebug(LOG_VariableAcquisitionWorker())
|
|
|
<< "VariableAcquisitionWorkerPrivate::removeVariableRequest "
|
|
|
<< QThread::currentThread()->objectName() << it->second;
|
|
|
m_AcqIdentifierToAcqRequestMap.erase(it->second);
|
|
|
m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second);
|
|
|
}
|
|
|
|
|
|
// stop any progression
|
|
|
emit q->variableRequestInProgress(vIdentifier, 0.0);
|
|
|
|
|
|
m_VIdentifierToCurrrentAcqIdMap.erase(vIdentifier);
|
|
|
unlock();
|
|
|
}
|
|
|
|
|
|
void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::cancelVarRequest(
|
|
|
QUuid varRequestId)
|
|
|
{
|
|
|
qCDebug(LOG_VariableAcquisitionWorker())
|
|
|
<< "VariableAcquisitionWorkerPrivate::cancelVarRequest 0";
|
|
|
lockRead();
|
|
|
// get all AcqIdentifier in link with varRequestId
|
|
|
QVector<QUuid> acqIdsToRm;
|
|
|
auto cend = m_AcqIdentifierToAcqRequestMap.cend();
|
|
|
for (auto it = m_AcqIdentifierToAcqRequestMap.cbegin(); it != cend; ++it) {
|
|
|
if (it->second.m_VarRequestId == varRequestId) {
|
|
|
acqIdsToRm << it->first;
|
|
|
}
|
|
|
}
|
|
|
unlock();
|
|
|
// run aborting or removing of acqIdsToRm
|
|
|
|
|
|
for (auto acqId : acqIdsToRm) {
|
|
|
removeAcqRequest(acqId);
|
|
|
}
|
|
|
qCDebug(LOG_VariableAcquisitionWorker())
|
|
|
<< "VariableAcquisitionWorkerPrivate::cancelVarRequest end";
|
|
|
}
|
|
|
|
|
|
void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeAcqRequest(
|
|
|
QUuid acqRequestId)
|
|
|
{
|
|
|
qCDebug(LOG_VariableAcquisitionWorker())
|
|
|
<< "VariableAcquisitionWorkerPrivate::removeAcqRequest";
|
|
|
QUuid vIdentifier;
|
|
|
std::shared_ptr<IDataProvider> provider;
|
|
|
lockRead();
|
|
|
auto acqIt = m_AcqIdentifierToAcqRequestMap.find(acqRequestId);
|
|
|
if (acqIt != m_AcqIdentifierToAcqRequestMap.cend()) {
|
|
|
vIdentifier = acqIt->second.m_vIdentifier;
|
|
|
provider = acqIt->second.m_Provider;
|
|
|
|
|
|
auto it = m_VIdentifierToCurrrentAcqIdMap.find(vIdentifier);
|
|
|
if (it != m_VIdentifierToCurrrentAcqIdMap.cend()) {
|
|
|
if (it->second == acqRequestId) {
|
|
|
// acqRequest is currently running -> let's aborting it
|
|
|
unlock();
|
|
|
|
|
|
// notify the request aborting to the provider
|
|
|
provider->requestDataAborting(acqRequestId);
|
|
|
}
|
|
|
else {
|
|
|
unlock();
|
|
|
}
|
|
|
}
|
|
|
else {
|
|
|
unlock();
|
|
|
}
|
|
|
}
|
|
|
else {
|
|
|
unlock();
|
|
|
}
|
|
|
|
|
|
lockWrite();
|
|
|
|
|
|
qCDebug(LOG_VariableAcquisitionWorker())
|
|
|
<< "VariableAcquisitionWorkerPrivate::updateToNextRequest removeAcqRequest: "
|
|
|
<< acqRequestId;
|
|
|
m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqRequestId);
|
|
|
m_AcqIdentifierToAcqRequestMap.erase(acqRequestId);
|
|
|
|
|
|
unlock();
|
|
|
qCDebug(LOG_VariableAcquisitionWorker())
|
|
|
<< "VariableAcquisitionWorkerPrivate::removeAcqRequest END";
|
|
|
}
|
|
|
|