##// END OF EJS Templates
commit processRequest
commit processRequest

File last commit:

r681:5e6b86368b57
r682:d41b2e70981c
Show More
VariableAcquisitionWorker.cpp
225 lines | 7.1 KiB | text/x-c | CppLexer
/ core / src / Variable / VariableAcquisitionWorker.cpp
#include "Variable/VariableAcquisitionWorker.h"
#include "Variable/Variable.h"
#include <Data/AcquisitionRequest.h>
#include <Data/SqpRange.h>
#include <unordered_map>
#include <utility>
#include <QMutex>
#include <QReadWriteLock>
#include <QThread>
#include <QtConcurrent/QtConcurrent>
Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker")
namespace {
using AcquisitionId = QUuid;
using VariableId = QUuid;
} // namespace
struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate {
explicit VariableAcquisitionWorkerPrivate() : m_Lock{QReadWriteLock::Recursive} {}
void lockRead() { m_Lock.lockForRead(); }
void lockWrite() { m_Lock.lockForWrite(); }
void unlock() { m_Lock.unlock(); }
void eraseRequest(AcquisitionId id);
std::map<AcquisitionId, AcquisitionRequest>::iterator insertRequest(AcquisitionId id,
AcquisitionRequest request);
void removeVariableRequest(QUuid vIdentifier);
QMutex m_WorkingMutex;
QReadWriteLock m_Lock;
/// Current acquisitions (by variable)
std::map<AcquisitionId, AcquisitionRequest> m_Requests;
std::map<VariableId, AcquisitionRequest *> m_RequestsIndex;
};
VariableAcquisitionWorker::VariableAcquisitionWorker(QObject *parent)
: QObject{parent}, impl{spimpl::make_unique_impl<VariableAcquisitionWorkerPrivate>()}
{
}
VariableAcquisitionWorker::~VariableAcquisitionWorker()
{
qCInfo(LOG_VariableAcquisitionWorker())
<< tr("VariableAcquisitionWorker destruction") << QThread::currentThread();
this->waitForFinish();
}
QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid vIdentifier,
SqpRange rangeRequested,
SqpRange cacheRangeRequested,
DataProviderParameters parameters,
std::shared_ptr<IDataProvider> provider)
{
qCDebug(LOG_VariableAcquisitionWorker())
<< tr("TORM VariableAcquisitionWorker::pushVariableRequest ") << cacheRangeRequested;
auto varRequestIdCanceled = QUuid();
// Request creation
auto acqRequest = AcquisitionRequest{};
acqRequest.m_VarRequestId = varRequestId;
acqRequest.m_vIdentifier = vIdentifier;
acqRequest.m_DataProviderParameters = parameters;
acqRequest.m_RangeRequested = rangeRequested;
acqRequest.m_CacheRangeRequested = cacheRangeRequested;
acqRequest.m_Size = parameters.m_Times.size();
acqRequest.m_Provider = provider;
impl->lockWrite();
// Checks if there is a current acquisition on variable
auto currentRequestIt = impl->m_RequestsIndex.find(vIdentifier);
if (currentRequestIt != impl->m_RequestsIndex.cend()) {
auto request = currentRequestIt->second;
QtConcurrent::run(
[ provider = request->m_Provider, acqIdentifier = request->m_AcqIdentifier ]() {
provider->requestDataAborting(acqIdentifier);
});
varRequestIdCanceled = request->m_VarRequestId;
impl->eraseRequest(request->m_AcqIdentifier);
}
// Sets the new acquisition request as the current request for the variable
auto newRequestIt = impl->insertRequest(acqRequest.m_AcqIdentifier, std::move(acqRequest));
if (newRequestIt != impl->m_Requests.end()) {
qCInfo(LOG_VariableAcquisitionWorker()) << "EXECUTE REQUEST" << acqRequest.m_AcqIdentifier;
QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
Q_ARG(QUuid, newRequestIt->first));
}
else {
/// @todo ALX : log
}
impl->unlock();
return varRequestIdCanceled;
}
void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier)
{
// TODO
}
void VariableAcquisitionWorker::onVariableRetrieveDataInProgress(QUuid acqIdentifier,
double progress)
{
// TODO
}
void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier,
std::shared_ptr<IDataSeries> dataSeries,
SqpRange dataRangeAcquired)
{
qCDebug(LOG_VariableAcquisitionWorker())
<< tr("onVariableDataAcquired on range ") << acqIdentifier << dataRangeAcquired;
impl->lockWrite();
auto it = impl->m_Requests.find(acqIdentifier);
if (it != impl->m_Requests.cend()) {
auto &request = it->second;
// Store the result
auto dataPacket = AcquisitionDataPacket{dataSeries, dataRangeAcquired};
request.m_DataPackets.push_back(dataPacket);
request.m_Size = request.m_Size - 1;
if (request.m_Size == 0) {
emit dataProvided(request.m_vIdentifier, request.m_RangeRequested,
request.m_CacheRangeRequested, request.m_DataPackets);
impl->eraseRequest(acqIdentifier);
}
}
impl->unlock();
}
void VariableAcquisitionWorker::onVariableAcquisitionCanceled(QUuid acqIdentifier)
{
impl->lockWrite();
impl->unlock();
}
void VariableAcquisitionWorker::initialize()
{
qCDebug(LOG_VariableAcquisitionWorker())
<< tr("VariableAcquisitionWorker init") << QThread::currentThread();
impl->m_WorkingMutex.lock();
qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init END");
}
void VariableAcquisitionWorker::finalize()
{
impl->m_WorkingMutex.unlock();
}
void VariableAcquisitionWorker::waitForFinish()
{
QMutexLocker locker{&impl->m_WorkingMutex};
}
void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::eraseRequest(AcquisitionId id)
{
auto it = m_Requests.find(id);
if (it != m_Requests.end()) {
// Removes from index
m_RequestsIndex.erase(it->second.m_vIdentifier);
// Removes request
m_Requests.erase(it);
}
}
std::map<AcquisitionId, AcquisitionRequest>::iterator
VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::insertRequest(
AcquisitionId id, AcquisitionRequest request)
{
// Inserts request
auto variableId = request.m_vIdentifier;
auto result = m_Requests.insert(std::make_pair(id, std::move(request)));
if (result.second) {
// Inserts index
m_RequestsIndex[variableId] = &result.first->second;
return result.first;
}
else {
return m_Requests.end();
}
}
void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariableRequest(
QUuid vIdentifier)
{
/// @todo ALX
// m_Acquisitions.erase(vIdentifier);
}
void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier)
{
impl->lockRead();
auto it = impl->m_Requests.find(acqIdentifier);
if (it != impl->m_Requests.cend()) {
auto &request = it->second;
impl->unlock();
request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters);
}
else {
impl->unlock();
// TODO log no acqIdentifier recognized
}
}