VariableController2.cpp
447 lines
| 13.4 KiB
| text/x-c
|
CppLexer
r0 | #include "Variable/VariableController2.h" | |||
r60 | ||||
r6 | #include "Variable/VariableSynchronizationGroup2.h" | |||
r60 | ||||
r2 | #include <Common/containers.h> | |||
#include <Common/debug.h> | ||||
#include <Data/DataProviderParameters.h> | ||||
r17 | #include <Data/DateTimeRange.h> | |||
r60 | #include <Data/DateTimeRangeHelper.h> | |||
#include <QCoreApplication> | ||||
#include <QDataStream> | ||||
#include <QObject> | ||||
#include <QQueue> | ||||
#include <QRunnable> | ||||
#include <QThreadPool> | ||||
r17 | #include <Variable/private/VCTransaction.h> | |||
r0 | ||||
r2 | class VariableController2::VariableController2Private | |||
r0 | { | |||
r60 | struct threadSafeVaraiblesMaps | |||
{ | ||||
inline void | ||||
r67 | addVariable(const std::shared_ptr<Variable2>& variable, | |||
r60 | const std::shared_ptr<IDataProvider>& provider, | |||
const std::shared_ptr<VariableSynchronizationGroup2>& | ||||
synchronizationGroup) | ||||
r19 | { | |||
r60 | QWriteLocker lock{&_lock}; | |||
_variables[*variable] = variable; | ||||
_providers[*variable] = provider; | ||||
_synchronizationGroups[*variable] = synchronizationGroup; | ||||
r17 | } | |||
r34 | ||||
r67 | inline void removeVariable(const std::shared_ptr<Variable2>& variable) | |||
r47 | { | |||
r60 | QWriteLocker lock{&_lock}; | |||
_variables.erase(*variable); | ||||
_providers.remove(*variable); | ||||
_synchronizationGroups.remove(*variable); | ||||
r47 | } | |||
r60 | inline void | |||
r67 | synchronize(const std::shared_ptr<Variable2>& variable, | |||
const std::optional<std::shared_ptr<Variable2>>& with) | ||||
r17 | { | |||
r60 | QWriteLocker lock{&_lock}; | |||
if(with.has_value()) | ||||
{ | ||||
auto newGroup = _synchronizationGroups[*with.value()]; | ||||
newGroup->addVariable(*variable); | ||||
_synchronizationGroups[*variable] = newGroup; | ||||
} | ||||
else | ||||
{ | ||||
_synchronizationGroups[*variable] = | ||||
std::make_shared<VariableSynchronizationGroup2>(*variable); | ||||
} | ||||
r17 | } | |||
r67 | inline std::shared_ptr<Variable2> variable(QUuid variable) | |||
r17 | { | |||
r60 | QReadLocker lock{&_lock}; | |||
auto it = _variables.find(variable); | ||||
r62 | #if __cplusplus > 201703L | |||
[[unlikely]] | ||||
#endif | ||||
if(it == _variables.end()) | ||||
r60 | SCIQLOP_ERROR(threadSafeVaraiblesMaps, "Unknown Variable"); | |||
return (*it).second; | ||||
r17 | } | |||
r67 | inline std::shared_ptr<Variable2> variable(int index) | |||
r9 | { | |||
r60 | QReadLocker lock{&_lock}; | |||
r62 | #if __cplusplus > 201703L | |||
[[unlikely]] | ||||
#endif | ||||
if(!_variables.size() > index) | ||||
r60 | SCIQLOP_ERROR(threadSafeVaraiblesMaps, "Index is out of bounds"); | |||
auto it = _variables.cbegin(); | ||||
while(index != 0) | ||||
{ | ||||
index -= 1; | ||||
it++; | ||||
} | ||||
return (*(it)).second; | ||||
r17 | } | |||
r67 | inline const std::vector<std::shared_ptr<Variable2>> variables() | |||
r17 | { | |||
r67 | std::vector<std::shared_ptr<Variable2>> vars; | |||
r60 | QReadLocker lock{&_lock}; | |||
for(const auto& [id, var] : _variables) | ||||
{ | ||||
vars.push_back(var); | ||||
} | ||||
return vars; | ||||
r17 | } | |||
r60 | ||||
inline std::shared_ptr<IDataProvider> provider(QUuid variable) | ||||
r17 | { | |||
r60 | QReadLocker lock{&_lock}; | |||
r62 | #if __cplusplus > 201703L | |||
[[unlikely]] | ||||
#endif | ||||
if(!_providers.contains(variable)) | ||||
r60 | SCIQLOP_ERROR(threadSafeVaraiblesMaps, "Unknown Variable"); | |||
return _providers[variable]; | ||||
r9 | } | |||
r60 | ||||
inline std::shared_ptr<VariableSynchronizationGroup2> group(QUuid variable) | ||||
r2 | { | |||
r60 | QReadLocker lock{&_lock}; | |||
r62 | #if __cplusplus > 201703L | |||
[[unlikely]] | ||||
#endif | ||||
if(!_synchronizationGroups.contains(variable)) | ||||
r60 | SCIQLOP_ERROR(threadSafeVaraiblesMaps, "Unknown Variable"); | |||
return _synchronizationGroups[variable]; | ||||
r2 | } | |||
r67 | inline bool has(const std::shared_ptr<Variable2>& variable) | |||
r17 | { | |||
r60 | QReadLocker lock{&_lock}; | |||
return _variables.find(*variable) == _variables.end(); | ||||
r17 | } | |||
r2 | ||||
r60 | private: | |||
r67 | std::map<QUuid, std::shared_ptr<Variable2>> _variables; | |||
r60 | QMap<QUuid, std::shared_ptr<IDataProvider>> _providers; | |||
QMap<QUuid, std::shared_ptr<VariableSynchronizationGroup2>> | ||||
_synchronizationGroups; | ||||
QReadWriteLock _lock{QReadWriteLock::Recursive}; | ||||
} _maps; | ||||
std::vector<QUuid> _variablesToRemove; | ||||
QThreadPool* _ThreadPool; | ||||
VCTransactionsQueues _transactions; | ||||
void _transactionComplete(QUuid group, | ||||
std::shared_ptr<VCTransaction> transaction) | ||||
{ | ||||
if(transaction->done()) { _transactions.complete(group); } | ||||
this->_processTransactions(); | ||||
} | ||||
void _cleanupVariables() | ||||
{ | ||||
for(auto id : _variablesToRemove) | ||||
r0 | { | |||
r60 | auto v = this->variable(id); | |||
if(!hasPendingTransactions(v)) | ||||
{ | ||||
_variablesToRemove.erase(std::remove(_variablesToRemove.begin(), | ||||
_variablesToRemove.end(), id), | ||||
_variablesToRemove.end()); | ||||
this->deleteVariable(v); | ||||
} | ||||
r2 | } | |||
r60 | } | |||
r2 | ||||
r60 | void _processTransactions(bool fragmented = false) | |||
{ | ||||
auto nextTransactions = _transactions.nextTransactions(); | ||||
auto pendingTransactions = _transactions.pendingTransactions(); | ||||
for(auto [groupID, newTransaction] : nextTransactions) | ||||
r25 | { | |||
r60 | if(newTransaction.has_value() && | |||
!pendingTransactions[groupID].has_value()) | ||||
{ | ||||
_transactions.start(groupID); | ||||
auto refVar = _maps.variable(newTransaction.value()->refVar); | ||||
auto ranges = | ||||
_computeAllRangesInGroup(refVar, newTransaction.value()->range); | ||||
for(auto const& [ID, range] : ranges) | ||||
{ | ||||
auto provider = _maps.provider(ID); | ||||
auto variable = _maps.variable(ID); | ||||
if(fragmented) | ||||
{ | ||||
r67 | auto missingRanges = _computeMissingRanges(variable, range); | |||
r60 | ||||
r67 | auto exe = | |||
new TransactionExe(variable, provider, missingRanges, range); | ||||
r60 | QObject::connect( | |||
exe, &TransactionExe::transactionComplete, | ||||
[groupID = groupID, transaction = newTransaction.value(), | ||||
this]() { this->_transactionComplete(groupID, transaction); }); | ||||
_ThreadPool->start(exe); | ||||
} | ||||
else | ||||
{ | ||||
r67 | auto exe = new TransactionExe(variable, provider, {range}, range); | |||
r60 | QObject::connect( | |||
exe, &TransactionExe::transactionComplete, | ||||
[groupID = groupID, transaction = newTransaction.value(), | ||||
this]() { this->_transactionComplete(groupID, transaction); }); | ||||
_ThreadPool->start(exe); | ||||
} | ||||
} | ||||
} | ||||
r25 | } | |||
r60 | // after each transaction update we get a new distribution of idle and | |||
// working variables so we can delete variables which are waiting to be | ||||
// deleted if they are now idle | ||||
_cleanupVariables(); | ||||
} | ||||
std::map<QUuid, DateTimeRange> | ||||
r67 | _computeAllRangesInGroup(const std::shared_ptr<Variable2>& refVar, | |||
r60 | DateTimeRange r) | |||
{ | ||||
std::map<QUuid, DateTimeRange> ranges; | ||||
if(!DateTimeRangeHelper::hasnan(r)) | ||||
r25 | { | |||
r60 | auto group = _maps.group(*refVar); | |||
if(auto transformation = | ||||
DateTimeRangeHelper::computeTransformation(refVar->range(), r); | ||||
transformation.has_value()) | ||||
{ | ||||
for(auto varId : group->variables()) | ||||
{ | ||||
auto var = _maps.variable(varId); | ||||
auto newRange = var->range().transform(transformation.value()); | ||||
ranges[varId] = newRange; | ||||
} | ||||
} | ||||
else // force new range to all variables -> may be weird if more than one | ||||
// var in the group | ||||
// @TODO ensure that there is no side effects | ||||
{ | ||||
for(auto varId : group->variables()) | ||||
{ | ||||
auto var = _maps.variable(varId); | ||||
ranges[varId] = r; | ||||
} | ||||
} | ||||
r25 | } | |||
r60 | else | |||
r20 | { | |||
r60 | SCIQLOP_ERROR(VariableController2Private, "Invalid range containing NaN"); | |||
r20 | } | |||
r60 | return ranges; | |||
} | ||||
r67 | std::vector<DateTimeRange> | |||
_computeMissingRanges(const std::shared_ptr<Variable2>& var, DateTimeRange r) | ||||
r60 | { | |||
r67 | return r - var->range(); | |||
r60 | } | |||
void _changeRange(QUuid id, DateTimeRange r) | ||||
{ | ||||
_changeRange(_maps.variable(id), r); | ||||
} | ||||
r67 | void _changeRange(const std::shared_ptr<Variable2>& var, DateTimeRange r) | |||
r60 | { | |||
r67 | auto provider = _maps.provider(*var); | |||
auto missingRanges = _computeMissingRanges(var, r); | ||||
std::vector<TimeSeries::ITimeSerie*> data; | ||||
r60 | for(auto range : missingRanges) | |||
r2 | { | |||
r60 | data.push_back( | |||
provider->getData(DataProviderParameters{{range}, var->metadata()})); | ||||
r2 | } | |||
r68 | data.push_back(var->data().get()); // might be smarter | |||
r67 | var->setData(data, r, true); | |||
r60 | } | |||
r2 | ||||
r60 | public: | |||
VariableController2Private(QObject* parent = Q_NULLPTR) | ||||
{ | ||||
Q_UNUSED(parent); | ||||
this->_ThreadPool = new QThreadPool(); | ||||
this->_ThreadPool->setMaxThreadCount(32); | ||||
} | ||||
/* | ||||
* This dtor has to like this even if this is ugly, because default dtor would | ||||
* rely on declaration order to destruct members and that would always lead to | ||||
* regressions when modifying class members | ||||
*/ | ||||
~VariableController2Private() { delete this->_ThreadPool; } | ||||
r67 | std::shared_ptr<Variable2> | |||
r60 | createVariable(const QString& name, const QVariantHash& metadata, | |||
std::shared_ptr<IDataProvider> provider) | ||||
{ | ||||
r67 | auto newVar = std::make_shared<Variable2>(name, metadata); | |||
r60 | auto group = std::make_shared<VariableSynchronizationGroup2>(newVar->ID()); | |||
_maps.addVariable(newVar, std::move(provider), group); | ||||
this->_transactions.addEntry(*group); | ||||
return newVar; | ||||
} | ||||
r67 | std::shared_ptr<Variable2> variable(QUuid ID) { return _maps.variable(ID); } | |||
r60 | ||||
r67 | std::shared_ptr<Variable2> variable(int index) | |||
r60 | { | |||
return _maps.variable(index); | ||||
} | ||||
r67 | std::shared_ptr<Variable2> | |||
cloneVariable(const std::shared_ptr<Variable2>& variable) | ||||
r60 | { | |||
auto newVar = variable->clone(); | ||||
_maps.synchronize(newVar, std::nullopt); | ||||
_maps.addVariable(newVar, _maps.provider(*variable), _maps.group(*newVar)); | ||||
this->_transactions.addEntry(*_maps.group(*newVar)); | ||||
return newVar; | ||||
} | ||||
r67 | bool hasPendingTransactions(const std::shared_ptr<Variable2>& variable) | |||
r60 | { | |||
return _transactions.active(*_maps.group(*variable)); | ||||
} | ||||
bool hasPendingTransactions() | ||||
{ | ||||
bool has = false; | ||||
for(const auto& var : _maps.variables()) | ||||
r2 | { | |||
r60 | has |= _transactions.active(*_maps.group(*var)); | |||
r2 | } | |||
r60 | return has; | |||
} | ||||
r2 | ||||
r67 | void deleteVariable(const std::shared_ptr<Variable2>& variable) | |||
r60 | { | |||
if(!hasPendingTransactions(variable)) | ||||
_maps.removeVariable(variable); | ||||
else | ||||
_variablesToRemove.push_back(variable->ID()); | ||||
} | ||||
r67 | void asyncChangeRange(const std::shared_ptr<Variable2>& variable, | |||
r60 | const DateTimeRange& r) | |||
{ | ||||
if(!DateTimeRangeHelper::hasnan(r)) | ||||
r17 | { | |||
r60 | auto group = _maps.group(*variable); | |||
// Just overwrite next transaction | ||||
{ | ||||
_transactions.enqueue(*group, | ||||
std::make_shared<VCTransaction>( | ||||
variable->ID(), r, | ||||
static_cast<int>(group->variables().size()))); | ||||
} | ||||
_processTransactions(); | ||||
r17 | } | |||
r60 | else | |||
r8 | { | |||
r60 | SCIQLOP_ERROR(VariableController2Private, "Invalid range containing NaN"); | |||
r8 | } | |||
r60 | } | |||
r2 | ||||
r67 | void changeRange(const std::shared_ptr<Variable2>& variable, DateTimeRange r) | |||
r60 | { | |||
asyncChangeRange(variable, r); | ||||
while(hasPendingTransactions(variable)) | ||||
r2 | { | |||
r60 | QCoreApplication::processEvents(); | |||
r0 | } | |||
r60 | } | |||
r67 | inline void synchronize(const std::shared_ptr<Variable2>& var, | |||
const std::shared_ptr<Variable2>& with) | ||||
r60 | { | |||
_maps.synchronize(var, with); | ||||
} | ||||
r67 | inline const std::vector<std::shared_ptr<Variable2>> variables() | |||
r60 | { | |||
return _maps.variables(); | ||||
} | ||||
r0 | }; | |||
VariableController2::VariableController2() | ||||
r60 | : impl{spimpl::make_unique_impl<VariableController2Private>()} | |||
r0 | {} | |||
r67 | std::shared_ptr<Variable2> VariableController2::createVariable( | |||
r60 | const QString& name, const QVariantHash& metadata, | |||
const std::shared_ptr<IDataProvider>& provider, const DateTimeRange& range) | ||||
r0 | { | |||
r60 | auto var = impl->createVariable(name, metadata, provider); | |||
var->setRange(range); // even with no data this is it's range | ||||
if(!DateTimeRangeHelper::hasnan(range)) | ||||
impl->asyncChangeRange(var, range); | ||||
else | ||||
SCIQLOP_ERROR(VariableController2, "Creating a variable with default " | ||||
"constructed DateTimeRange is an error"); | ||||
emit variableAdded(var); | ||||
return var; | ||||
r2 | } | |||
r67 | std::shared_ptr<Variable2> | |||
VariableController2::cloneVariable(const std::shared_ptr<Variable2>& variable) | ||||
r20 | { | |||
r60 | return impl->cloneVariable(variable); | |||
r20 | } | |||
r60 | void VariableController2::deleteVariable( | |||
r67 | const std::shared_ptr<Variable2>& variable) | |||
r2 | { | |||
r60 | impl->deleteVariable(variable); | |||
emit variableDeleted(variable); | ||||
r2 | } | |||
r67 | void VariableController2::changeRange( | |||
const std::shared_ptr<Variable2>& variable, const DateTimeRange& r) | ||||
r2 | { | |||
r60 | impl->changeRange(variable, r); | |||
r2 | } | |||
r60 | void VariableController2::asyncChangeRange( | |||
r67 | const std::shared_ptr<Variable2>& variable, const DateTimeRange& r) | |||
r15 | { | |||
r60 | impl->asyncChangeRange(variable, r); | |||
r15 | } | |||
r67 | const std::vector<std::shared_ptr<Variable2>> VariableController2::variables() | |||
r2 | { | |||
r60 | return impl->variables(); | |||
r0 | } | |||
r67 | bool VariableController2::isReady(const std::shared_ptr<Variable2>& variable) | |||
r17 | { | |||
r60 | return !impl->hasPendingTransactions(variable); | |||
r30 | } | |||
r60 | bool VariableController2::isReady() { return !impl->hasPendingTransactions(); } | |||
r17 | ||||
r67 | void VariableController2::synchronize(const std::shared_ptr<Variable2>& var, | |||
const std::shared_ptr<Variable2>& with) | ||||
r8 | { | |||
r60 | impl->synchronize(var, with); | |||
r8 | } | |||
r21 | ||||
r67 | const std::vector<std::shared_ptr<Variable2>> | |||
r60 | VariableController2::variables(const std::vector<QUuid>& ids) | |||
r25 | { | |||
r67 | std::vector<std::shared_ptr<Variable2>> variables; | |||
r60 | std::transform(std::cbegin(ids), std::cend(ids), | |||
std::back_inserter(variables), | ||||
[this](const auto& id) { return impl->variable(id); }); | ||||
return variables; | ||||
r25 | } | |||