|
|
#include "Variable/VariableAcquisitionWorker.h"
|
|
|
|
|
|
#include "Variable/Variable.h"
|
|
|
|
|
|
#include <Data/AcquisitionRequest.h>
|
|
|
#include <Data/SqpRange.h>
|
|
|
|
|
|
#include <unordered_map>
|
|
|
#include <utility>
|
|
|
|
|
|
#include <QMutex>
|
|
|
#include <QReadWriteLock>
|
|
|
#include <QThread>
|
|
|
#include <QtConcurrent/QtConcurrent>
|
|
|
|
|
|
Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker")
|
|
|
|
|
|
namespace {
|
|
|
|
|
|
using AcquisitionId = QUuid;
|
|
|
using VariableId = QUuid;
|
|
|
|
|
|
} // namespace
|
|
|
|
|
|
struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate {
|
|
|
|
|
|
explicit VariableAcquisitionWorkerPrivate() : m_Lock{QReadWriteLock::Recursive} {}
|
|
|
|
|
|
void lockRead() { m_Lock.lockForRead(); }
|
|
|
void lockWrite() { m_Lock.lockForWrite(); }
|
|
|
void unlock() { m_Lock.unlock(); }
|
|
|
|
|
|
void eraseRequest(AcquisitionId id);
|
|
|
std::map<AcquisitionId, AcquisitionRequest>::iterator insertRequest(AcquisitionId id,
|
|
|
AcquisitionRequest request);
|
|
|
|
|
|
void removeVariableRequest(QUuid vIdentifier);
|
|
|
|
|
|
QMutex m_WorkingMutex;
|
|
|
QReadWriteLock m_Lock;
|
|
|
|
|
|
/// Current acquisitions (by variable)
|
|
|
std::map<AcquisitionId, AcquisitionRequest> m_Requests;
|
|
|
std::map<VariableId, AcquisitionRequest *> m_RequestsIndex;
|
|
|
};
|
|
|
|
|
|
|
|
|
VariableAcquisitionWorker::VariableAcquisitionWorker(QObject *parent)
|
|
|
: QObject{parent}, impl{spimpl::make_unique_impl<VariableAcquisitionWorkerPrivate>()}
|
|
|
{
|
|
|
}
|
|
|
|
|
|
VariableAcquisitionWorker::~VariableAcquisitionWorker()
|
|
|
{
|
|
|
qCInfo(LOG_VariableAcquisitionWorker())
|
|
|
<< tr("VariableAcquisitionWorker destruction") << QThread::currentThread();
|
|
|
this->waitForFinish();
|
|
|
}
|
|
|
|
|
|
|
|
|
QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid vIdentifier,
|
|
|
SqpRange rangeRequested,
|
|
|
SqpRange cacheRangeRequested,
|
|
|
DataProviderParameters parameters,
|
|
|
std::shared_ptr<IDataProvider> provider)
|
|
|
{
|
|
|
qCDebug(LOG_VariableAcquisitionWorker())
|
|
|
<< tr("TORM VariableAcquisitionWorker::pushVariableRequest ") << cacheRangeRequested;
|
|
|
auto varRequestIdCanceled = QUuid();
|
|
|
|
|
|
// Request creation
|
|
|
auto acqRequest = AcquisitionRequest{};
|
|
|
acqRequest.m_VarRequestId = varRequestId;
|
|
|
acqRequest.m_vIdentifier = vIdentifier;
|
|
|
acqRequest.m_DataProviderParameters = parameters;
|
|
|
acqRequest.m_RangeRequested = rangeRequested;
|
|
|
acqRequest.m_CacheRangeRequested = cacheRangeRequested;
|
|
|
acqRequest.m_Size = parameters.m_Times.size();
|
|
|
acqRequest.m_Provider = provider;
|
|
|
|
|
|
impl->lockWrite();
|
|
|
|
|
|
// Checks if there is a current acquisition on variable
|
|
|
auto currentRequestIt = impl->m_RequestsIndex.find(vIdentifier);
|
|
|
if (currentRequestIt != impl->m_RequestsIndex.cend()) {
|
|
|
auto request = currentRequestIt->second;
|
|
|
QtConcurrent::run(
|
|
|
[ provider = request->m_Provider, acqIdentifier = request->m_AcqIdentifier ]() {
|
|
|
provider->requestDataAborting(acqIdentifier);
|
|
|
});
|
|
|
varRequestIdCanceled = request->m_VarRequestId;
|
|
|
|
|
|
impl->eraseRequest(request->m_AcqIdentifier);
|
|
|
}
|
|
|
|
|
|
// Sets the new acquisition request as the current request for the variable
|
|
|
auto newRequestIt = impl->insertRequest(acqRequest.m_AcqIdentifier, std::move(acqRequest));
|
|
|
if (newRequestIt != impl->m_Requests.end()) {
|
|
|
qCInfo(LOG_VariableAcquisitionWorker()) << "EXECUTE REQUEST" << acqRequest.m_AcqIdentifier;
|
|
|
QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
|
|
|
Q_ARG(QUuid, newRequestIt->first));
|
|
|
}
|
|
|
else {
|
|
|
/// @todo ALX : log
|
|
|
}
|
|
|
|
|
|
impl->unlock();
|
|
|
|
|
|
return varRequestIdCanceled;
|
|
|
}
|
|
|
|
|
|
void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier)
|
|
|
{
|
|
|
// TODO
|
|
|
}
|
|
|
|
|
|
void VariableAcquisitionWorker::onVariableRetrieveDataInProgress(QUuid acqIdentifier,
|
|
|
double progress)
|
|
|
{
|
|
|
// TODO
|
|
|
}
|
|
|
|
|
|
void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier,
|
|
|
std::shared_ptr<IDataSeries> dataSeries,
|
|
|
SqpRange dataRangeAcquired)
|
|
|
{
|
|
|
qCDebug(LOG_VariableAcquisitionWorker())
|
|
|
<< tr("onVariableDataAcquired on range ") << acqIdentifier << dataRangeAcquired;
|
|
|
impl->lockWrite();
|
|
|
|
|
|
auto it = impl->m_Requests.find(acqIdentifier);
|
|
|
if (it != impl->m_Requests.cend()) {
|
|
|
auto &request = it->second;
|
|
|
|
|
|
// Store the result
|
|
|
auto dataPacket = AcquisitionDataPacket{dataSeries, dataRangeAcquired};
|
|
|
request.m_DataPackets.push_back(dataPacket);
|
|
|
request.m_Size = request.m_Size - 1;
|
|
|
|
|
|
if (request.m_Size == 0) {
|
|
|
emit dataProvided(request.m_vIdentifier, request.m_RangeRequested,
|
|
|
request.m_CacheRangeRequested, request.m_DataPackets);
|
|
|
impl->eraseRequest(acqIdentifier);
|
|
|
}
|
|
|
}
|
|
|
impl->unlock();
|
|
|
}
|
|
|
|
|
|
void VariableAcquisitionWorker::onVariableAcquisitionCanceled(QUuid acqIdentifier)
|
|
|
{
|
|
|
impl->lockWrite();
|
|
|
impl->unlock();
|
|
|
}
|
|
|
|
|
|
void VariableAcquisitionWorker::initialize()
|
|
|
{
|
|
|
qCDebug(LOG_VariableAcquisitionWorker())
|
|
|
<< tr("VariableAcquisitionWorker init") << QThread::currentThread();
|
|
|
impl->m_WorkingMutex.lock();
|
|
|
qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init END");
|
|
|
}
|
|
|
|
|
|
void VariableAcquisitionWorker::finalize()
|
|
|
{
|
|
|
impl->m_WorkingMutex.unlock();
|
|
|
}
|
|
|
|
|
|
void VariableAcquisitionWorker::waitForFinish()
|
|
|
{
|
|
|
QMutexLocker locker{&impl->m_WorkingMutex};
|
|
|
}
|
|
|
|
|
|
|
|
|
void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::eraseRequest(AcquisitionId id)
|
|
|
{
|
|
|
auto it = m_Requests.find(id);
|
|
|
if (it != m_Requests.end()) {
|
|
|
// Removes from index
|
|
|
m_RequestsIndex.erase(it->second.m_vIdentifier);
|
|
|
|
|
|
// Removes request
|
|
|
m_Requests.erase(it);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
std::map<AcquisitionId, AcquisitionRequest>::iterator
|
|
|
VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::insertRequest(
|
|
|
AcquisitionId id, AcquisitionRequest request)
|
|
|
{
|
|
|
// Inserts request
|
|
|
auto variableId = request.m_vIdentifier;
|
|
|
auto result = m_Requests.insert(std::make_pair(id, std::move(request)));
|
|
|
|
|
|
if (result.second) {
|
|
|
// Inserts index
|
|
|
m_RequestsIndex[variableId] = &result.first->second;
|
|
|
|
|
|
return result.first;
|
|
|
}
|
|
|
else {
|
|
|
return m_Requests.end();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariableRequest(
|
|
|
QUuid vIdentifier)
|
|
|
{
|
|
|
/// @todo ALX
|
|
|
// m_Acquisitions.erase(vIdentifier);
|
|
|
}
|
|
|
|
|
|
void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier)
|
|
|
{
|
|
|
impl->lockRead();
|
|
|
auto it = impl->m_Requests.find(acqIdentifier);
|
|
|
if (it != impl->m_Requests.cend()) {
|
|
|
auto &request = it->second;
|
|
|
impl->unlock();
|
|
|
request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters);
|
|
|
}
|
|
|
else {
|
|
|
impl->unlock();
|
|
|
// TODO log no acqIdentifier recognized
|
|
|
}
|
|
|
}
|
|
|
|