##// END OF EJS Templates
Implementation of varRequestId cancel for the worker
perrinel -
r818:68959f479714
parent child
Show More
@@ -32,6 +32,10 struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate {
32 /// Remove the current request and execute the next one if exist
32 /// Remove the current request and execute the next one if exist
33 void updateToNextRequest(QUuid vIdentifier);
33 void updateToNextRequest(QUuid vIdentifier);
34
34
35 /// Remove and/or abort all AcqRequest in link with varRequestId
36 void cancelVarRequest(QUuid varRequestId);
37 void removeAcqRequest(QUuid acqRequestId);
38
35 QMutex m_WorkingMutex;
39 QMutex m_WorkingMutex;
36 QReadWriteLock m_Lock;
40 QReadWriteLock m_Lock;
37
41
@@ -67,7 +71,8 QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid v
67
71
68 // Request creation
72 // Request creation
69 auto acqRequest = AcquisitionRequest{};
73 auto acqRequest = AcquisitionRequest{};
70 qCInfo(LOG_VariableAcquisitionWorker()) << tr("TpushVariableRequest ") << vIdentifier;
74 qCInfo(LOG_VariableAcquisitionWorker()) << tr("TpushVariableRequest ") << vIdentifier
75 << varRequestId;
71 acqRequest.m_VarRequestId = varRequestId;
76 acqRequest.m_VarRequestId = varRequestId;
72 acqRequest.m_vIdentifier = vIdentifier;
77 acqRequest.m_vIdentifier = vIdentifier;
73 acqRequest.m_DataProviderParameters = parameters;
78 acqRequest.m_DataProviderParameters = parameters;
@@ -85,14 +90,19 QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid v
85 auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
90 auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
86 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
91 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
87 // A current request already exists, we can replace the next one
92 // A current request already exists, we can replace the next one
88 auto nextAcqId = it->second.second;
93 auto oldAcqId = it->second.second;
89 auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(nextAcqId);
94 auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(oldAcqId);
90 if (acqIdentifierToAcqRequestMapIt != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
95 if (acqIdentifierToAcqRequestMapIt != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
91 auto request = acqIdentifierToAcqRequestMapIt->second;
96 auto oldAcqRequest = acqIdentifierToAcqRequestMapIt->second;
92 varRequestIdCanceled = request.m_VarRequestId;
97 varRequestIdCanceled = oldAcqRequest.m_VarRequestId;
93 }
98 }
99 // remove old acqIdentifier from the worker
100 impl->cancelVarRequest(oldAcqId);
101 // impl->m_AcqIdentifierToAcqRequestMap.erase(oldAcqId);
94
102
95 it->second.second = acqRequest.m_AcqIdentifier;
103 it->second.second = acqRequest.m_AcqIdentifier;
104
105
96 impl->unlock();
106 impl->unlock();
97 }
107 }
98 else {
108 else {
@@ -122,10 +132,7 void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier)
122 impl->unlock();
132 impl->unlock();
123
133
124 // Remove the current request from the worker
134 // Remove the current request from the worker
125
126 impl->lockWrite();
127 impl->updateToNextRequest(vIdentifier);
135 impl->updateToNextRequest(vIdentifier);
128 impl->unlock();
129
136
130 // notify the request aborting to the provider
137 // notify the request aborting to the provider
131 request.m_Provider->requestDataAborting(currentAcqId);
138 request.m_Provider->requestDataAborting(currentAcqId);
@@ -221,22 +228,28 void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier,
221 // if the counter is 0, we can return data then run the next request if it exists and
228 // if the counter is 0, we can return data then run the next request if it exists and
222 // removed the finished request
229 // removed the finished request
223 if (acqRequest.m_Size == acqRequest.m_Progression) {
230 if (acqRequest.m_Size == acqRequest.m_Progression) {
231 auto varId = acqRequest.m_vIdentifier;
232 auto rangeRequested = acqRequest.m_RangeRequested;
233 auto cacheRangeRequested = acqRequest.m_CacheRangeRequested;
224 // Return the data
234 // Return the data
225 aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
235 aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
226 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
236 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
227 emit dataProvided(acqRequest.m_vIdentifier, acqRequest.m_RangeRequested,
237 emit dataProvided(varId, rangeRequested, cacheRangeRequested, aIdToADPVit->second);
228 acqRequest.m_CacheRangeRequested, aIdToADPVit->second);
229 }
238 }
239 impl->unlock();
230
240
231 // Update to the next request
241 // Update to the next request
232 impl->updateToNextRequest(acqRequest.m_vIdentifier);
242 impl->updateToNextRequest(acqRequest.m_vIdentifier);
233 }
243 }
244 else {
245 impl->unlock();
246 }
234 }
247 }
235 else {
248 else {
249 impl->unlock();
236 qCWarning(LOG_VariableAcquisitionWorker())
250 qCWarning(LOG_VariableAcquisitionWorker())
237 << tr("Impossible to retrieve AcquisitionRequest for the incoming data.");
251 << tr("Impossible to retrieve AcquisitionRequest for the incoming data.");
238 }
252 }
239 impl->unlock();
240 }
253 }
241
254
242 void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier)
255 void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier)
@@ -296,27 +309,101 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariable
296 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::updateToNextRequest(
309 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::updateToNextRequest(
297 QUuid vIdentifier)
310 QUuid vIdentifier)
298 {
311 {
312 lockRead();
299 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
313 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
300 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
314 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
301 if (it->second.second.isNull()) {
315 if (it->second.second.isNull()) {
316 unlock();
302 // There is no next request, we can remove the variable request
317 // There is no next request, we can remove the variable request
303 removeVariableRequest(vIdentifier);
318 removeVariableRequest(vIdentifier);
304 }
319 }
305 else {
320 else {
306 auto acqIdentifierToRemove = it->second.first;
321 auto acqIdentifierToRemove = it->second.first;
307 // Move the next request to the current request
322 // Move the next request to the current request
308 it->second.first = it->second.second;
323 auto nextRequestId = it->second.second;
324 it->second.first = nextRequestId;
309 it->second.second = QUuid();
325 it->second.second = QUuid();
326 unlock();
310 // Remove AcquisitionRequest and results;
327 // Remove AcquisitionRequest and results;
328 lockWrite();
311 m_AcqIdentifierToAcqRequestMap.erase(acqIdentifierToRemove);
329 m_AcqIdentifierToAcqRequestMap.erase(acqIdentifierToRemove);
312 m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqIdentifierToRemove);
330 m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqIdentifierToRemove);
331 unlock();
313 // Execute the current request
332 // Execute the current request
314 QMetaObject::invokeMethod(q, "onExecuteRequest", Qt::QueuedConnection,
333 QMetaObject::invokeMethod(q, "onExecuteRequest", Qt::QueuedConnection,
315 Q_ARG(QUuid, it->second.first));
334 Q_ARG(QUuid, nextRequestId));
316 }
335 }
317 }
336 }
318 else {
337 else {
338 unlock();
319 qCCritical(LOG_VariableAcquisitionWorker())
339 qCCritical(LOG_VariableAcquisitionWorker())
320 << tr("Impossible to execute the acquisition on an unfound variable ");
340 << tr("Impossible to execute the acquisition on an unfound variable ");
321 }
341 }
322 }
342 }
343
344 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::cancelVarRequest(
345 QUuid varRequestId)
346 {
347 lockRead();
348 // get all AcqIdentifier in link with varRequestId
349 QVector<QUuid> acqIdsToRm;
350 auto cend = m_AcqIdentifierToAcqRequestMap.cend();
351 for (auto it = m_AcqIdentifierToAcqRequestMap.cbegin(); it != cend; ++it) {
352 if (it->second.m_VarRequestId == varRequestId) {
353 acqIdsToRm << it->first;
354 }
355 }
356 unlock();
357 // run aborting or removing of acqIdsToRm
358
359 for (auto acqId : acqIdsToRm) {
360 removeAcqRequest(acqId);
361 }
362 }
363
364 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeAcqRequest(
365 QUuid acqRequestId)
366 {
367 QUuid vIdentifier;
368 std::shared_ptr<IDataProvider> provider;
369 lockRead();
370 auto acqIt = m_AcqIdentifierToAcqRequestMap.find(acqRequestId);
371 if (acqIt != m_AcqIdentifierToAcqRequestMap.cend()) {
372 vIdentifier = acqIt->second.m_vIdentifier;
373 provider = acqIt->second.m_Provider;
374
375 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
376 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
377 if (it->second.first == acqRequestId) {
378 // acqRequest is currently running -> let's aborting it
379 unlock();
380
381 // Remove the current request from the worker
382 updateToNextRequest(vIdentifier);
383
384 // notify the request aborting to the provider
385 provider->requestDataAborting(acqRequestId);
386 }
387 else if (it->second.second == acqRequestId) {
388 it->second.second = QUuid();
389 unlock();
390 }
391 else {
392 unlock();
393 }
394 }
395 else {
396 unlock();
397 }
398 }
399 else {
400 unlock();
401 }
402
403 lockWrite();
404
405 m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqRequestId);
406 m_AcqIdentifierToAcqRequestMap.erase(acqRequestId);
407
408 unlock();
409 }
@@ -620,8 +620,8 void VariableController::VariableControllerPrivate::processRequest(std::shared_p
620 varProvider);
620 varProvider);
621
621
622 if (!varRequestIdCanceled.isNull()) {
622 if (!varRequestIdCanceled.isNull()) {
623 qCDebug(LOG_VariableAcquisitionWorker()) << tr("vsarRequestIdCanceled: ")
623 qCInfo(LOG_VariableAcquisitionWorker()) << tr("varRequestIdCanceled: ")
624 << varRequestIdCanceled;
624 << varRequestIdCanceled;
625 cancelVariableRequest(varRequestIdCanceled);
625 cancelVariableRequest(varRequestIdCanceled);
626 }
626 }
627 }
627 }
@@ -768,7 +768,7 QUuid VariableController::VariableControllerPrivate::acceptVariableRequest(
768
768
769 varRequestIdQueue.pop_front();
769 varRequestIdQueue.pop_front();
770 if (varRequestIdQueue.empty()) {
770 if (varRequestIdQueue.empty()) {
771 qCDebug(LOG_VariableController())
771 qCCritical(LOG_VariableController())
772 << tr("TORM Erase REQUEST because it has been accepted") << varId;
772 << tr("TORM Erase REQUEST because it has been accepted") << varId;
773 m_VarIdToVarRequestIdQueueMap.erase(varId);
773 m_VarIdToVarRequestIdQueueMap.erase(varId);
774 }
774 }
@@ -850,6 +850,10 void VariableController::VariableControllerPrivate::cancelVariableRequest(QUuid
850 std::remove(varRequestIdQueue.begin(), varRequestIdQueue.end(), varRequestId),
850 std::remove(varRequestIdQueue.begin(), varRequestIdQueue.end(), varRequestId),
851 varRequestIdQueue.end());
851 varRequestIdQueue.end());
852 if (varRequestIdQueue.empty()) {
852 if (varRequestIdQueue.empty()) {
853
854 qCCritical(LOG_VariableController())
855 << tr("VariableControllerPrivate::cancelVariableRequest")
856 << varIdToVarRequestIdQueueMapIt->first;
853 varIdToVarRequestIdQueueMapIt
857 varIdToVarRequestIdQueueMapIt
854 = m_VarIdToVarRequestIdQueueMap.erase(varIdToVarRequestIdQueueMapIt);
858 = m_VarIdToVarRequestIdQueueMap.erase(varIdToVarRequestIdQueueMapIt);
855 }
859 }
@@ -180,7 +180,7 void CosinusProvider::requestDataLoading(QUuid acqIdentifier,
180 for (const auto &dateTime : qAsConst(times)) {
180 for (const auto &dateTime : qAsConst(times)) {
181 if (m_VariableToEnableProvider[acqIdentifier]) {
181 if (m_VariableToEnableProvider[acqIdentifier]) {
182 auto scalarSeries = this->retrieveData(acqIdentifier, dateTime, parameters.m_Data);
182 auto scalarSeries = this->retrieveData(acqIdentifier, dateTime, parameters.m_Data);
183 qCDebug(LOG_CosinusProvider()) << "TORM: CosinusProvider::dataProvided";
183 qCCritical(LOG_CosinusProvider()) << "TORM: CosinusProvider::dataProvided";
184 emit dataProvided(acqIdentifier, scalarSeries, dateTime);
184 emit dataProvided(acqIdentifier, scalarSeries, dateTime);
185 }
185 }
186 }
186 }
General Comments 3
Under Review
author

Auto status change to "Under Review"

Approved
author

Status change > Approved

You need to be logged in to leave comments. Login now