##// END OF EJS Templates
Remove old log
perrinel -
r822:b0ea6b620c8b
parent child
Show More
@@ -1,436 +1,416
1 1 #include "Variable/VariableAcquisitionWorker.h"
2 2
3 3 #include "Variable/Variable.h"
4 4
5 5 #include <Data/AcquisitionRequest.h>
6 6 #include <Data/SqpRange.h>
7 7
8 8 #include <unordered_map>
9 9 #include <utility>
10 10
11 11 #include <QMutex>
12 12 #include <QReadWriteLock>
13 13 #include <QThread>
14 14
15 15 #include <cmath>
16 16
17 17 Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker")
18 18
19 19 struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate {
20 20
21 21 explicit VariableAcquisitionWorkerPrivate(VariableAcquisitionWorker *parent)
22 22 : m_Lock{QReadWriteLock::Recursive}, q{parent}
23 23 {
24 24 }
25 25
26 26 void lockRead() { m_Lock.lockForRead(); }
27 27 void lockWrite() { m_Lock.lockForWrite(); }
28 28 void unlock() { m_Lock.unlock(); }
29 29
30 30 void removeVariableRequest(QUuid vIdentifier);
31 31
32 32 /// Remove the current request and execute the next one if exist
33 33 void updateToNextRequest(QUuid vIdentifier);
34 34
35 35 /// Remove and/or abort all AcqRequest in link with varRequestId
36 36 void cancelVarRequest(QUuid varRequestId);
37 37 void removeAcqRequest(QUuid acqRequestId);
38 38
39 39 QMutex m_WorkingMutex;
40 40 QReadWriteLock m_Lock;
41 41
42 42 std::map<QUuid, QVector<AcquisitionDataPacket> > m_AcqIdentifierToAcqDataPacketVectorMap;
43 43 std::map<QUuid, AcquisitionRequest> m_AcqIdentifierToAcqRequestMap;
44 44 std::map<QUuid, std::pair<QUuid, QUuid> > m_VIdentifierToCurrrentAcqIdNextIdPairMap;
45 45 VariableAcquisitionWorker *q;
46 46 };
47 47
48 48
49 49 VariableAcquisitionWorker::VariableAcquisitionWorker(QObject *parent)
50 50 : QObject{parent}, impl{spimpl::make_unique_impl<VariableAcquisitionWorkerPrivate>(this)}
51 51 {
52 52 }
53 53
54 54 VariableAcquisitionWorker::~VariableAcquisitionWorker()
55 55 {
56 56 qCInfo(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker destruction")
57 57 << QThread::currentThread();
58 58 this->waitForFinish();
59 59 }
60 60
61 61
62 62 QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid vIdentifier,
63 63 SqpRange rangeRequested,
64 64 SqpRange cacheRangeRequested,
65 65 DataProviderParameters parameters,
66 66 std::shared_ptr<IDataProvider> provider)
67 67 {
68 68 qCDebug(LOG_VariableAcquisitionWorker())
69 69 << tr("TORM VariableAcquisitionWorker::pushVariableRequest ") << cacheRangeRequested;
70 70 auto varRequestIdCanceled = QUuid();
71 71
72 72 // Request creation
73 73 auto acqRequest = AcquisitionRequest{};
74 74 qCDebug(LOG_VariableAcquisitionWorker()) << tr("PushVariableRequest ") << vIdentifier
75 75 << varRequestId;
76 76 acqRequest.m_VarRequestId = varRequestId;
77 77 acqRequest.m_vIdentifier = vIdentifier;
78 78 acqRequest.m_DataProviderParameters = parameters;
79 79 acqRequest.m_RangeRequested = rangeRequested;
80 80 acqRequest.m_CacheRangeRequested = cacheRangeRequested;
81 81 acqRequest.m_Size = parameters.m_Times.size();
82 82 acqRequest.m_Provider = provider;
83 83
84 84
85 85 // Register request
86 86 impl->lockWrite();
87 87 impl->m_AcqIdentifierToAcqRequestMap.insert(
88 88 std::make_pair(acqRequest.m_AcqIdentifier, acqRequest));
89 89
90 90 auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
91 91 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
92 92 // A current request already exists, we can replace the next one
93 93 auto oldAcqId = it->second.second;
94 94 auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(oldAcqId);
95 95 if (acqIdentifierToAcqRequestMapIt != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
96 96 auto oldAcqRequest = acqIdentifierToAcqRequestMapIt->second;
97 97 varRequestIdCanceled = oldAcqRequest.m_VarRequestId;
98 98 }
99 99
100 100 it->second.second = acqRequest.m_AcqIdentifier;
101 101 impl->unlock();
102 102
103 103 // remove old acqIdentifier from the worker
104 104 impl->cancelVarRequest(varRequestIdCanceled);
105 105 // impl->m_AcqIdentifierToAcqRequestMap.erase(oldAcqId);
106 106 }
107 107 else {
108 108 // First request for the variable, it must be stored and executed
109 109 impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.insert(
110 110 std::make_pair(vIdentifier, std::make_pair(acqRequest.m_AcqIdentifier, QUuid())));
111 111 impl->unlock();
112 112
113 113 QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
114 114 Q_ARG(QUuid, acqRequest.m_AcqIdentifier));
115 115 }
116 116
117 117 return varRequestIdCanceled;
118 118 }
119 119
120 120 void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier)
121 121 {
122 122 impl->lockRead();
123 123
124 124 auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
125 125 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
126 126 auto currentAcqId = it->second.first;
127 127
128 128 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(currentAcqId);
129 129 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
130 130 auto request = it->second;
131 131 impl->unlock();
132 132
133 133 // Remove the current request from the worker
134 134 impl->updateToNextRequest(vIdentifier);
135 135
136 136 // notify the request aborting to the provider
137 137 request.m_Provider->requestDataAborting(currentAcqId);
138 138 }
139 139 else {
140 140 impl->unlock();
141 141 qCWarning(LOG_VariableAcquisitionWorker())
142 142 << tr("Impossible to abort an unknown acquisition request") << currentAcqId;
143 143 }
144 144 }
145 145 else {
146 146 impl->unlock();
147 147 }
148 148 }
149 149
150 150 void VariableAcquisitionWorker::onVariableRetrieveDataInProgress(QUuid acqIdentifier,
151 151 double progress)
152 152 {
153 153 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableRetrieveDataInProgress ")
154 154 << acqIdentifier << progress;
155 155 impl->lockRead();
156 156 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
157 157 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
158 158 auto currentPartSize = (aIdToARit->second.m_Size != 0) ? 100 / aIdToARit->second.m_Size : 0;
159 159
160 160 auto currentPartProgress
161 161 = std::isnan(progress) ? 0.0 : (progress * currentPartSize) / 100.0;
162 162 auto currentAlreadyProgress = aIdToARit->second.m_Progression * currentPartSize;
163 163
164 164 auto finalProgression = currentAlreadyProgress + currentPartProgress;
165 165 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, finalProgression);
166 166 qCDebug(LOG_VariableAcquisitionWorker())
167 167 << tr("TORM: onVariableRetrieveDataInProgress ")
168 168 << QThread::currentThread()->objectName() << aIdToARit->second.m_vIdentifier
169 169 << currentPartSize << currentAlreadyProgress << currentPartProgress << finalProgression;
170 170 if (finalProgression == 100.0) {
171 171 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, 0.0);
172 172 }
173 173 }
174 174 impl->unlock();
175 175 }
176 176
177 177 void VariableAcquisitionWorker::onVariableAcquisitionFailed(QUuid acqIdentifier)
178 178 {
179 179 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
180 180 << QThread::currentThread();
181 181 impl->lockRead();
182 182 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
183 183 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
184 184 auto request = it->second;
185 185 impl->unlock();
186 186 qCInfo(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
187 187 << acqIdentifier << request.m_vIdentifier
188 188 << QThread::currentThread();
189 189 emit variableCanceledRequested(request.m_vIdentifier);
190 190 }
191 191 else {
192 192 impl->unlock();
193 193 // TODO log no acqIdentifier recognized
194 194 }
195 195 }
196 196
197 197 void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier,
198 198 std::shared_ptr<IDataSeries> dataSeries,
199 199 SqpRange dataRangeAcquired)
200 200 {
201 201 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableDataAcquired on range ")
202 202 << acqIdentifier << dataRangeAcquired;
203 203 impl->lockWrite();
204 204 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
205 205 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
206 206 // Store the result
207 207 auto dataPacket = AcquisitionDataPacket{};
208 208 dataPacket.m_Range = dataRangeAcquired;
209 209 dataPacket.m_DateSeries = dataSeries;
210 210
211 211 auto aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
212 212 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
213 213 // A current request result already exists, we can update it
214 214 aIdToADPVit->second.push_back(dataPacket);
215 215 }
216 216 else {
217 217 // First request result for the variable, it must be stored
218 218 impl->m_AcqIdentifierToAcqDataPacketVectorMap.insert(
219 219 std::make_pair(acqIdentifier, QVector<AcquisitionDataPacket>() << dataPacket));
220 220 }
221 221
222 222
223 223 // Decrement the counter of the request
224 224 auto &acqRequest = aIdToARit->second;
225 225 acqRequest.m_Progression = acqRequest.m_Progression + 1;
226 226
227 227 // if the counter is 0, we can return data then run the next request if it exists and
228 228 // removed the finished request
229 229 if (acqRequest.m_Size == acqRequest.m_Progression) {
230 230 auto varId = acqRequest.m_vIdentifier;
231 231 auto rangeRequested = acqRequest.m_RangeRequested;
232 232 auto cacheRangeRequested = acqRequest.m_CacheRangeRequested;
233 233 // Return the data
234 234 aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
235 235 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
236 236 emit dataProvided(varId, rangeRequested, cacheRangeRequested, aIdToADPVit->second);
237 237 }
238 238 impl->unlock();
239 239
240 240 // Update to the next request
241 241 impl->updateToNextRequest(acqRequest.m_vIdentifier);
242 242 }
243 243 else {
244 244 impl->unlock();
245 245 }
246 246 }
247 247 else {
248 248 impl->unlock();
249 249 qCWarning(LOG_VariableAcquisitionWorker())
250 250 << tr("Impossible to retrieve AcquisitionRequest for the incoming data.");
251 251 }
252 252 }
253 253
254 254 void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier)
255 255 {
256 256 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onExecuteRequest") << QThread::currentThread();
257 257 impl->lockRead();
258 258 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
259 259 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
260 260 auto request = it->second;
261 261 impl->unlock();
262 262 emit variableRequestInProgress(request.m_vIdentifier, 0.1);
263 263 request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters);
264 264 }
265 265 else {
266 266 impl->unlock();
267 267 // TODO log no acqIdentifier recognized
268 268 }
269 269 }
270 270
271 271 void VariableAcquisitionWorker::initialize()
272 272 {
273 273 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init")
274 274 << QThread::currentThread();
275 275 impl->m_WorkingMutex.lock();
276 276 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init END");
277 277 }
278 278
279 279 void VariableAcquisitionWorker::finalize()
280 280 {
281 281 impl->m_WorkingMutex.unlock();
282 282 }
283 283
284 284 void VariableAcquisitionWorker::waitForFinish()
285 285 {
286 286 QMutexLocker locker{&impl->m_WorkingMutex};
287 287 }
288 288
289 289 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariableRequest(
290 290 QUuid vIdentifier)
291 291 {
292 292 lockWrite();
293 293 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
294 294
295 295 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
296 296 // A current request already exists, we can replace the next one
297 297
298 298 m_AcqIdentifierToAcqRequestMap.erase(it->second.first);
299 299 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.first);
300 300
301 301 m_AcqIdentifierToAcqRequestMap.erase(it->second.second);
302 302 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.second);
303 303 }
304 304 m_VIdentifierToCurrrentAcqIdNextIdPairMap.erase(vIdentifier);
305 305 unlock();
306 306 }
307 307
308 308 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::updateToNextRequest(
309 309 QUuid vIdentifier)
310 310 {
311 311 lockRead();
312 312 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
313 313 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
314 314 if (it->second.second.isNull()) {
315 315 unlock();
316 316 // There is no next request, we can remove the variable request
317 317 removeVariableRequest(vIdentifier);
318 318 }
319 319 else {
320 320 auto acqIdentifierToRemove = it->second.first;
321 321 // Move the next request to the current request
322 322 auto nextRequestId = it->second.second;
323 323 it->second.first = nextRequestId;
324 324 it->second.second = QUuid();
325 325 unlock();
326 326 // Remove AcquisitionRequest and results;
327 327 lockWrite();
328 328 m_AcqIdentifierToAcqRequestMap.erase(acqIdentifierToRemove);
329 329 m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqIdentifierToRemove);
330 330 unlock();
331 331 // Execute the current request
332 332 QMetaObject::invokeMethod(q, "onExecuteRequest", Qt::QueuedConnection,
333 333 Q_ARG(QUuid, nextRequestId));
334 334 }
335 335 }
336 336 else {
337 337 unlock();
338 338 qCCritical(LOG_VariableAcquisitionWorker())
339 339 << tr("Impossible to execute the acquisition on an unfound variable ");
340 340 }
341 341 }
342 342
343 343 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::cancelVarRequest(
344 344 QUuid varRequestId)
345 345 {
346 qCCritical(LOG_VariableAcquisitionWorker())
346 qCDebug(LOG_VariableAcquisitionWorker())
347 347 << "VariableAcquisitionWorkerPrivate::cancelVarRequest 0";
348 348 lockRead();
349 349 // get all AcqIdentifier in link with varRequestId
350 350 QVector<QUuid> acqIdsToRm;
351 351 auto cend = m_AcqIdentifierToAcqRequestMap.cend();
352 352 for (auto it = m_AcqIdentifierToAcqRequestMap.cbegin(); it != cend; ++it) {
353 353 if (it->second.m_VarRequestId == varRequestId) {
354 354 acqIdsToRm << it->first;
355 355 }
356 356 }
357 357 unlock();
358 358 // run aborting or removing of acqIdsToRm
359 359
360 360 for (auto acqId : acqIdsToRm) {
361 361 removeAcqRequest(acqId);
362 362 }
363 qCCritical(LOG_VariableAcquisitionWorker())
363 qCDebug(LOG_VariableAcquisitionWorker())
364 364 << "VariableAcquisitionWorkerPrivate::cancelVarRequest end";
365 365 }
366 366
367 367 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeAcqRequest(
368 368 QUuid acqRequestId)
369 369 {
370 370 qCDebug(LOG_VariableAcquisitionWorker())
371 << "VariableAcquisitionWorkerPrivate::removeAcqRequest 0";
371 << "VariableAcquisitionWorkerPrivate::removeAcqRequest";
372 372 QUuid vIdentifier;
373 373 std::shared_ptr<IDataProvider> provider;
374 374 lockRead();
375 375 auto acqIt = m_AcqIdentifierToAcqRequestMap.find(acqRequestId);
376 qCDebug(LOG_VariableAcquisitionWorker())
377 << "VariableAcquisitionWorkerPrivate::removeAcqRequest 1";
378 376 if (acqIt != m_AcqIdentifierToAcqRequestMap.cend()) {
379 377 vIdentifier = acqIt->second.m_vIdentifier;
380 378 provider = acqIt->second.m_Provider;
381 379
382 qCDebug(LOG_VariableAcquisitionWorker())
383 << "VariableAcquisitionWorkerPrivate::removeAcqRequest 2";
384 380 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
385 381 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
386 qCDebug(LOG_VariableAcquisitionWorker())
387 << "VariableAcquisitionWorkerPrivate::removeAcqRequest 3";
388 382 if (it->second.first == acqRequestId) {
389 383 // acqRequest is currently running -> let's aborting it
390 qCDebug(LOG_VariableAcquisitionWorker())
391 << "VariableAcquisitionWorkerPrivate::removeAcqRequest 4";
392 384 unlock();
393 385
394 386 // Remove the current request from the worker
395 387 updateToNextRequest(vIdentifier);
396 388
397 389 // notify the request aborting to the provider
398 390 provider->requestDataAborting(acqRequestId);
399 qCDebug(LOG_VariableAcquisitionWorker())
400 << "VariableAcquisitionWorkerPrivate::removeAcqRequest 5";
401 391 }
402 392 else if (it->second.second == acqRequestId) {
403 393 it->second.second = QUuid();
404 qCDebug(LOG_VariableAcquisitionWorker())
405 << "VariableAcquisitionWorkerPrivate::removeAcqRequest 6";
406 394 unlock();
407 395 }
408 396 else {
409 qCDebug(LOG_VariableAcquisitionWorker())
410 << "VariableAcquisitionWorkerPrivate::removeAcqRequest 7";
411 397 unlock();
412 398 }
413 399 }
414 400 else {
415 qCDebug(LOG_VariableAcquisitionWorker())
416 << "VariableAcquisitionWorkerPrivate::removeAcqRequest 8";
417 401 unlock();
418 402 }
419 403 }
420 404 else {
421 qCDebug(LOG_VariableAcquisitionWorker())
422 << "VariableAcquisitionWorkerPrivate::removeAcqRequest 9";
423 405 unlock();
424 406 }
425 407
426 qCDebug(LOG_VariableAcquisitionWorker())
427 << "VariableAcquisitionWorkerPrivate::removeAcqRequest 10";
428 408 lockWrite();
429 409
430 410 m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqRequestId);
431 411 m_AcqIdentifierToAcqRequestMap.erase(acqRequestId);
432 412
433 413 unlock();
434 414 qCDebug(LOG_VariableAcquisitionWorker())
435 << "VariableAcquisitionWorkerPrivate::removeAcqRequest 11";
415 << "VariableAcquisitionWorkerPrivate::removeAcqRequest END";
436 416 }
General Comments 3
Under Review
author

Auto status change to "Under Review"

Approved
author

Status change > Approved

You need to be logged in to leave comments. Login now