##// END OF EJS Templates
Basic asynchronous variable update, still a lot to do...
jeandet -
r17:895ab1d87afd
parent child
Show More
@@ -0,0 +1,53
1 #include <QThreadPool>
2 #include <QRunnable>
3 #include <QObject>
4 #include <QReadWriteLock>
5
6 #include "Variable/VariableSynchronizationGroup2.h"
7 #include <Variable/Variable.h>
8 #include <Common/containers.h>
9 #include <Common/debug.h>
10 #include <Data/DataProviderParameters.h>
11 #include <Data/DateTimeRangeHelper.h>
12 #include <Data/DateTimeRange.h>
13 #include <Data/IDataProvider.h>
14
15 struct VCTransaction
16 {
17 VCTransaction(QUuid refVar, DateTimeRange range, int ready)
18 :refVar{refVar},range{range},ready{ready}
19 {}
20 QUuid refVar;
21 DateTimeRange range;
22 int ready;
23 QReadWriteLock lock;
24 };
25
26 class TransactionExe:public QObject,public QRunnable
27 {
28 Q_OBJECT
29 std::shared_ptr<Variable> _variable;
30 std::shared_ptr<IDataProvider> _provider;
31 std::vector<DateTimeRange> _ranges;
32 DateTimeRange _range;
33 DateTimeRange _cacheRange;
34 public:
35 TransactionExe(const std::shared_ptr<Variable>& variable, const std::shared_ptr<IDataProvider>& provider,
36 const std::vector<DateTimeRange>& ranges, DateTimeRange range, DateTimeRange cacheRange)
37 :_variable{variable}, _provider{provider},_ranges{ranges},_range{range},_cacheRange{cacheRange}
38 {
39 setAutoDelete(true);
40 }
41 void run()override
42 {
43 std::vector<IDataSeries*> data;
44 for(auto range:_ranges)
45 {
46 data.push_back(_provider->getData(DataProviderParameters{{range}, _variable->metadata()}));
47 }
48 _variable->updateData(data, _range, _cacheRange, true);
49 emit transactionComplete();
50 }
51 signals:
52 void transactionComplete();
53 };
@@ -0,0 +1,67
1 #include <cmath>
2 #include <algorithm>
3 #include <numeric>
4 #include <QtTest>
5 #include <QObject>
6 #include <Variable/VariableController2.h>
7 #include <Data/DateTimeRange.h>
8 #include <Data/IDataProvider.h>
9 #include <Data/ScalarSeries.h>
10 #include <Data/DataProviderParameters.h>
11 #include <Common/containers.h>
12
13 #include <TestUtils/TestProviders.h>
14
15 #define TEST_VC2_FIXTURE(slope) \
16 VariableController2 vc; \
17 auto provider = std::make_shared<SimpleRange<slope>>();\
18
19 #define TEST_VC2_CREATE_DEFAULT_VARS(name1, name2, name3)\
20 auto range = DateTimeRange::fromDateTime(QDate(2018,8,7),QTime(14,00),\
21 QDate(2018,8,7),QTime(16,00));\
22 auto name1 = vc.createVariable("name1", {}, provider, range);\
23 auto name2 = vc.createVariable("name1", {}, provider, range);\
24 auto name3 = vc.createVariable("name1", {}, provider, range);\
25 vc.synchronize(name1,name2);\
26
27
28 class TestVariableController2Async : public QObject
29 {
30 Q_OBJECT
31 public:
32 explicit TestVariableController2Async(QObject *parent = nullptr) : QObject(parent){}
33 signals:
34
35 private slots:
36 void initTestCase(){}
37 void cleanupTestCase(){}
38
39 void testSimplePan()
40 {
41 TEST_VC2_FIXTURE(2);
42 auto range = DateTimeRange::fromDateTime(QDate(2018,8,7),QTime(14,00),
43 QDate(2018,8,7),QTime(16,00));
44 int variableUpdated=0;
45 auto var1 = vc.createVariable("var1", {}, provider, range);
46 auto var2 = vc.createVariable("var2", {}, provider, range);
47 auto var3 = vc.createVariable("var3", {}, provider, range);
48 connect(&(*var2),&Variable::updated, [&variableUpdated](){variableUpdated+=1;});
49 vc.synchronize(var1,var2);
50 vc.asyncChangeRange(var1,range+Seconds<double>{10000.});
51 vc.asyncChangeRange(var1,range+Seconds<double>{50000.});
52 vc.asyncChangeRange(var1,range+Seconds<double>{100000.});
53 vc.asyncChangeRange(var1,range+Seconds<double>{150000.});
54 while(!vc.isReady(var1) || !vc.isReady(var2))
55 {
56 QCoreApplication::processEvents();
57 }
58 }
59
60
61 };
62
63
64 QTEST_MAIN(TestVariableController2Async)
65
66 #include "TestVariableController2Async.moc"
67
@@ -91,6 +91,7 FILE (GLOB_RECURSE core_SRCS
91 91 ./include/Variable/VariableCacheController.h
92 92 ./include/Variable/VariableController.h
93 93 ./include/Variable/VariableController2.h
94 ./include/Variable/private/VCTransaction.h
94 95 ./include/Time/TimeController.h
95 96 ./include/Settings/ISqpSettingsBindable.h
96 97 ./include/Settings/SqpSettingsDefs.h
@@ -36,6 +36,10 struct DateTimeRangeTransformation
36 36 return SciQLop::numeric::almost_equal(zoom, other.zoom, 1) &&
37 37 SciQLop::numeric::almost_equal<double>(shift, other.shift, 1);
38 38 }
39 DateTimeRangeTransformation merge(const DateTimeRangeTransformation& other) const
40 {
41 return DateTimeRangeTransformation{zoom*other.zoom,shift+other.shift};
42 }
39 43 };
40 44
41 45 /**
@@ -155,6 +159,7 template <class T>
155 159 DateTimeRange& operator-=(DateTimeRange&r, Seconds<T> offset)
156 160 {
157 161 shift(r,-offset);
162 return r;
158 163 }
159 164
160 165 template <class T>
@@ -62,7 +62,13 public:
62 62 DataSeriesType type() const noexcept;
63 63
64 64 QVariantHash metadata() const noexcept;
65
66 void updateData(const std::vector<IDataSeries*>& dataSeries,
67 const DateTimeRange& newRange, const DateTimeRange& newCacheRange,
68 bool notify=true);
69
65 70 DEPRECATE(
71
66 72 bool contains(const DateTimeRange &range) const noexcept;
67 73 bool intersect(const DateTimeRange &range) const noexcept;
68 74 bool isInside(const DateTimeRange &range) const noexcept;
@@ -70,20 +76,9 DEPRECATE(
70 76 bool cacheContains(const DateTimeRange &range) const noexcept;
71 77 bool cacheIntersect(const DateTimeRange &range) const noexcept;
72 78 bool cacheIsInside(const DateTimeRange &range) const noexcept;
73 )
74 DEPRECATE(
75 79 QVector<DateTimeRange> provideNotInCacheRangeList(const DateTimeRange &range) const noexcept;
76 80 QVector<DateTimeRange> provideInCacheRangeList(const DateTimeRange &range) const noexcept;
77 )
78 DEPRECATE(
79 81 void mergeDataSeries(std::shared_ptr<IDataSeries> dataSeries) noexcept;
80 )
81
82 void updateData(const std::vector<IDataSeries*>& dataSeries,
83 const DateTimeRange& newRange, const DateTimeRange& newCacheRange,
84 bool notify=true);
85
86 DEPRECATE(
87 82 static QVector<DateTimeRange> provideNotInCacheRangeList(const DateTimeRange &oldRange,
88 83 const DateTimeRange &nextRange);
89 84
@@ -91,6 +86,7 DEPRECATE(
91 86 const DateTimeRange &nextRange);
92 87 )
93 88
89 operator QUuid() {return _uuid;}
94 90 QUuid ID(){return _uuid;}
95 91 signals:
96 92 void updated();
@@ -31,6 +31,8 public:
31 31 void asyncChangeRange(const std::shared_ptr<Variable>& variable, const DateTimeRange& r);
32 32 const std::set<std::shared_ptr<Variable>> variables();
33 33
34 bool isReady(const std::shared_ptr<Variable>& variable);
35
34 36 void synchronize(const std::shared_ptr<Variable>& var, const std::shared_ptr<Variable>& with);
35 37
36 38
@@ -64,8 +64,12 public:
64 64 return this->_variables;
65 65 }
66 66
67 inline QUuid ID(){return _ID;}
68
69 operator QUuid() {return _ID;}
67 70 private:
68 71 std::set<QUuid> _variables;
72 QUuid _ID = QUuid::createUuid();
69 73 };
70 74
71 75 #endif // SCIQLOP_VARIABLESYNCHRONIZATIONGROUP2_H
@@ -1,33 +1,181
1 #include <QQueue>
2 #include <QThreadPool>
3 #include <QRunnable>
4 #include <QObject>
5
1 6 #include "Variable/VariableController2.h"
2 7 #include "Variable/VariableSynchronizationGroup2.h"
3 8 #include <Common/containers.h>
4 9 #include <Common/debug.h>
5 10 #include <Data/DataProviderParameters.h>
6 11 #include <Data/DateTimeRangeHelper.h>
12 #include <Data/DateTimeRange.h>
7 13 #include <Variable/VariableCacheStrategyFactory.h>
14 #include <Variable/private/VCTransaction.h>
15
16 class Transactions
17 {
18 QReadWriteLock _mutex{QReadWriteLock::Recursive};
19 std::map<QUuid,std::optional<std::shared_ptr<VCTransaction>>> _nextTransactions;
20 std::map<QUuid,std::optional<std::shared_ptr<VCTransaction>>> _pendingTransactions;
21 public:
22 void addEntry(QUuid id)
23 {
24 QWriteLocker lock{&_mutex};
25 _nextTransactions[id] = std::nullopt;
26 _pendingTransactions[id] = std::nullopt;
27 }
28
29 void removeEntry(QUuid id)
30 {
31 QWriteLocker lock{&_mutex};
32 _nextTransactions.erase(id);
33 _pendingTransactions.erase(id);
34 }
35
36 std::map<QUuid,std::optional<std::shared_ptr<VCTransaction>>> pendingTransactions()
37 {
38 QReadLocker lock{&_mutex};
39 return _pendingTransactions;
40 }
41
42 std::map<QUuid,std::optional<std::shared_ptr<VCTransaction>>> nextTransactions()
43 {
44 QReadLocker lock{&_mutex};
45 return _nextTransactions;
46 }
47
48 std::optional<std::shared_ptr<VCTransaction>> start(QUuid id)
49 {
50 QWriteLocker lock{&_mutex};
51 _pendingTransactions[id] = _nextTransactions[id];
52 _nextTransactions[id] = std::nullopt;
53 return _pendingTransactions[id];
54 }
55
56 void enqueue(QUuid id, std::shared_ptr<VCTransaction> transaction)
57 {
58 QWriteLocker lock{&_mutex};
59 _nextTransactions[id] = transaction;
60 }
61
62 void complete(QUuid id)
63 {
64 QWriteLocker lock{&_mutex};
65 _pendingTransactions[id] = std::nullopt;
66 }
67
68 bool active(QUuid id)
69 {
70 QReadLocker lock{&_mutex};
71 return _nextTransactions[id].has_value() && _pendingTransactions[id].has_value();
72 }
73 };
8 74
9 75 class VariableController2::VariableController2Private
10 76 {
77 QThreadPool _ThreadPool;
11 78 QMap<QUuid,std::shared_ptr<Variable>> _variables;
12 79 QMap<QUuid,std::shared_ptr<IDataProvider>> _providers;
13 80 QMap<QUuid,std::shared_ptr<VariableSynchronizationGroup2>> _synchronizationGroups;
81 Transactions _transactions;
82 QReadWriteLock _lock{QReadWriteLock::Recursive};
14 83 std::unique_ptr<VariableCacheStrategy> _cacheStrategy;
84
15 85 bool p_contains(const std::shared_ptr<Variable>& variable)
16 86 {
17 return _providers.contains(variable->ID());
87 QReadLocker lock{&_lock};
88 return _providers.contains(*variable);
18 89 }
19 90 bool v_contains(const std::shared_ptr<Variable>& variable)
20 91 {
92 QReadLocker lock{&_lock};
21 93 return SciQLop::containers::contains(this->_variables, variable);
22 94 }
23 95 bool sg_contains(const std::shared_ptr<Variable>& variable)
24 96 {
25 return _synchronizationGroups.contains(variable->ID());
97 QReadLocker lock{&_lock};
98 return _synchronizationGroups.contains(*variable);
26 99 }
27 100
28 void _changeRange(const std::shared_ptr<Variable>& var, DateTimeRange r)
101 void _transactionComplete(QUuid group, std::shared_ptr<VCTransaction> transaction)
102 {
103 {
104 QWriteLocker lock{&transaction->lock};
105 transaction->ready -=1;
106 }
107 if(transaction->ready==0)
108 {
109 _transactions.complete(group);
110 }
111 this->_processTransactions();
112 }
113 void _processTransactions()
114 {
115 QWriteLocker lock{&_lock};
116 auto nextTransactions = _transactions.nextTransactions();
117 auto pendingTransactions = _transactions.pendingTransactions();
118 for( auto [groupID, newTransaction] : nextTransactions)
119 {
120 if(newTransaction.has_value() && !pendingTransactions[groupID].has_value())
121 {
122 _transactions.start(groupID);
123 auto refVar = _variables[newTransaction.value()->refVar];
124 auto ranges = _computeAllRangesInGroup(refVar,newTransaction.value()->range);
125 for( auto const& [ID, range] : ranges)
126 {
127 auto provider = _providers[ID];
128 auto variable = _variables[ID];
129 auto [missingRanges, newCacheRange] = _computeMissingRanges(variable,range);
130 auto exe = new TransactionExe(_variables[ID], provider, missingRanges, range, newCacheRange);
131 QObject::connect(exe,
132 &TransactionExe::transactionComplete,
133 [groupID=groupID,transaction=newTransaction.value(),this]()
134 {
135 this->_transactionComplete(groupID, transaction);
136 }
137 );
138 _ThreadPool.start(exe);
139 }
140 }
141 }
142 }
143
144 std::map<QUuid,DateTimeRange> _computeAllRangesInGroup(const std::shared_ptr<Variable>& refVar, DateTimeRange r)
145 {
146 std::map<QUuid,DateTimeRange> ranges;
147 if(!DateTimeRangeHelper::hasnan(r))
148 {
149 auto group = _synchronizationGroups[*refVar];
150 if(auto transformation = DateTimeRangeHelper::computeTransformation(refVar->range(),r);
151 transformation.has_value())
152 {
153 for(auto varId:group->variables())
154 {
155 auto var = _variables[varId];
156 auto newRange = var->range().transform(transformation.value());
157 ranges[varId] = newRange;
158 }
159 }
160 else // force new range to all variables -> may be weird if more than one var in the group
161 // @TODO ensure that there is no side effects
162 {
163 for(auto varId:group->variables())
164 {
165 auto var = _variables[varId];
166 ranges[varId] = r;
167 }
168 }
169 }
170 else
171 {
172 SCIQLOP_ERROR(VariableController2Private, "Invalid range containing NaN");
173 }
174 return ranges;
175 }
176
177 std::pair<std::vector<DateTimeRange>,DateTimeRange> _computeMissingRanges(const std::shared_ptr<Variable>& var, DateTimeRange r)
29 178 {
30 auto provider = _providers[var->ID()];
31 179 DateTimeRange newCacheRange;
32 180 std::vector<DateTimeRange> missingRanges;
33 181 if(DateTimeRangeHelper::hasnan(var->cacheRange()))
@@ -40,6 +188,20 class VariableController2::VariableController2Private
40 188 newCacheRange = _cacheStrategy->computeRange(var->cacheRange(),r);
41 189 missingRanges = newCacheRange - var->cacheRange();
42 190 }
191 return {missingRanges,newCacheRange};
192 }
193
194 void _changeRange(QUuid id, DateTimeRange r)
195 {
196 _lock.lockForRead();
197 auto var = _variables[id];
198 _lock.unlock();
199 _changeRange(var,r);
200 }
201 void _changeRange(const std::shared_ptr<Variable>& var, DateTimeRange r)
202 {
203 auto provider = _providers[*var];
204 auto [missingRanges, newCacheRange] = _computeMissingRanges(var,r);
43 205 std::vector<IDataSeries*> data;
44 206 for(auto range:missingRanges)
45 207 {
@@ -52,19 +214,36 public:
52 214 :_cacheStrategy(VariableCacheStrategyFactory::createCacheStrategy(CacheStrategy::SingleThreshold))
53 215 {
54 216 Q_UNUSED(parent);
217 this->_ThreadPool.setMaxThreadCount(32);
55 218 }
56 219
57 ~VariableController2Private() = default;
220 ~VariableController2Private()
221 {
222 while (this->_ThreadPool.activeThreadCount())
223 {
224 this->_ThreadPool.waitForDone(100);
225 }
226 }
58 227
59 228 std::shared_ptr<Variable> createVariable(const QString &name, const QVariantHash &metadata, std::shared_ptr<IDataProvider> provider)
60 229 {
230 QWriteLocker lock{&_lock};
61 231 auto newVar = std::make_shared<Variable>(name,metadata);
62 this->_variables[newVar->ID()] = newVar;
63 this->_providers[newVar->ID()] = std::move(provider);
64 this->_synchronizationGroups[newVar->ID()] = std::make_shared<VariableSynchronizationGroup2>(newVar->ID());
232 this->_variables[*newVar] = newVar;
233 this->_providers[*newVar] = std::move(provider);
234 auto group = std::make_shared<VariableSynchronizationGroup2>(newVar->ID());
235 this->_synchronizationGroups[*newVar] = group;
236 this->_transactions.addEntry(*group);
65 237 return newVar;
66 238 }
67 239
240 bool hasPendingTransactions(const std::shared_ptr<Variable>& variable)
241 {
242 QReadLocker lock{&_lock};
243 auto group = _synchronizationGroups[*variable];
244 return _transactions.active(*group);
245 }
246
68 247 void deleteVariable(const std::shared_ptr<Variable>& variable)
69 248 {
70 249 /*
@@ -72,44 +251,32 public:
72 251 * this means we got the var controller in an inconsistent state
73 252 */
74 253 if(v_contains(variable))
75 this->_variables.remove(variable->ID());
254 {
255 QWriteLocker lock{&_lock};
256 this->_variables.remove(*variable);
257 }
76 258 if(p_contains(variable))
77 this->_providers.remove(variable->ID());
259 {
260 QWriteLocker lock{&_lock};
261 this->_providers.remove(*variable);
262 }
78 263 else
79 264 SCIQLOP_ERROR(VariableController2Private, "No provider found for given variable");
80 265 }
81 266
82 267 void asyncChangeRange(const std::shared_ptr<Variable>& variable, const DateTimeRange& r)
83 268 {
84
85 }
86
87 void changeRange(const std::shared_ptr<Variable>& variable, DateTimeRange r)
88 {
89 269 if(p_contains(variable))
90 270 {
91 271 if(!DateTimeRangeHelper::hasnan(r))
92 272 {
93 auto group = _synchronizationGroups[variable->ID()];
94 if(auto transformation = DateTimeRangeHelper::computeTransformation(variable->range(),r);
95 transformation.has_value())
96 {
97 for(auto varId:group->variables())
98 {
99 auto var = _variables[varId];
100 auto newRange = var->range().transform(transformation.value());
101 _changeRange(var,newRange);
102 }
103 }
104 else // force new range to all variables -> may be weird if more than one var in the group
105 // @TODO ensure that there is no side effects
106 {
107 for(auto varId:group->variables())
273 auto group = _synchronizationGroups[*variable];
274 // Just overwrite next transaction
108 275 {
109 auto var = _variables[varId];
110 _changeRange(var,r);
111 }
276 QWriteLocker lock{&_lock};
277 _transactions.enqueue(*group,std::make_shared<VCTransaction>(variable->ID(), r, static_cast<int>(group->variables().size())));
112 278 }
279 _processTransactions();
113 280 }
114 281 else
115 282 {
@@ -122,16 +289,25 public:
122 289 }
123 290 }
124 291
292 void changeRange(const std::shared_ptr<Variable>& variable, DateTimeRange r)
293 {
294 auto ranges = _computeAllRangesInGroup(variable,r);
295 for( auto const& [ID, range] : ranges)
296 {
297 _changeRange(ID,range);
298 }
299 }
300
125 301 void synchronize(const std::shared_ptr<Variable>& var, const std::shared_ptr<Variable>& with)
126 302 {
127 303 if(v_contains(var) && v_contains(with))
128 304 {
129 305 if(sg_contains(var) && sg_contains(with))
130 306 {
131
307 QWriteLocker lock{&_lock};
132 308 auto dest_group = this->_synchronizationGroups[with->ID()];
133 this->_synchronizationGroups[var->ID()] = dest_group;
134 dest_group->addVariable(var->ID());
309 this->_synchronizationGroups[*var] = dest_group;
310 dest_group->addVariable(*var);
135 311 }
136 312 else
137 313 {
@@ -147,6 +323,7 public:
147 323 const std::set<std::shared_ptr<Variable>> variables()
148 324 {
149 325 std::set<std::shared_ptr<Variable>> vars;
326 QReadLocker lock{&_lock};
150 327 for(const auto &var:_variables)
151 328 {
152 329 vars.insert(var);
@@ -192,8 +369,12 const std::set<std::shared_ptr<Variable> > VariableController2::variables()
192 369 return impl->variables();
193 370 }
194 371
372 bool VariableController2::isReady(const std::shared_ptr<Variable> &variable)
373 {
374 return impl->hasPendingTransactions(variable);
375 }
376
195 377 void VariableController2::synchronize(const std::shared_ptr<Variable> &var, const std::shared_ptr<Variable> &with)
196 378 {
197 379 impl->synchronize(var, with);
198 380 }
199
@@ -46,4 +46,5 declare_test(TestDownloader TestDownloader Network/TestDownloader.cpp "sciqlopco
46 46
47 47
48 48 declare_test(TestVariableController2 TestVariableController2 Variable/TestVariableController2.cpp "sciqlopcore;TestUtils;Qt5::Test")
49 declare_test(TestVariableController2Async TestVariableController2Async Variable/TestVariableController2Async.cpp "sciqlopcore;TestUtils;Qt5::Test")
49 50 declare_test(TestVariableController2WithSync TestVariableController2WithSync Variable/TestVariableController2WithSync.cpp "sciqlopcore;TestUtils;Qt5::Test")
General Comments 0
You need to be logged in to leave comments. Login now