##// END OF EJS Templates
Implementation of varRequestId cancel for the worker
perrinel -
r818:68959f479714
parent child
Show More
@@ -32,6 +32,10 struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate {
32 32 /// Remove the current request and execute the next one if exist
33 33 void updateToNextRequest(QUuid vIdentifier);
34 34
35 /// Remove and/or abort all AcqRequest in link with varRequestId
36 void cancelVarRequest(QUuid varRequestId);
37 void removeAcqRequest(QUuid acqRequestId);
38
35 39 QMutex m_WorkingMutex;
36 40 QReadWriteLock m_Lock;
37 41
@@ -67,7 +71,8 QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid v
67 71
68 72 // Request creation
69 73 auto acqRequest = AcquisitionRequest{};
70 qCInfo(LOG_VariableAcquisitionWorker()) << tr("TpushVariableRequest ") << vIdentifier;
74 qCInfo(LOG_VariableAcquisitionWorker()) << tr("TpushVariableRequest ") << vIdentifier
75 << varRequestId;
71 76 acqRequest.m_VarRequestId = varRequestId;
72 77 acqRequest.m_vIdentifier = vIdentifier;
73 78 acqRequest.m_DataProviderParameters = parameters;
@@ -85,14 +90,19 QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid v
85 90 auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
86 91 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
87 92 // A current request already exists, we can replace the next one
88 auto nextAcqId = it->second.second;
89 auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(nextAcqId);
93 auto oldAcqId = it->second.second;
94 auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(oldAcqId);
90 95 if (acqIdentifierToAcqRequestMapIt != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
91 auto request = acqIdentifierToAcqRequestMapIt->second;
92 varRequestIdCanceled = request.m_VarRequestId;
96 auto oldAcqRequest = acqIdentifierToAcqRequestMapIt->second;
97 varRequestIdCanceled = oldAcqRequest.m_VarRequestId;
93 98 }
99 // remove old acqIdentifier from the worker
100 impl->cancelVarRequest(oldAcqId);
101 // impl->m_AcqIdentifierToAcqRequestMap.erase(oldAcqId);
94 102
95 103 it->second.second = acqRequest.m_AcqIdentifier;
104
105
96 106 impl->unlock();
97 107 }
98 108 else {
@@ -122,10 +132,7 void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier)
122 132 impl->unlock();
123 133
124 134 // Remove the current request from the worker
125
126 impl->lockWrite();
127 135 impl->updateToNextRequest(vIdentifier);
128 impl->unlock();
129 136
130 137 // notify the request aborting to the provider
131 138 request.m_Provider->requestDataAborting(currentAcqId);
@@ -221,22 +228,28 void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier,
221 228 // if the counter is 0, we can return data then run the next request if it exists and
222 229 // removed the finished request
223 230 if (acqRequest.m_Size == acqRequest.m_Progression) {
231 auto varId = acqRequest.m_vIdentifier;
232 auto rangeRequested = acqRequest.m_RangeRequested;
233 auto cacheRangeRequested = acqRequest.m_CacheRangeRequested;
224 234 // Return the data
225 235 aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
226 236 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
227 emit dataProvided(acqRequest.m_vIdentifier, acqRequest.m_RangeRequested,
228 acqRequest.m_CacheRangeRequested, aIdToADPVit->second);
237 emit dataProvided(varId, rangeRequested, cacheRangeRequested, aIdToADPVit->second);
229 238 }
239 impl->unlock();
230 240
231 241 // Update to the next request
232 242 impl->updateToNextRequest(acqRequest.m_vIdentifier);
233 243 }
244 else {
245 impl->unlock();
246 }
234 247 }
235 248 else {
249 impl->unlock();
236 250 qCWarning(LOG_VariableAcquisitionWorker())
237 251 << tr("Impossible to retrieve AcquisitionRequest for the incoming data.");
238 252 }
239 impl->unlock();
240 253 }
241 254
242 255 void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier)
@@ -296,27 +309,101 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariable
296 309 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::updateToNextRequest(
297 310 QUuid vIdentifier)
298 311 {
312 lockRead();
299 313 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
300 314 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
301 315 if (it->second.second.isNull()) {
316 unlock();
302 317 // There is no next request, we can remove the variable request
303 318 removeVariableRequest(vIdentifier);
304 319 }
305 320 else {
306 321 auto acqIdentifierToRemove = it->second.first;
307 322 // Move the next request to the current request
308 it->second.first = it->second.second;
323 auto nextRequestId = it->second.second;
324 it->second.first = nextRequestId;
309 325 it->second.second = QUuid();
326 unlock();
310 327 // Remove AcquisitionRequest and results;
328 lockWrite();
311 329 m_AcqIdentifierToAcqRequestMap.erase(acqIdentifierToRemove);
312 330 m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqIdentifierToRemove);
331 unlock();
313 332 // Execute the current request
314 333 QMetaObject::invokeMethod(q, "onExecuteRequest", Qt::QueuedConnection,
315 Q_ARG(QUuid, it->second.first));
334 Q_ARG(QUuid, nextRequestId));
316 335 }
317 336 }
318 337 else {
338 unlock();
319 339 qCCritical(LOG_VariableAcquisitionWorker())
320 340 << tr("Impossible to execute the acquisition on an unfound variable ");
321 341 }
322 342 }
343
344 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::cancelVarRequest(
345 QUuid varRequestId)
346 {
347 lockRead();
348 // get all AcqIdentifier in link with varRequestId
349 QVector<QUuid> acqIdsToRm;
350 auto cend = m_AcqIdentifierToAcqRequestMap.cend();
351 for (auto it = m_AcqIdentifierToAcqRequestMap.cbegin(); it != cend; ++it) {
352 if (it->second.m_VarRequestId == varRequestId) {
353 acqIdsToRm << it->first;
354 }
355 }
356 unlock();
357 // run aborting or removing of acqIdsToRm
358
359 for (auto acqId : acqIdsToRm) {
360 removeAcqRequest(acqId);
361 }
362 }
363
364 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeAcqRequest(
365 QUuid acqRequestId)
366 {
367 QUuid vIdentifier;
368 std::shared_ptr<IDataProvider> provider;
369 lockRead();
370 auto acqIt = m_AcqIdentifierToAcqRequestMap.find(acqRequestId);
371 if (acqIt != m_AcqIdentifierToAcqRequestMap.cend()) {
372 vIdentifier = acqIt->second.m_vIdentifier;
373 provider = acqIt->second.m_Provider;
374
375 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
376 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
377 if (it->second.first == acqRequestId) {
378 // acqRequest is currently running -> let's aborting it
379 unlock();
380
381 // Remove the current request from the worker
382 updateToNextRequest(vIdentifier);
383
384 // notify the request aborting to the provider
385 provider->requestDataAborting(acqRequestId);
386 }
387 else if (it->second.second == acqRequestId) {
388 it->second.second = QUuid();
389 unlock();
390 }
391 else {
392 unlock();
393 }
394 }
395 else {
396 unlock();
397 }
398 }
399 else {
400 unlock();
401 }
402
403 lockWrite();
404
405 m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqRequestId);
406 m_AcqIdentifierToAcqRequestMap.erase(acqRequestId);
407
408 unlock();
409 }
@@ -620,8 +620,8 void VariableController::VariableControllerPrivate::processRequest(std::shared_p
620 620 varProvider);
621 621
622 622 if (!varRequestIdCanceled.isNull()) {
623 qCDebug(LOG_VariableAcquisitionWorker()) << tr("vsarRequestIdCanceled: ")
624 << varRequestIdCanceled;
623 qCInfo(LOG_VariableAcquisitionWorker()) << tr("varRequestIdCanceled: ")
624 << varRequestIdCanceled;
625 625 cancelVariableRequest(varRequestIdCanceled);
626 626 }
627 627 }
@@ -768,7 +768,7 QUuid VariableController::VariableControllerPrivate::acceptVariableRequest(
768 768
769 769 varRequestIdQueue.pop_front();
770 770 if (varRequestIdQueue.empty()) {
771 qCDebug(LOG_VariableController())
771 qCCritical(LOG_VariableController())
772 772 << tr("TORM Erase REQUEST because it has been accepted") << varId;
773 773 m_VarIdToVarRequestIdQueueMap.erase(varId);
774 774 }
@@ -850,6 +850,10 void VariableController::VariableControllerPrivate::cancelVariableRequest(QUuid
850 850 std::remove(varRequestIdQueue.begin(), varRequestIdQueue.end(), varRequestId),
851 851 varRequestIdQueue.end());
852 852 if (varRequestIdQueue.empty()) {
853
854 qCCritical(LOG_VariableController())
855 << tr("VariableControllerPrivate::cancelVariableRequest")
856 << varIdToVarRequestIdQueueMapIt->first;
853 857 varIdToVarRequestIdQueueMapIt
854 858 = m_VarIdToVarRequestIdQueueMap.erase(varIdToVarRequestIdQueueMapIt);
855 859 }
@@ -180,7 +180,7 void CosinusProvider::requestDataLoading(QUuid acqIdentifier,
180 180 for (const auto &dateTime : qAsConst(times)) {
181 181 if (m_VariableToEnableProvider[acqIdentifier]) {
182 182 auto scalarSeries = this->retrieveData(acqIdentifier, dateTime, parameters.m_Data);
183 qCDebug(LOG_CosinusProvider()) << "TORM: CosinusProvider::dataProvided";
183 qCCritical(LOG_CosinusProvider()) << "TORM: CosinusProvider::dataProvided";
184 184 emit dataProvided(acqIdentifier, scalarSeries, dateTime);
185 185 }
186 186 }
General Comments 3
Under Review
author

Auto status change to "Under Review"

Approved
author

Status change > Approved

You need to be logged in to leave comments. Login now