##// END OF EJS Templates
Improve synchro robustness.
perrinel -
r815:e8f1dd84704e
parent child
Show More
@@ -1,422 +1,436
1 1 #include "Variable/VariableAcquisitionWorker.h"
2 2
3 3 #include "Variable/Variable.h"
4 4
5 5 #include <Data/AcquisitionRequest.h>
6 6 #include <Data/SqpRange.h>
7 7
8 8 #include <unordered_map>
9 9 #include <utility>
10 10
11 11 #include <QMutex>
12 12 #include <QReadWriteLock>
13 13 #include <QThread>
14 14
15 15 #include <cmath>
16 16
17 17 Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker")
18 18
19 19 struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate {
20 20
21 21 explicit VariableAcquisitionWorkerPrivate(VariableAcquisitionWorker *parent)
22 22 : m_Lock{QReadWriteLock::Recursive}, q{parent}
23 23 {
24 24 }
25 25
26 26 void lockRead() { m_Lock.lockForRead(); }
27 27 void lockWrite() { m_Lock.lockForWrite(); }
28 28 void unlock() { m_Lock.unlock(); }
29 29
30 30 void removeVariableRequest(QUuid vIdentifier);
31 31
32 32 /// Remove the current request and execute the next one if exist
33 33 void updateToNextRequest(QUuid vIdentifier);
34 34
35 35 /// Remove and/or abort all AcqRequest in link with varRequestId
36 36 void cancelVarRequest(QUuid varRequestId);
37 37 void removeAcqRequest(QUuid acqRequestId);
38 38
39 39 QMutex m_WorkingMutex;
40 40 QReadWriteLock m_Lock;
41 41
42 42 std::map<QUuid, QVector<AcquisitionDataPacket> > m_AcqIdentifierToAcqDataPacketVectorMap;
43 43 std::map<QUuid, AcquisitionRequest> m_AcqIdentifierToAcqRequestMap;
44 44 std::map<QUuid, std::pair<QUuid, QUuid> > m_VIdentifierToCurrrentAcqIdNextIdPairMap;
45 45 VariableAcquisitionWorker *q;
46 46 };
47 47
48 48
49 49 VariableAcquisitionWorker::VariableAcquisitionWorker(QObject *parent)
50 50 : QObject{parent}, impl{spimpl::make_unique_impl<VariableAcquisitionWorkerPrivate>(this)}
51 51 {
52 52 }
53 53
54 54 VariableAcquisitionWorker::~VariableAcquisitionWorker()
55 55 {
56 56 qCInfo(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker destruction")
57 57 << QThread::currentThread();
58 58 this->waitForFinish();
59 59 }
60 60
61 61
62 62 QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid vIdentifier,
63 63 SqpRange rangeRequested,
64 64 SqpRange cacheRangeRequested,
65 65 DataProviderParameters parameters,
66 66 std::shared_ptr<IDataProvider> provider)
67 67 {
68 68 qCDebug(LOG_VariableAcquisitionWorker())
69 69 << tr("TORM VariableAcquisitionWorker::pushVariableRequest ") << cacheRangeRequested;
70 70 auto varRequestIdCanceled = QUuid();
71 71
72 72 // Request creation
73 73 auto acqRequest = AcquisitionRequest{};
74 qCInfo(LOG_VariableAcquisitionWorker()) << tr("TpushVariableRequest ") << vIdentifier
74 qCDebug(LOG_VariableAcquisitionWorker()) << tr("PushVariableRequest ") << vIdentifier
75 75 << varRequestId;
76 76 acqRequest.m_VarRequestId = varRequestId;
77 77 acqRequest.m_vIdentifier = vIdentifier;
78 78 acqRequest.m_DataProviderParameters = parameters;
79 79 acqRequest.m_RangeRequested = rangeRequested;
80 80 acqRequest.m_CacheRangeRequested = cacheRangeRequested;
81 81 acqRequest.m_Size = parameters.m_Times.size();
82 82 acqRequest.m_Provider = provider;
83 83
84 84
85 85 // Register request
86 86 impl->lockWrite();
87 87 impl->m_AcqIdentifierToAcqRequestMap.insert(
88 88 std::make_pair(acqRequest.m_AcqIdentifier, acqRequest));
89 89
90 90 auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
91 91 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
92 92 // A current request already exists, we can replace the next one
93 93 auto oldAcqId = it->second.second;
94 94 auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(oldAcqId);
95 95 if (acqIdentifierToAcqRequestMapIt != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
96 96 auto oldAcqRequest = acqIdentifierToAcqRequestMapIt->second;
97 97 varRequestIdCanceled = oldAcqRequest.m_VarRequestId;
98 98 }
99 99
100 100 it->second.second = acqRequest.m_AcqIdentifier;
101 101 impl->unlock();
102 102
103 103 // remove old acqIdentifier from the worker
104 impl->cancelVarRequest(oldAcqId);
104 impl->cancelVarRequest(varRequestIdCanceled);
105 105 // impl->m_AcqIdentifierToAcqRequestMap.erase(oldAcqId);
106 106 }
107 107 else {
108 108 // First request for the variable, it must be stored and executed
109 109 impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.insert(
110 110 std::make_pair(vIdentifier, std::make_pair(acqRequest.m_AcqIdentifier, QUuid())));
111 111 impl->unlock();
112 112
113 113 QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
114 114 Q_ARG(QUuid, acqRequest.m_AcqIdentifier));
115 115 }
116 116
117 117 return varRequestIdCanceled;
118 118 }
119 119
120 120 void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier)
121 121 {
122 122 impl->lockRead();
123 123
124 124 auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
125 125 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
126 126 auto currentAcqId = it->second.first;
127 127
128 128 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(currentAcqId);
129 129 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
130 130 auto request = it->second;
131 131 impl->unlock();
132 132
133 133 // Remove the current request from the worker
134 134 impl->updateToNextRequest(vIdentifier);
135 135
136 136 // notify the request aborting to the provider
137 137 request.m_Provider->requestDataAborting(currentAcqId);
138 138 }
139 139 else {
140 140 impl->unlock();
141 141 qCWarning(LOG_VariableAcquisitionWorker())
142 142 << tr("Impossible to abort an unknown acquisition request") << currentAcqId;
143 143 }
144 144 }
145 145 else {
146 146 impl->unlock();
147 147 }
148 148 }
149 149
150 150 void VariableAcquisitionWorker::onVariableRetrieveDataInProgress(QUuid acqIdentifier,
151 151 double progress)
152 152 {
153 153 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableRetrieveDataInProgress ")
154 154 << acqIdentifier << progress;
155 155 impl->lockRead();
156 156 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
157 157 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
158 158 auto currentPartSize = (aIdToARit->second.m_Size != 0) ? 100 / aIdToARit->second.m_Size : 0;
159 159
160 160 auto currentPartProgress
161 161 = std::isnan(progress) ? 0.0 : (progress * currentPartSize) / 100.0;
162 162 auto currentAlreadyProgress = aIdToARit->second.m_Progression * currentPartSize;
163 163
164 164 auto finalProgression = currentAlreadyProgress + currentPartProgress;
165 165 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, finalProgression);
166 166 qCDebug(LOG_VariableAcquisitionWorker())
167 167 << tr("TORM: onVariableRetrieveDataInProgress ")
168 168 << QThread::currentThread()->objectName() << aIdToARit->second.m_vIdentifier
169 169 << currentPartSize << currentAlreadyProgress << currentPartProgress << finalProgression;
170 170 if (finalProgression == 100.0) {
171 171 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, 0.0);
172 172 }
173 173 }
174 174 impl->unlock();
175 175 }
176 176
177 177 void VariableAcquisitionWorker::onVariableAcquisitionFailed(QUuid acqIdentifier)
178 178 {
179 179 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
180 180 << QThread::currentThread();
181 181 impl->lockRead();
182 182 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
183 183 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
184 184 auto request = it->second;
185 185 impl->unlock();
186 186 qCInfo(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
187 187 << acqIdentifier << request.m_vIdentifier
188 188 << QThread::currentThread();
189 189 emit variableCanceledRequested(request.m_vIdentifier);
190 190 }
191 191 else {
192 192 impl->unlock();
193 193 // TODO log no acqIdentifier recognized
194 194 }
195 195 }
196 196
197 197 void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier,
198 198 std::shared_ptr<IDataSeries> dataSeries,
199 199 SqpRange dataRangeAcquired)
200 200 {
201 201 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableDataAcquired on range ")
202 202 << acqIdentifier << dataRangeAcquired;
203 203 impl->lockWrite();
204 204 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
205 205 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
206 206 // Store the result
207 207 auto dataPacket = AcquisitionDataPacket{};
208 208 dataPacket.m_Range = dataRangeAcquired;
209 209 dataPacket.m_DateSeries = dataSeries;
210 210
211 211 auto aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
212 212 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
213 213 // A current request result already exists, we can update it
214 214 aIdToADPVit->second.push_back(dataPacket);
215 215 }
216 216 else {
217 217 // First request result for the variable, it must be stored
218 218 impl->m_AcqIdentifierToAcqDataPacketVectorMap.insert(
219 219 std::make_pair(acqIdentifier, QVector<AcquisitionDataPacket>() << dataPacket));
220 220 }
221 221
222 222
223 223 // Decrement the counter of the request
224 224 auto &acqRequest = aIdToARit->second;
225 225 acqRequest.m_Progression = acqRequest.m_Progression + 1;
226 226
227 227 // if the counter is 0, we can return data then run the next request if it exists and
228 228 // removed the finished request
229 229 if (acqRequest.m_Size == acqRequest.m_Progression) {
230 230 auto varId = acqRequest.m_vIdentifier;
231 231 auto rangeRequested = acqRequest.m_RangeRequested;
232 232 auto cacheRangeRequested = acqRequest.m_CacheRangeRequested;
233 233 // Return the data
234 234 aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
235 235 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
236 236 emit dataProvided(varId, rangeRequested, cacheRangeRequested, aIdToADPVit->second);
237 237 }
238 238 impl->unlock();
239 239
240 240 // Update to the next request
241 241 impl->updateToNextRequest(acqRequest.m_vIdentifier);
242 242 }
243 243 else {
244 244 impl->unlock();
245 245 }
246 246 }
247 247 else {
248 248 impl->unlock();
249 249 qCWarning(LOG_VariableAcquisitionWorker())
250 250 << tr("Impossible to retrieve AcquisitionRequest for the incoming data.");
251 251 }
252 252 }
253 253
254 254 void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier)
255 255 {
256 256 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onExecuteRequest") << QThread::currentThread();
257 257 impl->lockRead();
258 258 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
259 259 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
260 260 auto request = it->second;
261 261 impl->unlock();
262 262 emit variableRequestInProgress(request.m_vIdentifier, 0.1);
263 263 request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters);
264 264 }
265 265 else {
266 266 impl->unlock();
267 267 // TODO log no acqIdentifier recognized
268 268 }
269 269 }
270 270
271 271 void VariableAcquisitionWorker::initialize()
272 272 {
273 273 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init")
274 274 << QThread::currentThread();
275 275 impl->m_WorkingMutex.lock();
276 276 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init END");
277 277 }
278 278
279 279 void VariableAcquisitionWorker::finalize()
280 280 {
281 281 impl->m_WorkingMutex.unlock();
282 282 }
283 283
284 284 void VariableAcquisitionWorker::waitForFinish()
285 285 {
286 286 QMutexLocker locker{&impl->m_WorkingMutex};
287 287 }
288 288
289 289 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariableRequest(
290 290 QUuid vIdentifier)
291 291 {
292 292 lockWrite();
293 293 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
294 294
295 295 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
296 296 // A current request already exists, we can replace the next one
297 297
298 298 m_AcqIdentifierToAcqRequestMap.erase(it->second.first);
299 299 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.first);
300 300
301 301 m_AcqIdentifierToAcqRequestMap.erase(it->second.second);
302 302 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.second);
303 303 }
304 304 m_VIdentifierToCurrrentAcqIdNextIdPairMap.erase(vIdentifier);
305 305 unlock();
306 306 }
307 307
308 308 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::updateToNextRequest(
309 309 QUuid vIdentifier)
310 310 {
311 311 lockRead();
312 312 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
313 313 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
314 314 if (it->second.second.isNull()) {
315 315 unlock();
316 316 // There is no next request, we can remove the variable request
317 317 removeVariableRequest(vIdentifier);
318 318 }
319 319 else {
320 320 auto acqIdentifierToRemove = it->second.first;
321 321 // Move the next request to the current request
322 322 auto nextRequestId = it->second.second;
323 323 it->second.first = nextRequestId;
324 324 it->second.second = QUuid();
325 325 unlock();
326 326 // Remove AcquisitionRequest and results;
327 327 lockWrite();
328 328 m_AcqIdentifierToAcqRequestMap.erase(acqIdentifierToRemove);
329 329 m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqIdentifierToRemove);
330 330 unlock();
331 331 // Execute the current request
332 332 QMetaObject::invokeMethod(q, "onExecuteRequest", Qt::QueuedConnection,
333 333 Q_ARG(QUuid, nextRequestId));
334 334 }
335 335 }
336 336 else {
337 337 unlock();
338 338 qCCritical(LOG_VariableAcquisitionWorker())
339 339 << tr("Impossible to execute the acquisition on an unfound variable ");
340 340 }
341 341 }
342 342
343 343 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::cancelVarRequest(
344 344 QUuid varRequestId)
345 345 {
346 qInfo() << "VariableAcquisitionWorkerPrivate::cancelVarRequest 0";
346 qCCritical(LOG_VariableAcquisitionWorker())
347 << "VariableAcquisitionWorkerPrivate::cancelVarRequest 0";
347 348 lockRead();
348 349 // get all AcqIdentifier in link with varRequestId
349 350 QVector<QUuid> acqIdsToRm;
350 351 auto cend = m_AcqIdentifierToAcqRequestMap.cend();
351 352 for (auto it = m_AcqIdentifierToAcqRequestMap.cbegin(); it != cend; ++it) {
352 353 if (it->second.m_VarRequestId == varRequestId) {
353 354 acqIdsToRm << it->first;
354 355 }
355 356 }
356 357 unlock();
357 358 // run aborting or removing of acqIdsToRm
358 359
359 360 for (auto acqId : acqIdsToRm) {
360 361 removeAcqRequest(acqId);
361 362 }
362 qInfo() << "VariableAcquisitionWorkerPrivate::cancelVarRequest end";
363 qCCritical(LOG_VariableAcquisitionWorker())
364 << "VariableAcquisitionWorkerPrivate::cancelVarRequest end";
363 365 }
364 366
365 367 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeAcqRequest(
366 368 QUuid acqRequestId)
367 369 {
368 qInfo() << "VariableAcquisitionWorkerPrivate::removeAcqRequest 0";
370 qCDebug(LOG_VariableAcquisitionWorker())
371 << "VariableAcquisitionWorkerPrivate::removeAcqRequest 0";
369 372 QUuid vIdentifier;
370 373 std::shared_ptr<IDataProvider> provider;
371 374 lockRead();
372 375 auto acqIt = m_AcqIdentifierToAcqRequestMap.find(acqRequestId);
373 qInfo() << "VariableAcquisitionWorkerPrivate::removeAcqRequest 1";
376 qCDebug(LOG_VariableAcquisitionWorker())
377 << "VariableAcquisitionWorkerPrivate::removeAcqRequest 1";
374 378 if (acqIt != m_AcqIdentifierToAcqRequestMap.cend()) {
375 379 vIdentifier = acqIt->second.m_vIdentifier;
376 380 provider = acqIt->second.m_Provider;
377 381
378 qInfo() << "VariableAcquisitionWorkerPrivate::removeAcqRequest 2";
382 qCDebug(LOG_VariableAcquisitionWorker())
383 << "VariableAcquisitionWorkerPrivate::removeAcqRequest 2";
379 384 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
380 385 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
381 qInfo() << "VariableAcquisitionWorkerPrivate::removeAcqRequest 3";
386 qCDebug(LOG_VariableAcquisitionWorker())
387 << "VariableAcquisitionWorkerPrivate::removeAcqRequest 3";
382 388 if (it->second.first == acqRequestId) {
383 389 // acqRequest is currently running -> let's aborting it
384 qInfo() << "VariableAcquisitionWorkerPrivate::removeAcqRequest 4";
390 qCDebug(LOG_VariableAcquisitionWorker())
391 << "VariableAcquisitionWorkerPrivate::removeAcqRequest 4";
385 392 unlock();
386 393
387 394 // Remove the current request from the worker
388 395 updateToNextRequest(vIdentifier);
389 396
390 397 // notify the request aborting to the provider
391 398 provider->requestDataAborting(acqRequestId);
392 qInfo() << "VariableAcquisitionWorkerPrivate::removeAcqRequest 5";
399 qCDebug(LOG_VariableAcquisitionWorker())
400 << "VariableAcquisitionWorkerPrivate::removeAcqRequest 5";
393 401 }
394 402 else if (it->second.second == acqRequestId) {
395 403 it->second.second = QUuid();
396 qInfo() << "VariableAcquisitionWorkerPrivate::removeAcqRequest 6";
404 qCDebug(LOG_VariableAcquisitionWorker())
405 << "VariableAcquisitionWorkerPrivate::removeAcqRequest 6";
397 406 unlock();
398 407 }
399 408 else {
400 qInfo() << "VariableAcquisitionWorkerPrivate::removeAcqRequest 7";
409 qCDebug(LOG_VariableAcquisitionWorker())
410 << "VariableAcquisitionWorkerPrivate::removeAcqRequest 7";
401 411 unlock();
402 412 }
403 413 }
404 414 else {
405 qInfo() << "VariableAcquisitionWorkerPrivate::removeAcqRequest 8";
415 qCDebug(LOG_VariableAcquisitionWorker())
416 << "VariableAcquisitionWorkerPrivate::removeAcqRequest 8";
406 417 unlock();
407 418 }
408 419 }
409 420 else {
410 qInfo() << "VariableAcquisitionWorkerPrivate::removeAcqRequest 9";
421 qCDebug(LOG_VariableAcquisitionWorker())
422 << "VariableAcquisitionWorkerPrivate::removeAcqRequest 9";
411 423 unlock();
412 424 }
413 425
414 qInfo() << "VariableAcquisitionWorkerPrivate::removeAcqRequest 10";
426 qCDebug(LOG_VariableAcquisitionWorker())
427 << "VariableAcquisitionWorkerPrivate::removeAcqRequest 10";
415 428 lockWrite();
416 429
417 430 m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqRequestId);
418 431 m_AcqIdentifierToAcqRequestMap.erase(acqRequestId);
419 432
420 433 unlock();
421 qInfo() << "VariableAcquisitionWorkerPrivate::removeAcqRequest 11";
434 qCDebug(LOG_VariableAcquisitionWorker())
435 << "VariableAcquisitionWorkerPrivate::removeAcqRequest 11";
422 436 }
@@ -1,901 +1,901
1 1 #include <Variable/Variable.h>
2 2 #include <Variable/VariableAcquisitionWorker.h>
3 3 #include <Variable/VariableCacheStrategy.h>
4 4 #include <Variable/VariableCacheStrategyFactory.h>
5 5 #include <Variable/VariableController.h>
6 6 #include <Variable/VariableModel.h>
7 7 #include <Variable/VariableSynchronizationGroup.h>
8 8
9 9 #include <Data/DataProviderParameters.h>
10 10 #include <Data/IDataProvider.h>
11 11 #include <Data/IDataSeries.h>
12 12 #include <Data/VariableRequest.h>
13 13 #include <Time/TimeController.h>
14 14
15 15 #include <QMutex>
16 16 #include <QThread>
17 17 #include <QUuid>
18 18 #include <QtCore/QItemSelectionModel>
19 19
20 20 #include <deque>
21 21 #include <set>
22 22 #include <unordered_map>
23 23
24 24 Q_LOGGING_CATEGORY(LOG_VariableController, "VariableController")
25 25
26 26 namespace {
27 27
28 28 SqpRange computeSynchroRangeRequested(const SqpRange &varRange, const SqpRange &graphRange,
29 29 const SqpRange &oldGraphRange)
30 30 {
31 31 auto zoomType = VariableController::getZoomType(graphRange, oldGraphRange);
32 32
33 33 auto varRangeRequested = varRange;
34 34 switch (zoomType) {
35 35 case AcquisitionZoomType::ZoomIn: {
36 36 auto deltaLeft = graphRange.m_TStart - oldGraphRange.m_TStart;
37 37 auto deltaRight = oldGraphRange.m_TEnd - graphRange.m_TEnd;
38 38 varRangeRequested.m_TStart += deltaLeft;
39 39 varRangeRequested.m_TEnd -= deltaRight;
40 40 break;
41 41 }
42 42
43 43 case AcquisitionZoomType::ZoomOut: {
44 44 auto deltaLeft = oldGraphRange.m_TStart - graphRange.m_TStart;
45 45 auto deltaRight = graphRange.m_TEnd - oldGraphRange.m_TEnd;
46 46 varRangeRequested.m_TStart -= deltaLeft;
47 47 varRangeRequested.m_TEnd += deltaRight;
48 48 break;
49 49 }
50 50 case AcquisitionZoomType::PanRight: {
51 51 auto deltaLeft = graphRange.m_TStart - oldGraphRange.m_TStart;
52 52 auto deltaRight = graphRange.m_TEnd - oldGraphRange.m_TEnd;
53 53 varRangeRequested.m_TStart += deltaLeft;
54 54 varRangeRequested.m_TEnd += deltaRight;
55 55 break;
56 56 }
57 57 case AcquisitionZoomType::PanLeft: {
58 58 auto deltaLeft = oldGraphRange.m_TStart - graphRange.m_TStart;
59 59 auto deltaRight = oldGraphRange.m_TEnd - graphRange.m_TEnd;
60 60 varRangeRequested.m_TStart -= deltaLeft;
61 61 varRangeRequested.m_TEnd -= deltaRight;
62 62 break;
63 63 }
64 64 case AcquisitionZoomType::Unknown: {
65 65 qCCritical(LOG_VariableController())
66 66 << VariableController::tr("Impossible to synchronize: zoom type unknown");
67 67 break;
68 68 }
69 69 default:
70 70 qCCritical(LOG_VariableController()) << VariableController::tr(
71 71 "Impossible to synchronize: zoom type not take into account");
72 72 // No action
73 73 break;
74 74 }
75 75
76 76 return varRangeRequested;
77 77 }
78 78 }
79 79
80 80 struct VariableController::VariableControllerPrivate {
81 81 explicit VariableControllerPrivate(VariableController *parent)
82 82 : m_WorkingMutex{},
83 83 m_VariableModel{new VariableModel{parent}},
84 84 m_VariableSelectionModel{new QItemSelectionModel{m_VariableModel, parent}},
85 85 // m_VariableCacheStrategy{std::make_unique<VariableCacheStrategy>()},
86 86 m_VariableCacheStrategy{VariableCacheStrategyFactory::createCacheStrategy(
87 87 CacheStrategy::SingleThreshold)},
88 88 m_VariableAcquisitionWorker{std::make_unique<VariableAcquisitionWorker>()},
89 89 q{parent}
90 90 {
91 91
92 92 m_VariableAcquisitionWorker->moveToThread(&m_VariableAcquisitionWorkerThread);
93 93 m_VariableAcquisitionWorkerThread.setObjectName("VariableAcquisitionWorkerThread");
94 94 }
95 95
96 96
97 97 virtual ~VariableControllerPrivate()
98 98 {
99 99 qCDebug(LOG_VariableController()) << tr("VariableControllerPrivate destruction");
100 100 m_VariableAcquisitionWorkerThread.quit();
101 101 m_VariableAcquisitionWorkerThread.wait();
102 102 }
103 103
104 104
105 105 void processRequest(std::shared_ptr<Variable> var, const SqpRange &rangeRequested,
106 106 QUuid varRequestId);
107 107
108 108 QVector<SqpRange> provideNotInCacheDateTimeList(std::shared_ptr<Variable> variable,
109 109 const SqpRange &dateTime);
110 110
111 111 std::shared_ptr<Variable> findVariable(QUuid vIdentifier);
112 112 std::shared_ptr<IDataSeries>
113 113 retrieveDataSeries(const QVector<AcquisitionDataPacket> acqDataPacketVector);
114 114
115 115 void registerProvider(std::shared_ptr<IDataProvider> provider);
116 116
117 117 void storeVariableRequest(QUuid varId, QUuid varRequestId, const VariableRequest &varRequest);
118 118 QUuid acceptVariableRequest(QUuid varId, std::shared_ptr<IDataSeries> dataSeries);
119 119 void updateVariableRequest(QUuid varRequestId);
120 120 void cancelVariableRequest(QUuid varRequestId);
121 121
122 122 SqpRange getLastRequestedRange(QUuid varId);
123 123
124 124 QMutex m_WorkingMutex;
125 125 /// Variable model. The VariableController has the ownership
126 126 VariableModel *m_VariableModel;
127 127 QItemSelectionModel *m_VariableSelectionModel;
128 128
129 129
130 130 TimeController *m_TimeController{nullptr};
131 131 std::unique_ptr<VariableCacheStrategy> m_VariableCacheStrategy;
132 132 std::unique_ptr<VariableAcquisitionWorker> m_VariableAcquisitionWorker;
133 133 QThread m_VariableAcquisitionWorkerThread;
134 134
135 135 std::unordered_map<std::shared_ptr<Variable>, std::shared_ptr<IDataProvider> >
136 136 m_VariableToProviderMap;
137 137 std::unordered_map<std::shared_ptr<Variable>, QUuid> m_VariableToIdentifierMap;
138 138 std::map<QUuid, std::shared_ptr<VariableSynchronizationGroup> >
139 139 m_GroupIdToVariableSynchronizationGroupMap;
140 140 std::map<QUuid, QUuid> m_VariableIdGroupIdMap;
141 141 std::set<std::shared_ptr<IDataProvider> > m_ProviderSet;
142 142
143 143 std::map<QUuid, std::map<QUuid, VariableRequest> > m_VarRequestIdToVarIdVarRequestMap;
144 144
145 145 std::map<QUuid, std::deque<QUuid> > m_VarIdToVarRequestIdQueueMap;
146 146
147 147
148 148 VariableController *q;
149 149 };
150 150
151 151
152 152 VariableController::VariableController(QObject *parent)
153 153 : QObject{parent}, impl{spimpl::make_unique_impl<VariableControllerPrivate>(this)}
154 154 {
155 155 qCDebug(LOG_VariableController()) << tr("VariableController construction")
156 156 << QThread::currentThread();
157 157
158 158 connect(impl->m_VariableModel, &VariableModel::abortProgessRequested, this,
159 159 &VariableController::onAbortProgressRequested);
160 160
161 161 connect(impl->m_VariableAcquisitionWorker.get(),
162 162 &VariableAcquisitionWorker::variableCanceledRequested, this,
163 163 &VariableController::onAbortAcquisitionRequested);
164 164
165 165 connect(impl->m_VariableAcquisitionWorker.get(), &VariableAcquisitionWorker::dataProvided, this,
166 166 &VariableController::onDataProvided);
167 167 connect(impl->m_VariableAcquisitionWorker.get(),
168 168 &VariableAcquisitionWorker::variableRequestInProgress, this,
169 169 &VariableController::onVariableRetrieveDataInProgress);
170 170
171 171
172 172 connect(&impl->m_VariableAcquisitionWorkerThread, &QThread::started,
173 173 impl->m_VariableAcquisitionWorker.get(), &VariableAcquisitionWorker::initialize);
174 174 connect(&impl->m_VariableAcquisitionWorkerThread, &QThread::finished,
175 175 impl->m_VariableAcquisitionWorker.get(), &VariableAcquisitionWorker::finalize);
176 176
177 177
178 178 impl->m_VariableAcquisitionWorkerThread.start();
179 179 }
180 180
181 181 VariableController::~VariableController()
182 182 {
183 183 qCDebug(LOG_VariableController()) << tr("VariableController destruction")
184 184 << QThread::currentThread();
185 185 this->waitForFinish();
186 186 }
187 187
188 188 VariableModel *VariableController::variableModel() noexcept
189 189 {
190 190 return impl->m_VariableModel;
191 191 }
192 192
193 193 QItemSelectionModel *VariableController::variableSelectionModel() noexcept
194 194 {
195 195 return impl->m_VariableSelectionModel;
196 196 }
197 197
198 198 void VariableController::setTimeController(TimeController *timeController) noexcept
199 199 {
200 200 impl->m_TimeController = timeController;
201 201 }
202 202
203 203 std::shared_ptr<Variable>
204 204 VariableController::cloneVariable(std::shared_ptr<Variable> variable) noexcept
205 205 {
206 206 if (impl->m_VariableModel->containsVariable(variable)) {
207 207 // Clones variable
208 208 auto duplicate = variable->clone();
209 209
210 210 // Adds clone to model
211 211 impl->m_VariableModel->addVariable(duplicate);
212 212
213 213 // Generates clone identifier
214 214 impl->m_VariableToIdentifierMap[duplicate] = QUuid::createUuid();
215 215
216 216 // Registers provider
217 217 auto variableProvider = impl->m_VariableToProviderMap.at(variable);
218 218 auto duplicateProvider = variableProvider != nullptr ? variableProvider->clone() : nullptr;
219 219
220 220 impl->m_VariableToProviderMap[duplicate] = duplicateProvider;
221 221 if (duplicateProvider) {
222 222 impl->registerProvider(duplicateProvider);
223 223 }
224 224
225 225 return duplicate;
226 226 }
227 227 else {
228 228 qCCritical(LOG_VariableController())
229 229 << tr("Can't create duplicate of variable %1: variable not registered in the model")
230 230 .arg(variable->name());
231 231 return nullptr;
232 232 }
233 233 }
234 234
235 235 void VariableController::deleteVariable(std::shared_ptr<Variable> variable) noexcept
236 236 {
237 237 if (!variable) {
238 238 qCCritical(LOG_VariableController()) << "Can't delete variable: variable is null";
239 239 return;
240 240 }
241 241
242 242 // Spreads in SciQlop that the variable will be deleted, so that potential receivers can
243 243 // make some treatments before the deletion
244 244 emit variableAboutToBeDeleted(variable);
245 245
246 246 // Deletes identifier
247 247 impl->m_VariableToIdentifierMap.erase(variable);
248 248
249 249 // Deletes provider
250 250 auto nbProvidersDeleted = impl->m_VariableToProviderMap.erase(variable);
251 251 qCDebug(LOG_VariableController())
252 252 << tr("Number of providers deleted for variable %1: %2")
253 253 .arg(variable->name(), QString::number(nbProvidersDeleted));
254 254
255 255
256 256 // Deletes from model
257 257 impl->m_VariableModel->deleteVariable(variable);
258 258 }
259 259
260 260 void VariableController::deleteVariables(
261 261 const QVector<std::shared_ptr<Variable> > &variables) noexcept
262 262 {
263 263 for (auto variable : qAsConst(variables)) {
264 264 deleteVariable(variable);
265 265 }
266 266 }
267 267
268 268 std::shared_ptr<Variable>
269 269 VariableController::createVariable(const QString &name, const QVariantHash &metadata,
270 270 std::shared_ptr<IDataProvider> provider) noexcept
271 271 {
272 272 if (!impl->m_TimeController) {
273 273 qCCritical(LOG_VariableController())
274 274 << tr("Impossible to create variable: The time controller is null");
275 275 return nullptr;
276 276 }
277 277
278 278 auto range = impl->m_TimeController->dateTime();
279 279
280 280 if (auto newVariable = impl->m_VariableModel->createVariable(name, metadata)) {
281 281 auto identifier = QUuid::createUuid();
282 282
283 283 // store the provider
284 284 impl->registerProvider(provider);
285 285
286 286 // Associate the provider
287 287 impl->m_VariableToProviderMap[newVariable] = provider;
288 288 qCInfo(LOG_VariableController()) << "createVariable: " << identifier;
289 289 impl->m_VariableToIdentifierMap[newVariable] = identifier;
290 290
291 291
292 292 auto varRequestId = QUuid::createUuid();
293 293 impl->processRequest(newVariable, range, varRequestId);
294 294 impl->updateVariableRequest(varRequestId);
295 295
296 296 return newVariable;
297 297 }
298 298 }
299 299
300 300 void VariableController::onDateTimeOnSelection(const SqpRange &dateTime)
301 301 {
302 302 // NOTE: Even if acquisition request is aborting, the graphe range will be changed
303 303 qCDebug(LOG_VariableController()) << "VariableController::onDateTimeOnSelection"
304 304 << QThread::currentThread()->objectName();
305 305 auto selectedRows = impl->m_VariableSelectionModel->selectedRows();
306 306 auto variables = QVector<std::shared_ptr<Variable> >{};
307 307
308 308 for (const auto &selectedRow : qAsConst(selectedRows)) {
309 309 if (auto selectedVariable = impl->m_VariableModel->variable(selectedRow.row())) {
310 310 variables << selectedVariable;
311 311
312 312 // notify that rescale operation has to be done
313 313 emit rangeChanged(selectedVariable, dateTime);
314 314 }
315 315 }
316 316
317 317 if (!variables.isEmpty()) {
318 318 this->onRequestDataLoading(variables, dateTime, true);
319 319 }
320 320 }
321 321
322 322 void VariableController::onDataProvided(QUuid vIdentifier, const SqpRange &rangeRequested,
323 323 const SqpRange &cacheRangeRequested,
324 324 QVector<AcquisitionDataPacket> dataAcquired)
325 325 {
326 326 auto retrievedDataSeries = impl->retrieveDataSeries(dataAcquired);
327 327 auto varRequestId = impl->acceptVariableRequest(vIdentifier, retrievedDataSeries);
328 328 if (!varRequestId.isNull()) {
329 329 impl->updateVariableRequest(varRequestId);
330 330 }
331 331 }
332 332
333 333 void VariableController::onVariableRetrieveDataInProgress(QUuid identifier, double progress)
334 334 {
335 335 qCDebug(LOG_VariableController())
336 336 << "TORM: variableController::onVariableRetrieveDataInProgress"
337 337 << QThread::currentThread()->objectName() << progress;
338 338 if (auto var = impl->findVariable(identifier)) {
339 339 impl->m_VariableModel->setDataProgress(var, progress);
340 340 }
341 341 else {
342 342 qCCritical(LOG_VariableController())
343 343 << tr("Impossible to notify progression of a null variable");
344 344 }
345 345 }
346 346
347 347 void VariableController::onAbortProgressRequested(std::shared_ptr<Variable> variable)
348 348 {
349 349 auto it = impl->m_VariableToIdentifierMap.find(variable);
350 350 if (it != impl->m_VariableToIdentifierMap.cend()) {
351 351 impl->m_VariableAcquisitionWorker->abortProgressRequested(it->second);
352 352
353 353 QUuid varRequestId;
354 354 auto varIdToVarRequestIdQueueMapIt = impl->m_VarIdToVarRequestIdQueueMap.find(it->second);
355 355 if (varIdToVarRequestIdQueueMapIt != impl->m_VarIdToVarRequestIdQueueMap.cend()) {
356 356 auto &varRequestIdQueue = varIdToVarRequestIdQueueMapIt->second;
357 357 varRequestId = varRequestIdQueue.front();
358 358 impl->cancelVariableRequest(varRequestId);
359 359
360 360 // Finish the progression for the request
361 361 impl->m_VariableModel->setDataProgress(variable, 0.0);
362 362 }
363 363 else {
364 364 qCWarning(LOG_VariableController())
365 365 << tr("Aborting progression of inexistant variable request detected !!!")
366 366 << QThread::currentThread()->objectName();
367 367 }
368 368 }
369 369 else {
370 370 qCWarning(LOG_VariableController())
371 371 << tr("Aborting progression of inexistant variable detected !!!")
372 372 << QThread::currentThread()->objectName();
373 373 }
374 374 }
375 375
376 376 void VariableController::onAbortAcquisitionRequested(QUuid vIdentifier)
377 377 {
378 378 qCDebug(LOG_VariableController()) << "TORM: variableController::onAbortAcquisitionRequested"
379 379 << QThread::currentThread()->objectName() << vIdentifier;
380 380
381 381 if (auto var = impl->findVariable(vIdentifier)) {
382 382 this->onAbortProgressRequested(var);
383 383 }
384 384 else {
385 385 qCCritical(LOG_VariableController())
386 386 << tr("Impossible to abort Acquisition Requestof a null variable");
387 387 }
388 388 }
389 389
390 390 void VariableController::onAddSynchronizationGroupId(QUuid synchronizationGroupId)
391 391 {
392 392 qCDebug(LOG_VariableController()) << "TORM: VariableController::onAddSynchronizationGroupId"
393 393 << QThread::currentThread()->objectName()
394 394 << synchronizationGroupId;
395 395 auto vSynchroGroup = std::make_shared<VariableSynchronizationGroup>();
396 396 impl->m_GroupIdToVariableSynchronizationGroupMap.insert(
397 397 std::make_pair(synchronizationGroupId, vSynchroGroup));
398 398 }
399 399
400 400 void VariableController::onRemoveSynchronizationGroupId(QUuid synchronizationGroupId)
401 401 {
402 402 impl->m_GroupIdToVariableSynchronizationGroupMap.erase(synchronizationGroupId);
403 403 }
404 404
405 405 void VariableController::onAddSynchronized(std::shared_ptr<Variable> variable,
406 406 QUuid synchronizationGroupId)
407 407
408 408 {
409 409 qCDebug(LOG_VariableController()) << "TORM: VariableController::onAddSynchronized"
410 410 << synchronizationGroupId;
411 411 auto varToVarIdIt = impl->m_VariableToIdentifierMap.find(variable);
412 412 if (varToVarIdIt != impl->m_VariableToIdentifierMap.cend()) {
413 413 auto groupIdToVSGIt
414 414 = impl->m_GroupIdToVariableSynchronizationGroupMap.find(synchronizationGroupId);
415 415 if (groupIdToVSGIt != impl->m_GroupIdToVariableSynchronizationGroupMap.cend()) {
416 416 impl->m_VariableIdGroupIdMap.insert(
417 417 std::make_pair(varToVarIdIt->second, synchronizationGroupId));
418 418 groupIdToVSGIt->second->addVariableId(varToVarIdIt->second);
419 419 }
420 420 else {
421 421 qCCritical(LOG_VariableController())
422 422 << tr("Impossible to synchronize a variable with an unknown sycnhronization group")
423 423 << variable->name();
424 424 }
425 425 }
426 426 else {
427 427 qCCritical(LOG_VariableController())
428 428 << tr("Impossible to synchronize a variable with no identifier") << variable->name();
429 429 }
430 430 }
431 431
432 432 void VariableController::desynchronize(std::shared_ptr<Variable> variable,
433 433 QUuid synchronizationGroupId)
434 434 {
435 435 // Gets variable id
436 436 auto variableIt = impl->m_VariableToIdentifierMap.find(variable);
437 437 if (variableIt == impl->m_VariableToIdentifierMap.cend()) {
438 438 qCCritical(LOG_VariableController())
439 439 << tr("Can't desynchronize variable %1: variable identifier not found")
440 440 .arg(variable->name());
441 441 return;
442 442 }
443 443
444 444 // Gets synchronization group
445 445 auto groupIt = impl->m_GroupIdToVariableSynchronizationGroupMap.find(synchronizationGroupId);
446 446 if (groupIt == impl->m_GroupIdToVariableSynchronizationGroupMap.cend()) {
447 447 qCCritical(LOG_VariableController())
448 448 << tr("Can't desynchronize variable %1: unknown synchronization group")
449 449 .arg(variable->name());
450 450 return;
451 451 }
452 452
453 453 auto variableId = variableIt->second;
454 454
455 455 // Removes variable from synchronization group
456 456 auto synchronizationGroup = groupIt->second;
457 457 synchronizationGroup->removeVariableId(variableId);
458 458
459 459 // Removes link between variable and synchronization group
460 460 impl->m_VariableIdGroupIdMap.erase(variableId);
461 461 }
462 462
463 463 void VariableController::onRequestDataLoading(QVector<std::shared_ptr<Variable> > variables,
464 464 const SqpRange &range, bool synchronise)
465 465 {
466 466 // NOTE: oldRange isn't really necessary since oldRange == variable->range().
467 467
468 468 // we want to load data of the variable for the dateTime.
469 469 // First we check if the cache contains some of them.
470 470 // For the other, we ask the provider to give them.
471 471
472 472 auto varRequestId = QUuid::createUuid();
473 473 qCDebug(LOG_VariableController()) << "VariableController::onRequestDataLoading"
474 474 << QThread::currentThread()->objectName() << varRequestId;
475 475
476 476 for (const auto &var : variables) {
477 477 qCDebug(LOG_VariableController()) << "processRequest for" << var->name() << varRequestId;
478 478 impl->processRequest(var, range, varRequestId);
479 479 }
480 480
481 481 if (synchronise) {
482 482 // Get the group ids
483 483 qCDebug(LOG_VariableController())
484 484 << "TORM VariableController::onRequestDataLoading for synchro var ENABLE";
485 485 auto groupIds = std::set<QUuid>{};
486 486 auto groupIdToOldRangeMap = std::map<QUuid, SqpRange>{};
487 487 for (const auto &var : variables) {
488 488 auto varToVarIdIt = impl->m_VariableToIdentifierMap.find(var);
489 489 if (varToVarIdIt != impl->m_VariableToIdentifierMap.cend()) {
490 490 auto vId = varToVarIdIt->second;
491 491 auto varIdToGroupIdIt = impl->m_VariableIdGroupIdMap.find(vId);
492 492 if (varIdToGroupIdIt != impl->m_VariableIdGroupIdMap.cend()) {
493 493 auto gId = varIdToGroupIdIt->second;
494 494 groupIdToOldRangeMap.insert(std::make_pair(gId, var->range()));
495 495 if (groupIds.find(gId) == groupIds.cend()) {
496 496 qCDebug(LOG_VariableController()) << "Synchro detect group " << gId;
497 497 groupIds.insert(gId);
498 498 }
499 499 }
500 500 }
501 501 }
502 502
503 503 // We assume here all group ids exist
504 504 for (const auto &gId : groupIds) {
505 505 auto vSynchronizationGroup = impl->m_GroupIdToVariableSynchronizationGroupMap.at(gId);
506 506 auto vSyncIds = vSynchronizationGroup->getIds();
507 507 qCDebug(LOG_VariableController()) << "Var in synchro group ";
508 508 for (auto vId : vSyncIds) {
509 509 auto var = impl->findVariable(vId);
510 510
511 511 // Don't process already processed var
512 512 if (!variables.contains(var)) {
513 513 if (var != nullptr) {
514 514 qCDebug(LOG_VariableController()) << "processRequest synchro for"
515 515 << var->name();
516 516 auto vSyncRangeRequested = computeSynchroRangeRequested(
517 517 var->range(), range, groupIdToOldRangeMap.at(gId));
518 518 qCDebug(LOG_VariableController()) << "synchro RR" << vSyncRangeRequested;
519 519 impl->processRequest(var, vSyncRangeRequested, varRequestId);
520 520 }
521 521 else {
522 522 qCCritical(LOG_VariableController())
523 523
524 524 << tr("Impossible to synchronize a null variable");
525 525 }
526 526 }
527 527 }
528 528 }
529 529 }
530 530
531 531 impl->updateVariableRequest(varRequestId);
532 532 }
533 533
534 534
535 535 void VariableController::initialize()
536 536 {
537 537 qCDebug(LOG_VariableController()) << tr("VariableController init") << QThread::currentThread();
538 538 impl->m_WorkingMutex.lock();
539 539 qCDebug(LOG_VariableController()) << tr("VariableController init END");
540 540 }
541 541
542 542 void VariableController::finalize()
543 543 {
544 544 impl->m_WorkingMutex.unlock();
545 545 }
546 546
547 547 void VariableController::waitForFinish()
548 548 {
549 549 QMutexLocker locker{&impl->m_WorkingMutex};
550 550 }
551 551
552 552 AcquisitionZoomType VariableController::getZoomType(const SqpRange &range, const SqpRange &oldRange)
553 553 {
554 554 // t1.m_TStart <= t2.m_TStart && t2.m_TEnd <= t1.m_TEnd
555 555 auto zoomType = AcquisitionZoomType::Unknown;
556 556 if (range.m_TStart <= oldRange.m_TStart && oldRange.m_TEnd <= range.m_TEnd) {
557 qCCritical(LOG_VariableController()) << "zoomtype: ZoomOut";
557 qCDebug(LOG_VariableController()) << "zoomtype: ZoomOut";
558 558 zoomType = AcquisitionZoomType::ZoomOut;
559 559 }
560 560 else if (range.m_TStart > oldRange.m_TStart && range.m_TEnd > oldRange.m_TEnd) {
561 qCCritical(LOG_VariableController()) << "zoomtype: PanRight";
561 qCDebug(LOG_VariableController()) << "zoomtype: PanRight";
562 562 zoomType = AcquisitionZoomType::PanRight;
563 563 }
564 564 else if (range.m_TStart < oldRange.m_TStart && range.m_TEnd < oldRange.m_TEnd) {
565 qCCritical(LOG_VariableController()) << "zoomtype: PanLeft";
565 qCDebug(LOG_VariableController()) << "zoomtype: PanLeft";
566 566 zoomType = AcquisitionZoomType::PanLeft;
567 567 }
568 568 else if (range.m_TStart > oldRange.m_TStart && oldRange.m_TEnd > range.m_TEnd) {
569 qCCritical(LOG_VariableController()) << "zoomtype: ZoomIn";
569 qCDebug(LOG_VariableController()) << "zoomtype: ZoomIn";
570 570 zoomType = AcquisitionZoomType::ZoomIn;
571 571 }
572 572 else {
573 qCCritical(LOG_VariableController()) << "getZoomType: Unknown type detected";
573 qCDebug(LOG_VariableController()) << "getZoomType: Unknown type detected";
574 574 }
575 575 return zoomType;
576 576 }
577 577
578 578 void VariableController::VariableControllerPrivate::processRequest(std::shared_ptr<Variable> var,
579 579 const SqpRange &rangeRequested,
580 580 QUuid varRequestId)
581 581 {
582 582 auto varRequest = VariableRequest{};
583 583
584 584 auto it = m_VariableToIdentifierMap.find(var);
585 585 if (it != m_VariableToIdentifierMap.cend()) {
586 586
587 587 auto varId = it->second;
588 588
589 589 auto oldRange = getLastRequestedRange(varId);
590 590
591 591 // check for update oldRange to the last request range.
592 592 if (oldRange == INVALID_RANGE) {
593 593 oldRange = var->range();
594 594 }
595 595
596 596 auto varStrategyRangesRequested
597 597 = m_VariableCacheStrategy->computeRange(oldRange, rangeRequested);
598 598
599 599 auto notInCacheRangeList = QVector<SqpRange>{varStrategyRangesRequested.second};
600 600 auto inCacheRangeList = QVector<SqpRange>{};
601 601 if (m_VarIdToVarRequestIdQueueMap.find(varId) == m_VarIdToVarRequestIdQueueMap.cend()) {
602 602 notInCacheRangeList
603 603 = var->provideNotInCacheRangeList(varStrategyRangesRequested.second);
604 604 inCacheRangeList = var->provideInCacheRangeList(varStrategyRangesRequested.second);
605 605 }
606 606
607 607 if (!notInCacheRangeList.empty()) {
608 608 varRequest.m_RangeRequested = varStrategyRangesRequested.first;
609 609 varRequest.m_CacheRangeRequested = varStrategyRangesRequested.second;
610 610
611 611 // store VarRequest
612 612 storeVariableRequest(varId, varRequestId, varRequest);
613 613
614 614 auto varProvider = m_VariableToProviderMap.at(var);
615 615 if (varProvider != nullptr) {
616 616 auto varRequestIdCanceled = m_VariableAcquisitionWorker->pushVariableRequest(
617 617 varRequestId, varId, varStrategyRangesRequested.first,
618 618 varStrategyRangesRequested.second,
619 619 DataProviderParameters{std::move(notInCacheRangeList), var->metadata()},
620 620 varProvider);
621 621
622 622 if (!varRequestIdCanceled.isNull()) {
623 623 qCInfo(LOG_VariableAcquisitionWorker()) << tr("varRequestIdCanceled: ")
624 624 << varRequestIdCanceled;
625 625 cancelVariableRequest(varRequestIdCanceled);
626 626 }
627 627 }
628 628 else {
629 629 qCCritical(LOG_VariableController())
630 630 << "Impossible to provide data with a null provider";
631 631 }
632 632
633 633 if (!inCacheRangeList.empty()) {
634 634 emit q->updateVarDisplaying(var, inCacheRangeList.first());
635 635 }
636 636 }
637 637 else {
638 638 varRequest.m_RangeRequested = varStrategyRangesRequested.first;
639 639 varRequest.m_CacheRangeRequested = varStrategyRangesRequested.second;
640 640 // store VarRequest
641 641 storeVariableRequest(varId, varRequestId, varRequest);
642 642 acceptVariableRequest(
643 643 varId, var->dataSeries()->subDataSeries(varStrategyRangesRequested.second));
644 644 }
645 645 }
646 646 }
647 647
648 648 std::shared_ptr<Variable>
649 649 VariableController::VariableControllerPrivate::findVariable(QUuid vIdentifier)
650 650 {
651 651 std::shared_ptr<Variable> var;
652 652 auto findReply = [vIdentifier](const auto &entry) { return vIdentifier == entry.second; };
653 653
654 654 auto end = m_VariableToIdentifierMap.cend();
655 655 auto it = std::find_if(m_VariableToIdentifierMap.cbegin(), end, findReply);
656 656 if (it != end) {
657 657 var = it->first;
658 658 }
659 659 else {
660 660 qCCritical(LOG_VariableController())
661 661 << tr("Impossible to find the variable with the identifier: ") << vIdentifier;
662 662 }
663 663
664 664 return var;
665 665 }
666 666
667 667 std::shared_ptr<IDataSeries> VariableController::VariableControllerPrivate::retrieveDataSeries(
668 668 const QVector<AcquisitionDataPacket> acqDataPacketVector)
669 669 {
670 670 qCDebug(LOG_VariableController()) << tr("TORM: retrieveDataSeries acqDataPacketVector size")
671 671 << acqDataPacketVector.size();
672 672 std::shared_ptr<IDataSeries> dataSeries;
673 673 if (!acqDataPacketVector.isEmpty()) {
674 674 dataSeries = acqDataPacketVector[0].m_DateSeries;
675 675 for (int i = 1; i < acqDataPacketVector.size(); ++i) {
676 676 dataSeries->merge(acqDataPacketVector[i].m_DateSeries.get());
677 677 }
678 678 }
679 679 qCDebug(LOG_VariableController()) << tr("TORM: retrieveDataSeries acqDataPacketVector size END")
680 680 << acqDataPacketVector.size();
681 681 return dataSeries;
682 682 }
683 683
684 684 void VariableController::VariableControllerPrivate::registerProvider(
685 685 std::shared_ptr<IDataProvider> provider)
686 686 {
687 687 if (m_ProviderSet.find(provider) == m_ProviderSet.end()) {
688 688 qCDebug(LOG_VariableController()) << tr("Registering of a new provider")
689 689 << provider->objectName();
690 690 m_ProviderSet.insert(provider);
691 691 connect(provider.get(), &IDataProvider::dataProvided, m_VariableAcquisitionWorker.get(),
692 692 &VariableAcquisitionWorker::onVariableDataAcquired);
693 693 connect(provider.get(), &IDataProvider::dataProvidedProgress,
694 694 m_VariableAcquisitionWorker.get(),
695 695 &VariableAcquisitionWorker::onVariableRetrieveDataInProgress);
696 696 connect(provider.get(), &IDataProvider::dataProvidedFailed,
697 697 m_VariableAcquisitionWorker.get(),
698 698 &VariableAcquisitionWorker::onVariableAcquisitionFailed);
699 699 }
700 700 else {
701 701 qCDebug(LOG_VariableController()) << tr("Cannot register provider, it already exists ");
702 702 }
703 703 }
704 704
705 705 void VariableController::VariableControllerPrivate::storeVariableRequest(
706 706 QUuid varId, QUuid varRequestId, const VariableRequest &varRequest)
707 707 {
708 708 // First request for the variable. we can create an entry for it
709 709 auto varIdToVarRequestIdQueueMapIt = m_VarIdToVarRequestIdQueueMap.find(varId);
710 710 if (varIdToVarRequestIdQueueMapIt == m_VarIdToVarRequestIdQueueMap.cend()) {
711 711 auto varRequestIdQueue = std::deque<QUuid>{};
712 712 qCDebug(LOG_VariableController()) << tr("Store REQUEST in QUEUE");
713 713 varRequestIdQueue.push_back(varRequestId);
714 714 m_VarIdToVarRequestIdQueueMap.insert(std::make_pair(varId, std::move(varRequestIdQueue)));
715 715 }
716 716 else {
717 717 qCDebug(LOG_VariableController()) << tr("Store REQUEST in EXISTING QUEUE");
718 718 auto &varRequestIdQueue = varIdToVarRequestIdQueueMapIt->second;
719 719 varRequestIdQueue.push_back(varRequestId);
720 720 }
721 721
722 722 auto varRequestIdToVarIdVarRequestMapIt = m_VarRequestIdToVarIdVarRequestMap.find(varRequestId);
723 723 if (varRequestIdToVarIdVarRequestMapIt == m_VarRequestIdToVarIdVarRequestMap.cend()) {
724 724 auto varIdToVarRequestMap = std::map<QUuid, VariableRequest>{};
725 725 varIdToVarRequestMap.insert(std::make_pair(varId, varRequest));
726 726 qCDebug(LOG_VariableController()) << tr("Store REQUESTID in MAP");
727 727 m_VarRequestIdToVarIdVarRequestMap.insert(
728 728 std::make_pair(varRequestId, std::move(varIdToVarRequestMap)));
729 729 }
730 730 else {
731 731 auto &varIdToVarRequestMap = varRequestIdToVarIdVarRequestMapIt->second;
732 732 qCDebug(LOG_VariableController()) << tr("Store REQUESTID in EXISTING MAP");
733 733 varIdToVarRequestMap.insert(std::make_pair(varId, varRequest));
734 734 }
735 735 }
736 736
737 737 QUuid VariableController::VariableControllerPrivate::acceptVariableRequest(
738 738 QUuid varId, std::shared_ptr<IDataSeries> dataSeries)
739 739 {
740 740 QUuid varRequestId;
741 741 auto varIdToVarRequestIdQueueMapIt = m_VarIdToVarRequestIdQueueMap.find(varId);
742 742 if (varIdToVarRequestIdQueueMapIt != m_VarIdToVarRequestIdQueueMap.cend()) {
743 743 auto &varRequestIdQueue = varIdToVarRequestIdQueueMapIt->second;
744 744 varRequestId = varRequestIdQueue.front();
745 745 auto varRequestIdToVarIdVarRequestMapIt
746 746 = m_VarRequestIdToVarIdVarRequestMap.find(varRequestId);
747 747 if (varRequestIdToVarIdVarRequestMapIt != m_VarRequestIdToVarIdVarRequestMap.cend()) {
748 748 auto &varIdToVarRequestMap = varRequestIdToVarIdVarRequestMapIt->second;
749 749 auto varIdToVarRequestMapIt = varIdToVarRequestMap.find(varId);
750 750 if (varIdToVarRequestMapIt != varIdToVarRequestMap.cend()) {
751 751 qCDebug(LOG_VariableController()) << tr("acceptVariableRequest");
752 752 auto &varRequest = varIdToVarRequestMapIt->second;
753 753 varRequest.m_DataSeries = dataSeries;
754 754 varRequest.m_CanUpdate = true;
755 755 }
756 756 else {
757 757 qCDebug(LOG_VariableController())
758 758 << tr("Impossible to acceptVariableRequest of a unknown variable id attached "
759 759 "to a variableRequestId")
760 760 << varRequestId << varId;
761 761 }
762 762 }
763 763 else {
764 764 qCCritical(LOG_VariableController())
765 765 << tr("Impossible to acceptVariableRequest of a unknown variableRequestId")
766 766 << varRequestId;
767 767 }
768 768
769 769 varRequestIdQueue.pop_front();
770 770 if (varRequestIdQueue.empty()) {
771 qCCritical(LOG_VariableController())
771 qCDebug(LOG_VariableController())
772 772 << tr("TORM Erase REQUEST because it has been accepted") << varId;
773 773 m_VarIdToVarRequestIdQueueMap.erase(varId);
774 774 }
775 775 }
776 776 else {
777 777 qCCritical(LOG_VariableController())
778 778 << tr("Impossible to acceptVariableRequest of a unknown variable id") << varId;
779 779 }
780 780
781 781 return varRequestId;
782 782 }
783 783
784 784 void VariableController::VariableControllerPrivate::updateVariableRequest(QUuid varRequestId)
785 785 {
786 786
787 787 auto varRequestIdToVarIdVarRequestMapIt = m_VarRequestIdToVarIdVarRequestMap.find(varRequestId);
788 788 if (varRequestIdToVarIdVarRequestMapIt != m_VarRequestIdToVarIdVarRequestMap.cend()) {
789 789 bool processVariableUpdate = true;
790 790 auto &varIdToVarRequestMap = varRequestIdToVarIdVarRequestMapIt->second;
791 791 for (auto varIdToVarRequestMapIt = varIdToVarRequestMap.cbegin();
792 792 (varIdToVarRequestMapIt != varIdToVarRequestMap.cend()) && processVariableUpdate;
793 793 ++varIdToVarRequestMapIt) {
794 794 processVariableUpdate &= varIdToVarRequestMapIt->second.m_CanUpdate;
795 795 qCDebug(LOG_VariableController()) << tr("updateVariableRequest")
796 796 << processVariableUpdate;
797 797 }
798 798
799 799 if (processVariableUpdate) {
800 800 for (auto varIdToVarRequestMapIt = varIdToVarRequestMap.cbegin();
801 801 varIdToVarRequestMapIt != varIdToVarRequestMap.cend(); ++varIdToVarRequestMapIt) {
802 802 if (auto var = findVariable(varIdToVarRequestMapIt->first)) {
803 803 auto &varRequest = varIdToVarRequestMapIt->second;
804 804 var->setRange(varRequest.m_RangeRequested);
805 805 var->setCacheRange(varRequest.m_CacheRangeRequested);
806 806 qCDebug(LOG_VariableController()) << tr("1: onDataProvided")
807 807 << varRequest.m_RangeRequested;
808 808 qCDebug(LOG_VariableController()) << tr("2: onDataProvided")
809 809 << varRequest.m_CacheRangeRequested;
810 810 var->mergeDataSeries(varRequest.m_DataSeries);
811 811 qCDebug(LOG_VariableController()) << tr("3: onDataProvided");
812 812
813 813 /// @todo MPL: confirm
814 814 // Variable update is notified only if there is no pending request for it
815 815 // if
816 816 // (m_VarIdToVarRequestIdQueueMap.count(varIdToVarRequestMapIt->first)
817 817 // == 0) {
818 818 emit var->updated();
819 819 // }
820 820 }
821 821 else {
822 822 qCCritical(LOG_VariableController())
823 823 << tr("Impossible to update data to a null variable");
824 824 }
825 825 }
826 826
827 827 // cleaning varRequestId
828 828 qCDebug(LOG_VariableController()) << tr("0: erase REQUEST in MAP ?")
829 829 << m_VarRequestIdToVarIdVarRequestMap.size();
830 830 m_VarRequestIdToVarIdVarRequestMap.erase(varRequestId);
831 831 qCDebug(LOG_VariableController()) << tr("1: erase REQUEST in MAP ?")
832 832 << m_VarRequestIdToVarIdVarRequestMap.size();
833 833 }
834 834 }
835 835 else {
836 836 qCCritical(LOG_VariableController())
837 837 << tr("Cannot updateVariableRequest for a unknow varRequestId") << varRequestId;
838 838 }
839 839 }
840 840
841 841 void VariableController::VariableControllerPrivate::cancelVariableRequest(QUuid varRequestId)
842 842 {
843 843 // cleaning varRequestId
844 844 m_VarRequestIdToVarIdVarRequestMap.erase(varRequestId);
845 845
846 846 for (auto varIdToVarRequestIdQueueMapIt = m_VarIdToVarRequestIdQueueMap.begin();
847 847 varIdToVarRequestIdQueueMapIt != m_VarIdToVarRequestIdQueueMap.end();) {
848 848 auto &varRequestIdQueue = varIdToVarRequestIdQueueMapIt->second;
849 849 varRequestIdQueue.erase(
850 850 std::remove(varRequestIdQueue.begin(), varRequestIdQueue.end(), varRequestId),
851 851 varRequestIdQueue.end());
852 852 if (varRequestIdQueue.empty()) {
853 853
854 854 qCCritical(LOG_VariableController())
855 855 << tr("VariableControllerPrivate::cancelVariableRequest")
856 856 << varIdToVarRequestIdQueueMapIt->first;
857 857 varIdToVarRequestIdQueueMapIt
858 858 = m_VarIdToVarRequestIdQueueMap.erase(varIdToVarRequestIdQueueMapIt);
859 859 }
860 860 else {
861 861 ++varIdToVarRequestIdQueueMapIt;
862 862 }
863 863 }
864 864 }
865 865
866 866 SqpRange VariableController::VariableControllerPrivate::getLastRequestedRange(QUuid varId)
867 867 {
868 868 auto lastRangeRequested = SqpRange{INVALID_RANGE};
869 869 auto varIdToVarRequestIdQueueMapIt = m_VarIdToVarRequestIdQueueMap.find(varId);
870 870 if (varIdToVarRequestIdQueueMapIt != m_VarIdToVarRequestIdQueueMap.cend()) {
871 871 auto &varRequestIdQueue = varIdToVarRequestIdQueueMapIt->second;
872 872 auto varRequestId = varRequestIdQueue.back();
873 873 auto varRequestIdToVarIdVarRequestMapIt
874 874 = m_VarRequestIdToVarIdVarRequestMap.find(varRequestId);
875 875 if (varRequestIdToVarIdVarRequestMapIt != m_VarRequestIdToVarIdVarRequestMap.cend()) {
876 876 auto &varIdToVarRequestMap = varRequestIdToVarIdVarRequestMapIt->second;
877 877 auto varIdToVarRequestMapIt = varIdToVarRequestMap.find(varId);
878 878 if (varIdToVarRequestMapIt != varIdToVarRequestMap.cend()) {
879 879 auto &varRequest = varIdToVarRequestMapIt->second;
880 880 lastRangeRequested = varRequest.m_RangeRequested;
881 881 }
882 882 else {
883 883 qCDebug(LOG_VariableController())
884 884 << tr("Impossible to getLastRequestedRange of a unknown variable id attached "
885 885 "to a variableRequestId")
886 886 << varRequestId << varId;
887 887 }
888 888 }
889 889 else {
890 890 qCCritical(LOG_VariableController())
891 891 << tr("Impossible to getLastRequestedRange of a unknown variableRequestId")
892 892 << varRequestId;
893 893 }
894 894 }
895 895 else {
896 896 qDebug(LOG_VariableController())
897 897 << tr("Impossible to getLastRequestedRange of a unknown variable id") << varId;
898 898 }
899 899
900 900 return lastRangeRequested;
901 901 }
General Comments 0
You need to be logged in to leave comments. Login now