##// END OF EJS Templates
Fix lock bug
perrinel -
r819:baf81ae414e4
parent child
Show More
@@ -1,409 +1,422
1 1 #include "Variable/VariableAcquisitionWorker.h"
2 2
3 3 #include "Variable/Variable.h"
4 4
5 5 #include <Data/AcquisitionRequest.h>
6 6 #include <Data/SqpRange.h>
7 7
8 8 #include <unordered_map>
9 9 #include <utility>
10 10
11 11 #include <QMutex>
12 12 #include <QReadWriteLock>
13 13 #include <QThread>
14 14
15 15 #include <cmath>
16 16
17 17 Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker")
18 18
19 19 struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate {
20 20
21 21 explicit VariableAcquisitionWorkerPrivate(VariableAcquisitionWorker *parent)
22 22 : m_Lock{QReadWriteLock::Recursive}, q{parent}
23 23 {
24 24 }
25 25
26 26 void lockRead() { m_Lock.lockForRead(); }
27 27 void lockWrite() { m_Lock.lockForWrite(); }
28 28 void unlock() { m_Lock.unlock(); }
29 29
30 30 void removeVariableRequest(QUuid vIdentifier);
31 31
32 32 /// Remove the current request and execute the next one if exist
33 33 void updateToNextRequest(QUuid vIdentifier);
34 34
35 35 /// Remove and/or abort all AcqRequest in link with varRequestId
36 36 void cancelVarRequest(QUuid varRequestId);
37 37 void removeAcqRequest(QUuid acqRequestId);
38 38
39 39 QMutex m_WorkingMutex;
40 40 QReadWriteLock m_Lock;
41 41
42 42 std::map<QUuid, QVector<AcquisitionDataPacket> > m_AcqIdentifierToAcqDataPacketVectorMap;
43 43 std::map<QUuid, AcquisitionRequest> m_AcqIdentifierToAcqRequestMap;
44 44 std::map<QUuid, std::pair<QUuid, QUuid> > m_VIdentifierToCurrrentAcqIdNextIdPairMap;
45 45 VariableAcquisitionWorker *q;
46 46 };
47 47
48 48
49 49 VariableAcquisitionWorker::VariableAcquisitionWorker(QObject *parent)
50 50 : QObject{parent}, impl{spimpl::make_unique_impl<VariableAcquisitionWorkerPrivate>(this)}
51 51 {
52 52 }
53 53
54 54 VariableAcquisitionWorker::~VariableAcquisitionWorker()
55 55 {
56 56 qCInfo(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker destruction")
57 57 << QThread::currentThread();
58 58 this->waitForFinish();
59 59 }
60 60
61 61
62 62 QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid vIdentifier,
63 63 SqpRange rangeRequested,
64 64 SqpRange cacheRangeRequested,
65 65 DataProviderParameters parameters,
66 66 std::shared_ptr<IDataProvider> provider)
67 67 {
68 68 qCDebug(LOG_VariableAcquisitionWorker())
69 69 << tr("TORM VariableAcquisitionWorker::pushVariableRequest ") << cacheRangeRequested;
70 70 auto varRequestIdCanceled = QUuid();
71 71
72 72 // Request creation
73 73 auto acqRequest = AcquisitionRequest{};
74 74 qCInfo(LOG_VariableAcquisitionWorker()) << tr("TpushVariableRequest ") << vIdentifier
75 75 << varRequestId;
76 76 acqRequest.m_VarRequestId = varRequestId;
77 77 acqRequest.m_vIdentifier = vIdentifier;
78 78 acqRequest.m_DataProviderParameters = parameters;
79 79 acqRequest.m_RangeRequested = rangeRequested;
80 80 acqRequest.m_CacheRangeRequested = cacheRangeRequested;
81 81 acqRequest.m_Size = parameters.m_Times.size();
82 82 acqRequest.m_Provider = provider;
83 83
84 84
85 85 // Register request
86 86 impl->lockWrite();
87 87 impl->m_AcqIdentifierToAcqRequestMap.insert(
88 88 std::make_pair(acqRequest.m_AcqIdentifier, acqRequest));
89 89
90 90 auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
91 91 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
92 92 // A current request already exists, we can replace the next one
93 93 auto oldAcqId = it->second.second;
94 94 auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(oldAcqId);
95 95 if (acqIdentifierToAcqRequestMapIt != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
96 96 auto oldAcqRequest = acqIdentifierToAcqRequestMapIt->second;
97 97 varRequestIdCanceled = oldAcqRequest.m_VarRequestId;
98 98 }
99 // remove old acqIdentifier from the worker
100 impl->cancelVarRequest(oldAcqId);
101 // impl->m_AcqIdentifierToAcqRequestMap.erase(oldAcqId);
102 99
103 100 it->second.second = acqRequest.m_AcqIdentifier;
104
105
106 101 impl->unlock();
102
103 // remove old acqIdentifier from the worker
104 impl->cancelVarRequest(oldAcqId);
105 // impl->m_AcqIdentifierToAcqRequestMap.erase(oldAcqId);
107 106 }
108 107 else {
109 108 // First request for the variable, it must be stored and executed
110 109 impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.insert(
111 110 std::make_pair(vIdentifier, std::make_pair(acqRequest.m_AcqIdentifier, QUuid())));
112 111 impl->unlock();
113 112
114 113 QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
115 114 Q_ARG(QUuid, acqRequest.m_AcqIdentifier));
116 115 }
117 116
118 117 return varRequestIdCanceled;
119 118 }
120 119
121 120 void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier)
122 121 {
123 122 impl->lockRead();
124 123
125 124 auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
126 125 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
127 126 auto currentAcqId = it->second.first;
128 127
129 128 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(currentAcqId);
130 129 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
131 130 auto request = it->second;
132 131 impl->unlock();
133 132
134 133 // Remove the current request from the worker
135 134 impl->updateToNextRequest(vIdentifier);
136 135
137 136 // notify the request aborting to the provider
138 137 request.m_Provider->requestDataAborting(currentAcqId);
139 138 }
140 139 else {
141 140 impl->unlock();
142 141 qCWarning(LOG_VariableAcquisitionWorker())
143 142 << tr("Impossible to abort an unknown acquisition request") << currentAcqId;
144 143 }
145 144 }
146 145 else {
147 146 impl->unlock();
148 147 }
149 148 }
150 149
151 150 void VariableAcquisitionWorker::onVariableRetrieveDataInProgress(QUuid acqIdentifier,
152 151 double progress)
153 152 {
154 153 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableRetrieveDataInProgress ")
155 154 << acqIdentifier << progress;
156 155 impl->lockRead();
157 156 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
158 157 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
159 158 auto currentPartSize = (aIdToARit->second.m_Size != 0) ? 100 / aIdToARit->second.m_Size : 0;
160 159
161 160 auto currentPartProgress
162 161 = std::isnan(progress) ? 0.0 : (progress * currentPartSize) / 100.0;
163 162 auto currentAlreadyProgress = aIdToARit->second.m_Progression * currentPartSize;
164 163
165 164 auto finalProgression = currentAlreadyProgress + currentPartProgress;
166 165 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, finalProgression);
167 166 qCDebug(LOG_VariableAcquisitionWorker())
168 167 << tr("TORM: onVariableRetrieveDataInProgress ")
169 168 << QThread::currentThread()->objectName() << aIdToARit->second.m_vIdentifier
170 169 << currentPartSize << currentAlreadyProgress << currentPartProgress << finalProgression;
171 170 if (finalProgression == 100.0) {
172 171 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, 0.0);
173 172 }
174 173 }
175 174 impl->unlock();
176 175 }
177 176
178 177 void VariableAcquisitionWorker::onVariableAcquisitionFailed(QUuid acqIdentifier)
179 178 {
180 179 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
181 180 << QThread::currentThread();
182 181 impl->lockRead();
183 182 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
184 183 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
185 184 auto request = it->second;
186 185 impl->unlock();
187 186 qCInfo(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
188 187 << acqIdentifier << request.m_vIdentifier
189 188 << QThread::currentThread();
190 189 emit variableCanceledRequested(request.m_vIdentifier);
191 190 }
192 191 else {
193 192 impl->unlock();
194 193 // TODO log no acqIdentifier recognized
195 194 }
196 195 }
197 196
198 197 void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier,
199 198 std::shared_ptr<IDataSeries> dataSeries,
200 199 SqpRange dataRangeAcquired)
201 200 {
202 201 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableDataAcquired on range ")
203 202 << acqIdentifier << dataRangeAcquired;
204 203 impl->lockWrite();
205 204 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
206 205 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
207 206 // Store the result
208 207 auto dataPacket = AcquisitionDataPacket{};
209 208 dataPacket.m_Range = dataRangeAcquired;
210 209 dataPacket.m_DateSeries = dataSeries;
211 210
212 211 auto aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
213 212 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
214 213 // A current request result already exists, we can update it
215 214 aIdToADPVit->second.push_back(dataPacket);
216 215 }
217 216 else {
218 217 // First request result for the variable, it must be stored
219 218 impl->m_AcqIdentifierToAcqDataPacketVectorMap.insert(
220 219 std::make_pair(acqIdentifier, QVector<AcquisitionDataPacket>() << dataPacket));
221 220 }
222 221
223 222
224 223 // Decrement the counter of the request
225 224 auto &acqRequest = aIdToARit->second;
226 225 acqRequest.m_Progression = acqRequest.m_Progression + 1;
227 226
228 227 // if the counter is 0, we can return data then run the next request if it exists and
229 228 // removed the finished request
230 229 if (acqRequest.m_Size == acqRequest.m_Progression) {
231 230 auto varId = acqRequest.m_vIdentifier;
232 231 auto rangeRequested = acqRequest.m_RangeRequested;
233 232 auto cacheRangeRequested = acqRequest.m_CacheRangeRequested;
234 233 // Return the data
235 234 aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
236 235 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
237 236 emit dataProvided(varId, rangeRequested, cacheRangeRequested, aIdToADPVit->second);
238 237 }
239 238 impl->unlock();
240 239
241 240 // Update to the next request
242 241 impl->updateToNextRequest(acqRequest.m_vIdentifier);
243 242 }
244 243 else {
245 244 impl->unlock();
246 245 }
247 246 }
248 247 else {
249 248 impl->unlock();
250 249 qCWarning(LOG_VariableAcquisitionWorker())
251 250 << tr("Impossible to retrieve AcquisitionRequest for the incoming data.");
252 251 }
253 252 }
254 253
255 254 void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier)
256 255 {
257 256 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onExecuteRequest") << QThread::currentThread();
258 257 impl->lockRead();
259 258 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
260 259 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
261 260 auto request = it->second;
262 261 impl->unlock();
263 262 emit variableRequestInProgress(request.m_vIdentifier, 0.1);
264 263 request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters);
265 264 }
266 265 else {
267 266 impl->unlock();
268 267 // TODO log no acqIdentifier recognized
269 268 }
270 269 }
271 270
272 271 void VariableAcquisitionWorker::initialize()
273 272 {
274 273 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init")
275 274 << QThread::currentThread();
276 275 impl->m_WorkingMutex.lock();
277 276 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init END");
278 277 }
279 278
280 279 void VariableAcquisitionWorker::finalize()
281 280 {
282 281 impl->m_WorkingMutex.unlock();
283 282 }
284 283
285 284 void VariableAcquisitionWorker::waitForFinish()
286 285 {
287 286 QMutexLocker locker{&impl->m_WorkingMutex};
288 287 }
289 288
290 289 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariableRequest(
291 290 QUuid vIdentifier)
292 291 {
293 292 lockWrite();
294 293 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
295 294
296 295 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
297 296 // A current request already exists, we can replace the next one
298 297
299 298 m_AcqIdentifierToAcqRequestMap.erase(it->second.first);
300 299 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.first);
301 300
302 301 m_AcqIdentifierToAcqRequestMap.erase(it->second.second);
303 302 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.second);
304 303 }
305 304 m_VIdentifierToCurrrentAcqIdNextIdPairMap.erase(vIdentifier);
306 305 unlock();
307 306 }
308 307
309 308 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::updateToNextRequest(
310 309 QUuid vIdentifier)
311 310 {
312 311 lockRead();
313 312 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
314 313 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
315 314 if (it->second.second.isNull()) {
316 315 unlock();
317 316 // There is no next request, we can remove the variable request
318 317 removeVariableRequest(vIdentifier);
319 318 }
320 319 else {
321 320 auto acqIdentifierToRemove = it->second.first;
322 321 // Move the next request to the current request
323 322 auto nextRequestId = it->second.second;
324 323 it->second.first = nextRequestId;
325 324 it->second.second = QUuid();
326 325 unlock();
327 326 // Remove AcquisitionRequest and results;
328 327 lockWrite();
329 328 m_AcqIdentifierToAcqRequestMap.erase(acqIdentifierToRemove);
330 329 m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqIdentifierToRemove);
331 330 unlock();
332 331 // Execute the current request
333 332 QMetaObject::invokeMethod(q, "onExecuteRequest", Qt::QueuedConnection,
334 333 Q_ARG(QUuid, nextRequestId));
335 334 }
336 335 }
337 336 else {
338 337 unlock();
339 338 qCCritical(LOG_VariableAcquisitionWorker())
340 339 << tr("Impossible to execute the acquisition on an unfound variable ");
341 340 }
342 341 }
343 342
344 343 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::cancelVarRequest(
345 344 QUuid varRequestId)
346 345 {
346 qInfo() << "VariableAcquisitionWorkerPrivate::cancelVarRequest 0";
347 347 lockRead();
348 348 // get all AcqIdentifier in link with varRequestId
349 349 QVector<QUuid> acqIdsToRm;
350 350 auto cend = m_AcqIdentifierToAcqRequestMap.cend();
351 351 for (auto it = m_AcqIdentifierToAcqRequestMap.cbegin(); it != cend; ++it) {
352 352 if (it->second.m_VarRequestId == varRequestId) {
353 353 acqIdsToRm << it->first;
354 354 }
355 355 }
356 356 unlock();
357 357 // run aborting or removing of acqIdsToRm
358 358
359 359 for (auto acqId : acqIdsToRm) {
360 360 removeAcqRequest(acqId);
361 361 }
362 qInfo() << "VariableAcquisitionWorkerPrivate::cancelVarRequest end";
362 363 }
363 364
364 365 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeAcqRequest(
365 366 QUuid acqRequestId)
366 367 {
368 qInfo() << "VariableAcquisitionWorkerPrivate::removeAcqRequest 0";
367 369 QUuid vIdentifier;
368 370 std::shared_ptr<IDataProvider> provider;
369 371 lockRead();
370 372 auto acqIt = m_AcqIdentifierToAcqRequestMap.find(acqRequestId);
373 qInfo() << "VariableAcquisitionWorkerPrivate::removeAcqRequest 1";
371 374 if (acqIt != m_AcqIdentifierToAcqRequestMap.cend()) {
372 375 vIdentifier = acqIt->second.m_vIdentifier;
373 376 provider = acqIt->second.m_Provider;
374 377
378 qInfo() << "VariableAcquisitionWorkerPrivate::removeAcqRequest 2";
375 379 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
376 380 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
381 qInfo() << "VariableAcquisitionWorkerPrivate::removeAcqRequest 3";
377 382 if (it->second.first == acqRequestId) {
378 383 // acqRequest is currently running -> let's aborting it
384 qInfo() << "VariableAcquisitionWorkerPrivate::removeAcqRequest 4";
379 385 unlock();
380 386
381 387 // Remove the current request from the worker
382 388 updateToNextRequest(vIdentifier);
383 389
384 390 // notify the request aborting to the provider
385 391 provider->requestDataAborting(acqRequestId);
392 qInfo() << "VariableAcquisitionWorkerPrivate::removeAcqRequest 5";
386 393 }
387 394 else if (it->second.second == acqRequestId) {
388 395 it->second.second = QUuid();
396 qInfo() << "VariableAcquisitionWorkerPrivate::removeAcqRequest 6";
389 397 unlock();
390 398 }
391 399 else {
400 qInfo() << "VariableAcquisitionWorkerPrivate::removeAcqRequest 7";
392 401 unlock();
393 402 }
394 403 }
395 404 else {
405 qInfo() << "VariableAcquisitionWorkerPrivate::removeAcqRequest 8";
396 406 unlock();
397 407 }
398 408 }
399 409 else {
410 qInfo() << "VariableAcquisitionWorkerPrivate::removeAcqRequest 9";
400 411 unlock();
401 412 }
402 413
414 qInfo() << "VariableAcquisitionWorkerPrivate::removeAcqRequest 10";
403 415 lockWrite();
404 416
405 417 m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqRequestId);
406 418 m_AcqIdentifierToAcqRequestMap.erase(acqRequestId);
407 419
408 420 unlock();
421 qInfo() << "VariableAcquisitionWorkerPrivate::removeAcqRequest 11";
409 422 }
General Comments 3
Under Review
author

Auto status change to "Under Review"

Approved
author

Status change > Approved

You need to be logged in to leave comments. Login now