##// END OF EJS Templates
Add implementation of the Var Acquisition waiting
perrinel -
r626:fa45b8492b4d
parent child
Show More
@@ -1,552 +1,734
1 1 #include <Variable/Variable.h>
2 2 #include <Variable/VariableAcquisitionWorker.h>
3 3 #include <Variable/VariableCacheController.h>
4 4 #include <Variable/VariableCacheStrategy.h>
5 5 #include <Variable/VariableController.h>
6 6 #include <Variable/VariableModel.h>
7 7 #include <Variable/VariableSynchronizationGroup.h>
8 8
9 9 #include <Data/DataProviderParameters.h>
10 10 #include <Data/IDataProvider.h>
11 11 #include <Data/IDataSeries.h>
12 #include <Data/VariableRequest.h>
12 13 #include <Time/TimeController.h>
13 14
14 15 #include <QMutex>
15 16 #include <QThread>
16 17 #include <QUuid>
17 18 #include <QtCore/QItemSelectionModel>
18 19
20 #include <deque>
19 21 #include <set>
20 22 #include <unordered_map>
21 23
22 24 Q_LOGGING_CATEGORY(LOG_VariableController, "VariableController")
23 25
24 26 namespace {
25 27
26 28 SqpRange computeSynchroRangeRequested(const SqpRange &varRange, const SqpRange &graphRange,
27 29 const SqpRange &oldGraphRange)
28 30 {
29 31 auto zoomType = VariableController::getZoomType(graphRange, oldGraphRange);
30 32
31 33 auto varRangeRequested = varRange;
32 34 switch (zoomType) {
33 35 case AcquisitionZoomType::ZoomIn: {
34 36 auto deltaLeft = graphRange.m_TStart - oldGraphRange.m_TStart;
35 37 auto deltaRight = oldGraphRange.m_TEnd - graphRange.m_TEnd;
36 38 varRangeRequested.m_TStart += deltaLeft;
37 39 varRangeRequested.m_TEnd -= deltaRight;
38 40 break;
39 41 }
40 42
41 43 case AcquisitionZoomType::ZoomOut: {
42 44 auto deltaLeft = oldGraphRange.m_TStart - graphRange.m_TStart;
43 45 auto deltaRight = graphRange.m_TEnd - oldGraphRange.m_TEnd;
44 46 varRangeRequested.m_TStart -= deltaLeft;
45 47 varRangeRequested.m_TEnd += deltaRight;
46 48 break;
47 49 }
48 50 case AcquisitionZoomType::PanRight: {
49 51 auto deltaRight = graphRange.m_TEnd - oldGraphRange.m_TEnd;
50 52 varRangeRequested.m_TStart += deltaRight;
51 53 varRangeRequested.m_TEnd += deltaRight;
52 54 break;
53 55 }
54 56 case AcquisitionZoomType::PanLeft: {
55 57 auto deltaLeft = oldGraphRange.m_TStart - graphRange.m_TStart;
56 58 varRangeRequested.m_TStart -= deltaLeft;
57 59 varRangeRequested.m_TEnd -= deltaLeft;
58 60 break;
59 61 }
60 62 case AcquisitionZoomType::Unknown: {
61 63 qCCritical(LOG_VariableController())
62 64 << VariableController::tr("Impossible to synchronize: zoom type unknown");
63 65 break;
64 66 }
65 67 default:
66 68 qCCritical(LOG_VariableController()) << VariableController::tr(
67 69 "Impossible to synchronize: zoom type not take into account");
68 70 // No action
69 71 break;
70 72 }
71 73
72 74 return varRangeRequested;
73 75 }
74 76 }
75 77
76 78 struct VariableController::VariableControllerPrivate {
77 79 explicit VariableControllerPrivate(VariableController *parent)
78 80 : m_WorkingMutex{},
79 81 m_VariableModel{new VariableModel{parent}},
80 82 m_VariableSelectionModel{new QItemSelectionModel{m_VariableModel, parent}},
81 83 m_VariableCacheController{std::make_unique<VariableCacheController>()},
82 84 m_VariableCacheStrategy{std::make_unique<VariableCacheStrategy>()},
83 85 m_VariableAcquisitionWorker{std::make_unique<VariableAcquisitionWorker>()},
84 86 q{parent}
85 87 {
86 88
87 89 m_VariableAcquisitionWorker->moveToThread(&m_VariableAcquisitionWorkerThread);
88 90 m_VariableAcquisitionWorkerThread.setObjectName("VariableAcquisitionWorkerThread");
89 91 }
90 92
91 93
92 94 virtual ~VariableControllerPrivate()
93 95 {
94 96 qCDebug(LOG_VariableController()) << tr("VariableControllerPrivate destruction");
95 97 m_VariableAcquisitionWorkerThread.quit();
96 98 m_VariableAcquisitionWorkerThread.wait();
97 99 }
98 100
99 101
100 void processRequest(std::shared_ptr<Variable> var, const SqpRange &rangeRequested);
102 void processRequest(std::shared_ptr<Variable> var, const SqpRange &rangeRequested,
103 QUuid varRequestId);
101 104
102 105 QVector<SqpRange> provideNotInCacheDateTimeList(std::shared_ptr<Variable> variable,
103 106 const SqpRange &dateTime);
104 107
105 108 std::shared_ptr<Variable> findVariable(QUuid vIdentifier);
106 109 std::shared_ptr<IDataSeries>
107 110 retrieveDataSeries(const QVector<AcquisitionDataPacket> acqDataPacketVector);
108 111
109 112 void registerProvider(std::shared_ptr<IDataProvider> provider);
110 113
114 void storeVariableRequest(QUuid varId, QUuid varRequestId, const VariableRequest &varRequest);
115 QUuid acceptVariableRequest(QUuid varId, std::shared_ptr<IDataSeries> dataSeries);
116 void updateVariableRequest(QUuid varRequestId);
117 void cancelVariableRequest(QUuid varRequestId);
118
111 119 QMutex m_WorkingMutex;
112 120 /// Variable model. The VariableController has the ownership
113 121 VariableModel *m_VariableModel;
114 122 QItemSelectionModel *m_VariableSelectionModel;
115 123
116 124
117 125 TimeController *m_TimeController{nullptr};
118 126 std::unique_ptr<VariableCacheController> m_VariableCacheController;
119 127 std::unique_ptr<VariableCacheStrategy> m_VariableCacheStrategy;
120 128 std::unique_ptr<VariableAcquisitionWorker> m_VariableAcquisitionWorker;
121 129 QThread m_VariableAcquisitionWorkerThread;
122 130
123 131 std::unordered_map<std::shared_ptr<Variable>, std::shared_ptr<IDataProvider> >
124 132 m_VariableToProviderMap;
125 133 std::unordered_map<std::shared_ptr<Variable>, QUuid> m_VariableToIdentifierMap;
126 134 std::map<QUuid, std::shared_ptr<VariableSynchronizationGroup> >
127 135 m_GroupIdToVariableSynchronizationGroupMap;
128 136 std::map<QUuid, QUuid> m_VariableIdGroupIdMap;
129 137 std::set<std::shared_ptr<IDataProvider> > m_ProviderSet;
130 138
139 std::map<QUuid, std::map<QUuid, VariableRequest> > m_VarRequestIdToVarIdVarRequestMap;
140
141 std::map<QUuid, std::deque<QUuid> > m_VarIdToVarRequestIdQueueMap;
142
131 143
132 144 VariableController *q;
133 145 };
134 146
135 147
136 148 VariableController::VariableController(QObject *parent)
137 149 : QObject{parent}, impl{spimpl::make_unique_impl<VariableControllerPrivate>(this)}
138 150 {
139 151 qCDebug(LOG_VariableController()) << tr("VariableController construction")
140 152 << QThread::currentThread();
141 153
142 154 connect(impl->m_VariableModel, &VariableModel::abortProgessRequested, this,
143 155 &VariableController::onAbortProgressRequested);
144 156
145 157 connect(impl->m_VariableAcquisitionWorker.get(), &VariableAcquisitionWorker::dataProvided, this,
146 158 &VariableController::onDataProvided);
147 159 connect(impl->m_VariableAcquisitionWorker.get(),
148 160 &VariableAcquisitionWorker::variableRequestInProgress, this,
149 161 &VariableController::onVariableRetrieveDataInProgress);
150 162
151 163 connect(&impl->m_VariableAcquisitionWorkerThread, &QThread::started,
152 164 impl->m_VariableAcquisitionWorker.get(), &VariableAcquisitionWorker::initialize);
153 165 connect(&impl->m_VariableAcquisitionWorkerThread, &QThread::finished,
154 166 impl->m_VariableAcquisitionWorker.get(), &VariableAcquisitionWorker::finalize);
155 167
156 168
157 169 impl->m_VariableAcquisitionWorkerThread.start();
158 170 }
159 171
160 172 VariableController::~VariableController()
161 173 {
162 174 qCDebug(LOG_VariableController()) << tr("VariableController destruction")
163 175 << QThread::currentThread();
164 176 this->waitForFinish();
165 177 }
166 178
167 179 VariableModel *VariableController::variableModel() noexcept
168 180 {
169 181 return impl->m_VariableModel;
170 182 }
171 183
172 184 QItemSelectionModel *VariableController::variableSelectionModel() noexcept
173 185 {
174 186 return impl->m_VariableSelectionModel;
175 187 }
176 188
177 189 void VariableController::setTimeController(TimeController *timeController) noexcept
178 190 {
179 191 impl->m_TimeController = timeController;
180 192 }
181 193
182 194 void VariableController::deleteVariable(std::shared_ptr<Variable> variable) noexcept
183 195 {
184 196 if (!variable) {
185 197 qCCritical(LOG_VariableController()) << "Can't delete variable: variable is null";
186 198 return;
187 199 }
188 200
189 201 // Spreads in SciQlop that the variable will be deleted, so that potential receivers can
190 202 // make some treatments before the deletion
191 203 emit variableAboutToBeDeleted(variable);
192 204
193 205 // Deletes identifier
194 206 impl->m_VariableToIdentifierMap.erase(variable);
195 207
196 208 // Deletes provider
197 209 auto nbProvidersDeleted = impl->m_VariableToProviderMap.erase(variable);
198 210 qCDebug(LOG_VariableController())
199 211 << tr("Number of providers deleted for variable %1: %2")
200 212 .arg(variable->name(), QString::number(nbProvidersDeleted));
201 213
202 214 // Clears cache
203 215 impl->m_VariableCacheController->clear(variable);
204 216
205 217 // Deletes from model
206 218 impl->m_VariableModel->deleteVariable(variable);
207 219 }
208 220
209 221 void VariableController::deleteVariables(
210 222 const QVector<std::shared_ptr<Variable> > &variables) noexcept
211 223 {
212 224 for (auto variable : qAsConst(variables)) {
213 225 deleteVariable(variable);
214 226 }
215 227 }
216 228
217 229 void VariableController::abortProgress(std::shared_ptr<Variable> variable)
218 230 {
219 231 }
220 232
221 233 std::shared_ptr<Variable>
222 234 VariableController::createVariable(const QString &name, const QVariantHash &metadata,
223 235 std::shared_ptr<IDataProvider> provider) noexcept
224 236 {
225 237 if (!impl->m_TimeController) {
226 238 qCCritical(LOG_VariableController())
227 239 << tr("Impossible to create variable: The time controller is null");
228 240 return nullptr;
229 241 }
230 242
231 243 auto range = impl->m_TimeController->dateTime();
232 244
233 245 if (auto newVariable = impl->m_VariableModel->createVariable(name, range, metadata)) {
234 246 auto identifier = QUuid::createUuid();
235 247
236 248 // store the provider
237 249 impl->registerProvider(provider);
238 250
239 251 // Associate the provider
240 252 impl->m_VariableToProviderMap[newVariable] = provider;
241 253 impl->m_VariableToIdentifierMap[newVariable] = identifier;
242 254
243 255
244 impl->processRequest(newVariable, range);
256 auto varRequestId = QUuid::createUuid();
257 qCInfo(LOG_VariableController()) << "processRequest for" << name << varRequestId;
258 impl->processRequest(newVariable, range, varRequestId);
259 impl->updateVariableRequest(varRequestId);
245 260
246 261 return newVariable;
247 262 }
248 263 }
249 264
250 265 void VariableController::onDateTimeOnSelection(const SqpRange &dateTime)
251 266 {
252 // TODO check synchronisation
267 // TODO check synchronisation and Rescale
253 268 qCDebug(LOG_VariableController()) << "VariableController::onDateTimeOnSelection"
254 269 << QThread::currentThread()->objectName();
255 270 auto selectedRows = impl->m_VariableSelectionModel->selectedRows();
271 auto varRequestId = QUuid::createUuid();
256 272
257 273 for (const auto &selectedRow : qAsConst(selectedRows)) {
258 274 if (auto selectedVariable = impl->m_VariableModel->variable(selectedRow.row())) {
259 275 selectedVariable->setRange(dateTime);
260 impl->processRequest(selectedVariable, dateTime);
276 impl->processRequest(selectedVariable, dateTime, varRequestId);
261 277
262 278 // notify that rescale operation has to be done
263 279 emit rangeChanged(selectedVariable, dateTime);
264 280 }
265 281 }
282 impl->updateVariableRequest(varRequestId);
266 283 }
267 284
268 285 void VariableController::onDataProvided(QUuid vIdentifier, const SqpRange &rangeRequested,
269 286 const SqpRange &cacheRangeRequested,
270 287 QVector<AcquisitionDataPacket> dataAcquired)
271 288 {
272 if (auto var = impl->findVariable(vIdentifier)) {
273 var->setRange(rangeRequested);
274 var->setCacheRange(cacheRangeRequested);
275 qCDebug(LOG_VariableController()) << tr("1: onDataProvided") << rangeRequested;
276 qCDebug(LOG_VariableController()) << tr("2: onDataProvided") << cacheRangeRequested;
277
278 289 auto retrievedDataSeries = impl->retrieveDataSeries(dataAcquired);
279 qCDebug(LOG_VariableController()) << tr("3: onDataProvided")
280 << retrievedDataSeries->range();
281 var->mergeDataSeries(retrievedDataSeries);
282 qCDebug(LOG_VariableController()) << tr("4: onDataProvided");
283 emit var->updated();
284 }
285 else {
286 qCCritical(LOG_VariableController()) << tr("Impossible to provide data to a null variable");
290 auto varRequestId = impl->acceptVariableRequest(vIdentifier, retrievedDataSeries);
291 if (!varRequestId.isNull()) {
292 impl->updateVariableRequest(varRequestId);
287 293 }
288 294 }
289 295
290 296 void VariableController::onVariableRetrieveDataInProgress(QUuid identifier, double progress)
291 297 {
292 298 if (auto var = impl->findVariable(identifier)) {
293 299 impl->m_VariableModel->setDataProgress(var, progress);
294 300 }
295 301 else {
296 302 qCCritical(LOG_VariableController())
297 303 << tr("Impossible to notify progression of a null variable");
298 304 }
299 305 }
300 306
301 307 void VariableController::onAbortProgressRequested(std::shared_ptr<Variable> variable)
302 308 {
303 309 qCDebug(LOG_VariableController()) << "TORM: VariableController::onAbortProgressRequested"
304 310 << QThread::currentThread()->objectName();
305 311
306 312 auto it = impl->m_VariableToIdentifierMap.find(variable);
307 313 if (it != impl->m_VariableToIdentifierMap.cend()) {
308 314 impl->m_VariableToProviderMap.at(variable)->requestDataAborting(it->second);
309 315 }
310 316 else {
311 317 qCWarning(LOG_VariableController())
312 318 << tr("Aborting progression of inexistant variable detected !!!")
313 319 << QThread::currentThread()->objectName();
314 320 }
315 321 }
316 322
317 323 void VariableController::onAddSynchronizationGroupId(QUuid synchronizationGroupId)
318 324 {
319 325 qCDebug(LOG_VariableController()) << "TORM: VariableController::onAddSynchronizationGroupId"
320 326 << QThread::currentThread()->objectName()
321 327 << synchronizationGroupId;
322 328 auto vSynchroGroup = std::make_shared<VariableSynchronizationGroup>();
323 329 impl->m_GroupIdToVariableSynchronizationGroupMap.insert(
324 330 std::make_pair(synchronizationGroupId, vSynchroGroup));
325 331 }
326 332
327 333 void VariableController::onRemoveSynchronizationGroupId(QUuid synchronizationGroupId)
328 334 {
329 335 impl->m_GroupIdToVariableSynchronizationGroupMap.erase(synchronizationGroupId);
330 336 }
331 337
332 338 void VariableController::onAddSynchronized(std::shared_ptr<Variable> variable,
333 339 QUuid synchronizationGroupId)
334 340
335 341 {
336 342 qCDebug(LOG_VariableController()) << "TORM: VariableController::onAddSynchronized"
337 343 << synchronizationGroupId;
338 344 auto varToVarIdIt = impl->m_VariableToIdentifierMap.find(variable);
339 345 if (varToVarIdIt != impl->m_VariableToIdentifierMap.cend()) {
340 346 auto groupIdToVSGIt
341 347 = impl->m_GroupIdToVariableSynchronizationGroupMap.find(synchronizationGroupId);
342 348 if (groupIdToVSGIt != impl->m_GroupIdToVariableSynchronizationGroupMap.cend()) {
343 349 impl->m_VariableIdGroupIdMap.insert(
344 350 std::make_pair(varToVarIdIt->second, synchronizationGroupId));
345 351 groupIdToVSGIt->second->addVariableId(varToVarIdIt->second);
346 352 }
347 353 else {
348 354 qCCritical(LOG_VariableController())
349 355 << tr("Impossible to synchronize a variable with an unknown sycnhronization group")
350 356 << variable->name();
351 357 }
352 358 }
353 359 else {
354 360 qCCritical(LOG_VariableController())
355 361 << tr("Impossible to synchronize a variable with no identifier") << variable->name();
356 362 }
357 363 }
358 364
359 365
360 366 void VariableController::onRequestDataLoading(QVector<std::shared_ptr<Variable> > variables,
361 367 const SqpRange &range, const SqpRange &oldRange,
362 368 bool synchronise)
363 369 {
364 370 // NOTE: oldRange isn't really necessary since oldRange == variable->range().
365 371
366 372 qCDebug(LOG_VariableController()) << "VariableController::onRequestDataLoading"
367 373 << QThread::currentThread()->objectName();
368 374 // we want to load data of the variable for the dateTime.
369 375 // First we check if the cache contains some of them.
370 376 // For the other, we ask the provider to give them.
371 377
378 auto varRequestId = QUuid::createUuid();
379
372 380 for (const auto &var : variables) {
373 qCDebug(LOG_VariableController()) << "processRequest for" << var->name();
374 impl->processRequest(var, range);
381 qCInfo(LOG_VariableController()) << "processRequest for" << var->name() << varRequestId;
382 impl->processRequest(var, range, varRequestId);
375 383 }
376 384
377 385 if (synchronise) {
378 386 // Get the group ids
379 387 qCDebug(LOG_VariableController())
380 388 << "TORM VariableController::onRequestDataLoading for synchro var ENABLE";
381 389 auto groupIds = std::set<QUuid>();
382 390 for (const auto &var : variables) {
383 391 auto varToVarIdIt = impl->m_VariableToIdentifierMap.find(var);
384 392 if (varToVarIdIt != impl->m_VariableToIdentifierMap.cend()) {
385 393 auto vId = varToVarIdIt->second;
386 394 auto varIdToGroupIdIt = impl->m_VariableIdGroupIdMap.find(vId);
387 395 if (varIdToGroupIdIt != impl->m_VariableIdGroupIdMap.cend()) {
388 396 auto gId = varIdToGroupIdIt->second;
389 397 if (groupIds.find(gId) == groupIds.cend()) {
390 398 qCDebug(LOG_VariableController()) << "Synchro detect group " << gId;
391 399 groupIds.insert(gId);
392 400 }
393 401 }
394 402 }
395 403 }
396 404
397 405 // We assume here all group ids exist
398 406 for (const auto &gId : groupIds) {
399 407 auto vSynchronizationGroup = impl->m_GroupIdToVariableSynchronizationGroupMap.at(gId);
400 408 auto vSyncIds = vSynchronizationGroup->getIds();
401 409 qCDebug(LOG_VariableController()) << "Var in synchro group ";
402 410 for (auto vId : vSyncIds) {
403 411 auto var = impl->findVariable(vId);
404 412
405 413 // Don't process already processed var
406 414 if (!variables.contains(var)) {
407 415 if (var != nullptr) {
408 416 qCDebug(LOG_VariableController()) << "processRequest synchro for"
409 417 << var->name();
410 418 auto vSyncRangeRequested
411 419 = computeSynchroRangeRequested(var->range(), range, oldRange);
412 impl->processRequest(var, vSyncRangeRequested);
420 impl->processRequest(var, vSyncRangeRequested, varRequestId);
413 421 }
414 422 else {
415 423 qCCritical(LOG_VariableController())
416 424
417 425 << tr("Impossible to synchronize a null variable");
418 426 }
419 427 }
420 428 }
421 429 }
422 430 }
431
432 impl->updateVariableRequest(varRequestId);
423 433 }
424 434
425 435
426 436 void VariableController::initialize()
427 437 {
428 438 qCDebug(LOG_VariableController()) << tr("VariableController init") << QThread::currentThread();
429 439 impl->m_WorkingMutex.lock();
430 440 qCDebug(LOG_VariableController()) << tr("VariableController init END");
431 441 }
432 442
433 443 void VariableController::finalize()
434 444 {
435 445 impl->m_WorkingMutex.unlock();
436 446 }
437 447
438 448 void VariableController::waitForFinish()
439 449 {
440 450 QMutexLocker locker{&impl->m_WorkingMutex};
441 451 }
442 452
443 453 AcquisitionZoomType VariableController::getZoomType(const SqpRange &range, const SqpRange &oldRange)
444 454 {
445 455 // t1.m_TStart <= t2.m_TStart && t2.m_TEnd <= t1.m_TEnd
446 456 auto zoomType = AcquisitionZoomType::Unknown;
447 457 if (range.m_TStart <= oldRange.m_TStart && oldRange.m_TEnd <= range.m_TEnd) {
448 458 zoomType = AcquisitionZoomType::ZoomOut;
449 459 }
450 460 else if (range.m_TStart > oldRange.m_TStart && range.m_TEnd > oldRange.m_TEnd) {
451 461 zoomType = AcquisitionZoomType::PanRight;
452 462 }
453 463 else if (range.m_TStart < oldRange.m_TStart && range.m_TEnd < oldRange.m_TEnd) {
454 464 zoomType = AcquisitionZoomType::PanLeft;
455 465 }
456 466 else if (range.m_TStart > oldRange.m_TStart && oldRange.m_TEnd > range.m_TEnd) {
457 467 zoomType = AcquisitionZoomType::ZoomIn;
458 468 }
459 469 else {
460 470 qCCritical(LOG_VariableController()) << "getZoomType: Unknown type detected";
461 471 }
462 472 return zoomType;
463 473 }
464 474
465 475 void VariableController::VariableControllerPrivate::processRequest(std::shared_ptr<Variable> var,
466 const SqpRange &rangeRequested)
476 const SqpRange &rangeRequested,
477 QUuid varRequestId)
467 478 {
468 479
469 auto varRangesRequested
470 = m_VariableCacheStrategy->computeCacheRange(var->range(), rangeRequested);
471 auto notInCacheRangeList = var->provideNotInCacheRangeList(varRangesRequested.second);
472 auto inCacheRangeList = var->provideInCacheRangeList(varRangesRequested.second);
480 // TODO: protect at
481 auto varRequest = VariableRequest{};
482 auto varId = m_VariableToIdentifierMap.at(var);
483
484
485 auto varStrategyRangesRequested
486 = m_VariableCacheStrategy->computeStrategyRanges(var->range(), rangeRequested);
487 auto notInCacheRangeList = var->provideNotInCacheRangeList(varStrategyRangesRequested.second);
488 auto inCacheRangeList = var->provideInCacheRangeList(varStrategyRangesRequested.second);
473 489
474 490 if (!notInCacheRangeList.empty()) {
475 auto identifier = m_VariableToIdentifierMap.at(var);
491 varRequest.m_RangeRequested = varStrategyRangesRequested.first;
492 varRequest.m_CacheRangeRequested = varStrategyRangesRequested.second;
493 // store VarRequest
494 storeVariableRequest(varId, varRequestId, varRequest);
495
476 496 auto varProvider = m_VariableToProviderMap.at(var);
477 497 if (varProvider != nullptr) {
478 m_VariableAcquisitionWorker->pushVariableRequest(
479 identifier, varRangesRequested.first, varRangesRequested.second,
498 auto varRequestIdCanceled = m_VariableAcquisitionWorker->pushVariableRequest(
499 varRequestId, varId, varStrategyRangesRequested.first,
500 varStrategyRangesRequested.second,
480 501 DataProviderParameters{std::move(notInCacheRangeList), var->metadata()},
481 502 varProvider);
503
504 if (!varRequestIdCanceled.isNull()) {
505 cancelVariableRequest(varRequestIdCanceled);
506 }
482 507 }
483 508 else {
484 509 qCCritical(LOG_VariableController())
485 510 << "Impossible to provide data with a null provider";
486 511 }
487 512
488 513 if (!inCacheRangeList.empty()) {
489 514 emit q->updateVarDisplaying(var, inCacheRangeList.first());
490 515 }
491 516 }
492 517 else {
493 var->setRange(rangeRequested);
494 var->setCacheRange(varRangesRequested.second);
495 var->setDataSeries(var->dataSeries()->subDataSeries(varRangesRequested.second));
496 emit var->updated();
518
519 varRequest.m_RangeRequested = varStrategyRangesRequested.first;
520 varRequest.m_CacheRangeRequested = varStrategyRangesRequested.second;
521 // store VarRequest
522 storeVariableRequest(varId, varRequestId, varRequest);
523 acceptVariableRequest(varId,
524 var->dataSeries()->subDataSeries(varStrategyRangesRequested.second));
497 525 }
498 526 }
499 527
500 528 std::shared_ptr<Variable>
501 529 VariableController::VariableControllerPrivate::findVariable(QUuid vIdentifier)
502 530 {
503 531 std::shared_ptr<Variable> var;
504 532 auto findReply = [vIdentifier](const auto &entry) { return vIdentifier == entry.second; };
505 533
506 534 auto end = m_VariableToIdentifierMap.cend();
507 535 auto it = std::find_if(m_VariableToIdentifierMap.cbegin(), end, findReply);
508 536 if (it != end) {
509 537 var = it->first;
510 538 }
511 539 else {
512 540 qCCritical(LOG_VariableController())
513 541 << tr("Impossible to find the variable with the identifier: ") << vIdentifier;
514 542 }
515 543
516 544 return var;
517 545 }
518 546
519 547 std::shared_ptr<IDataSeries> VariableController::VariableControllerPrivate::retrieveDataSeries(
520 548 const QVector<AcquisitionDataPacket> acqDataPacketVector)
521 549 {
522 550 qCDebug(LOG_VariableController()) << tr("TORM: retrieveDataSeries acqDataPacketVector size")
523 551 << acqDataPacketVector.size();
524 552 std::shared_ptr<IDataSeries> dataSeries;
525 553 if (!acqDataPacketVector.isEmpty()) {
526 554 dataSeries = acqDataPacketVector[0].m_DateSeries;
527 555 for (int i = 1; i < acqDataPacketVector.size(); ++i) {
528 556 dataSeries->merge(acqDataPacketVector[i].m_DateSeries.get());
529 557 }
530 558 }
531 559 qCDebug(LOG_VariableController()) << tr("TORM: retrieveDataSeries acqDataPacketVector size END")
532 560 << acqDataPacketVector.size();
533 561 return dataSeries;
534 562 }
535 563
536 564 void VariableController::VariableControllerPrivate::registerProvider(
537 565 std::shared_ptr<IDataProvider> provider)
538 566 {
539 567 if (m_ProviderSet.find(provider) == m_ProviderSet.end()) {
540 568 qCDebug(LOG_VariableController()) << tr("Registering of a new provider")
541 569 << provider->objectName();
542 570 m_ProviderSet.insert(provider);
543 571 connect(provider.get(), &IDataProvider::dataProvided, m_VariableAcquisitionWorker.get(),
544 572 &VariableAcquisitionWorker::onVariableDataAcquired);
545 573 connect(provider.get(), &IDataProvider::dataProvidedProgress,
546 574 m_VariableAcquisitionWorker.get(),
547 575 &VariableAcquisitionWorker::onVariableRetrieveDataInProgress);
548 576 }
549 577 else {
550 578 qCDebug(LOG_VariableController()) << tr("Cannot register provider, it already exists ");
551 579 }
552 580 }
581
582 void VariableController::VariableControllerPrivate::storeVariableRequest(
583 QUuid varId, QUuid varRequestId, const VariableRequest &varRequest)
584 {
585 // First request for the variable. we can create an entry for it
586 auto varIdToVarRequestIdQueueMapIt = m_VarIdToVarRequestIdQueueMap.find(varId);
587 if (varIdToVarRequestIdQueueMapIt == m_VarIdToVarRequestIdQueueMap.cend()) {
588 auto varRequestIdQueue = std::deque<QUuid>{};
589 qCDebug(LOG_VariableController()) << tr("Store REQUEST in QUEUE");
590 varRequestIdQueue.push_back(varRequestId);
591 m_VarIdToVarRequestIdQueueMap.insert(std::make_pair(varId, std::move(varRequestIdQueue)));
592 }
593 else {
594 qCDebug(LOG_VariableController()) << tr("Store REQUEST in EXISTING QUEUE");
595 auto &varRequestIdQueue = varIdToVarRequestIdQueueMapIt->second;
596 varRequestIdQueue.push_back(varRequestId);
597 }
598
599 auto varRequestIdToVarIdVarRequestMapIt = m_VarRequestIdToVarIdVarRequestMap.find(varRequestId);
600 if (varRequestIdToVarIdVarRequestMapIt == m_VarRequestIdToVarIdVarRequestMap.cend()) {
601 auto varIdToVarRequestMap = std::map<QUuid, VariableRequest>{};
602 varIdToVarRequestMap.insert(std::make_pair(varId, varRequest));
603 qCDebug(LOG_VariableController()) << tr("Store REQUESTID in MAP");
604 m_VarRequestIdToVarIdVarRequestMap.insert(
605 std::make_pair(varRequestId, std::move(varIdToVarRequestMap)));
606 }
607 else {
608 auto &varIdToVarRequestMap = varRequestIdToVarIdVarRequestMapIt->second;
609 qCDebug(LOG_VariableController()) << tr("Store REQUESTID in EXISTING MAP");
610 varIdToVarRequestMap.insert(std::make_pair(varId, varRequest));
611 }
612 }
613
614 QUuid VariableController::VariableControllerPrivate::acceptVariableRequest(
615 QUuid varId, std::shared_ptr<IDataSeries> dataSeries)
616 {
617 QUuid varRequestId;
618 auto varIdToVarRequestIdQueueMapIt = m_VarIdToVarRequestIdQueueMap.find(varId);
619 if (varIdToVarRequestIdQueueMapIt != m_VarIdToVarRequestIdQueueMap.cend()) {
620 auto &varRequestIdQueue = varIdToVarRequestIdQueueMapIt->second;
621 varRequestId = varRequestIdQueue.front();
622 auto varRequestIdToVarIdVarRequestMapIt
623 = m_VarRequestIdToVarIdVarRequestMap.find(varRequestId);
624 if (varRequestIdToVarIdVarRequestMapIt != m_VarRequestIdToVarIdVarRequestMap.cend()) {
625 auto &varIdToVarRequestMap = varRequestIdToVarIdVarRequestMapIt->second;
626 auto varIdToVarRequestMapIt = varIdToVarRequestMap.find(varId);
627 if (varIdToVarRequestMapIt != varIdToVarRequestMap.cend()) {
628 qCDebug(LOG_VariableController()) << tr("acceptVariableRequest");
629 auto &varRequest = varIdToVarRequestMapIt->second;
630 varRequest.m_DataSeries = dataSeries;
631 varRequest.m_CanUpdate = true;
632 }
633 else {
634 qCDebug(LOG_VariableController())
635 << tr("Impossible to acceptVariableRequest of a unknown variable id attached "
636 "to a variableRequestId")
637 << varRequestId << varId;
638 }
639 }
640 else {
641 qCCritical(LOG_VariableController())
642 << tr("Impossible to acceptVariableRequest of a unknown variableRequestId")
643 << varRequestId;
644 }
645
646 qCDebug(LOG_VariableController()) << tr("1: erase REQUEST in QUEUE ?")
647 << varRequestIdQueue.size();
648 varRequestIdQueue.pop_front();
649 qCDebug(LOG_VariableController()) << tr("2: erase REQUEST in QUEUE ?")
650 << varRequestIdQueue.size();
651 if (varRequestIdQueue.empty()) {
652 m_VarIdToVarRequestIdQueueMap.erase(varId);
653 }
654 }
655 else {
656 qCCritical(LOG_VariableController())
657 << tr("Impossible to acceptVariableRequest of a unknown variable id") << varId;
658 }
659
660 return varRequestId;
661 }
662
663 void VariableController::VariableControllerPrivate::updateVariableRequest(QUuid varRequestId)
664 {
665
666 auto varRequestIdToVarIdVarRequestMapIt = m_VarRequestIdToVarIdVarRequestMap.find(varRequestId);
667 if (varRequestIdToVarIdVarRequestMapIt != m_VarRequestIdToVarIdVarRequestMap.cend()) {
668 bool processVariableUpdate = true;
669 auto &varIdToVarRequestMap = varRequestIdToVarIdVarRequestMapIt->second;
670 for (auto varIdToVarRequestMapIt = varIdToVarRequestMap.cbegin();
671 (varIdToVarRequestMapIt != varIdToVarRequestMap.cend()) && processVariableUpdate;
672 ++varIdToVarRequestMapIt) {
673 processVariableUpdate &= varIdToVarRequestMapIt->second.m_CanUpdate;
674 qCDebug(LOG_VariableController()) << tr("updateVariableRequest")
675 << processVariableUpdate;
676 }
677
678 if (processVariableUpdate) {
679 for (auto varIdToVarRequestMapIt = varIdToVarRequestMap.cbegin();
680 varIdToVarRequestMapIt != varIdToVarRequestMap.cend(); ++varIdToVarRequestMapIt) {
681 if (auto var = findVariable(varIdToVarRequestMapIt->first)) {
682 auto &varRequest = varIdToVarRequestMapIt->second;
683 var->setRange(varRequest.m_RangeRequested);
684 var->setCacheRange(varRequest.m_CacheRangeRequested);
685 qCInfo(LOG_VariableController()) << tr("1: onDataProvided")
686 << varRequest.m_RangeRequested;
687 qCInfo(LOG_VariableController()) << tr("2: onDataProvided")
688 << varRequest.m_CacheRangeRequested;
689 var->mergeDataSeries(varRequest.m_DataSeries);
690 qCInfo(LOG_VariableController()) << tr("3: onDataProvided")
691 << varRequest.m_DataSeries->range();
692 qCDebug(LOG_VariableController()) << tr("4: onDataProvided");
693 emit var->updated();
694 }
695 else {
696 qCCritical(LOG_VariableController())
697 << tr("Impossible to update data to a null variable");
698 }
699 }
700
701 // cleaning varRequestId
702 qCDebug(LOG_VariableController()) << tr("0: erase REQUEST in MAP ?")
703 << m_VarRequestIdToVarIdVarRequestMap.size();
704 m_VarRequestIdToVarIdVarRequestMap.erase(varRequestId);
705 qCDebug(LOG_VariableController()) << tr("1: erase REQUEST in MAP ?")
706 << m_VarRequestIdToVarIdVarRequestMap.size();
707 }
708 }
709 else {
710 qCCritical(LOG_VariableController())
711 << tr("Cannot updateVariableRequest for a unknow varRequestId") << varRequestId;
712 }
713 }
714
715 void VariableController::VariableControllerPrivate::cancelVariableRequest(QUuid varRequestId)
716 {
717 // cleaning varRequestId
718 m_VarRequestIdToVarIdVarRequestMap.erase(varRequestId);
719
720 for (auto varIdToVarRequestIdQueueMapIt = m_VarIdToVarRequestIdQueueMap.begin();
721 varIdToVarRequestIdQueueMapIt != m_VarIdToVarRequestIdQueueMap.end();) {
722 auto &varRequestIdQueue = varIdToVarRequestIdQueueMapIt->second;
723 varRequestIdQueue.erase(
724 std::remove(varRequestIdQueue.begin(), varRequestIdQueue.end(), varRequestId),
725 varRequestIdQueue.end());
726 if (varRequestIdQueue.empty()) {
727 varIdToVarRequestIdQueueMapIt
728 = m_VarIdToVarRequestIdQueueMap.erase(varIdToVarRequestIdQueueMapIt);
729 }
730 else {
731 ++varIdToVarRequestIdQueueMapIt;
732 }
733 }
734 }
General Comments 0
You need to be logged in to leave comments. Login now