##// END OF EJS Templates
Fix asynchrone bug with reset of the download progress state
perrinel -
r1392:40334c320188
parent child
Show More
@@ -1,416 +1,435
1 1 #include "Variable/VariableAcquisitionWorker.h"
2 2
3 3 #include "Variable/Variable.h"
4 4
5 5 #include <Data/AcquisitionRequest.h>
6 6 #include <Data/SqpRange.h>
7 7
8 8 #include <unordered_map>
9 9 #include <utility>
10 10
11 11 #include <QMutex>
12 12 #include <QReadWriteLock>
13 13 #include <QThread>
14 14
15 15 #include <cmath>
16 16
17 17 Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker")
18 18
19 19 struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate {
20 20
21 21 explicit VariableAcquisitionWorkerPrivate(VariableAcquisitionWorker *parent)
22 22 : m_Lock{QReadWriteLock::Recursive}, q{parent}
23 23 {
24 24 }
25 25
26 26 void lockRead() { m_Lock.lockForRead(); }
27 27 void lockWrite() { m_Lock.lockForWrite(); }
28 28 void unlock() { m_Lock.unlock(); }
29 29
30 30 void removeVariableRequest(QUuid vIdentifier);
31 31
32 32 /// Remove the current request and execute the next one if exist
33 33 void updateToNextRequest(QUuid vIdentifier);
34 34
35 35 /// Remove and/or abort all AcqRequest in link with varRequestId
36 36 void cancelVarRequest(QUuid varRequestId);
37 37 void removeAcqRequest(QUuid acqRequestId);
38 38
39 39 QMutex m_WorkingMutex;
40 40 QReadWriteLock m_Lock;
41 41
42 42 std::map<QUuid, QVector<AcquisitionDataPacket> > m_AcqIdentifierToAcqDataPacketVectorMap;
43 43 std::map<QUuid, AcquisitionRequest> m_AcqIdentifierToAcqRequestMap;
44 44 std::map<QUuid, std::pair<QUuid, QUuid> > m_VIdentifierToCurrrentAcqIdNextIdPairMap;
45 45 VariableAcquisitionWorker *q;
46 46 };
47 47
48 48
49 49 VariableAcquisitionWorker::VariableAcquisitionWorker(QObject *parent)
50 50 : QObject{parent}, impl{spimpl::make_unique_impl<VariableAcquisitionWorkerPrivate>(this)}
51 51 {
52 52 }
53 53
54 54 VariableAcquisitionWorker::~VariableAcquisitionWorker()
55 55 {
56 56 qCInfo(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker destruction")
57 57 << QThread::currentThread();
58 58 this->waitForFinish();
59 59 }
60 60
61 61
62 62 QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid vIdentifier,
63 63 SqpRange rangeRequested,
64 64 SqpRange cacheRangeRequested,
65 65 DataProviderParameters parameters,
66 66 std::shared_ptr<IDataProvider> provider)
67 67 {
68 68 qCDebug(LOG_VariableAcquisitionWorker())
69 69 << tr("TORM VariableAcquisitionWorker::pushVariableRequest ") << cacheRangeRequested;
70 70 auto varRequestIdCanceled = QUuid();
71 71
72 72 // Request creation
73 73 auto acqRequest = AcquisitionRequest{};
74 qCDebug(LOG_VariableAcquisitionWorker()) << tr("PushVariableRequest ") << vIdentifier
75 << varRequestId;
74 qCDebug(LOG_VariableAcquisitionWorker()) << tr("Add acqRequest ") << acqRequest.m_AcqIdentifier;
76 75 acqRequest.m_VarRequestId = varRequestId;
77 76 acqRequest.m_vIdentifier = vIdentifier;
78 77 acqRequest.m_DataProviderParameters = parameters;
79 78 acqRequest.m_RangeRequested = rangeRequested;
80 79 acqRequest.m_CacheRangeRequested = cacheRangeRequested;
81 80 acqRequest.m_Size = parameters.m_Times.size();
82 81 acqRequest.m_Provider = provider;
83 82
84 83
85 84 // Register request
86 85 impl->lockWrite();
87 86 impl->m_AcqIdentifierToAcqRequestMap.insert(
88 87 std::make_pair(acqRequest.m_AcqIdentifier, acqRequest));
89 88
90 89 auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
91 90 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
92 91 // A current request already exists, we can replace the next one
93 92 auto oldAcqId = it->second.second;
94 93 auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(oldAcqId);
95 94 if (acqIdentifierToAcqRequestMapIt != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
96 95 auto oldAcqRequest = acqIdentifierToAcqRequestMapIt->second;
97 96 varRequestIdCanceled = oldAcqRequest.m_VarRequestId;
98 97 }
99 98
100 99 it->second.second = acqRequest.m_AcqIdentifier;
101 100 impl->unlock();
102 101
103 102 // remove old acqIdentifier from the worker
104 103 impl->cancelVarRequest(varRequestIdCanceled);
105 104 // impl->m_AcqIdentifierToAcqRequestMap.erase(oldAcqId);
106 105 }
107 106 else {
108 107 // First request for the variable, it must be stored and executed
109 108 impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.insert(
110 109 std::make_pair(vIdentifier, std::make_pair(acqRequest.m_AcqIdentifier, QUuid())));
111 110 impl->unlock();
112 111
113 112 QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
114 113 Q_ARG(QUuid, acqRequest.m_AcqIdentifier));
115 114 }
116 115
117 116 return varRequestIdCanceled;
118 117 }
119 118
120 119 void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier)
121 120 {
122 121 impl->lockRead();
123 122
124 123 auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
125 124 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
126 125 auto currentAcqId = it->second.first;
127 126
128 127 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(currentAcqId);
129 128 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
130 129 auto request = it->second;
131 130 impl->unlock();
132 131
133 132 // Remove the current request from the worker
134 133 impl->updateToNextRequest(vIdentifier);
135 134
136 135 // notify the request aborting to the provider
137 136 request.m_Provider->requestDataAborting(currentAcqId);
138 137 }
139 138 else {
140 139 impl->unlock();
141 140 qCWarning(LOG_VariableAcquisitionWorker())
142 141 << tr("Impossible to abort an unknown acquisition request") << currentAcqId;
143 142 }
144 143 }
145 144 else {
146 145 impl->unlock();
147 146 }
148 147 }
149 148
150 149 void VariableAcquisitionWorker::onVariableRetrieveDataInProgress(QUuid acqIdentifier,
151 150 double progress)
152 151 {
153 152 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableRetrieveDataInProgress ")
153 << QThread::currentThread()->objectName()
154 154 << acqIdentifier << progress;
155 155 impl->lockRead();
156 156 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
157 157 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
158 158 auto currentPartSize = (aIdToARit->second.m_Size != 0) ? 100 / aIdToARit->second.m_Size : 0;
159 159
160 160 auto currentPartProgress
161 161 = std::isnan(progress) ? 0.0 : (progress * currentPartSize) / 100.0;
162 162 auto currentAlreadyProgress = aIdToARit->second.m_Progression * currentPartSize;
163 163
164 164 auto finalProgression = currentAlreadyProgress + currentPartProgress;
165 165 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, finalProgression);
166 166 qCDebug(LOG_VariableAcquisitionWorker())
167 167 << tr("TORM: onVariableRetrieveDataInProgress ")
168 168 << QThread::currentThread()->objectName() << aIdToARit->second.m_vIdentifier
169 169 << currentPartSize << currentAlreadyProgress << currentPartProgress << finalProgression;
170 170 if (finalProgression == 100.0) {
171 qCDebug(LOG_VariableAcquisitionWorker())
172 << tr("TORM: onVariableRetrieveDataInProgress : finished : 0.0 ");
171 173 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, 0.0);
172 174 }
173 175 }
174 176 impl->unlock();
175 177 }
176 178
177 179 void VariableAcquisitionWorker::onVariableAcquisitionFailed(QUuid acqIdentifier)
178 180 {
179 181 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
180 182 << QThread::currentThread();
181 183 impl->lockRead();
182 184 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
183 185 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
184 186 auto request = it->second;
185 187 impl->unlock();
186 188 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
187 189 << acqIdentifier << request.m_vIdentifier
188 190 << QThread::currentThread();
189 191 emit variableCanceledRequested(request.m_vIdentifier);
190 192 }
191 193 else {
192 194 impl->unlock();
193 195 // TODO log no acqIdentifier recognized
194 196 }
195 197 }
196 198
197 199 void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier,
198 200 std::shared_ptr<IDataSeries> dataSeries,
199 201 SqpRange dataRangeAcquired)
200 202 {
201 203 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableDataAcquired on range ")
202 204 << acqIdentifier << dataRangeAcquired;
203 205 impl->lockWrite();
204 206 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
205 207 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
206 208 // Store the result
207 209 auto dataPacket = AcquisitionDataPacket{};
208 210 dataPacket.m_Range = dataRangeAcquired;
209 211 dataPacket.m_DateSeries = dataSeries;
210 212
211 213 auto aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
212 214 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
213 215 // A current request result already exists, we can update it
214 216 aIdToADPVit->second.push_back(dataPacket);
215 217 }
216 218 else {
217 219 // First request result for the variable, it must be stored
218 220 impl->m_AcqIdentifierToAcqDataPacketVectorMap.insert(
219 221 std::make_pair(acqIdentifier, QVector<AcquisitionDataPacket>() << dataPacket));
220 222 }
221 223
222 224
223 225 // Decrement the counter of the request
224 226 auto &acqRequest = aIdToARit->second;
225 227 acqRequest.m_Progression = acqRequest.m_Progression + 1;
226 228
227 229 // if the counter is 0, we can return data then run the next request if it exists and
228 230 // removed the finished request
229 231 if (acqRequest.m_Size == acqRequest.m_Progression) {
230 232 auto varId = acqRequest.m_vIdentifier;
231 233 auto rangeRequested = acqRequest.m_RangeRequested;
232 234 auto cacheRangeRequested = acqRequest.m_CacheRangeRequested;
233 235 // Return the data
234 236 aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
235 237 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
236 238 emit dataProvided(varId, rangeRequested, cacheRangeRequested, aIdToADPVit->second);
237 239 }
238 240 impl->unlock();
239 241
240 242 // Update to the next request
241 243 impl->updateToNextRequest(acqRequest.m_vIdentifier);
242 244 }
243 245 else {
244 246 impl->unlock();
245 247 }
246 248 }
247 249 else {
248 250 impl->unlock();
249 251 qCWarning(LOG_VariableAcquisitionWorker())
250 252 << tr("Impossible to retrieve AcquisitionRequest for the incoming data.");
251 253 }
252 254 }
253 255
254 256 void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier)
255 257 {
256 258 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onExecuteRequest") << QThread::currentThread();
257 259 impl->lockRead();
258 260 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
259 261 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
260 262 auto request = it->second;
261 263 impl->unlock();
262 264 emit variableRequestInProgress(request.m_vIdentifier, 0.1);
265 qCDebug(LOG_VariableAcquisitionWorker()) << tr("Start request 10%") << acqIdentifier
266 << QThread::currentThread();
263 267 request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters);
264 268 }
265 269 else {
266 270 impl->unlock();
267 271 // TODO log no acqIdentifier recognized
268 272 }
269 273 }
270 274
271 275 void VariableAcquisitionWorker::initialize()
272 276 {
273 277 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init")
274 278 << QThread::currentThread();
275 279 impl->m_WorkingMutex.lock();
276 280 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init END");
277 281 }
278 282
279 283 void VariableAcquisitionWorker::finalize()
280 284 {
281 285 impl->m_WorkingMutex.unlock();
282 286 }
283 287
284 288 void VariableAcquisitionWorker::waitForFinish()
285 289 {
286 290 QMutexLocker locker{&impl->m_WorkingMutex};
287 291 }
288 292
289 293 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariableRequest(
290 294 QUuid vIdentifier)
291 295 {
292 296 lockWrite();
293 297 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
294 298
295 299 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
296 300 // A current request already exists, we can replace the next one
297 301
302 qCDebug(LOG_VariableAcquisitionWorker())
303 << "VariableAcquisitionWorkerPrivate::removeVariableRequest "
304 << QThread::currentThread()->objectName() << it->second.first;
298 305 m_AcqIdentifierToAcqRequestMap.erase(it->second.first);
299 306 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.first);
300 307
308 qCDebug(LOG_VariableAcquisitionWorker())
309 << "VariableAcquisitionWorkerPrivate::removeVariableRequest " << it->second.second;
301 310 m_AcqIdentifierToAcqRequestMap.erase(it->second.second);
302 311 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.second);
303 312 }
313
314 // stop any progression
315 emit q->variableRequestInProgress(vIdentifier, 0.0);
316
304 317 m_VIdentifierToCurrrentAcqIdNextIdPairMap.erase(vIdentifier);
305 318 unlock();
306 319 }
307 320
308 321 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::updateToNextRequest(
309 322 QUuid vIdentifier)
310 323 {
311 324 lockRead();
312 325 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
313 326 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
314 327 if (it->second.second.isNull()) {
315 328 unlock();
316 329 // There is no next request, we can remove the variable request
317 330 removeVariableRequest(vIdentifier);
318 331 }
319 332 else {
320 333 auto acqIdentifierToRemove = it->second.first;
321 334 // Move the next request to the current request
322 335 auto nextRequestId = it->second.second;
323 336 it->second.first = nextRequestId;
324 337 it->second.second = QUuid();
325 338 unlock();
326 339 // Remove AcquisitionRequest and results;
327 340 lockWrite();
341 qCDebug(LOG_VariableAcquisitionWorker())
342 << "VariableAcquisitionWorkerPrivate::updateToNextRequest removed: "
343 << acqIdentifierToRemove;
328 344 m_AcqIdentifierToAcqRequestMap.erase(acqIdentifierToRemove);
329 345 m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqIdentifierToRemove);
330 346 unlock();
331 347 // Execute the current request
332 348 QMetaObject::invokeMethod(q, "onExecuteRequest", Qt::QueuedConnection,
333 349 Q_ARG(QUuid, nextRequestId));
334 350 }
335 351 }
336 352 else {
337 353 unlock();
338 354 qCCritical(LOG_VariableAcquisitionWorker())
339 355 << tr("Impossible to execute the acquisition on an unfound variable ");
340 356 }
341 357 }
342 358
343 359 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::cancelVarRequest(
344 360 QUuid varRequestId)
345 361 {
346 362 qCDebug(LOG_VariableAcquisitionWorker())
347 363 << "VariableAcquisitionWorkerPrivate::cancelVarRequest 0";
348 364 lockRead();
349 365 // get all AcqIdentifier in link with varRequestId
350 366 QVector<QUuid> acqIdsToRm;
351 367 auto cend = m_AcqIdentifierToAcqRequestMap.cend();
352 368 for (auto it = m_AcqIdentifierToAcqRequestMap.cbegin(); it != cend; ++it) {
353 369 if (it->second.m_VarRequestId == varRequestId) {
354 370 acqIdsToRm << it->first;
355 371 }
356 372 }
357 373 unlock();
358 374 // run aborting or removing of acqIdsToRm
359 375
360 376 for (auto acqId : acqIdsToRm) {
361 377 removeAcqRequest(acqId);
362 378 }
363 379 qCDebug(LOG_VariableAcquisitionWorker())
364 380 << "VariableAcquisitionWorkerPrivate::cancelVarRequest end";
365 381 }
366 382
367 383 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeAcqRequest(
368 384 QUuid acqRequestId)
369 385 {
370 386 qCDebug(LOG_VariableAcquisitionWorker())
371 387 << "VariableAcquisitionWorkerPrivate::removeAcqRequest";
372 388 QUuid vIdentifier;
373 389 std::shared_ptr<IDataProvider> provider;
374 390 lockRead();
375 391 auto acqIt = m_AcqIdentifierToAcqRequestMap.find(acqRequestId);
376 392 if (acqIt != m_AcqIdentifierToAcqRequestMap.cend()) {
377 393 vIdentifier = acqIt->second.m_vIdentifier;
378 394 provider = acqIt->second.m_Provider;
379 395
380 396 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
381 397 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
382 398 if (it->second.first == acqRequestId) {
383 399 // acqRequest is currently running -> let's aborting it
384 400 unlock();
385 401
386 402 // Remove the current request from the worker
387 403 updateToNextRequest(vIdentifier);
388 404
389 405 // notify the request aborting to the provider
390 406 provider->requestDataAborting(acqRequestId);
391 407 }
392 408 else if (it->second.second == acqRequestId) {
393 409 it->second.second = QUuid();
394 410 unlock();
395 411 }
396 412 else {
397 413 unlock();
398 414 }
399 415 }
400 416 else {
401 417 unlock();
402 418 }
403 419 }
404 420 else {
405 421 unlock();
406 422 }
407 423
408 424 lockWrite();
409 425
426 qCDebug(LOG_VariableAcquisitionWorker())
427 << "VariableAcquisitionWorkerPrivate::updateToNextRequest removeAcqRequest: "
428 << acqRequestId;
410 429 m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqRequestId);
411 430 m_AcqIdentifierToAcqRequestMap.erase(acqRequestId);
412 431
413 432 unlock();
414 433 qCDebug(LOG_VariableAcquisitionWorker())
415 434 << "VariableAcquisitionWorkerPrivate::removeAcqRequest END";
416 435 }
General Comments 0
You need to be logged in to leave comments. Login now