##// END OF EJS Templates
Remove regression for aborting request during processRequest method
perrinel -
r827:83da67fa98c8
parent child
Show More
@@ -1,415 +1,416
1 1 #include "Variable/VariableAcquisitionWorker.h"
2 2
3 3 #include "Variable/Variable.h"
4 4
5 5 #include <Data/AcquisitionRequest.h>
6 6 #include <Data/SqpRange.h>
7 7
8 8 #include <unordered_map>
9 9 #include <utility>
10 10
11 11 #include <QMutex>
12 12 #include <QReadWriteLock>
13 13 #include <QThread>
14 14
15 15 #include <cmath>
16 16
17 17 Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker")
18 18
19 19 struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate {
20 20
21 21 explicit VariableAcquisitionWorkerPrivate(VariableAcquisitionWorker *parent)
22 22 : m_Lock{QReadWriteLock::Recursive}, q{parent}
23 23 {
24 24 }
25 25
26 26 void lockRead() { m_Lock.lockForRead(); }
27 27 void lockWrite() { m_Lock.lockForWrite(); }
28 28 void unlock() { m_Lock.unlock(); }
29 29
30 30 void removeVariableRequest(QUuid vIdentifier);
31 31
32 32 /// Remove the current request and execute the next one if exist
33 33 void updateToNextRequest(QUuid vIdentifier);
34 34
35 35 /// Remove and/or abort all AcqRequest in link with varRequestId
36 36 void cancelVarRequest(QUuid varRequestId);
37 37 void removeAcqRequest(QUuid acqRequestId);
38 38
39 39 QMutex m_WorkingMutex;
40 40 QReadWriteLock m_Lock;
41 41
42 42 std::map<QUuid, QVector<AcquisitionDataPacket> > m_AcqIdentifierToAcqDataPacketVectorMap;
43 43 std::map<QUuid, AcquisitionRequest> m_AcqIdentifierToAcqRequestMap;
44 44 std::map<QUuid, std::pair<QUuid, QUuid> > m_VIdentifierToCurrrentAcqIdNextIdPairMap;
45 45 VariableAcquisitionWorker *q;
46 46 };
47 47
48 48
49 49 VariableAcquisitionWorker::VariableAcquisitionWorker(QObject *parent)
50 50 : QObject{parent}, impl{spimpl::make_unique_impl<VariableAcquisitionWorkerPrivate>(this)}
51 51 {
52 52 }
53 53
54 54 VariableAcquisitionWorker::~VariableAcquisitionWorker()
55 55 {
56 56 qCInfo(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker destruction")
57 57 << QThread::currentThread();
58 58 this->waitForFinish();
59 59 }
60 60
61 61
62 62 QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid vIdentifier,
63 63 SqpRange rangeRequested,
64 64 SqpRange cacheRangeRequested,
65 65 DataProviderParameters parameters,
66 66 std::shared_ptr<IDataProvider> provider)
67 67 {
68 68 qCDebug(LOG_VariableAcquisitionWorker())
69 69 << tr("TORM VariableAcquisitionWorker::pushVariableRequest ") << cacheRangeRequested;
70 70 auto varRequestIdCanceled = QUuid();
71 71
72 72 // Request creation
73 73 auto acqRequest = AcquisitionRequest{};
74 74 qCDebug(LOG_VariableAcquisitionWorker()) << tr("PushVariableRequest ") << vIdentifier
75 75 << varRequestId;
76 76 acqRequest.m_VarRequestId = varRequestId;
77 77 acqRequest.m_vIdentifier = vIdentifier;
78 78 acqRequest.m_DataProviderParameters = parameters;
79 79 acqRequest.m_RangeRequested = rangeRequested;
80 80 acqRequest.m_CacheRangeRequested = cacheRangeRequested;
81 81 acqRequest.m_Size = parameters.m_Times.size();
82 82 acqRequest.m_Provider = provider;
83 83
84 84
85 85 // Register request
86 86 impl->lockWrite();
87 87 impl->m_AcqIdentifierToAcqRequestMap.insert(
88 88 std::make_pair(acqRequest.m_AcqIdentifier, acqRequest));
89 89
90 90 auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
91 91 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
92 92 // A current request already exists, we can replace the next one
93 93 auto oldAcqId = it->second.second;
94 94 auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(oldAcqId);
95 95 if (acqIdentifierToAcqRequestMapIt != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
96 96 auto oldAcqRequest = acqIdentifierToAcqRequestMapIt->second;
97 97 varRequestIdCanceled = oldAcqRequest.m_VarRequestId;
98 98 }
99 99
100 100 it->second.second = acqRequest.m_AcqIdentifier;
101 101 impl->unlock();
102 102
103 103 // remove old acqIdentifier from the worker
104 104 impl->cancelVarRequest(varRequestIdCanceled);
105 // impl->m_AcqIdentifierToAcqRequestMap.erase(oldAcqId);
105 106 }
106 107 else {
107 108 // First request for the variable, it must be stored and executed
108 109 impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.insert(
109 110 std::make_pair(vIdentifier, std::make_pair(acqRequest.m_AcqIdentifier, QUuid())));
110 111 impl->unlock();
111 112
112 113 QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
113 114 Q_ARG(QUuid, acqRequest.m_AcqIdentifier));
114 115 }
115 116
116 117 return varRequestIdCanceled;
117 118 }
118 119
119 120 void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier)
120 121 {
121 122 impl->lockRead();
122 123
123 124 auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
124 125 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
125 126 auto currentAcqId = it->second.first;
126 127
127 128 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(currentAcqId);
128 129 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
129 130 auto request = it->second;
130 131 impl->unlock();
131 132
132 133 // Remove the current request from the worker
133 134 impl->updateToNextRequest(vIdentifier);
134 135
135 136 // notify the request aborting to the provider
136 137 request.m_Provider->requestDataAborting(currentAcqId);
137 138 }
138 139 else {
139 140 impl->unlock();
140 141 qCWarning(LOG_VariableAcquisitionWorker())
141 142 << tr("Impossible to abort an unknown acquisition request") << currentAcqId;
142 143 }
143 144 }
144 145 else {
145 146 impl->unlock();
146 147 }
147 148 }
148 149
149 150 void VariableAcquisitionWorker::onVariableRetrieveDataInProgress(QUuid acqIdentifier,
150 151 double progress)
151 152 {
152 153 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableRetrieveDataInProgress ")
153 154 << acqIdentifier << progress;
154 155 impl->lockRead();
155 156 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
156 157 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
157 158 auto currentPartSize = (aIdToARit->second.m_Size != 0) ? 100 / aIdToARit->second.m_Size : 0;
158 159
159 160 auto currentPartProgress
160 161 = std::isnan(progress) ? 0.0 : (progress * currentPartSize) / 100.0;
161 162 auto currentAlreadyProgress = aIdToARit->second.m_Progression * currentPartSize;
162 163
163 164 auto finalProgression = currentAlreadyProgress + currentPartProgress;
164 165 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, finalProgression);
165 166 qCDebug(LOG_VariableAcquisitionWorker())
166 167 << tr("TORM: onVariableRetrieveDataInProgress ")
167 168 << QThread::currentThread()->objectName() << aIdToARit->second.m_vIdentifier
168 169 << currentPartSize << currentAlreadyProgress << currentPartProgress << finalProgression;
169 170 if (finalProgression == 100.0) {
170 171 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, 0.0);
171 172 }
172 173 }
173 174 impl->unlock();
174 175 }
175 176
176 177 void VariableAcquisitionWorker::onVariableAcquisitionFailed(QUuid acqIdentifier)
177 178 {
178 179 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
179 180 << QThread::currentThread();
180 181 impl->lockRead();
181 182 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
182 183 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
183 184 auto request = it->second;
184 185 impl->unlock();
185 186 qCInfo(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
186 187 << acqIdentifier << request.m_vIdentifier
187 188 << QThread::currentThread();
188 189 emit variableCanceledRequested(request.m_vIdentifier);
189 190 }
190 191 else {
191 192 impl->unlock();
192 193 // TODO log no acqIdentifier recognized
193 194 }
194 195 }
195 196
196 197 void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier,
197 198 std::shared_ptr<IDataSeries> dataSeries,
198 199 SqpRange dataRangeAcquired)
199 200 {
200 201 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableDataAcquired on range ")
201 202 << acqIdentifier << dataRangeAcquired;
202 203 impl->lockWrite();
203 204 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
204 205 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
205 206 // Store the result
206 207 auto dataPacket = AcquisitionDataPacket{};
207 208 dataPacket.m_Range = dataRangeAcquired;
208 209 dataPacket.m_DateSeries = dataSeries;
209 210
210 211 auto aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
211 212 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
212 213 // A current request result already exists, we can update it
213 214 aIdToADPVit->second.push_back(dataPacket);
214 215 }
215 216 else {
216 217 // First request result for the variable, it must be stored
217 218 impl->m_AcqIdentifierToAcqDataPacketVectorMap.insert(
218 219 std::make_pair(acqIdentifier, QVector<AcquisitionDataPacket>() << dataPacket));
219 220 }
220 221
221 222
222 223 // Decrement the counter of the request
223 224 auto &acqRequest = aIdToARit->second;
224 225 acqRequest.m_Progression = acqRequest.m_Progression + 1;
225 226
226 227 // if the counter is 0, we can return data then run the next request if it exists and
227 228 // removed the finished request
228 229 if (acqRequest.m_Size == acqRequest.m_Progression) {
229 230 auto varId = acqRequest.m_vIdentifier;
230 231 auto rangeRequested = acqRequest.m_RangeRequested;
231 232 auto cacheRangeRequested = acqRequest.m_CacheRangeRequested;
232 233 // Return the data
233 234 aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
234 235 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
235 236 emit dataProvided(varId, rangeRequested, cacheRangeRequested, aIdToADPVit->second);
236 237 }
237 238 impl->unlock();
238 239
239 240 // Update to the next request
240 241 impl->updateToNextRequest(acqRequest.m_vIdentifier);
241 242 }
242 243 else {
243 244 impl->unlock();
244 245 }
245 246 }
246 247 else {
247 248 impl->unlock();
248 249 qCWarning(LOG_VariableAcquisitionWorker())
249 250 << tr("Impossible to retrieve AcquisitionRequest for the incoming data.");
250 251 }
251 252 }
252 253
253 254 void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier)
254 255 {
255 256 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onExecuteRequest") << QThread::currentThread();
256 257 impl->lockRead();
257 258 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
258 259 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
259 260 auto request = it->second;
260 261 impl->unlock();
261 262 emit variableRequestInProgress(request.m_vIdentifier, 0.1);
262 263 request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters);
263 264 }
264 265 else {
265 266 impl->unlock();
266 267 // TODO log no acqIdentifier recognized
267 268 }
268 269 }
269 270
270 271 void VariableAcquisitionWorker::initialize()
271 272 {
272 273 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init")
273 274 << QThread::currentThread();
274 275 impl->m_WorkingMutex.lock();
275 276 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init END");
276 277 }
277 278
278 279 void VariableAcquisitionWorker::finalize()
279 280 {
280 281 impl->m_WorkingMutex.unlock();
281 282 }
282 283
283 284 void VariableAcquisitionWorker::waitForFinish()
284 285 {
285 286 QMutexLocker locker{&impl->m_WorkingMutex};
286 287 }
287 288
288 289 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariableRequest(
289 290 QUuid vIdentifier)
290 291 {
291 292 lockWrite();
292 293 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
293 294
294 295 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
295 296 // A current request already exists, we can replace the next one
296 297
297 298 m_AcqIdentifierToAcqRequestMap.erase(it->second.first);
298 299 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.first);
299 300
300 301 m_AcqIdentifierToAcqRequestMap.erase(it->second.second);
301 302 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.second);
302 303 }
303 304 m_VIdentifierToCurrrentAcqIdNextIdPairMap.erase(vIdentifier);
304 305 unlock();
305 306 }
306 307
307 308 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::updateToNextRequest(
308 309 QUuid vIdentifier)
309 310 {
310 311 lockRead();
311 312 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
312 313 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
313 314 if (it->second.second.isNull()) {
314 315 unlock();
315 316 // There is no next request, we can remove the variable request
316 317 removeVariableRequest(vIdentifier);
317 318 }
318 319 else {
319 320 auto acqIdentifierToRemove = it->second.first;
320 321 // Move the next request to the current request
321 322 auto nextRequestId = it->second.second;
322 323 it->second.first = nextRequestId;
323 324 it->second.second = QUuid();
324 325 unlock();
325 326 // Remove AcquisitionRequest and results;
326 327 lockWrite();
327 328 m_AcqIdentifierToAcqRequestMap.erase(acqIdentifierToRemove);
328 329 m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqIdentifierToRemove);
329 330 unlock();
330 331 // Execute the current request
331 332 QMetaObject::invokeMethod(q, "onExecuteRequest", Qt::QueuedConnection,
332 333 Q_ARG(QUuid, nextRequestId));
333 334 }
334 335 }
335 336 else {
336 337 unlock();
337 338 qCCritical(LOG_VariableAcquisitionWorker())
338 339 << tr("Impossible to execute the acquisition on an unfound variable ");
339 340 }
340 341 }
341 342
342 343 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::cancelVarRequest(
343 344 QUuid varRequestId)
344 345 {
345 346 qCDebug(LOG_VariableAcquisitionWorker())
346 347 << "VariableAcquisitionWorkerPrivate::cancelVarRequest 0";
347 348 lockRead();
348 349 // get all AcqIdentifier in link with varRequestId
349 350 QVector<QUuid> acqIdsToRm;
350 351 auto cend = m_AcqIdentifierToAcqRequestMap.cend();
351 352 for (auto it = m_AcqIdentifierToAcqRequestMap.cbegin(); it != cend; ++it) {
352 353 if (it->second.m_VarRequestId == varRequestId) {
353 354 acqIdsToRm << it->first;
354 355 }
355 356 }
356 357 unlock();
357 358 // run aborting or removing of acqIdsToRm
358 359
359 360 for (auto acqId : acqIdsToRm) {
360 361 removeAcqRequest(acqId);
361 362 }
362 363 qCDebug(LOG_VariableAcquisitionWorker())
363 364 << "VariableAcquisitionWorkerPrivate::cancelVarRequest end";
364 365 }
365 366
366 367 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeAcqRequest(
367 368 QUuid acqRequestId)
368 369 {
369 370 qCDebug(LOG_VariableAcquisitionWorker())
370 371 << "VariableAcquisitionWorkerPrivate::removeAcqRequest";
371 372 QUuid vIdentifier;
372 373 std::shared_ptr<IDataProvider> provider;
373 374 lockRead();
374 375 auto acqIt = m_AcqIdentifierToAcqRequestMap.find(acqRequestId);
375 376 if (acqIt != m_AcqIdentifierToAcqRequestMap.cend()) {
376 377 vIdentifier = acqIt->second.m_vIdentifier;
377 378 provider = acqIt->second.m_Provider;
378 379
379 380 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
380 381 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
381 382 if (it->second.first == acqRequestId) {
382 383 // acqRequest is currently running -> let's aborting it
383 384 unlock();
384 385
385 386 // Remove the current request from the worker
386 387 updateToNextRequest(vIdentifier);
387 388
388 389 // notify the request aborting to the provider
389 390 provider->requestDataAborting(acqRequestId);
390 391 }
391 392 else if (it->second.second == acqRequestId) {
392 393 it->second.second = QUuid();
393 394 unlock();
394 395 }
395 396 else {
396 397 unlock();
397 398 }
398 399 }
399 400 else {
400 401 unlock();
401 402 }
402 403 }
403 404 else {
404 405 unlock();
405 406 }
406 407
407 408 lockWrite();
408 409
409 410 m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqRequestId);
410 411 m_AcqIdentifierToAcqRequestMap.erase(acqRequestId);
411 412
412 413 unlock();
413 414 qCDebug(LOG_VariableAcquisitionWorker())
414 415 << "VariableAcquisitionWorkerPrivate::removeAcqRequest END";
415 416 }
@@ -1,956 +1,958
1 1 #include <Variable/Variable.h>
2 2 #include <Variable/VariableAcquisitionWorker.h>
3 3 #include <Variable/VariableCacheStrategy.h>
4 4 #include <Variable/VariableCacheStrategyFactory.h>
5 5 #include <Variable/VariableController.h>
6 6 #include <Variable/VariableModel.h>
7 7 #include <Variable/VariableSynchronizationGroup.h>
8 8
9 9 #include <Data/DataProviderParameters.h>
10 10 #include <Data/IDataProvider.h>
11 11 #include <Data/IDataSeries.h>
12 12 #include <Data/VariableRequest.h>
13 13 #include <Time/TimeController.h>
14 14
15 15 #include <QMutex>
16 16 #include <QThread>
17 17 #include <QUuid>
18 18 #include <QtCore/QItemSelectionModel>
19 19
20 20 #include <deque>
21 21 #include <set>
22 22 #include <unordered_map>
23 23
24 24 Q_LOGGING_CATEGORY(LOG_VariableController, "VariableController")
25 25
26 26 namespace {
27 27
28 28 SqpRange computeSynchroRangeRequested(const SqpRange &varRange, const SqpRange &graphRange,
29 29 const SqpRange &oldGraphRange)
30 30 {
31 31 auto zoomType = VariableController::getZoomType(graphRange, oldGraphRange);
32 32
33 33 auto varRangeRequested = varRange;
34 34 switch (zoomType) {
35 35 case AcquisitionZoomType::ZoomIn: {
36 36 auto deltaLeft = graphRange.m_TStart - oldGraphRange.m_TStart;
37 37 auto deltaRight = oldGraphRange.m_TEnd - graphRange.m_TEnd;
38 38 varRangeRequested.m_TStart += deltaLeft;
39 39 varRangeRequested.m_TEnd -= deltaRight;
40 40 break;
41 41 }
42 42
43 43 case AcquisitionZoomType::ZoomOut: {
44 44 auto deltaLeft = oldGraphRange.m_TStart - graphRange.m_TStart;
45 45 auto deltaRight = graphRange.m_TEnd - oldGraphRange.m_TEnd;
46 46 varRangeRequested.m_TStart -= deltaLeft;
47 47 varRangeRequested.m_TEnd += deltaRight;
48 48 break;
49 49 }
50 50 case AcquisitionZoomType::PanRight: {
51 51 auto deltaLeft = graphRange.m_TStart - oldGraphRange.m_TStart;
52 52 auto deltaRight = graphRange.m_TEnd - oldGraphRange.m_TEnd;
53 53 varRangeRequested.m_TStart += deltaLeft;
54 54 varRangeRequested.m_TEnd += deltaRight;
55 55 break;
56 56 }
57 57 case AcquisitionZoomType::PanLeft: {
58 58 auto deltaLeft = oldGraphRange.m_TStart - graphRange.m_TStart;
59 59 auto deltaRight = oldGraphRange.m_TEnd - graphRange.m_TEnd;
60 60 varRangeRequested.m_TStart -= deltaLeft;
61 61 varRangeRequested.m_TEnd -= deltaRight;
62 62 break;
63 63 }
64 64 case AcquisitionZoomType::Unknown: {
65 65 qCCritical(LOG_VariableController())
66 66 << VariableController::tr("Impossible to synchronize: zoom type unknown");
67 67 break;
68 68 }
69 69 default:
70 70 qCCritical(LOG_VariableController()) << VariableController::tr(
71 71 "Impossible to synchronize: zoom type not take into account");
72 72 // No action
73 73 break;
74 74 }
75 75
76 76 return varRangeRequested;
77 77 }
78 78 }
79 79
80 80 struct VariableController::VariableControllerPrivate {
81 81 explicit VariableControllerPrivate(VariableController *parent)
82 82 : m_WorkingMutex{},
83 83 m_VariableModel{new VariableModel{parent}},
84 84 m_VariableSelectionModel{new QItemSelectionModel{m_VariableModel, parent}},
85 85 // m_VariableCacheStrategy{std::make_unique<VariableCacheStrategy>()},
86 86 m_VariableCacheStrategy{VariableCacheStrategyFactory::createCacheStrategy(
87 87 CacheStrategy::SingleThreshold)},
88 88 m_VariableAcquisitionWorker{std::make_unique<VariableAcquisitionWorker>()},
89 89 q{parent}
90 90 {
91 91
92 92 m_VariableAcquisitionWorker->moveToThread(&m_VariableAcquisitionWorkerThread);
93 93 m_VariableAcquisitionWorkerThread.setObjectName("VariableAcquisitionWorkerThread");
94 94 }
95 95
96 96
97 97 virtual ~VariableControllerPrivate()
98 98 {
99 99 qCDebug(LOG_VariableController()) << tr("VariableControllerPrivate destruction");
100 100 m_VariableAcquisitionWorkerThread.quit();
101 101 m_VariableAcquisitionWorkerThread.wait();
102 102 }
103 103
104 104
105 105 void processRequest(std::shared_ptr<Variable> var, const SqpRange &rangeRequested,
106 106 QUuid varRequestId);
107 107
108 108 std::shared_ptr<Variable> findVariable(QUuid vIdentifier);
109 109 std::shared_ptr<IDataSeries>
110 110 retrieveDataSeries(const QVector<AcquisitionDataPacket> acqDataPacketVector);
111 111
112 112 void registerProvider(std::shared_ptr<IDataProvider> provider);
113 113
114 114 void storeVariableRequest(QUuid varId, QUuid varRequestId, const VariableRequest &varRequest);
115 115 QUuid acceptVariableRequest(QUuid varId, std::shared_ptr<IDataSeries> dataSeries);
116 116 void updateVariableRequest(QUuid varRequestId);
117 117 void cancelVariableRequest(QUuid varRequestId);
118 118 void abortVariableRequest(QUuid varRequestId);
119 119 SqpRange getLastRequestedRange(QUuid varId);
120 120
121 121 QMutex m_WorkingMutex;
122 122 /// Variable model. The VariableController has the ownership
123 123 VariableModel *m_VariableModel;
124 124 QItemSelectionModel *m_VariableSelectionModel;
125 125
126 126
127 127 TimeController *m_TimeController{nullptr};
128 128 std::unique_ptr<VariableCacheStrategy> m_VariableCacheStrategy;
129 129 std::unique_ptr<VariableAcquisitionWorker> m_VariableAcquisitionWorker;
130 130 QThread m_VariableAcquisitionWorkerThread;
131 131
132 132 std::unordered_map<std::shared_ptr<Variable>, std::shared_ptr<IDataProvider> >
133 133 m_VariableToProviderMap;
134 134 std::unordered_map<std::shared_ptr<Variable>, QUuid> m_VariableToIdentifierMap;
135 135 std::map<QUuid, std::shared_ptr<VariableSynchronizationGroup> >
136 136 m_GroupIdToVariableSynchronizationGroupMap;
137 137 std::map<QUuid, QUuid> m_VariableIdGroupIdMap;
138 138 std::set<std::shared_ptr<IDataProvider> > m_ProviderSet;
139 139
140 140 std::map<QUuid, std::map<QUuid, VariableRequest> > m_VarRequestIdToVarIdVarRequestMap;
141 141
142 142 std::map<QUuid, std::deque<QUuid> > m_VarIdToVarRequestIdQueueMap;
143 143
144
145 144 VariableController *q;
146 145 };
147 146
148 147
149 148 VariableController::VariableController(QObject *parent)
150 149 : QObject{parent}, impl{spimpl::make_unique_impl<VariableControllerPrivate>(this)}
151 150 {
152 151 qCDebug(LOG_VariableController()) << tr("VariableController construction")
153 152 << QThread::currentThread();
154 153
155 154 connect(impl->m_VariableModel, &VariableModel::abortProgessRequested, this,
156 155 &VariableController::onAbortProgressRequested);
157 156
158 157 connect(impl->m_VariableAcquisitionWorker.get(),
159 158 &VariableAcquisitionWorker::variableCanceledRequested, this,
160 159 &VariableController::onAbortAcquisitionRequested);
161 160
162 161 connect(impl->m_VariableAcquisitionWorker.get(), &VariableAcquisitionWorker::dataProvided, this,
163 162 &VariableController::onDataProvided);
164 163 connect(impl->m_VariableAcquisitionWorker.get(),
165 164 &VariableAcquisitionWorker::variableRequestInProgress, this,
166 165 &VariableController::onVariableRetrieveDataInProgress);
167 166
168 167
169 168 connect(&impl->m_VariableAcquisitionWorkerThread, &QThread::started,
170 169 impl->m_VariableAcquisitionWorker.get(), &VariableAcquisitionWorker::initialize);
171 170 connect(&impl->m_VariableAcquisitionWorkerThread, &QThread::finished,
172 171 impl->m_VariableAcquisitionWorker.get(), &VariableAcquisitionWorker::finalize);
173 172
174 173
175 174 impl->m_VariableAcquisitionWorkerThread.start();
176 175 }
177 176
178 177 VariableController::~VariableController()
179 178 {
180 179 qCDebug(LOG_VariableController()) << tr("VariableController destruction")
181 180 << QThread::currentThread();
182 181 this->waitForFinish();
183 182 }
184 183
185 184 VariableModel *VariableController::variableModel() noexcept
186 185 {
187 186 return impl->m_VariableModel;
188 187 }
189 188
190 189 QItemSelectionModel *VariableController::variableSelectionModel() noexcept
191 190 {
192 191 return impl->m_VariableSelectionModel;
193 192 }
194 193
195 194 void VariableController::setTimeController(TimeController *timeController) noexcept
196 195 {
197 196 impl->m_TimeController = timeController;
198 197 }
199 198
200 199 std::shared_ptr<Variable>
201 200 VariableController::cloneVariable(std::shared_ptr<Variable> variable) noexcept
202 201 {
203 202 if (impl->m_VariableModel->containsVariable(variable)) {
204 203 // Clones variable
205 204 auto duplicate = variable->clone();
206 205
207 206 // Adds clone to model
208 207 impl->m_VariableModel->addVariable(duplicate);
209 208
210 209 // Generates clone identifier
211 210 impl->m_VariableToIdentifierMap[duplicate] = QUuid::createUuid();
212 211
213 212 // Registers provider
214 213 auto variableProvider = impl->m_VariableToProviderMap.at(variable);
215 214 auto duplicateProvider = variableProvider != nullptr ? variableProvider->clone() : nullptr;
216 215
217 216 impl->m_VariableToProviderMap[duplicate] = duplicateProvider;
218 217 if (duplicateProvider) {
219 218 impl->registerProvider(duplicateProvider);
220 219 }
221 220
222 221 return duplicate;
223 222 }
224 223 else {
225 224 qCCritical(LOG_VariableController())
226 225 << tr("Can't create duplicate of variable %1: variable not registered in the model")
227 226 .arg(variable->name());
228 227 return nullptr;
229 228 }
230 229 }
231 230
232 231 void VariableController::deleteVariable(std::shared_ptr<Variable> variable) noexcept
233 232 {
234 233 if (!variable) {
235 234 qCCritical(LOG_VariableController()) << "Can't delete variable: variable is null";
236 235 return;
237 236 }
238 237
239 238 // Spreads in SciQlop that the variable will be deleted, so that potential receivers can
240 239 // make some treatments before the deletion
241 240 emit variableAboutToBeDeleted(variable);
242 241
243 242 // Deletes identifier
244 243 impl->m_VariableToIdentifierMap.erase(variable);
245 244
246 245 // Deletes provider
247 246 auto nbProvidersDeleted = impl->m_VariableToProviderMap.erase(variable);
248 247 qCDebug(LOG_VariableController())
249 248 << tr("Number of providers deleted for variable %1: %2")
250 249 .arg(variable->name(), QString::number(nbProvidersDeleted));
251 250
252 251
253 252 // Deletes from model
254 253 impl->m_VariableModel->deleteVariable(variable);
255 254 }
256 255
257 256 void VariableController::deleteVariables(
258 257 const QVector<std::shared_ptr<Variable> > &variables) noexcept
259 258 {
260 259 for (auto variable : qAsConst(variables)) {
261 260 deleteVariable(variable);
262 261 }
263 262 }
264 263
265 264 std::shared_ptr<Variable>
266 265 VariableController::createVariable(const QString &name, const QVariantHash &metadata,
267 266 std::shared_ptr<IDataProvider> provider) noexcept
268 267 {
269 268 if (!impl->m_TimeController) {
270 269 qCCritical(LOG_VariableController())
271 270 << tr("Impossible to create variable: The time controller is null");
272 271 return nullptr;
273 272 }
274 273
275 274 auto range = impl->m_TimeController->dateTime();
276 275
277 276 if (auto newVariable = impl->m_VariableModel->createVariable(name, metadata)) {
278 277 auto identifier = QUuid::createUuid();
279 278
280 279 // store the provider
281 280 impl->registerProvider(provider);
282 281
283 282 // Associate the provider
284 283 impl->m_VariableToProviderMap[newVariable] = provider;
285 qCInfo(LOG_VariableController()) << "createVariable: " << identifier;
286 284 impl->m_VariableToIdentifierMap[newVariable] = identifier;
287 285
288 286
289 287 auto varRequestId = QUuid::createUuid();
288 qCInfo(LOG_VariableController()) << "createVariable: " << identifier << varRequestId;
290 289 impl->processRequest(newVariable, range, varRequestId);
291 290 impl->updateVariableRequest(varRequestId);
292 291
293 292 return newVariable;
294 293 }
295 294 }
296 295
297 296 void VariableController::onDateTimeOnSelection(const SqpRange &dateTime)
298 297 {
299 298 // NOTE: Even if acquisition request is aborting, the graphe range will be changed
300 299 qCDebug(LOG_VariableController()) << "VariableController::onDateTimeOnSelection"
301 300 << QThread::currentThread()->objectName();
302 301 auto selectedRows = impl->m_VariableSelectionModel->selectedRows();
303 302 auto variables = QVector<std::shared_ptr<Variable> >{};
304 303
305 304 for (const auto &selectedRow : qAsConst(selectedRows)) {
306 305 if (auto selectedVariable = impl->m_VariableModel->variable(selectedRow.row())) {
307 306 variables << selectedVariable;
308 307
309 308 // notify that rescale operation has to be done
310 309 emit rangeChanged(selectedVariable, dateTime);
311 310 }
312 311 }
313 312
314 313 if (!variables.isEmpty()) {
315 314 this->onRequestDataLoading(variables, dateTime, true);
316 315 }
317 316 }
318 317
319 318 void VariableController::onDataProvided(QUuid vIdentifier, const SqpRange &rangeRequested,
320 319 const SqpRange &cacheRangeRequested,
321 320 QVector<AcquisitionDataPacket> dataAcquired)
322 321 {
323 322 auto retrievedDataSeries = impl->retrieveDataSeries(dataAcquired);
324 323 auto varRequestId = impl->acceptVariableRequest(vIdentifier, retrievedDataSeries);
325 324 if (!varRequestId.isNull()) {
326 325 impl->updateVariableRequest(varRequestId);
327 326 }
328 327 }
329 328
330 329 void VariableController::onVariableRetrieveDataInProgress(QUuid identifier, double progress)
331 330 {
332 331 qCDebug(LOG_VariableController())
333 332 << "TORM: variableController::onVariableRetrieveDataInProgress"
334 333 << QThread::currentThread()->objectName() << progress;
335 334 if (auto var = impl->findVariable(identifier)) {
336 335 impl->m_VariableModel->setDataProgress(var, progress);
337 336 }
338 337 else {
339 338 qCCritical(LOG_VariableController())
340 339 << tr("Impossible to notify progression of a null variable");
341 340 }
342 341 }
343 342
344 343 void VariableController::onAbortProgressRequested(std::shared_ptr<Variable> variable)
345 344 {
346 345 auto it = impl->m_VariableToIdentifierMap.find(variable);
347 346 if (it != impl->m_VariableToIdentifierMap.cend()) {
348 347 impl->m_VariableAcquisitionWorker->abortProgressRequested(it->second);
349 348
350 349 QUuid varRequestId;
351 350 auto varIdToVarRequestIdQueueMapIt = impl->m_VarIdToVarRequestIdQueueMap.find(it->second);
352 351 if (varIdToVarRequestIdQueueMapIt != impl->m_VarIdToVarRequestIdQueueMap.cend()) {
353 352 auto &varRequestIdQueue = varIdToVarRequestIdQueueMapIt->second;
354 353 varRequestId = varRequestIdQueue.front();
355 354 impl->cancelVariableRequest(varRequestId);
356 355
357 356 // Finish the progression for the request
358 357 impl->m_VariableModel->setDataProgress(variable, 0.0);
359 358 }
360 359 else {
361 360 qCWarning(LOG_VariableController())
362 361 << tr("Aborting progression of inexistant variable request detected !!!")
363 362 << QThread::currentThread()->objectName();
364 363 }
365 364 }
366 365 else {
367 366 qCWarning(LOG_VariableController())
368 367 << tr("Aborting progression of inexistant variable detected !!!")
369 368 << QThread::currentThread()->objectName();
370 369 }
371 370 }
372 371
373 372 void VariableController::onAbortAcquisitionRequested(QUuid vIdentifier)
374 373 {
375 374 qCDebug(LOG_VariableController()) << "TORM: variableController::onAbortAcquisitionRequested"
376 375 << QThread::currentThread()->objectName() << vIdentifier;
377 376
378 377 if (auto var = impl->findVariable(vIdentifier)) {
379 378 this->onAbortProgressRequested(var);
380 379 }
381 380 else {
382 381 qCCritical(LOG_VariableController())
383 382 << tr("Impossible to abort Acquisition Requestof a null variable");
384 383 }
385 384 }
386 385
387 386 void VariableController::onAddSynchronizationGroupId(QUuid synchronizationGroupId)
388 387 {
389 388 qCDebug(LOG_VariableController()) << "TORM: VariableController::onAddSynchronizationGroupId"
390 389 << QThread::currentThread()->objectName()
391 390 << synchronizationGroupId;
392 391 auto vSynchroGroup = std::make_shared<VariableSynchronizationGroup>();
393 392 impl->m_GroupIdToVariableSynchronizationGroupMap.insert(
394 393 std::make_pair(synchronizationGroupId, vSynchroGroup));
395 394 }
396 395
397 396 void VariableController::onRemoveSynchronizationGroupId(QUuid synchronizationGroupId)
398 397 {
399 398 impl->m_GroupIdToVariableSynchronizationGroupMap.erase(synchronizationGroupId);
400 399 }
401 400
402 401 void VariableController::onAddSynchronized(std::shared_ptr<Variable> variable,
403 402 QUuid synchronizationGroupId)
404 403
405 404 {
406 405 qCDebug(LOG_VariableController()) << "TORM: VariableController::onAddSynchronized"
407 406 << synchronizationGroupId;
408 407 auto varToVarIdIt = impl->m_VariableToIdentifierMap.find(variable);
409 408 if (varToVarIdIt != impl->m_VariableToIdentifierMap.cend()) {
410 409 auto groupIdToVSGIt
411 410 = impl->m_GroupIdToVariableSynchronizationGroupMap.find(synchronizationGroupId);
412 411 if (groupIdToVSGIt != impl->m_GroupIdToVariableSynchronizationGroupMap.cend()) {
413 412 impl->m_VariableIdGroupIdMap.insert(
414 413 std::make_pair(varToVarIdIt->second, synchronizationGroupId));
415 414 groupIdToVSGIt->second->addVariableId(varToVarIdIt->second);
416 415 }
417 416 else {
418 417 qCCritical(LOG_VariableController())
419 418 << tr("Impossible to synchronize a variable with an unknown sycnhronization group")
420 419 << variable->name();
421 420 }
422 421 }
423 422 else {
424 423 qCCritical(LOG_VariableController())
425 424 << tr("Impossible to synchronize a variable with no identifier") << variable->name();
426 425 }
427 426 }
428 427
429 428 void VariableController::desynchronize(std::shared_ptr<Variable> variable,
430 429 QUuid synchronizationGroupId)
431 430 {
432 431 // Gets variable id
433 432 auto variableIt = impl->m_VariableToIdentifierMap.find(variable);
434 433 if (variableIt == impl->m_VariableToIdentifierMap.cend()) {
435 434 qCCritical(LOG_VariableController())
436 435 << tr("Can't desynchronize variable %1: variable identifier not found")
437 436 .arg(variable->name());
438 437 return;
439 438 }
440 439
441 440 // Gets synchronization group
442 441 auto groupIt = impl->m_GroupIdToVariableSynchronizationGroupMap.find(synchronizationGroupId);
443 442 if (groupIt == impl->m_GroupIdToVariableSynchronizationGroupMap.cend()) {
444 443 qCCritical(LOG_VariableController())
445 444 << tr("Can't desynchronize variable %1: unknown synchronization group")
446 445 .arg(variable->name());
447 446 return;
448 447 }
449 448
450 449 auto variableId = variableIt->second;
451 450
452 451 // Removes variable from synchronization group
453 452 auto synchronizationGroup = groupIt->second;
454 453 synchronizationGroup->removeVariableId(variableId);
455 454
456 455 // Removes link between variable and synchronization group
457 456 impl->m_VariableIdGroupIdMap.erase(variableId);
458 457 }
459 458
460 459 void VariableController::onRequestDataLoading(QVector<std::shared_ptr<Variable> > variables,
461 460 const SqpRange &range, bool synchronise)
462 461 {
463 462 // NOTE: oldRange isn't really necessary since oldRange == variable->range().
464 463
465 464 // we want to load data of the variable for the dateTime.
466 465 // First we check if the cache contains some of them.
467 466 // For the other, we ask the provider to give them.
468 467
469 468 auto varRequestId = QUuid::createUuid();
470 469 qCDebug(LOG_VariableController()) << "VariableController::onRequestDataLoading"
471 470 << QThread::currentThread()->objectName() << varRequestId;
472 471
473 472 for (const auto &var : variables) {
474 qCDebug(LOG_VariableController()) << "processRequest for" << var->name() << varRequestId;
473 qCInfo(LOG_VariableController()) << "processRequest for" << var->name() << varRequestId;
475 474 impl->processRequest(var, range, varRequestId);
476 475 }
477 476
478 477 if (synchronise) {
479 478 // Get the group ids
480 479 qCDebug(LOG_VariableController())
481 480 << "TORM VariableController::onRequestDataLoading for synchro var ENABLE";
482 481 auto groupIds = std::set<QUuid>{};
483 482 auto groupIdToOldRangeMap = std::map<QUuid, SqpRange>{};
484 483 for (const auto &var : variables) {
485 484 auto varToVarIdIt = impl->m_VariableToIdentifierMap.find(var);
486 485 if (varToVarIdIt != impl->m_VariableToIdentifierMap.cend()) {
487 486 auto vId = varToVarIdIt->second;
488 487 auto varIdToGroupIdIt = impl->m_VariableIdGroupIdMap.find(vId);
489 488 if (varIdToGroupIdIt != impl->m_VariableIdGroupIdMap.cend()) {
490 489 auto gId = varIdToGroupIdIt->second;
491 490 groupIdToOldRangeMap.insert(std::make_pair(gId, var->range()));
492 491 if (groupIds.find(gId) == groupIds.cend()) {
493 492 qCDebug(LOG_VariableController()) << "Synchro detect group " << gId;
494 493 groupIds.insert(gId);
495 494 }
496 495 }
497 496 }
498 497 }
499 498
500 499 // We assume here all group ids exist
501 500 for (const auto &gId : groupIds) {
502 501 auto vSynchronizationGroup = impl->m_GroupIdToVariableSynchronizationGroupMap.at(gId);
503 502 auto vSyncIds = vSynchronizationGroup->getIds();
504 503 qCDebug(LOG_VariableController()) << "Var in synchro group ";
505 504 for (auto vId : vSyncIds) {
506 505 auto var = impl->findVariable(vId);
507 506
508 507 // Don't process already processed var
509 508 if (!variables.contains(var)) {
510 509 if (var != nullptr) {
511 qCDebug(LOG_VariableController()) << "processRequest synchro for"
512 << var->name();
510 qCInfo(LOG_VariableController()) << "processRequest synchro for"
511 << var->name() << varRequestId;
513 512 auto vSyncRangeRequested = computeSynchroRangeRequested(
514 513 var->range(), range, groupIdToOldRangeMap.at(gId));
515 514 qCDebug(LOG_VariableController()) << "synchro RR" << vSyncRangeRequested;
516 515 impl->processRequest(var, vSyncRangeRequested, varRequestId);
517 516 }
518 517 else {
519 518 qCCritical(LOG_VariableController())
520 519
521 520 << tr("Impossible to synchronize a null variable");
522 521 }
523 522 }
524 523 }
525 524 }
526 525 }
527 526
528 527 impl->updateVariableRequest(varRequestId);
529 528 }
530 529
531 530
532 531 void VariableController::initialize()
533 532 {
534 533 qCDebug(LOG_VariableController()) << tr("VariableController init") << QThread::currentThread();
535 534 impl->m_WorkingMutex.lock();
536 535 qCDebug(LOG_VariableController()) << tr("VariableController init END");
537 536 }
538 537
539 538 void VariableController::finalize()
540 539 {
541 540 impl->m_WorkingMutex.unlock();
542 541 }
543 542
544 543 void VariableController::waitForFinish()
545 544 {
546 545 QMutexLocker locker{&impl->m_WorkingMutex};
547 546 }
548 547
549 548 AcquisitionZoomType VariableController::getZoomType(const SqpRange &range, const SqpRange &oldRange)
550 549 {
551 550 // t1.m_TStart <= t2.m_TStart && t2.m_TEnd <= t1.m_TEnd
552 551 auto zoomType = AcquisitionZoomType::Unknown;
553 552 if (range.m_TStart <= oldRange.m_TStart && oldRange.m_TEnd <= range.m_TEnd) {
554 553 qCDebug(LOG_VariableController()) << "zoomtype: ZoomOut";
555 554 zoomType = AcquisitionZoomType::ZoomOut;
556 555 }
557 556 else if (range.m_TStart > oldRange.m_TStart && range.m_TEnd > oldRange.m_TEnd) {
558 557 qCDebug(LOG_VariableController()) << "zoomtype: PanRight";
559 558 zoomType = AcquisitionZoomType::PanRight;
560 559 }
561 560 else if (range.m_TStart < oldRange.m_TStart && range.m_TEnd < oldRange.m_TEnd) {
562 561 qCDebug(LOG_VariableController()) << "zoomtype: PanLeft";
563 562 zoomType = AcquisitionZoomType::PanLeft;
564 563 }
565 564 else if (range.m_TStart > oldRange.m_TStart && oldRange.m_TEnd > range.m_TEnd) {
566 565 qCDebug(LOG_VariableController()) << "zoomtype: ZoomIn";
567 566 zoomType = AcquisitionZoomType::ZoomIn;
568 567 }
569 568 else {
570 569 qCDebug(LOG_VariableController()) << "getZoomType: Unknown type detected";
571 570 }
572 571 return zoomType;
573 572 }
574 573
575 574 void VariableController::VariableControllerPrivate::processRequest(std::shared_ptr<Variable> var,
576 575 const SqpRange &rangeRequested,
577 576 QUuid varRequestId)
578 577 {
579 578 auto varRequest = VariableRequest{};
580 579
581 580 auto it = m_VariableToIdentifierMap.find(var);
582 581 if (it != m_VariableToIdentifierMap.cend()) {
583 582
584 583 auto varId = it->second;
585 584
586 585 auto oldRange = getLastRequestedRange(varId);
587 586
588 587 // check for update oldRange to the last request range.
589 588 if (oldRange == INVALID_RANGE) {
590 589 oldRange = var->range();
591 590 }
592 591
593 592 auto varStrategyRangesRequested
594 593 = m_VariableCacheStrategy->computeRange(oldRange, rangeRequested);
595 594
596 595 // Use commented lines to remove the cache (run time)
597 596 // auto notInCacheRangeList = QVector<SqpRange>{varStrategyRangesRequested.second};
598 597 // auto inCacheRangeList = QVector<SqpRange>{};
599 598 auto notInCacheRangeList
600 599 = Variable::provideNotInCacheRangeList(oldRange, varStrategyRangesRequested.second);
601 600 auto inCacheRangeList
602 601 = Variable::provideInCacheRangeList(oldRange, varStrategyRangesRequested.second);
603 602
604 603 if (!notInCacheRangeList.empty()) {
605 604 varRequest.m_RangeRequested = varStrategyRangesRequested.first;
606 605 varRequest.m_CacheRangeRequested = varStrategyRangesRequested.second;
607 606
608 607 // store VarRequest
609 608 storeVariableRequest(varId, varRequestId, varRequest);
610 609
611 610 auto varProvider = m_VariableToProviderMap.at(var);
612 611 if (varProvider != nullptr) {
613 612 auto varRequestIdCanceled = m_VariableAcquisitionWorker->pushVariableRequest(
614 613 varRequestId, varId, varStrategyRangesRequested.first,
615 614 varStrategyRangesRequested.second,
616 615 DataProviderParameters{std::move(notInCacheRangeList), var->metadata()},
617 616 varProvider);
618 617
619 if (!varRequestIdCanceled.isNull()) {
620 qCInfo(LOG_VariableAcquisitionWorker()) << tr("varRequestIdCanceled: ")
621 << varRequestIdCanceled;
622 // cancelVariableRequest(varRequestIdCanceled);
623 abortVariableRequest(varRequestIdCanceled);
624 }
625 618 }
626 619 else {
627 620 qCCritical(LOG_VariableController())
628 621 << "Impossible to provide data with a null provider";
629 622 }
630 623
631 624 if (!inCacheRangeList.empty()) {
632 625 emit q->updateVarDisplaying(var, inCacheRangeList.first());
633 626 }
634 627 }
635 628 else {
636 629 varRequest.m_RangeRequested = varStrategyRangesRequested.first;
637 630 varRequest.m_CacheRangeRequested = varStrategyRangesRequested.second;
638 631 // store VarRequest
639 632 storeVariableRequest(varId, varRequestId, varRequest);
640 633 acceptVariableRequest(
641 634 varId, var->dataSeries()->subDataSeries(varStrategyRangesRequested.second));
642 635 }
643 636 }
644 637 }
645 638
646 639 std::shared_ptr<Variable>
647 640 VariableController::VariableControllerPrivate::findVariable(QUuid vIdentifier)
648 641 {
649 642 std::shared_ptr<Variable> var;
650 643 auto findReply = [vIdentifier](const auto &entry) { return vIdentifier == entry.second; };
651 644
652 645 auto end = m_VariableToIdentifierMap.cend();
653 646 auto it = std::find_if(m_VariableToIdentifierMap.cbegin(), end, findReply);
654 647 if (it != end) {
655 648 var = it->first;
656 649 }
657 650 else {
658 651 qCCritical(LOG_VariableController())
659 652 << tr("Impossible to find the variable with the identifier: ") << vIdentifier;
660 653 }
661 654
662 655 return var;
663 656 }
664 657
665 658 std::shared_ptr<IDataSeries> VariableController::VariableControllerPrivate::retrieveDataSeries(
666 659 const QVector<AcquisitionDataPacket> acqDataPacketVector)
667 660 {
668 661 qCDebug(LOG_VariableController()) << tr("TORM: retrieveDataSeries acqDataPacketVector size")
669 662 << acqDataPacketVector.size();
670 663 std::shared_ptr<IDataSeries> dataSeries;
671 664 if (!acqDataPacketVector.isEmpty()) {
672 665 dataSeries = acqDataPacketVector[0].m_DateSeries;
673 666 for (int i = 1; i < acqDataPacketVector.size(); ++i) {
674 667 dataSeries->merge(acqDataPacketVector[i].m_DateSeries.get());
675 668 }
676 669 }
677 670 qCDebug(LOG_VariableController()) << tr("TORM: retrieveDataSeries acqDataPacketVector size END")
678 671 << acqDataPacketVector.size();
679 672 return dataSeries;
680 673 }
681 674
682 675 void VariableController::VariableControllerPrivate::registerProvider(
683 676 std::shared_ptr<IDataProvider> provider)
684 677 {
685 678 if (m_ProviderSet.find(provider) == m_ProviderSet.end()) {
686 679 qCDebug(LOG_VariableController()) << tr("Registering of a new provider")
687 680 << provider->objectName();
688 681 m_ProviderSet.insert(provider);
689 682 connect(provider.get(), &IDataProvider::dataProvided, m_VariableAcquisitionWorker.get(),
690 683 &VariableAcquisitionWorker::onVariableDataAcquired);
691 684 connect(provider.get(), &IDataProvider::dataProvidedProgress,
692 685 m_VariableAcquisitionWorker.get(),
693 686 &VariableAcquisitionWorker::onVariableRetrieveDataInProgress);
694 687 connect(provider.get(), &IDataProvider::dataProvidedFailed,
695 688 m_VariableAcquisitionWorker.get(),
696 689 &VariableAcquisitionWorker::onVariableAcquisitionFailed);
697 690 }
698 691 else {
699 692 qCDebug(LOG_VariableController()) << tr("Cannot register provider, it already exists ");
700 693 }
701 694 }
702 695
703 696 void VariableController::VariableControllerPrivate::storeVariableRequest(
704 697 QUuid varId, QUuid varRequestId, const VariableRequest &varRequest)
705 698 {
706 699 // First request for the variable. we can create an entry for it
707 700 auto varIdToVarRequestIdQueueMapIt = m_VarIdToVarRequestIdQueueMap.find(varId);
708 701 if (varIdToVarRequestIdQueueMapIt == m_VarIdToVarRequestIdQueueMap.cend()) {
709 702 auto varRequestIdQueue = std::deque<QUuid>{};
710 703 qCDebug(LOG_VariableController()) << tr("Store REQUEST in QUEUE");
711 704 varRequestIdQueue.push_back(varRequestId);
712 705 m_VarIdToVarRequestIdQueueMap.insert(std::make_pair(varId, std::move(varRequestIdQueue)));
713 706 }
714 707 else {
715 708 qCDebug(LOG_VariableController()) << tr("Store REQUEST in EXISTING QUEUE");
716 709 auto &varRequestIdQueue = varIdToVarRequestIdQueueMapIt->second;
717 710 varRequestIdQueue.push_back(varRequestId);
718 711 }
719 712
720 713 auto varRequestIdToVarIdVarRequestMapIt = m_VarRequestIdToVarIdVarRequestMap.find(varRequestId);
721 714 if (varRequestIdToVarIdVarRequestMapIt == m_VarRequestIdToVarIdVarRequestMap.cend()) {
722 715 auto varIdToVarRequestMap = std::map<QUuid, VariableRequest>{};
723 716 varIdToVarRequestMap.insert(std::make_pair(varId, varRequest));
724 717 qCDebug(LOG_VariableController()) << tr("Store REQUESTID in MAP");
725 718 m_VarRequestIdToVarIdVarRequestMap.insert(
726 719 std::make_pair(varRequestId, std::move(varIdToVarRequestMap)));
727 720 }
728 721 else {
729 722 auto &varIdToVarRequestMap = varRequestIdToVarIdVarRequestMapIt->second;
730 723 qCDebug(LOG_VariableController()) << tr("Store REQUESTID in EXISTING MAP");
731 724 varIdToVarRequestMap.insert(std::make_pair(varId, varRequest));
732 725 }
733 726 }
734 727
735 728 QUuid VariableController::VariableControllerPrivate::acceptVariableRequest(
736 729 QUuid varId, std::shared_ptr<IDataSeries> dataSeries)
737 730 {
738 731 QUuid varRequestId;
739 732 auto varIdToVarRequestIdQueueMapIt = m_VarIdToVarRequestIdQueueMap.find(varId);
740 733 if (varIdToVarRequestIdQueueMapIt != m_VarIdToVarRequestIdQueueMap.cend()) {
741 734 auto &varRequestIdQueue = varIdToVarRequestIdQueueMapIt->second;
742 735 varRequestId = varRequestIdQueue.front();
743 736 auto varRequestIdToVarIdVarRequestMapIt
744 737 = m_VarRequestIdToVarIdVarRequestMap.find(varRequestId);
745 738 if (varRequestIdToVarIdVarRequestMapIt != m_VarRequestIdToVarIdVarRequestMap.cend()) {
746 739 auto &varIdToVarRequestMap = varRequestIdToVarIdVarRequestMapIt->second;
747 740 auto varIdToVarRequestMapIt = varIdToVarRequestMap.find(varId);
748 741 if (varIdToVarRequestMapIt != varIdToVarRequestMap.cend()) {
749 742 qCDebug(LOG_VariableController()) << tr("acceptVariableRequest");
750 743 auto &varRequest = varIdToVarRequestMapIt->second;
751 744 varRequest.m_DataSeries = dataSeries;
752 745 varRequest.m_CanUpdate = true;
753 746 }
754 747 else {
755 748 qCDebug(LOG_VariableController())
756 749 << tr("Impossible to acceptVariableRequest of a unknown variable id attached "
757 750 "to a variableRequestId")
758 751 << varRequestId << varId;
759 752 }
760 753 }
761 754 else {
762 755 qCCritical(LOG_VariableController())
763 756 << tr("Impossible to acceptVariableRequest of a unknown variableRequestId")
764 757 << varRequestId;
765 758 }
766 759
767 760 varRequestIdQueue.pop_front();
768 761 if (varRequestIdQueue.empty()) {
769 762 qCDebug(LOG_VariableController())
770 763 << tr("TORM Erase REQUEST because it has been accepted") << varId;
771 764 m_VarIdToVarRequestIdQueueMap.erase(varId);
772 765 }
773 766 }
774 767 else {
775 768 qCCritical(LOG_VariableController())
776 769 << tr("Impossible to acceptVariableRequest of a unknown variable id") << varId;
777 770 }
778 771
779 772 return varRequestId;
780 773 }
781 774
782 775 void VariableController::VariableControllerPrivate::updateVariableRequest(QUuid varRequestId)
783 776 {
784 777
785 778 auto varRequestIdToVarIdVarRequestMapIt = m_VarRequestIdToVarIdVarRequestMap.find(varRequestId);
786 779 if (varRequestIdToVarIdVarRequestMapIt != m_VarRequestIdToVarIdVarRequestMap.cend()) {
787 780 bool processVariableUpdate = true;
788 781 auto &varIdToVarRequestMap = varRequestIdToVarIdVarRequestMapIt->second;
789 782 for (auto varIdToVarRequestMapIt = varIdToVarRequestMap.cbegin();
790 783 (varIdToVarRequestMapIt != varIdToVarRequestMap.cend()) && processVariableUpdate;
791 784 ++varIdToVarRequestMapIt) {
792 785 processVariableUpdate &= varIdToVarRequestMapIt->second.m_CanUpdate;
793 786 qCDebug(LOG_VariableController()) << tr("updateVariableRequest")
794 787 << processVariableUpdate;
795 788 }
796 789
797 790 if (processVariableUpdate) {
798 791 for (auto varIdToVarRequestMapIt = varIdToVarRequestMap.cbegin();
799 792 varIdToVarRequestMapIt != varIdToVarRequestMap.cend(); ++varIdToVarRequestMapIt) {
800 793 if (auto var = findVariable(varIdToVarRequestMapIt->first)) {
801 794 auto &varRequest = varIdToVarRequestMapIt->second;
802 795 var->setRange(varRequest.m_RangeRequested);
803 796 var->setCacheRange(varRequest.m_CacheRangeRequested);
804 797 qCDebug(LOG_VariableController()) << tr("1: onDataProvided")
805 798 << varRequest.m_RangeRequested
806 799 << varRequest.m_CacheRangeRequested;
807 800 qCDebug(LOG_VariableController()) << tr("2: onDataProvided var points before")
808 801 << var->nbPoints()
809 802 << varRequest.m_DataSeries->nbPoints();
810 803 var->mergeDataSeries(varRequest.m_DataSeries);
811 804 qCDebug(LOG_VariableController()) << tr("3: onDataProvided var points after")
812 805 << var->nbPoints();
813 806
814 807 emit var->updated();
815 808 }
816 809 else {
817 810 qCCritical(LOG_VariableController())
818 811 << tr("Impossible to update data to a null variable");
819 812 }
820 813 }
821 814 // cleaning varRequestId
822 815 qCDebug(LOG_VariableController()) << tr("0: erase REQUEST in MAP ?")
823 816 << m_VarRequestIdToVarIdVarRequestMap.size();
824 817 m_VarRequestIdToVarIdVarRequestMap.erase(varRequestId);
825 818 qCDebug(LOG_VariableController()) << tr("1: erase REQUEST in MAP ?")
826 819 << m_VarRequestIdToVarIdVarRequestMap.size();
827 820 }
828 821 }
829 822 else {
830 823 qCCritical(LOG_VariableController())
831 824 << tr("Cannot updateVariableRequest for a unknow varRequestId") << varRequestId;
832 825 }
833 826 }
834 827
835 828 void VariableController::VariableControllerPrivate::cancelVariableRequest(QUuid varRequestId)
836 829 {
837 830 // cleaning varRequestId
838 831 m_VarRequestIdToVarIdVarRequestMap.erase(varRequestId);
839 832
840 833 for (auto varIdToVarRequestIdQueueMapIt = m_VarIdToVarRequestIdQueueMap.begin();
841 834 varIdToVarRequestIdQueueMapIt != m_VarIdToVarRequestIdQueueMap.end();) {
842 835 auto &varRequestIdQueue = varIdToVarRequestIdQueueMapIt->second;
843 836 varRequestIdQueue.erase(
844 837 std::remove(varRequestIdQueue.begin(), varRequestIdQueue.end(), varRequestId),
845 838 varRequestIdQueue.end());
846 839 if (varRequestIdQueue.empty()) {
847 840 varIdToVarRequestIdQueueMapIt
848 841 = m_VarIdToVarRequestIdQueueMap.erase(varIdToVarRequestIdQueueMapIt);
849 842
850 843 // Recompute if there is any next request based on the removed request.
851 844 }
852 845 else {
853 846 ++varIdToVarRequestIdQueueMapIt;
854 847 }
855 848 }
856 849 }
857 850
858 851 void VariableController::VariableControllerPrivate::abortVariableRequest(QUuid varRequestId)
859 852 {
860 853 auto varRequestIdsMap = std::map<QUuid, VariableRequest>{};
861 854 auto varRequestIdToVarIdVarRequestMapIt = m_VarRequestIdToVarIdVarRequestMap.find(varRequestId);
862 855 if (varRequestIdToVarIdVarRequestMapIt != m_VarRequestIdToVarIdVarRequestMap.cend()) {
863 856 varRequestIdsMap = varRequestIdToVarIdVarRequestMapIt->second;
864 857 }
858 qCCritical(LOG_VariableController())
859 << tr("abortVariableRequest : varRequestId and count of aborting") << varRequestId
860 << varRequestIdsMap.size();
865 861
866 862 auto nextUuidToRemove = QSet<QUuid>{};
867 863 auto varIdEnd = varRequestIdsMap.end();
868 864 for (auto varIdIt = varRequestIdsMap.begin(); varIdIt != varIdEnd;) {
869 865 auto currentVarId = varIdIt->first;
870 866 auto varIdToVarRequestIdQueueMapIt = m_VarIdToVarRequestIdQueueMap.find(currentVarId);
871 867 if (varIdToVarRequestIdQueueMapIt != m_VarIdToVarRequestIdQueueMap.cend()) {
872 868
873 869 auto &varRequestIdQueue = varIdToVarRequestIdQueueMapIt->second;
874 870
875 871 auto varReqIdQueueEnd = varRequestIdQueue.end();
876 872 for (auto varReqIdQueueIt = varRequestIdQueue.begin();
877 873 varReqIdQueueIt != varRequestIdQueue.end(); ++varReqIdQueueIt) {
878 874 if (*varReqIdQueueIt == varRequestId) {
879 875 auto nextVarRequestIdToRm = varReqIdQueueIt;
880 876 ++nextVarRequestIdToRm;
881 877
882 878 if (nextVarRequestIdToRm == varReqIdQueueEnd) {
883 879 // The varRequestId is in progress for the current var, let's aborting it.
884 880 m_VariableAcquisitionWorker->abortProgressRequested(currentVarId);
885 881 }
886 882 else {
887 883 // There is at least one Request after
888 884 // let's add only new id to remove
889 885 if (!nextUuidToRemove.contains(*nextVarRequestIdToRm)) {
890 886 nextUuidToRemove << *nextVarRequestIdToRm;
891 887 }
892 888 }
893 889
890 qCCritical(LOG_VariableController()) << tr("remove the request : aborting")
891 << currentVarId;
892
894 893 varReqIdQueueIt = varRequestIdQueue.erase(varReqIdQueueIt);
895 894 // break is necessary here, we don"t need to iterate on the dequeue anymore and
896 895 // the iteration is broken because of the erase
897 896 break;
898 897 }
899 898 }
900 899
901 900 // beacause the process can modify the map under the iteration, we need to update the
902 901 // iterator only if no erase has been done
903 902 if (varRequestIdQueue.empty()) {
904 903 varIdIt = varRequestIdsMap.erase(varIdIt);
905 904 }
906 905 else {
907 906 ++varIdIt;
908 907 }
909 908 }
910 909 }
911 910
912 911 if (varRequestIdToVarIdVarRequestMapIt != m_VarRequestIdToVarIdVarRequestMap.cend()) {
912 qCCritical(LOG_VariableController()) << tr("remove the varRequestId");
913 913 m_VarRequestIdToVarIdVarRequestMap.erase(varRequestIdToVarIdVarRequestMapIt);
914 914 }
915 915
916 916 for (auto nextVarRequestIdToRm : nextUuidToRemove) {
917 qCCritical(LOG_VariableController()) << tr("Cancel automaticaly the next request:")
918 << nextVarRequestIdToRm;
917 919 abortVariableRequest(nextVarRequestIdToRm);
918 920 }
919 921 }
920 922
921 923 SqpRange VariableController::VariableControllerPrivate::getLastRequestedRange(QUuid varId)
922 924 {
923 925 auto lastRangeRequested = SqpRange{INVALID_RANGE};
924 926 auto varIdToVarRequestIdQueueMapIt = m_VarIdToVarRequestIdQueueMap.find(varId);
925 927 if (varIdToVarRequestIdQueueMapIt != m_VarIdToVarRequestIdQueueMap.cend()) {
926 928 auto &varRequestIdQueue = varIdToVarRequestIdQueueMapIt->second;
927 929 auto varRequestId = varRequestIdQueue.back();
928 930 auto varRequestIdToVarIdVarRequestMapIt
929 931 = m_VarRequestIdToVarIdVarRequestMap.find(varRequestId);
930 932 if (varRequestIdToVarIdVarRequestMapIt != m_VarRequestIdToVarIdVarRequestMap.cend()) {
931 933 auto &varIdToVarRequestMap = varRequestIdToVarIdVarRequestMapIt->second;
932 934 auto varIdToVarRequestMapIt = varIdToVarRequestMap.find(varId);
933 935 if (varIdToVarRequestMapIt != varIdToVarRequestMap.cend()) {
934 936 auto &varRequest = varIdToVarRequestMapIt->second;
935 937 lastRangeRequested = varRequest.m_RangeRequested;
936 938 }
937 939 else {
938 940 qCDebug(LOG_VariableController())
939 941 << tr("Impossible to getLastRequestedRange of a unknown variable id attached "
940 942 "to a variableRequestId")
941 943 << varRequestId << varId;
942 944 }
943 945 }
944 946 else {
945 947 qCCritical(LOG_VariableController())
946 948 << tr("Impossible to getLastRequestedRange of a unknown variableRequestId")
947 949 << varRequestId;
948 950 }
949 951 }
950 952 else {
951 953 qDebug(LOG_VariableController())
952 954 << tr("Impossible to getLastRequestedRange of a unknown variable id") << varId;
953 955 }
954 956
955 957 return lastRangeRequested;
956 958 }
General Comments 3
Under Review
author

Auto status change to "Under Review"

Approved
author

Merge lasted acquisition developpement on main Sciqlop branch

You need to be logged in to leave comments. Login now