##// END OF EJS Templates
Remove unused pending request of worker since it's already in the VC....
perrinel -
r1387:3f0567bfecb5 HEAD
parent child
Show More
@@ -1,430 +1,386
1 1 #include "Variable/VariableAcquisitionWorker.h"
2 2
3 3 #include "Variable/Variable.h"
4 4
5 5 #include <Data/AcquisitionRequest.h>
6 6 #include <Data/SqpRange.h>
7 7
8 8 #include <unordered_map>
9 9 #include <utility>
10 10
11 11 #include <QMutex>
12 12 #include <QReadWriteLock>
13 13 #include <QThread>
14 14
15 15 #include <cmath>
16 16
17 17 Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker")
18 18
19 19 struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate {
20 20
21 21 explicit VariableAcquisitionWorkerPrivate(VariableAcquisitionWorker *parent)
22 22 : m_Lock{QReadWriteLock::Recursive}, q{parent}
23 23 {
24 24 }
25 25
26 26 void lockRead() { m_Lock.lockForRead(); }
27 27 void lockWrite() { m_Lock.lockForWrite(); }
28 28 void unlock() { m_Lock.unlock(); }
29 29
30 30 void removeVariableRequest(QUuid vIdentifier);
31 31
32 /// Remove the current request and execute the next one if exist
33 void updateToNextRequest(QUuid vIdentifier);
34
35 32 /// Remove and/or abort all AcqRequest in link with varRequestId
36 33 void cancelVarRequest(QUuid varRequestId);
37 34 void removeAcqRequest(QUuid acqRequestId);
38 35
39 36 QMutex m_WorkingMutex;
40 37 QReadWriteLock m_Lock;
41 38
42 39 std::map<QUuid, QVector<AcquisitionDataPacket> > m_AcqIdentifierToAcqDataPacketVectorMap;
43 40 std::map<QUuid, AcquisitionRequest> m_AcqIdentifierToAcqRequestMap;
44 std::map<QUuid, std::pair<QUuid, QUuid> > m_VIdentifierToCurrrentAcqIdNextIdPairMap;
41 std::map<QUuid, QUuid> m_VIdentifierToCurrrentAcqIdMap;
45 42 VariableAcquisitionWorker *q;
46 43 };
47 44
48 45
49 46 VariableAcquisitionWorker::VariableAcquisitionWorker(QObject *parent)
50 47 : QObject{parent}, impl{spimpl::make_unique_impl<VariableAcquisitionWorkerPrivate>(this)}
51 48 {
52 49 }
53 50
54 51 VariableAcquisitionWorker::~VariableAcquisitionWorker()
55 52 {
56 53 qCInfo(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker destruction")
57 54 << QThread::currentThread();
58 55 this->waitForFinish();
59 56 }
60 57
61 58
62 59 QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid vIdentifier,
63 60 DataProviderParameters parameters,
64 61 std::shared_ptr<IDataProvider> provider)
65 62 {
66 63 qCDebug(LOG_VariableAcquisitionWorker())
67 64 << tr("TORM VariableAcquisitionWorker::pushVariableRequest varRequestId: ") << varRequestId
68 65 << "vId: " << vIdentifier;
69 66 auto varRequestIdCanceled = QUuid();
70 67
71 68 // Request creation
72 69 auto acqRequest = AcquisitionRequest{};
73 qCDebug(LOG_VariableAcquisitionWorker()) << tr("Add acqRequest ") << acqRequest.m_AcqIdentifier;
74 70 acqRequest.m_VarRequestId = varRequestId;
75 71 acqRequest.m_vIdentifier = vIdentifier;
76 72 acqRequest.m_DataProviderParameters = parameters;
77 73 acqRequest.m_Size = parameters.m_Times.size();
78 74 acqRequest.m_Provider = provider;
75 qCInfo(LOG_VariableAcquisitionWorker()) << tr("Add acqRequest ") << acqRequest.m_AcqIdentifier
76 << acqRequest.m_Size;
79 77
80 78
81 79 // Register request
82 80 impl->lockWrite();
83 81 impl->m_AcqIdentifierToAcqRequestMap.insert(
84 82 std::make_pair(acqRequest.m_AcqIdentifier, acqRequest));
85 83
86 auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
87 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
88 // A current request already exists, we can replace the next one
89 auto oldAcqId = it->second.second;
84 auto it = impl->m_VIdentifierToCurrrentAcqIdMap.find(vIdentifier);
85 if (it != impl->m_VIdentifierToCurrrentAcqIdMap.cend()) {
86 // A current request already exists, we can cancel it
87 // remove old acqIdentifier from the worker
88 auto oldAcqId = it->second;
90 89 auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(oldAcqId);
91 90 if (acqIdentifierToAcqRequestMapIt != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
92 91 auto oldAcqRequest = acqIdentifierToAcqRequestMapIt->second;
93 92 varRequestIdCanceled = oldAcqRequest.m_VarRequestId;
94 93 }
95
96 it->second.second = acqRequest.m_AcqIdentifier;
97 94 impl->unlock();
98
99 // remove old acqIdentifier from the worker
100 95 impl->cancelVarRequest(varRequestIdCanceled);
101 // impl->m_AcqIdentifierToAcqRequestMap.erase(oldAcqId);
102 96 }
103 97 else {
104 // First request for the variable, it must be stored and executed
105 impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.insert(
106 std::make_pair(vIdentifier, std::make_pair(acqRequest.m_AcqIdentifier, QUuid())));
107 98 impl->unlock();
108
109 QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
110 Q_ARG(QUuid, acqRequest.m_AcqIdentifier));
111 99 }
112 100
101 // Request for the variable, it must be stored and executed
102 impl->lockWrite();
103 impl->m_VIdentifierToCurrrentAcqIdMap.insert(
104 std::make_pair(vIdentifier, acqRequest.m_AcqIdentifier));
105 impl->unlock();
106
107 QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
108 Q_ARG(QUuid, acqRequest.m_AcqIdentifier));
109
113 110 return varRequestIdCanceled;
114 111 }
115 112
116 113 void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier)
117 114 {
118 115 impl->lockRead();
119 116
120 auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
121 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
122 auto currentAcqId = it->second.first;
117 auto it = impl->m_VIdentifierToCurrrentAcqIdMap.find(vIdentifier);
118 if (it != impl->m_VIdentifierToCurrrentAcqIdMap.cend()) {
119 auto currentAcqId = it->second;
123 120
124 121 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(currentAcqId);
125 122 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
126 123 auto request = it->second;
127 124 impl->unlock();
128 125
129 // Remove the current request from the worker
130 impl->updateToNextRequest(vIdentifier);
131
132 126 // notify the request aborting to the provider
133 127 request.m_Provider->requestDataAborting(currentAcqId);
134 128 }
135 129 else {
136 130 impl->unlock();
137 131 qCWarning(LOG_VariableAcquisitionWorker())
138 132 << tr("Impossible to abort an unknown acquisition request") << currentAcqId;
139 133 }
140 134 }
141 135 else {
142 136 impl->unlock();
143 137 }
144 138 }
145 139
146 140 void VariableAcquisitionWorker::onVariableRetrieveDataInProgress(QUuid acqIdentifier,
147 141 double progress)
148 142 {
149 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableRetrieveDataInProgress ")
150 << QThread::currentThread()->objectName()
151 << acqIdentifier << progress;
143 qCInfo(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableRetrieveDataInProgress ")
144 << QThread::currentThread()->objectName()
145 << acqIdentifier << progress;
152 146 impl->lockRead();
153 147 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
154 148 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
155 auto currentPartSize = (aIdToARit->second.m_Size != 0) ? 100 / aIdToARit->second.m_Size : 0;
149 auto progressPartSize
150 = (aIdToARit->second.m_Size != 0) ? 100 / aIdToARit->second.m_Size : 0;
156 151
157 152 auto currentPartProgress
158 = std::isnan(progress) ? 0.0 : (progress * currentPartSize) / 100.0;
159 auto currentAlreadyProgress = aIdToARit->second.m_Progression * currentPartSize;
153 = std::isnan(progress) ? 0.0 : (progress * progressPartSize) / 100.0;
154
155 // We can only give an approximation of the currentProgression since its upgrade is async.
156 qCInfo(LOG_VariableAcquisitionWorker())
157 << tr("Progression: ") << aIdToARit->second.m_Progression << aIdToARit->second.m_Size;
158 auto currentProgression = aIdToARit->second.m_Progression;
159 if (currentProgression == aIdToARit->second.m_Size) {
160 currentProgression = aIdToARit->second.m_Size - 1;
161 }
162
163 auto currentAlreadyProgress = progressPartSize * currentProgression;
164
160 165
161 166 auto finalProgression = currentAlreadyProgress + currentPartProgress;
162 167 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, finalProgression);
163 qCDebug(LOG_VariableAcquisitionWorker())
168 qCInfo(LOG_VariableAcquisitionWorker())
164 169 << tr("TORM: onVariableRetrieveDataInProgress ")
165 170 << QThread::currentThread()->objectName() << aIdToARit->second.m_vIdentifier
166 << currentPartSize << currentAlreadyProgress << currentPartProgress << finalProgression;
171 << progressPartSize << currentAlreadyProgress << currentPartProgress
172 << finalProgression;
173
167 174 if (finalProgression == 100.0) {
168 qCDebug(LOG_VariableAcquisitionWorker())
175 qCInfo(LOG_VariableAcquisitionWorker())
169 176 << tr("TORM: onVariableRetrieveDataInProgress : finished : 0.0 ");
170 177 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, 0.0);
171 178 }
172 179 }
173 180 impl->unlock();
174 181 }
175 182
176 183 void VariableAcquisitionWorker::onVariableAcquisitionFailed(QUuid acqIdentifier)
177 184 {
178 185 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
179 186 << QThread::currentThread();
180 187 impl->lockRead();
181 188 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
182 189 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
183 190 auto request = it->second;
184 191 impl->unlock();
185 192 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
186 193 << acqIdentifier << request.m_vIdentifier
187 194 << QThread::currentThread();
188 195 emit variableCanceledRequested(request.m_vIdentifier);
189 196 }
190 197 else {
191 198 impl->unlock();
192 199 // TODO log no acqIdentifier recognized
193 200 }
194 201 }
195 202
196 203 void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier,
197 204 std::shared_ptr<IDataSeries> dataSeries,
198 205 SqpRange dataRangeAcquired)
199 206 {
200 207 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableDataAcquired on range ")
201 208 << acqIdentifier << dataRangeAcquired;
202 209 impl->lockWrite();
203 210 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
204 211 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
205 212 // Store the result
206 213 auto dataPacket = AcquisitionDataPacket{};
207 214 dataPacket.m_Range = dataRangeAcquired;
208 215 dataPacket.m_DateSeries = dataSeries;
209 216
210 217 auto aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
211 218 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
212 219 // A current request result already exists, we can update it
213 220 aIdToADPVit->second.push_back(dataPacket);
214 221 }
215 222 else {
216 223 // First request result for the variable, it must be stored
217 224 impl->m_AcqIdentifierToAcqDataPacketVectorMap.insert(
218 225 std::make_pair(acqIdentifier, QVector<AcquisitionDataPacket>() << dataPacket));
219 226 }
220 227
221 228
222 229 // Decrement the counter of the request
223 230 auto &acqRequest = aIdToARit->second;
231 qCInfo(LOG_VariableAcquisitionWorker()) << tr("TORM: +1 update progresson ")
232 << acqIdentifier;
224 233 acqRequest.m_Progression = acqRequest.m_Progression + 1;
225 234
226 235 // if the counter is 0, we can return data then run the next request if it exists and
227 236 // removed the finished request
228 237 if (acqRequest.m_Size == acqRequest.m_Progression) {
229 238 auto varId = acqRequest.m_vIdentifier;
230 239 // Return the data
231 240 aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
232 241 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
233 242 emit dataProvided(varId, aIdToADPVit->second);
234 243 }
235 244 impl->unlock();
236
237 // Update to the next request
238 impl->updateToNextRequest(acqRequest.m_vIdentifier);
239 245 }
240 246 else {
241 247 impl->unlock();
242 248 }
243 249 }
244 250 else {
245 251 impl->unlock();
246 252 qCWarning(LOG_VariableAcquisitionWorker())
247 253 << tr("Impossible to retrieve AcquisitionRequest for the incoming data.");
248 254 }
249 255 }
250 256
251 257 void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier)
252 258 {
253 259 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onExecuteRequest") << QThread::currentThread();
254 260 impl->lockRead();
255 261 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
256 262 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
257 263 auto request = it->second;
258 264 impl->unlock();
259 265 emit variableRequestInProgress(request.m_vIdentifier, 0.1);
260 266 qCDebug(LOG_VariableAcquisitionWorker()) << tr("Start request 10%") << acqIdentifier
261 267 << QThread::currentThread();
262 268 request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters);
263 269 }
264 270 else {
265 271 impl->unlock();
266 272 // TODO log no acqIdentifier recognized
267 273 }
268 274 }
269 275
270 276 void VariableAcquisitionWorker::initialize()
271 277 {
272 278 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init")
273 279 << QThread::currentThread();
274 280 impl->m_WorkingMutex.lock();
275 281 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init END");
276 282 }
277 283
278 284 void VariableAcquisitionWorker::finalize()
279 285 {
280 286 impl->m_WorkingMutex.unlock();
281 287 }
282 288
283 289 void VariableAcquisitionWorker::waitForFinish()
284 290 {
285 291 QMutexLocker locker{&impl->m_WorkingMutex};
286 292 }
287 293
288 294 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariableRequest(
289 295 QUuid vIdentifier)
290 296 {
291 297 lockWrite();
292 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
298 auto it = m_VIdentifierToCurrrentAcqIdMap.find(vIdentifier);
293 299
294 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
300 if (it != m_VIdentifierToCurrrentAcqIdMap.cend()) {
295 301 // A current request already exists, we can replace the next one
296 302
297 303 qCDebug(LOG_VariableAcquisitionWorker())
298 304 << "VariableAcquisitionWorkerPrivate::removeVariableRequest "
299 << QThread::currentThread()->objectName() << it->second.first;
300 m_AcqIdentifierToAcqRequestMap.erase(it->second.first);
301 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.first);
302
303 qCDebug(LOG_VariableAcquisitionWorker())
304 << "VariableAcquisitionWorkerPrivate::removeVariableRequest " << it->second.second;
305 m_AcqIdentifierToAcqRequestMap.erase(it->second.second);
306 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.second);
305 << QThread::currentThread()->objectName() << it->second;
306 m_AcqIdentifierToAcqRequestMap.erase(it->second);
307 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second);
307 308 }
308 309
309 310 // stop any progression
310 311 emit q->variableRequestInProgress(vIdentifier, 0.0);
311 312
312 m_VIdentifierToCurrrentAcqIdNextIdPairMap.erase(vIdentifier);
313 m_VIdentifierToCurrrentAcqIdMap.erase(vIdentifier);
313 314 unlock();
314 315 }
315 316
316 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::updateToNextRequest(
317 QUuid vIdentifier)
318 {
319 lockRead();
320 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
321 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
322 if (it->second.second.isNull()) {
323 unlock();
324 // There is no next request, we can remove the variable request
325 removeVariableRequest(vIdentifier);
326 }
327 else {
328 auto acqIdentifierToRemove = it->second.first;
329 // Move the next request to the current request
330 auto nextRequestId = it->second.second;
331 it->second.first = nextRequestId;
332 it->second.second = QUuid();
333 unlock();
334 // Remove AcquisitionRequest and results;
335 lockWrite();
336 qCDebug(LOG_VariableAcquisitionWorker())
337 << "VariableAcquisitionWorkerPrivate::updateToNextRequest removed: "
338 << acqIdentifierToRemove;
339 m_AcqIdentifierToAcqRequestMap.erase(acqIdentifierToRemove);
340 m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqIdentifierToRemove);
341 unlock();
342 // Execute the current request
343 QMetaObject::invokeMethod(q, "onExecuteRequest", Qt::QueuedConnection,
344 Q_ARG(QUuid, nextRequestId));
345 }
346 }
347 else {
348 unlock();
349 qCCritical(LOG_VariableAcquisitionWorker())
350 << tr("Impossible to execute the acquisition on an unfound variable ");
351 }
352 }
353
354 317 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::cancelVarRequest(
355 318 QUuid varRequestId)
356 319 {
357 320 qCDebug(LOG_VariableAcquisitionWorker())
358 321 << "VariableAcquisitionWorkerPrivate::cancelVarRequest 0";
359 322 lockRead();
360 323 // get all AcqIdentifier in link with varRequestId
361 324 QVector<QUuid> acqIdsToRm;
362 325 auto cend = m_AcqIdentifierToAcqRequestMap.cend();
363 326 for (auto it = m_AcqIdentifierToAcqRequestMap.cbegin(); it != cend; ++it) {
364 327 if (it->second.m_VarRequestId == varRequestId) {
365 328 acqIdsToRm << it->first;
366 329 }
367 330 }
368 331 unlock();
369 332 // run aborting or removing of acqIdsToRm
370 333
371 334 for (auto acqId : acqIdsToRm) {
372 335 removeAcqRequest(acqId);
373 336 }
374 337 qCDebug(LOG_VariableAcquisitionWorker())
375 338 << "VariableAcquisitionWorkerPrivate::cancelVarRequest end";
376 339 }
377 340
378 341 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeAcqRequest(
379 342 QUuid acqRequestId)
380 343 {
381 344 qCDebug(LOG_VariableAcquisitionWorker())
382 345 << "VariableAcquisitionWorkerPrivate::removeAcqRequest";
383 346 QUuid vIdentifier;
384 347 std::shared_ptr<IDataProvider> provider;
385 348 lockRead();
386 349 auto acqIt = m_AcqIdentifierToAcqRequestMap.find(acqRequestId);
387 350 if (acqIt != m_AcqIdentifierToAcqRequestMap.cend()) {
388 351 vIdentifier = acqIt->second.m_vIdentifier;
389 352 provider = acqIt->second.m_Provider;
390 353
391 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
392 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
393 if (it->second.first == acqRequestId) {
354 auto it = m_VIdentifierToCurrrentAcqIdMap.find(vIdentifier);
355 if (it != m_VIdentifierToCurrrentAcqIdMap.cend()) {
356 if (it->second == acqRequestId) {
394 357 // acqRequest is currently running -> let's aborting it
395 358 unlock();
396 359
397 // Remove the current request from the worker
398 updateToNextRequest(vIdentifier);
399
400 360 // notify the request aborting to the provider
401 361 provider->requestDataAborting(acqRequestId);
402 362 }
403 else if (it->second.second == acqRequestId) {
404 it->second.second = QUuid();
405 unlock();
406 }
407 363 else {
408 364 unlock();
409 365 }
410 366 }
411 367 else {
412 368 unlock();
413 369 }
414 370 }
415 371 else {
416 372 unlock();
417 373 }
418 374
419 375 lockWrite();
420 376
421 377 qCDebug(LOG_VariableAcquisitionWorker())
422 378 << "VariableAcquisitionWorkerPrivate::updateToNextRequest removeAcqRequest: "
423 379 << acqRequestId;
424 380 m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqRequestId);
425 381 m_AcqIdentifierToAcqRequestMap.erase(acqRequestId);
426 382
427 383 unlock();
428 384 qCDebug(LOG_VariableAcquisitionWorker())
429 385 << "VariableAcquisitionWorkerPrivate::removeAcqRequest END";
430 386 }
General Comments 0
You need to be logged in to leave comments. Login now