VariableController2.cpp
403 lines
| 13.5 KiB
| text/x-c
|
CppLexer
r17 | #include <QQueue> | |||
#include <QThreadPool> | ||||
#include <QRunnable> | ||||
#include <QObject> | ||||
r21 | #include <QDataStream> | |||
r17 | ||||
r0 | #include "Variable/VariableController2.h" | |||
r6 | #include "Variable/VariableSynchronizationGroup2.h" | |||
r2 | #include <Common/containers.h> | |||
#include <Common/debug.h> | ||||
#include <Data/DataProviderParameters.h> | ||||
r9 | #include <Data/DateTimeRangeHelper.h> | |||
r17 | #include <Data/DateTimeRange.h> | |||
r9 | #include <Variable/VariableCacheStrategyFactory.h> | |||
r17 | #include <Variable/private/VCTransaction.h> | |||
r18 | #include <QCoreApplication> | |||
r17 | ||||
r0 | ||||
r19 | ||||
r2 | class VariableController2::VariableController2Private | |||
r0 | { | |||
r19 | struct threadSafeVaraiblesMaps | |||
{ | ||||
inline void addVariable(const std::shared_ptr<Variable>& variable, const std::shared_ptr<IDataProvider>& provider, const std::shared_ptr<VariableSynchronizationGroup2>& synchronizationGroup) | ||||
{ | ||||
QWriteLocker lock{&_lock}; | ||||
_variables[*variable] = variable; | ||||
_providers[*variable] = provider; | ||||
_synchronizationGroups[*variable] = synchronizationGroup; | ||||
} | ||||
inline void removeVariable(const std::shared_ptr<Variable>& variable) | ||||
{ | ||||
QWriteLocker lock{&_lock}; | ||||
_variables.remove(*variable); | ||||
_providers.remove(*variable); | ||||
_synchronizationGroups.remove(*variable); | ||||
} | ||||
inline void synchronize(const std::shared_ptr<Variable>& variable, const std::optional<std::shared_ptr<Variable>>& with) | ||||
{ | ||||
QWriteLocker lock{&_lock}; | ||||
if(with.has_value()) | ||||
{ | ||||
auto newGroup = _synchronizationGroups[*with.value()]; | ||||
newGroup->addVariable(*variable); | ||||
_synchronizationGroups[*variable] = newGroup; | ||||
} | ||||
else | ||||
{ | ||||
_synchronizationGroups[*variable] = std::make_shared<VariableSynchronizationGroup2>(*variable); | ||||
} | ||||
} | ||||
inline std::shared_ptr<Variable> variable(QUuid variable) | ||||
{ | ||||
QReadLocker lock{&_lock}; | ||||
[[unlikely]] | ||||
if(!_variables.contains(variable)) | ||||
SCIQLOP_ERROR(threadSafeVaraiblesMaps,"Unknown Variable"); | ||||
return _variables[variable]; | ||||
} | ||||
r25 | inline std::shared_ptr<Variable> variable(int index) | |||
{ | ||||
QReadLocker lock{&_lock}; | ||||
[[unlikely]] | ||||
if(!_variables.size() > index) | ||||
SCIQLOP_ERROR(threadSafeVaraiblesMaps,"Index is out of bounds"); | ||||
return _variables.values()[index]; | ||||
} | ||||
r21 | inline const std::vector<std::shared_ptr<Variable>> variables() | |||
r19 | { | |||
r21 | std::vector<std::shared_ptr<Variable>> vars; | |||
r19 | QReadLocker lock{&_lock}; | |||
for(const auto &var:_variables) | ||||
{ | ||||
r21 | vars.push_back(var); | |||
r19 | } | |||
return vars; | ||||
} | ||||
inline std::shared_ptr<IDataProvider> provider(QUuid variable) | ||||
{ | ||||
QReadLocker lock{&_lock}; | ||||
[[unlikely]] | ||||
if(!_providers.contains(variable)) | ||||
SCIQLOP_ERROR(threadSafeVaraiblesMaps,"Unknown Variable"); | ||||
return _providers[variable]; | ||||
} | ||||
inline std::shared_ptr<VariableSynchronizationGroup2> group(QUuid variable) | ||||
{ | ||||
QReadLocker lock{&_lock}; | ||||
[[unlikely]] | ||||
if(!_synchronizationGroups.contains(variable)) | ||||
SCIQLOP_ERROR(threadSafeVaraiblesMaps,"Unknown Variable"); | ||||
return _synchronizationGroups[variable]; | ||||
} | ||||
inline bool has(const std::shared_ptr<Variable>& variable) | ||||
{ | ||||
QReadLocker lock{&_lock}; | ||||
return _variables.contains(*variable); | ||||
} | ||||
private: | ||||
QMap<QUuid,std::shared_ptr<Variable>> _variables; | ||||
QMap<QUuid,std::shared_ptr<IDataProvider>> _providers; | ||||
QMap<QUuid,std::shared_ptr<VariableSynchronizationGroup2>> _synchronizationGroups; | ||||
QReadWriteLock _lock{QReadWriteLock::Recursive}; | ||||
}_maps; | ||||
r17 | QThreadPool _ThreadPool; | |||
r20 | VCTransactionsQueues _transactions; | |||
r9 | std::unique_ptr<VariableCacheStrategy> _cacheStrategy; | |||
r17 | ||||
void _transactionComplete(QUuid group, std::shared_ptr<VCTransaction> transaction) | ||||
{ | ||||
r19 | if(transaction->done()) | |||
r17 | { | |||
_transactions.complete(group); | ||||
} | ||||
this->_processTransactions(); | ||||
} | ||||
void _processTransactions() | ||||
{ | ||||
auto nextTransactions = _transactions.nextTransactions(); | ||||
auto pendingTransactions = _transactions.pendingTransactions(); | ||||
for( auto [groupID, newTransaction] : nextTransactions) | ||||
{ | ||||
if(newTransaction.has_value() && !pendingTransactions[groupID].has_value()) | ||||
{ | ||||
_transactions.start(groupID); | ||||
r19 | auto refVar = _maps.variable(newTransaction.value()->refVar); | |||
r17 | auto ranges = _computeAllRangesInGroup(refVar,newTransaction.value()->range); | |||
for( auto const& [ID, range] : ranges) | ||||
{ | ||||
r19 | auto provider = _maps.provider(ID); | |||
auto variable = _maps.variable(ID); | ||||
r17 | auto [missingRanges, newCacheRange] = _computeMissingRanges(variable,range); | |||
r19 | auto exe = new TransactionExe(variable, provider, missingRanges, range, newCacheRange); | |||
r17 | QObject::connect(exe, | |||
&TransactionExe::transactionComplete, | ||||
[groupID=groupID,transaction=newTransaction.value(),this]() | ||||
{ | ||||
this->_transactionComplete(groupID, transaction); | ||||
} | ||||
); | ||||
_ThreadPool.start(exe); | ||||
} | ||||
} | ||||
} | ||||
} | ||||
std::map<QUuid,DateTimeRange> _computeAllRangesInGroup(const std::shared_ptr<Variable>& refVar, DateTimeRange r) | ||||
{ | ||||
std::map<QUuid,DateTimeRange> ranges; | ||||
if(!DateTimeRangeHelper::hasnan(r)) | ||||
{ | ||||
r19 | auto group = _maps.group(*refVar); | |||
r17 | if(auto transformation = DateTimeRangeHelper::computeTransformation(refVar->range(),r); | |||
transformation.has_value()) | ||||
{ | ||||
for(auto varId:group->variables()) | ||||
{ | ||||
r19 | auto var = _maps.variable(varId); | |||
r17 | auto newRange = var->range().transform(transformation.value()); | |||
ranges[varId] = newRange; | ||||
} | ||||
} | ||||
else // force new range to all variables -> may be weird if more than one var in the group | ||||
// @TODO ensure that there is no side effects | ||||
{ | ||||
for(auto varId:group->variables()) | ||||
{ | ||||
r19 | auto var = _maps.variable(varId); | |||
r17 | ranges[varId] = r; | |||
} | ||||
} | ||||
} | ||||
else | ||||
{ | ||||
SCIQLOP_ERROR(VariableController2Private, "Invalid range containing NaN"); | ||||
} | ||||
return ranges; | ||||
} | ||||
std::pair<std::vector<DateTimeRange>,DateTimeRange> _computeMissingRanges(const std::shared_ptr<Variable>& var, DateTimeRange r) | ||||
r9 | { | |||
r10 | DateTimeRange newCacheRange; | |||
std::vector<DateTimeRange> missingRanges; | ||||
if(DateTimeRangeHelper::hasnan(var->cacheRange())) | ||||
{ | ||||
newCacheRange = _cacheStrategy->computeRange(r,r); | ||||
missingRanges = {newCacheRange}; | ||||
} | ||||
else | ||||
{ | ||||
newCacheRange = _cacheStrategy->computeRange(var->cacheRange(),r); | ||||
missingRanges = newCacheRange - var->cacheRange(); | ||||
} | ||||
r17 | return {missingRanges,newCacheRange}; | |||
} | ||||
void _changeRange(QUuid id, DateTimeRange r) | ||||
{ | ||||
r19 | _changeRange(_maps.variable(id) ,r); | |||
r17 | } | |||
void _changeRange(const std::shared_ptr<Variable>& var, DateTimeRange r) | ||||
{ | ||||
r19 | auto provider = _maps.provider(*var); | |||
r17 | auto [missingRanges, newCacheRange] = _computeMissingRanges(var,r); | |||
r16 | std::vector<IDataSeries*> data; | |||
r9 | for(auto range:missingRanges) | |||
{ | ||||
r17 | data.push_back(provider->getData(DataProviderParameters{{range}, var->metadata()})); | |||
r9 | } | |||
r17 | var->updateData(data, r, newCacheRange, true); | |||
r9 | } | |||
r0 | public: | |||
VariableController2Private(QObject* parent=Q_NULLPTR) | ||||
r12 | :_cacheStrategy(VariableCacheStrategyFactory::createCacheStrategy(CacheStrategy::SingleThreshold)) | |||
r2 | { | |||
Q_UNUSED(parent); | ||||
r17 | this->_ThreadPool.setMaxThreadCount(32); | |||
r2 | } | |||
r19 | /* | |||
* This dtor has to like this even if this is ugly, because default dtor would rely on | ||||
* declaration order to destruct members and that would always lead to regressions when | ||||
r20 | * modifying class members | |||
r19 | */ | |||
r17 | ~VariableController2Private() | |||
{ | ||||
while (this->_ThreadPool.activeThreadCount()) | ||||
{ | ||||
this->_ThreadPool.waitForDone(100); | ||||
} | ||||
} | ||||
r2 | ||||
r10 | std::shared_ptr<Variable> createVariable(const QString &name, const QVariantHash &metadata, std::shared_ptr<IDataProvider> provider) | |||
r0 | { | |||
r2 | auto newVar = std::make_shared<Variable>(name,metadata); | |||
r17 | auto group = std::make_shared<VariableSynchronizationGroup2>(newVar->ID()); | |||
r19 | _maps.addVariable(newVar,std::move(provider),group); | |||
r17 | this->_transactions.addEntry(*group); | |||
r2 | return newVar; | |||
} | ||||
r25 | std::shared_ptr<Variable> variable(QUuid ID) | |||
{ | ||||
return _maps.variable(ID); | ||||
} | ||||
std::shared_ptr<Variable> variable(int index) | ||||
{ | ||||
return _maps.variable(index); | ||||
} | ||||
r20 | std::shared_ptr<Variable> cloneVariable(const std::shared_ptr<Variable>& variable) | |||
{ | ||||
auto newVar = variable->clone(); | ||||
_maps.synchronize(newVar,std::nullopt); | ||||
_maps.addVariable(newVar,_maps.provider(*variable),_maps.group(*newVar)); | ||||
this->_transactions.addEntry(*_maps.group(*newVar)); | ||||
return newVar; | ||||
} | ||||
r17 | bool hasPendingTransactions(const std::shared_ptr<Variable>& variable) | |||
{ | ||||
r19 | return _transactions.active(*_maps.group(*variable)); | |||
r17 | } | |||
r14 | void deleteVariable(const std::shared_ptr<Variable>& variable) | |||
r2 | { | |||
r19 | _maps.removeVariable(variable); | |||
r2 | } | |||
r15 | void asyncChangeRange(const std::shared_ptr<Variable>& variable, const DateTimeRange& r) | |||
r2 | { | |||
r19 | if(!DateTimeRangeHelper::hasnan(r)) | |||
r2 | { | |||
r19 | auto group = _maps.group(*variable); | |||
// Just overwrite next transaction | ||||
r9 | { | |||
r19 | _transactions.enqueue(*group,std::make_shared<VCTransaction>(variable->ID(), r, static_cast<int>(group->variables().size()))); | |||
r9 | } | |||
r19 | _processTransactions(); | |||
r2 | } | |||
else | ||||
{ | ||||
r19 | SCIQLOP_ERROR(VariableController2Private, "Invalid range containing NaN"); | |||
r2 | } | |||
} | ||||
r17 | void changeRange(const std::shared_ptr<Variable>& variable, DateTimeRange r) | |||
{ | ||||
r18 | asyncChangeRange(variable,r); | |||
while (hasPendingTransactions(variable)) | ||||
r17 | { | |||
r18 | QCoreApplication::processEvents(); | |||
r17 | } | |||
} | ||||
r19 | inline void synchronize(const std::shared_ptr<Variable>& var, const std::shared_ptr<Variable>& with) | |||
r8 | { | |||
r19 | _maps.synchronize(var, with); | |||
r8 | } | |||
r2 | ||||
r21 | inline const std::vector<std::shared_ptr<Variable>> variables() | |||
r2 | { | |||
r19 | return _maps.variables(); | |||
r0 | } | |||
r2 | ||||
r0 | }; | |||
VariableController2::VariableController2() | ||||
:impl{spimpl::make_unique_impl<VariableController2Private>()} | ||||
{} | ||||
r14 | std::shared_ptr<Variable> VariableController2::createVariable(const QString &name, const QVariantHash &metadata, const std::shared_ptr<IDataProvider>& provider, const DateTimeRange &range) | |||
r0 | { | |||
r10 | auto var = impl->createVariable(name, metadata, provider); | |||
r2 | emit variableAdded(var); | |||
r10 | if(!DateTimeRangeHelper::hasnan(range)) | |||
r9 | impl->changeRange(var,range); | |||
else | ||||
SCIQLOP_ERROR(VariableController2, "Creating a variable with default constructed DateTimeRange is an error"); | ||||
r2 | return var; | |||
} | ||||
r20 | std::shared_ptr<Variable> VariableController2::cloneVariable(const std::shared_ptr<Variable> &variable) | |||
{ | ||||
return impl->cloneVariable(variable); | ||||
} | ||||
r14 | void VariableController2::deleteVariable(const std::shared_ptr<Variable>& variable) | |||
r2 | { | |||
impl->deleteVariable(variable); | ||||
emit variableDeleted(variable); | ||||
} | ||||
r14 | void VariableController2::changeRange(const std::shared_ptr<Variable>& variable, const DateTimeRange& r) | |||
r2 | { | |||
impl->changeRange(variable, r); | ||||
} | ||||
r15 | void VariableController2::asyncChangeRange(const std::shared_ptr<Variable> &variable, const DateTimeRange &r) | |||
{ | ||||
impl->asyncChangeRange(variable, r); | ||||
} | ||||
r21 | const std::vector<std::shared_ptr<Variable> > VariableController2::variables() | |||
r2 | { | |||
return impl->variables(); | ||||
r0 | } | |||
r17 | bool VariableController2::isReady(const std::shared_ptr<Variable> &variable) | |||
{ | ||||
return impl->hasPendingTransactions(variable); | ||||
} | ||||
r14 | void VariableController2::synchronize(const std::shared_ptr<Variable> &var, const std::shared_ptr<Variable> &with) | |||
r8 | { | |||
impl->synchronize(var, with); | ||||
} | ||||
r21 | ||||
QByteArray VariableController2::mimeData(const std::vector<std::shared_ptr<Variable> > &variables) const | ||||
{ | ||||
auto encodedData = QByteArray{}; | ||||
QDataStream stream{&encodedData, QIODevice::WriteOnly}; | ||||
for (auto &var : variables) { | ||||
stream << var->ID().toByteArray(); | ||||
} | ||||
return encodedData; | ||||
} | ||||
r25 | ||||
const std::vector<std::shared_ptr<Variable>> VariableController2::variables(QByteArray mimeData) | ||||
{ | ||||
std::vector<std::shared_ptr<Variable>> variables; | ||||
QDataStream stream{mimeData}; | ||||
QVariantList ids; | ||||
stream >> ids; | ||||
for (const auto& id : ids) { | ||||
auto uuid = QUuid{id.toByteArray()}; | ||||
variables.push_back (impl->variable(uuid)); | ||||
} | ||||
return variables; | ||||
} | ||||
const std::shared_ptr<Variable> &VariableController2::operator[](int index) const | ||||
{ | ||||
return impl->variable (index); | ||||
} | ||||
std::shared_ptr<Variable> VariableController2::operator[](int index) | ||||
{ | ||||
return impl->variable (index); | ||||
} | ||||