diff --git a/CMakeLists.txt b/CMakeLists.txt index f8d099b..573f29d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -91,6 +91,7 @@ FILE (GLOB_RECURSE core_SRCS ./include/Variable/VariableCacheController.h ./include/Variable/VariableController.h ./include/Variable/VariableController2.h + ./include/Variable/private/VCTransaction.h ./include/Time/TimeController.h ./include/Settings/ISqpSettingsBindable.h ./include/Settings/SqpSettingsDefs.h diff --git a/include/Data/DateTimeRange.h b/include/Data/DateTimeRange.h index 23d8cd3..2a0a160 100644 --- a/include/Data/DateTimeRange.h +++ b/include/Data/DateTimeRange.h @@ -36,6 +36,10 @@ struct DateTimeRangeTransformation return SciQLop::numeric::almost_equal(zoom, other.zoom, 1) && SciQLop::numeric::almost_equal(shift, other.shift, 1); } + DateTimeRangeTransformation merge(const DateTimeRangeTransformation& other) const + { + return DateTimeRangeTransformation{zoom*other.zoom,shift+other.shift}; + } }; /** @@ -155,6 +159,7 @@ template DateTimeRange& operator-=(DateTimeRange&r, Seconds offset) { shift(r,-offset); + return r; } template diff --git a/include/Variable/Variable.h b/include/Variable/Variable.h index 228bef7..e5b65db 100644 --- a/include/Variable/Variable.h +++ b/include/Variable/Variable.h @@ -62,7 +62,13 @@ public: DataSeriesType type() const noexcept; QVariantHash metadata() const noexcept; + + void updateData(const std::vector& dataSeries, + const DateTimeRange& newRange, const DateTimeRange& newCacheRange, + bool notify=true); + DEPRECATE( + bool contains(const DateTimeRange &range) const noexcept; bool intersect(const DateTimeRange &range) const noexcept; bool isInside(const DateTimeRange &range) const noexcept; @@ -70,36 +76,26 @@ DEPRECATE( bool cacheContains(const DateTimeRange &range) const noexcept; bool cacheIntersect(const DateTimeRange &range) const noexcept; bool cacheIsInside(const DateTimeRange &range) const noexcept; -) -DEPRECATE( QVector provideNotInCacheRangeList(const DateTimeRange &range) const noexcept; QVector provideInCacheRangeList(const DateTimeRange &range) const noexcept; -) -DEPRECATE( void mergeDataSeries(std::shared_ptr dataSeries) noexcept; - ) - - void updateData(const std::vector& dataSeries, - const DateTimeRange& newRange, const DateTimeRange& newCacheRange, - bool notify=true); - -DEPRECATE( static QVector provideNotInCacheRangeList(const DateTimeRange &oldRange, - const DateTimeRange &nextRange); + const DateTimeRange &nextRange); static QVector provideInCacheRangeList(const DateTimeRange &oldRange, - const DateTimeRange &nextRange); -) + const DateTimeRange &nextRange); + ) + operator QUuid() {return _uuid;} QUuid ID(){return _uuid;} signals: void updated(); -DEPRECATE( - /// Signal emitted when when the data series of the variable is loaded for the first time - void dataInitialized(); + DEPRECATE( + /// Signal emitted when when the data series of the variable is loaded for the first time + void dataInitialized(); ) -private: - class VariablePrivate; + private: + class VariablePrivate; spimpl::unique_impl_ptr impl; QUuid _uuid; QReadWriteLock m_lock; diff --git a/include/Variable/VariableController2.h b/include/Variable/VariableController2.h index 4861c4e..dd48ed2 100644 --- a/include/Variable/VariableController2.h +++ b/include/Variable/VariableController2.h @@ -31,6 +31,8 @@ public: void asyncChangeRange(const std::shared_ptr& variable, const DateTimeRange& r); const std::set> variables(); + bool isReady(const std::shared_ptr& variable); + void synchronize(const std::shared_ptr& var, const std::shared_ptr& with); diff --git a/include/Variable/VariableSynchronizationGroup2.h b/include/Variable/VariableSynchronizationGroup2.h index d613f8d..85d2e50 100644 --- a/include/Variable/VariableSynchronizationGroup2.h +++ b/include/Variable/VariableSynchronizationGroup2.h @@ -64,8 +64,12 @@ public: return this->_variables; } + inline QUuid ID(){return _ID;} + + operator QUuid() {return _ID;} private: std::set _variables; + QUuid _ID = QUuid::createUuid(); }; #endif // SCIQLOP_VARIABLESYNCHRONIZATIONGROUP2_H diff --git a/include/Variable/private/VCTransaction.h b/include/Variable/private/VCTransaction.h new file mode 100644 index 0000000..a03c690 --- /dev/null +++ b/include/Variable/private/VCTransaction.h @@ -0,0 +1,53 @@ +#include +#include +#include +#include + +#include "Variable/VariableSynchronizationGroup2.h" +#include +#include +#include +#include +#include +#include +#include + +struct VCTransaction +{ + VCTransaction(QUuid refVar, DateTimeRange range, int ready) + :refVar{refVar},range{range},ready{ready} + {} + QUuid refVar; + DateTimeRange range; + int ready; + QReadWriteLock lock; +}; + +class TransactionExe:public QObject,public QRunnable +{ + Q_OBJECT + std::shared_ptr _variable; + std::shared_ptr _provider; + std::vector _ranges; + DateTimeRange _range; + DateTimeRange _cacheRange; +public: + TransactionExe(const std::shared_ptr& variable, const std::shared_ptr& provider, + const std::vector& ranges, DateTimeRange range, DateTimeRange cacheRange) + :_variable{variable}, _provider{provider},_ranges{ranges},_range{range},_cacheRange{cacheRange} + { + setAutoDelete(true); + } + void run()override + { + std::vector data; + for(auto range:_ranges) + { + data.push_back(_provider->getData(DataProviderParameters{{range}, _variable->metadata()})); + } + _variable->updateData(data, _range, _cacheRange, true); + emit transactionComplete(); + } +signals: + void transactionComplete(); +}; diff --git a/src/Variable/VariableController2.cpp b/src/Variable/VariableController2.cpp index 1e47a27..772925f 100644 --- a/src/Variable/VariableController2.cpp +++ b/src/Variable/VariableController2.cpp @@ -1,33 +1,181 @@ +#include +#include +#include +#include + #include "Variable/VariableController2.h" #include "Variable/VariableSynchronizationGroup2.h" #include #include #include #include +#include #include +#include + +class Transactions +{ + QReadWriteLock _mutex{QReadWriteLock::Recursive}; + std::map>> _nextTransactions; + std::map>> _pendingTransactions; +public: + void addEntry(QUuid id) + { + QWriteLocker lock{&_mutex}; + _nextTransactions[id] = std::nullopt; + _pendingTransactions[id] = std::nullopt; + } + + void removeEntry(QUuid id) + { + QWriteLocker lock{&_mutex}; + _nextTransactions.erase(id); + _pendingTransactions.erase(id); + } + + std::map>> pendingTransactions() + { + QReadLocker lock{&_mutex}; + return _pendingTransactions; + } + + std::map>> nextTransactions() + { + QReadLocker lock{&_mutex}; + return _nextTransactions; + } + + std::optional> start(QUuid id) + { + QWriteLocker lock{&_mutex}; + _pendingTransactions[id] = _nextTransactions[id]; + _nextTransactions[id] = std::nullopt; + return _pendingTransactions[id]; + } + + void enqueue(QUuid id, std::shared_ptr transaction) + { + QWriteLocker lock{&_mutex}; + _nextTransactions[id] = transaction; + } + + void complete(QUuid id) + { + QWriteLocker lock{&_mutex}; + _pendingTransactions[id] = std::nullopt; + } + + bool active(QUuid id) + { + QReadLocker lock{&_mutex}; + return _nextTransactions[id].has_value() && _pendingTransactions[id].has_value(); + } +}; class VariableController2::VariableController2Private { + QThreadPool _ThreadPool; QMap> _variables; QMap> _providers; QMap> _synchronizationGroups; + Transactions _transactions; + QReadWriteLock _lock{QReadWriteLock::Recursive}; std::unique_ptr _cacheStrategy; + bool p_contains(const std::shared_ptr& variable) { - return _providers.contains(variable->ID()); + QReadLocker lock{&_lock}; + return _providers.contains(*variable); } bool v_contains(const std::shared_ptr& variable) { + QReadLocker lock{&_lock}; return SciQLop::containers::contains(this->_variables, variable); } bool sg_contains(const std::shared_ptr& variable) { - return _synchronizationGroups.contains(variable->ID()); + QReadLocker lock{&_lock}; + return _synchronizationGroups.contains(*variable); } - void _changeRange(const std::shared_ptr& var, DateTimeRange r) + void _transactionComplete(QUuid group, std::shared_ptr transaction) + { + { + QWriteLocker lock{&transaction->lock}; + transaction->ready -=1; + } + if(transaction->ready==0) + { + _transactions.complete(group); + } + this->_processTransactions(); + } + void _processTransactions() + { + QWriteLocker lock{&_lock}; + auto nextTransactions = _transactions.nextTransactions(); + auto pendingTransactions = _transactions.pendingTransactions(); + for( auto [groupID, newTransaction] : nextTransactions) + { + if(newTransaction.has_value() && !pendingTransactions[groupID].has_value()) + { + _transactions.start(groupID); + auto refVar = _variables[newTransaction.value()->refVar]; + auto ranges = _computeAllRangesInGroup(refVar,newTransaction.value()->range); + for( auto const& [ID, range] : ranges) + { + auto provider = _providers[ID]; + auto variable = _variables[ID]; + auto [missingRanges, newCacheRange] = _computeMissingRanges(variable,range); + auto exe = new TransactionExe(_variables[ID], provider, missingRanges, range, newCacheRange); + QObject::connect(exe, + &TransactionExe::transactionComplete, + [groupID=groupID,transaction=newTransaction.value(),this]() + { + this->_transactionComplete(groupID, transaction); + } + ); + _ThreadPool.start(exe); + } + } + } + } + + std::map _computeAllRangesInGroup(const std::shared_ptr& refVar, DateTimeRange r) + { + std::map ranges; + if(!DateTimeRangeHelper::hasnan(r)) + { + auto group = _synchronizationGroups[*refVar]; + if(auto transformation = DateTimeRangeHelper::computeTransformation(refVar->range(),r); + transformation.has_value()) + { + for(auto varId:group->variables()) + { + auto var = _variables[varId]; + auto newRange = var->range().transform(transformation.value()); + ranges[varId] = newRange; + } + } + else // force new range to all variables -> may be weird if more than one var in the group + // @TODO ensure that there is no side effects + { + for(auto varId:group->variables()) + { + auto var = _variables[varId]; + ranges[varId] = r; + } + } + } + else + { + SCIQLOP_ERROR(VariableController2Private, "Invalid range containing NaN"); + } + return ranges; + } + + std::pair,DateTimeRange> _computeMissingRanges(const std::shared_ptr& var, DateTimeRange r) { - auto provider = _providers[var->ID()]; DateTimeRange newCacheRange; std::vector missingRanges; if(DateTimeRangeHelper::hasnan(var->cacheRange())) @@ -40,31 +188,62 @@ class VariableController2::VariableController2Private newCacheRange = _cacheStrategy->computeRange(var->cacheRange(),r); missingRanges = newCacheRange - var->cacheRange(); } + return {missingRanges,newCacheRange}; + } + + void _changeRange(QUuid id, DateTimeRange r) + { + _lock.lockForRead(); + auto var = _variables[id]; + _lock.unlock(); + _changeRange(var,r); + } + void _changeRange(const std::shared_ptr& var, DateTimeRange r) + { + auto provider = _providers[*var]; + auto [missingRanges, newCacheRange] = _computeMissingRanges(var,r); std::vector data; for(auto range:missingRanges) { - data.push_back(provider->getData(DataProviderParameters{{range},var->metadata()})); + data.push_back(provider->getData(DataProviderParameters{{range}, var->metadata()})); } - var->updateData(data,r,newCacheRange,true); + var->updateData(data, r, newCacheRange, true); } public: VariableController2Private(QObject* parent=Q_NULLPTR) :_cacheStrategy(VariableCacheStrategyFactory::createCacheStrategy(CacheStrategy::SingleThreshold)) { Q_UNUSED(parent); + this->_ThreadPool.setMaxThreadCount(32); } - ~VariableController2Private() = default; + ~VariableController2Private() + { + while (this->_ThreadPool.activeThreadCount()) + { + this->_ThreadPool.waitForDone(100); + } + } std::shared_ptr createVariable(const QString &name, const QVariantHash &metadata, std::shared_ptr provider) { + QWriteLocker lock{&_lock}; auto newVar = std::make_shared(name,metadata); - this->_variables[newVar->ID()] = newVar; - this->_providers[newVar->ID()] = std::move(provider); - this->_synchronizationGroups[newVar->ID()] = std::make_shared(newVar->ID()); + this->_variables[*newVar] = newVar; + this->_providers[*newVar] = std::move(provider); + auto group = std::make_shared(newVar->ID()); + this->_synchronizationGroups[*newVar] = group; + this->_transactions.addEntry(*group); return newVar; } + bool hasPendingTransactions(const std::shared_ptr& variable) + { + QReadLocker lock{&_lock}; + auto group = _synchronizationGroups[*variable]; + return _transactions.active(*group); + } + void deleteVariable(const std::shared_ptr& variable) { /* @@ -72,44 +251,32 @@ public: * this means we got the var controller in an inconsistent state */ if(v_contains(variable)) - this->_variables.remove(variable->ID()); + { + QWriteLocker lock{&_lock}; + this->_variables.remove(*variable); + } if(p_contains(variable)) - this->_providers.remove(variable->ID()); + { + QWriteLocker lock{&_lock}; + this->_providers.remove(*variable); + } else SCIQLOP_ERROR(VariableController2Private, "No provider found for given variable"); } void asyncChangeRange(const std::shared_ptr& variable, const DateTimeRange& r) { - - } - - void changeRange(const std::shared_ptr& variable, DateTimeRange r) - { if(p_contains(variable)) { if(!DateTimeRangeHelper::hasnan(r)) { - auto group = _synchronizationGroups[variable->ID()]; - if(auto transformation = DateTimeRangeHelper::computeTransformation(variable->range(),r); - transformation.has_value()) - { - for(auto varId:group->variables()) - { - auto var = _variables[varId]; - auto newRange = var->range().transform(transformation.value()); - _changeRange(var,newRange); - } - } - else // force new range to all variables -> may be weird if more than one var in the group - // @TODO ensure that there is no side effects + auto group = _synchronizationGroups[*variable]; + // Just overwrite next transaction { - for(auto varId:group->variables()) - { - auto var = _variables[varId]; - _changeRange(var,r); - } + QWriteLocker lock{&_lock}; + _transactions.enqueue(*group,std::make_shared(variable->ID(), r, static_cast(group->variables().size()))); } + _processTransactions(); } else { @@ -122,16 +289,25 @@ public: } } + void changeRange(const std::shared_ptr& variable, DateTimeRange r) + { + auto ranges = _computeAllRangesInGroup(variable,r); + for( auto const& [ID, range] : ranges) + { + _changeRange(ID,range); + } + } + void synchronize(const std::shared_ptr& var, const std::shared_ptr& with) { if(v_contains(var) && v_contains(with)) { if(sg_contains(var) && sg_contains(with)) { - + QWriteLocker lock{&_lock}; auto dest_group = this->_synchronizationGroups[with->ID()]; - this->_synchronizationGroups[var->ID()] = dest_group; - dest_group->addVariable(var->ID()); + this->_synchronizationGroups[*var] = dest_group; + dest_group->addVariable(*var); } else { @@ -147,6 +323,7 @@ public: const std::set> variables() { std::set> vars; + QReadLocker lock{&_lock}; for(const auto &var:_variables) { vars.insert(var); @@ -192,8 +369,12 @@ const std::set > VariableController2::variables() return impl->variables(); } +bool VariableController2::isReady(const std::shared_ptr &variable) +{ + return impl->hasPendingTransactions(variable); +} + void VariableController2::synchronize(const std::shared_ptr &var, const std::shared_ptr &with) { impl->synchronize(var, with); } - diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index d1e18c2..cdc3c6f 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -46,4 +46,5 @@ declare_test(TestDownloader TestDownloader Network/TestDownloader.cpp "sciqlopco declare_test(TestVariableController2 TestVariableController2 Variable/TestVariableController2.cpp "sciqlopcore;TestUtils;Qt5::Test") +declare_test(TestVariableController2Async TestVariableController2Async Variable/TestVariableController2Async.cpp "sciqlopcore;TestUtils;Qt5::Test") declare_test(TestVariableController2WithSync TestVariableController2WithSync Variable/TestVariableController2WithSync.cpp "sciqlopcore;TestUtils;Qt5::Test") diff --git a/tests/Variable/TestVariableController2Async.cpp b/tests/Variable/TestVariableController2Async.cpp new file mode 100644 index 0000000..c3f686a --- /dev/null +++ b/tests/Variable/TestVariableController2Async.cpp @@ -0,0 +1,67 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#define TEST_VC2_FIXTURE(slope) \ + VariableController2 vc; \ + auto provider = std::make_shared>();\ + +#define TEST_VC2_CREATE_DEFAULT_VARS(name1, name2, name3)\ + auto range = DateTimeRange::fromDateTime(QDate(2018,8,7),QTime(14,00),\ + QDate(2018,8,7),QTime(16,00));\ + auto name1 = vc.createVariable("name1", {}, provider, range);\ + auto name2 = vc.createVariable("name1", {}, provider, range);\ + auto name3 = vc.createVariable("name1", {}, provider, range);\ + vc.synchronize(name1,name2);\ + + +class TestVariableController2Async : public QObject +{ + Q_OBJECT +public: + explicit TestVariableController2Async(QObject *parent = nullptr) : QObject(parent){} +signals: + +private slots: + void initTestCase(){} + void cleanupTestCase(){} + + void testSimplePan() + { + TEST_VC2_FIXTURE(2); + auto range = DateTimeRange::fromDateTime(QDate(2018,8,7),QTime(14,00), + QDate(2018,8,7),QTime(16,00)); + int variableUpdated=0; + auto var1 = vc.createVariable("var1", {}, provider, range); + auto var2 = vc.createVariable("var2", {}, provider, range); + auto var3 = vc.createVariable("var3", {}, provider, range); + connect(&(*var2),&Variable::updated, [&variableUpdated](){variableUpdated+=1;}); + vc.synchronize(var1,var2); + vc.asyncChangeRange(var1,range+Seconds{10000.}); + vc.asyncChangeRange(var1,range+Seconds{50000.}); + vc.asyncChangeRange(var1,range+Seconds{100000.}); + vc.asyncChangeRange(var1,range+Seconds{150000.}); + while(!vc.isReady(var1) || !vc.isReady(var2)) + { + QCoreApplication::processEvents(); + } + } + + +}; + + +QTEST_MAIN(TestVariableController2Async) + +#include "TestVariableController2Async.moc" +