##// END OF EJS Templates
Fix asynchrone bug with reset of the download progress state
perrinel -
r1384:cfb5a07b3479
parent child
Show More
@@ -1,416 +1,435
1 #include "Variable/VariableAcquisitionWorker.h"
1 #include "Variable/VariableAcquisitionWorker.h"
2
2
3 #include "Variable/Variable.h"
3 #include "Variable/Variable.h"
4
4
5 #include <Data/AcquisitionRequest.h>
5 #include <Data/AcquisitionRequest.h>
6 #include <Data/SqpRange.h>
6 #include <Data/SqpRange.h>
7
7
8 #include <unordered_map>
8 #include <unordered_map>
9 #include <utility>
9 #include <utility>
10
10
11 #include <QMutex>
11 #include <QMutex>
12 #include <QReadWriteLock>
12 #include <QReadWriteLock>
13 #include <QThread>
13 #include <QThread>
14
14
15 #include <cmath>
15 #include <cmath>
16
16
17 Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker")
17 Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker")
18
18
19 struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate {
19 struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate {
20
20
21 explicit VariableAcquisitionWorkerPrivate(VariableAcquisitionWorker *parent)
21 explicit VariableAcquisitionWorkerPrivate(VariableAcquisitionWorker *parent)
22 : m_Lock{QReadWriteLock::Recursive}, q{parent}
22 : m_Lock{QReadWriteLock::Recursive}, q{parent}
23 {
23 {
24 }
24 }
25
25
26 void lockRead() { m_Lock.lockForRead(); }
26 void lockRead() { m_Lock.lockForRead(); }
27 void lockWrite() { m_Lock.lockForWrite(); }
27 void lockWrite() { m_Lock.lockForWrite(); }
28 void unlock() { m_Lock.unlock(); }
28 void unlock() { m_Lock.unlock(); }
29
29
30 void removeVariableRequest(QUuid vIdentifier);
30 void removeVariableRequest(QUuid vIdentifier);
31
31
32 /// Remove the current request and execute the next one if exist
32 /// Remove the current request and execute the next one if exist
33 void updateToNextRequest(QUuid vIdentifier);
33 void updateToNextRequest(QUuid vIdentifier);
34
34
35 /// Remove and/or abort all AcqRequest in link with varRequestId
35 /// Remove and/or abort all AcqRequest in link with varRequestId
36 void cancelVarRequest(QUuid varRequestId);
36 void cancelVarRequest(QUuid varRequestId);
37 void removeAcqRequest(QUuid acqRequestId);
37 void removeAcqRequest(QUuid acqRequestId);
38
38
39 QMutex m_WorkingMutex;
39 QMutex m_WorkingMutex;
40 QReadWriteLock m_Lock;
40 QReadWriteLock m_Lock;
41
41
42 std::map<QUuid, QVector<AcquisitionDataPacket> > m_AcqIdentifierToAcqDataPacketVectorMap;
42 std::map<QUuid, QVector<AcquisitionDataPacket> > m_AcqIdentifierToAcqDataPacketVectorMap;
43 std::map<QUuid, AcquisitionRequest> m_AcqIdentifierToAcqRequestMap;
43 std::map<QUuid, AcquisitionRequest> m_AcqIdentifierToAcqRequestMap;
44 std::map<QUuid, std::pair<QUuid, QUuid> > m_VIdentifierToCurrrentAcqIdNextIdPairMap;
44 std::map<QUuid, std::pair<QUuid, QUuid> > m_VIdentifierToCurrrentAcqIdNextIdPairMap;
45 VariableAcquisitionWorker *q;
45 VariableAcquisitionWorker *q;
46 };
46 };
47
47
48
48
49 VariableAcquisitionWorker::VariableAcquisitionWorker(QObject *parent)
49 VariableAcquisitionWorker::VariableAcquisitionWorker(QObject *parent)
50 : QObject{parent}, impl{spimpl::make_unique_impl<VariableAcquisitionWorkerPrivate>(this)}
50 : QObject{parent}, impl{spimpl::make_unique_impl<VariableAcquisitionWorkerPrivate>(this)}
51 {
51 {
52 }
52 }
53
53
54 VariableAcquisitionWorker::~VariableAcquisitionWorker()
54 VariableAcquisitionWorker::~VariableAcquisitionWorker()
55 {
55 {
56 qCInfo(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker destruction")
56 qCInfo(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker destruction")
57 << QThread::currentThread();
57 << QThread::currentThread();
58 this->waitForFinish();
58 this->waitForFinish();
59 }
59 }
60
60
61
61
62 QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid vIdentifier,
62 QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid vIdentifier,
63 SqpRange rangeRequested,
63 SqpRange rangeRequested,
64 SqpRange cacheRangeRequested,
64 SqpRange cacheRangeRequested,
65 DataProviderParameters parameters,
65 DataProviderParameters parameters,
66 std::shared_ptr<IDataProvider> provider)
66 std::shared_ptr<IDataProvider> provider)
67 {
67 {
68 qCDebug(LOG_VariableAcquisitionWorker())
68 qCDebug(LOG_VariableAcquisitionWorker())
69 << tr("TORM VariableAcquisitionWorker::pushVariableRequest ") << cacheRangeRequested;
69 << tr("TORM VariableAcquisitionWorker::pushVariableRequest ") << cacheRangeRequested;
70 auto varRequestIdCanceled = QUuid();
70 auto varRequestIdCanceled = QUuid();
71
71
72 // Request creation
72 // Request creation
73 auto acqRequest = AcquisitionRequest{};
73 auto acqRequest = AcquisitionRequest{};
74 qCDebug(LOG_VariableAcquisitionWorker()) << tr("PushVariableRequest ") << vIdentifier
74 qCDebug(LOG_VariableAcquisitionWorker()) << tr("Add acqRequest ") << acqRequest.m_AcqIdentifier;
75 << varRequestId;
76 acqRequest.m_VarRequestId = varRequestId;
75 acqRequest.m_VarRequestId = varRequestId;
77 acqRequest.m_vIdentifier = vIdentifier;
76 acqRequest.m_vIdentifier = vIdentifier;
78 acqRequest.m_DataProviderParameters = parameters;
77 acqRequest.m_DataProviderParameters = parameters;
79 acqRequest.m_RangeRequested = rangeRequested;
78 acqRequest.m_RangeRequested = rangeRequested;
80 acqRequest.m_CacheRangeRequested = cacheRangeRequested;
79 acqRequest.m_CacheRangeRequested = cacheRangeRequested;
81 acqRequest.m_Size = parameters.m_Times.size();
80 acqRequest.m_Size = parameters.m_Times.size();
82 acqRequest.m_Provider = provider;
81 acqRequest.m_Provider = provider;
83
82
84
83
85 // Register request
84 // Register request
86 impl->lockWrite();
85 impl->lockWrite();
87 impl->m_AcqIdentifierToAcqRequestMap.insert(
86 impl->m_AcqIdentifierToAcqRequestMap.insert(
88 std::make_pair(acqRequest.m_AcqIdentifier, acqRequest));
87 std::make_pair(acqRequest.m_AcqIdentifier, acqRequest));
89
88
90 auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
89 auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
91 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
90 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
92 // A current request already exists, we can replace the next one
91 // A current request already exists, we can replace the next one
93 auto oldAcqId = it->second.second;
92 auto oldAcqId = it->second.second;
94 auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(oldAcqId);
93 auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(oldAcqId);
95 if (acqIdentifierToAcqRequestMapIt != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
94 if (acqIdentifierToAcqRequestMapIt != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
96 auto oldAcqRequest = acqIdentifierToAcqRequestMapIt->second;
95 auto oldAcqRequest = acqIdentifierToAcqRequestMapIt->second;
97 varRequestIdCanceled = oldAcqRequest.m_VarRequestId;
96 varRequestIdCanceled = oldAcqRequest.m_VarRequestId;
98 }
97 }
99
98
100 it->second.second = acqRequest.m_AcqIdentifier;
99 it->second.second = acqRequest.m_AcqIdentifier;
101 impl->unlock();
100 impl->unlock();
102
101
103 // remove old acqIdentifier from the worker
102 // remove old acqIdentifier from the worker
104 impl->cancelVarRequest(varRequestIdCanceled);
103 impl->cancelVarRequest(varRequestIdCanceled);
105 // impl->m_AcqIdentifierToAcqRequestMap.erase(oldAcqId);
104 // impl->m_AcqIdentifierToAcqRequestMap.erase(oldAcqId);
106 }
105 }
107 else {
106 else {
108 // First request for the variable, it must be stored and executed
107 // First request for the variable, it must be stored and executed
109 impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.insert(
108 impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.insert(
110 std::make_pair(vIdentifier, std::make_pair(acqRequest.m_AcqIdentifier, QUuid())));
109 std::make_pair(vIdentifier, std::make_pair(acqRequest.m_AcqIdentifier, QUuid())));
111 impl->unlock();
110 impl->unlock();
112
111
113 QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
112 QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
114 Q_ARG(QUuid, acqRequest.m_AcqIdentifier));
113 Q_ARG(QUuid, acqRequest.m_AcqIdentifier));
115 }
114 }
116
115
117 return varRequestIdCanceled;
116 return varRequestIdCanceled;
118 }
117 }
119
118
120 void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier)
119 void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier)
121 {
120 {
122 impl->lockRead();
121 impl->lockRead();
123
122
124 auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
123 auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
125 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
124 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
126 auto currentAcqId = it->second.first;
125 auto currentAcqId = it->second.first;
127
126
128 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(currentAcqId);
127 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(currentAcqId);
129 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
128 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
130 auto request = it->second;
129 auto request = it->second;
131 impl->unlock();
130 impl->unlock();
132
131
133 // Remove the current request from the worker
132 // Remove the current request from the worker
134 impl->updateToNextRequest(vIdentifier);
133 impl->updateToNextRequest(vIdentifier);
135
134
136 // notify the request aborting to the provider
135 // notify the request aborting to the provider
137 request.m_Provider->requestDataAborting(currentAcqId);
136 request.m_Provider->requestDataAborting(currentAcqId);
138 }
137 }
139 else {
138 else {
140 impl->unlock();
139 impl->unlock();
141 qCWarning(LOG_VariableAcquisitionWorker())
140 qCWarning(LOG_VariableAcquisitionWorker())
142 << tr("Impossible to abort an unknown acquisition request") << currentAcqId;
141 << tr("Impossible to abort an unknown acquisition request") << currentAcqId;
143 }
142 }
144 }
143 }
145 else {
144 else {
146 impl->unlock();
145 impl->unlock();
147 }
146 }
148 }
147 }
149
148
150 void VariableAcquisitionWorker::onVariableRetrieveDataInProgress(QUuid acqIdentifier,
149 void VariableAcquisitionWorker::onVariableRetrieveDataInProgress(QUuid acqIdentifier,
151 double progress)
150 double progress)
152 {
151 {
153 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableRetrieveDataInProgress ")
152 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableRetrieveDataInProgress ")
153 << QThread::currentThread()->objectName()
154 << acqIdentifier << progress;
154 << acqIdentifier << progress;
155 impl->lockRead();
155 impl->lockRead();
156 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
156 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
157 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
157 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
158 auto currentPartSize = (aIdToARit->second.m_Size != 0) ? 100 / aIdToARit->second.m_Size : 0;
158 auto currentPartSize = (aIdToARit->second.m_Size != 0) ? 100 / aIdToARit->second.m_Size : 0;
159
159
160 auto currentPartProgress
160 auto currentPartProgress
161 = std::isnan(progress) ? 0.0 : (progress * currentPartSize) / 100.0;
161 = std::isnan(progress) ? 0.0 : (progress * currentPartSize) / 100.0;
162 auto currentAlreadyProgress = aIdToARit->second.m_Progression * currentPartSize;
162 auto currentAlreadyProgress = aIdToARit->second.m_Progression * currentPartSize;
163
163
164 auto finalProgression = currentAlreadyProgress + currentPartProgress;
164 auto finalProgression = currentAlreadyProgress + currentPartProgress;
165 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, finalProgression);
165 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, finalProgression);
166 qCDebug(LOG_VariableAcquisitionWorker())
166 qCDebug(LOG_VariableAcquisitionWorker())
167 << tr("TORM: onVariableRetrieveDataInProgress ")
167 << tr("TORM: onVariableRetrieveDataInProgress ")
168 << QThread::currentThread()->objectName() << aIdToARit->second.m_vIdentifier
168 << QThread::currentThread()->objectName() << aIdToARit->second.m_vIdentifier
169 << currentPartSize << currentAlreadyProgress << currentPartProgress << finalProgression;
169 << currentPartSize << currentAlreadyProgress << currentPartProgress << finalProgression;
170 if (finalProgression == 100.0) {
170 if (finalProgression == 100.0) {
171 qCDebug(LOG_VariableAcquisitionWorker())
172 << tr("TORM: onVariableRetrieveDataInProgress : finished : 0.0 ");
171 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, 0.0);
173 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, 0.0);
172 }
174 }
173 }
175 }
174 impl->unlock();
176 impl->unlock();
175 }
177 }
176
178
177 void VariableAcquisitionWorker::onVariableAcquisitionFailed(QUuid acqIdentifier)
179 void VariableAcquisitionWorker::onVariableAcquisitionFailed(QUuid acqIdentifier)
178 {
180 {
179 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
181 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
180 << QThread::currentThread();
182 << QThread::currentThread();
181 impl->lockRead();
183 impl->lockRead();
182 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
184 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
183 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
185 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
184 auto request = it->second;
186 auto request = it->second;
185 impl->unlock();
187 impl->unlock();
186 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
188 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
187 << acqIdentifier << request.m_vIdentifier
189 << acqIdentifier << request.m_vIdentifier
188 << QThread::currentThread();
190 << QThread::currentThread();
189 emit variableCanceledRequested(request.m_vIdentifier);
191 emit variableCanceledRequested(request.m_vIdentifier);
190 }
192 }
191 else {
193 else {
192 impl->unlock();
194 impl->unlock();
193 // TODO log no acqIdentifier recognized
195 // TODO log no acqIdentifier recognized
194 }
196 }
195 }
197 }
196
198
197 void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier,
199 void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier,
198 std::shared_ptr<IDataSeries> dataSeries,
200 std::shared_ptr<IDataSeries> dataSeries,
199 SqpRange dataRangeAcquired)
201 SqpRange dataRangeAcquired)
200 {
202 {
201 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableDataAcquired on range ")
203 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableDataAcquired on range ")
202 << acqIdentifier << dataRangeAcquired;
204 << acqIdentifier << dataRangeAcquired;
203 impl->lockWrite();
205 impl->lockWrite();
204 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
206 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
205 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
207 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
206 // Store the result
208 // Store the result
207 auto dataPacket = AcquisitionDataPacket{};
209 auto dataPacket = AcquisitionDataPacket{};
208 dataPacket.m_Range = dataRangeAcquired;
210 dataPacket.m_Range = dataRangeAcquired;
209 dataPacket.m_DateSeries = dataSeries;
211 dataPacket.m_DateSeries = dataSeries;
210
212
211 auto aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
213 auto aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
212 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
214 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
213 // A current request result already exists, we can update it
215 // A current request result already exists, we can update it
214 aIdToADPVit->second.push_back(dataPacket);
216 aIdToADPVit->second.push_back(dataPacket);
215 }
217 }
216 else {
218 else {
217 // First request result for the variable, it must be stored
219 // First request result for the variable, it must be stored
218 impl->m_AcqIdentifierToAcqDataPacketVectorMap.insert(
220 impl->m_AcqIdentifierToAcqDataPacketVectorMap.insert(
219 std::make_pair(acqIdentifier, QVector<AcquisitionDataPacket>() << dataPacket));
221 std::make_pair(acqIdentifier, QVector<AcquisitionDataPacket>() << dataPacket));
220 }
222 }
221
223
222
224
223 // Decrement the counter of the request
225 // Decrement the counter of the request
224 auto &acqRequest = aIdToARit->second;
226 auto &acqRequest = aIdToARit->second;
225 acqRequest.m_Progression = acqRequest.m_Progression + 1;
227 acqRequest.m_Progression = acqRequest.m_Progression + 1;
226
228
227 // if the counter is 0, we can return data then run the next request if it exists and
229 // if the counter is 0, we can return data then run the next request if it exists and
228 // removed the finished request
230 // removed the finished request
229 if (acqRequest.m_Size == acqRequest.m_Progression) {
231 if (acqRequest.m_Size == acqRequest.m_Progression) {
230 auto varId = acqRequest.m_vIdentifier;
232 auto varId = acqRequest.m_vIdentifier;
231 auto rangeRequested = acqRequest.m_RangeRequested;
233 auto rangeRequested = acqRequest.m_RangeRequested;
232 auto cacheRangeRequested = acqRequest.m_CacheRangeRequested;
234 auto cacheRangeRequested = acqRequest.m_CacheRangeRequested;
233 // Return the data
235 // Return the data
234 aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
236 aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
235 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
237 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
236 emit dataProvided(varId, rangeRequested, cacheRangeRequested, aIdToADPVit->second);
238 emit dataProvided(varId, rangeRequested, cacheRangeRequested, aIdToADPVit->second);
237 }
239 }
238 impl->unlock();
240 impl->unlock();
239
241
240 // Update to the next request
242 // Update to the next request
241 impl->updateToNextRequest(acqRequest.m_vIdentifier);
243 impl->updateToNextRequest(acqRequest.m_vIdentifier);
242 }
244 }
243 else {
245 else {
244 impl->unlock();
246 impl->unlock();
245 }
247 }
246 }
248 }
247 else {
249 else {
248 impl->unlock();
250 impl->unlock();
249 qCWarning(LOG_VariableAcquisitionWorker())
251 qCWarning(LOG_VariableAcquisitionWorker())
250 << tr("Impossible to retrieve AcquisitionRequest for the incoming data.");
252 << tr("Impossible to retrieve AcquisitionRequest for the incoming data.");
251 }
253 }
252 }
254 }
253
255
254 void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier)
256 void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier)
255 {
257 {
256 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onExecuteRequest") << QThread::currentThread();
258 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onExecuteRequest") << QThread::currentThread();
257 impl->lockRead();
259 impl->lockRead();
258 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
260 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
259 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
261 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
260 auto request = it->second;
262 auto request = it->second;
261 impl->unlock();
263 impl->unlock();
262 emit variableRequestInProgress(request.m_vIdentifier, 0.1);
264 emit variableRequestInProgress(request.m_vIdentifier, 0.1);
265 qCDebug(LOG_VariableAcquisitionWorker()) << tr("Start request 10%") << acqIdentifier
266 << QThread::currentThread();
263 request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters);
267 request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters);
264 }
268 }
265 else {
269 else {
266 impl->unlock();
270 impl->unlock();
267 // TODO log no acqIdentifier recognized
271 // TODO log no acqIdentifier recognized
268 }
272 }
269 }
273 }
270
274
271 void VariableAcquisitionWorker::initialize()
275 void VariableAcquisitionWorker::initialize()
272 {
276 {
273 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init")
277 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init")
274 << QThread::currentThread();
278 << QThread::currentThread();
275 impl->m_WorkingMutex.lock();
279 impl->m_WorkingMutex.lock();
276 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init END");
280 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init END");
277 }
281 }
278
282
279 void VariableAcquisitionWorker::finalize()
283 void VariableAcquisitionWorker::finalize()
280 {
284 {
281 impl->m_WorkingMutex.unlock();
285 impl->m_WorkingMutex.unlock();
282 }
286 }
283
287
284 void VariableAcquisitionWorker::waitForFinish()
288 void VariableAcquisitionWorker::waitForFinish()
285 {
289 {
286 QMutexLocker locker{&impl->m_WorkingMutex};
290 QMutexLocker locker{&impl->m_WorkingMutex};
287 }
291 }
288
292
289 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariableRequest(
293 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariableRequest(
290 QUuid vIdentifier)
294 QUuid vIdentifier)
291 {
295 {
292 lockWrite();
296 lockWrite();
293 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
297 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
294
298
295 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
299 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
296 // A current request already exists, we can replace the next one
300 // A current request already exists, we can replace the next one
297
301
302 qCDebug(LOG_VariableAcquisitionWorker())
303 << "VariableAcquisitionWorkerPrivate::removeVariableRequest "
304 << QThread::currentThread()->objectName() << it->second.first;
298 m_AcqIdentifierToAcqRequestMap.erase(it->second.first);
305 m_AcqIdentifierToAcqRequestMap.erase(it->second.first);
299 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.first);
306 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.first);
300
307
308 qCDebug(LOG_VariableAcquisitionWorker())
309 << "VariableAcquisitionWorkerPrivate::removeVariableRequest " << it->second.second;
301 m_AcqIdentifierToAcqRequestMap.erase(it->second.second);
310 m_AcqIdentifierToAcqRequestMap.erase(it->second.second);
302 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.second);
311 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.second);
303 }
312 }
313
314 // stop any progression
315 emit q->variableRequestInProgress(vIdentifier, 0.0);
316
304 m_VIdentifierToCurrrentAcqIdNextIdPairMap.erase(vIdentifier);
317 m_VIdentifierToCurrrentAcqIdNextIdPairMap.erase(vIdentifier);
305 unlock();
318 unlock();
306 }
319 }
307
320
308 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::updateToNextRequest(
321 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::updateToNextRequest(
309 QUuid vIdentifier)
322 QUuid vIdentifier)
310 {
323 {
311 lockRead();
324 lockRead();
312 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
325 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
313 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
326 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
314 if (it->second.second.isNull()) {
327 if (it->second.second.isNull()) {
315 unlock();
328 unlock();
316 // There is no next request, we can remove the variable request
329 // There is no next request, we can remove the variable request
317 removeVariableRequest(vIdentifier);
330 removeVariableRequest(vIdentifier);
318 }
331 }
319 else {
332 else {
320 auto acqIdentifierToRemove = it->second.first;
333 auto acqIdentifierToRemove = it->second.first;
321 // Move the next request to the current request
334 // Move the next request to the current request
322 auto nextRequestId = it->second.second;
335 auto nextRequestId = it->second.second;
323 it->second.first = nextRequestId;
336 it->second.first = nextRequestId;
324 it->second.second = QUuid();
337 it->second.second = QUuid();
325 unlock();
338 unlock();
326 // Remove AcquisitionRequest and results;
339 // Remove AcquisitionRequest and results;
327 lockWrite();
340 lockWrite();
341 qCDebug(LOG_VariableAcquisitionWorker())
342 << "VariableAcquisitionWorkerPrivate::updateToNextRequest removed: "
343 << acqIdentifierToRemove;
328 m_AcqIdentifierToAcqRequestMap.erase(acqIdentifierToRemove);
344 m_AcqIdentifierToAcqRequestMap.erase(acqIdentifierToRemove);
329 m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqIdentifierToRemove);
345 m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqIdentifierToRemove);
330 unlock();
346 unlock();
331 // Execute the current request
347 // Execute the current request
332 QMetaObject::invokeMethod(q, "onExecuteRequest", Qt::QueuedConnection,
348 QMetaObject::invokeMethod(q, "onExecuteRequest", Qt::QueuedConnection,
333 Q_ARG(QUuid, nextRequestId));
349 Q_ARG(QUuid, nextRequestId));
334 }
350 }
335 }
351 }
336 else {
352 else {
337 unlock();
353 unlock();
338 qCCritical(LOG_VariableAcquisitionWorker())
354 qCCritical(LOG_VariableAcquisitionWorker())
339 << tr("Impossible to execute the acquisition on an unfound variable ");
355 << tr("Impossible to execute the acquisition on an unfound variable ");
340 }
356 }
341 }
357 }
342
358
343 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::cancelVarRequest(
359 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::cancelVarRequest(
344 QUuid varRequestId)
360 QUuid varRequestId)
345 {
361 {
346 qCDebug(LOG_VariableAcquisitionWorker())
362 qCDebug(LOG_VariableAcquisitionWorker())
347 << "VariableAcquisitionWorkerPrivate::cancelVarRequest 0";
363 << "VariableAcquisitionWorkerPrivate::cancelVarRequest 0";
348 lockRead();
364 lockRead();
349 // get all AcqIdentifier in link with varRequestId
365 // get all AcqIdentifier in link with varRequestId
350 QVector<QUuid> acqIdsToRm;
366 QVector<QUuid> acqIdsToRm;
351 auto cend = m_AcqIdentifierToAcqRequestMap.cend();
367 auto cend = m_AcqIdentifierToAcqRequestMap.cend();
352 for (auto it = m_AcqIdentifierToAcqRequestMap.cbegin(); it != cend; ++it) {
368 for (auto it = m_AcqIdentifierToAcqRequestMap.cbegin(); it != cend; ++it) {
353 if (it->second.m_VarRequestId == varRequestId) {
369 if (it->second.m_VarRequestId == varRequestId) {
354 acqIdsToRm << it->first;
370 acqIdsToRm << it->first;
355 }
371 }
356 }
372 }
357 unlock();
373 unlock();
358 // run aborting or removing of acqIdsToRm
374 // run aborting or removing of acqIdsToRm
359
375
360 for (auto acqId : acqIdsToRm) {
376 for (auto acqId : acqIdsToRm) {
361 removeAcqRequest(acqId);
377 removeAcqRequest(acqId);
362 }
378 }
363 qCDebug(LOG_VariableAcquisitionWorker())
379 qCDebug(LOG_VariableAcquisitionWorker())
364 << "VariableAcquisitionWorkerPrivate::cancelVarRequest end";
380 << "VariableAcquisitionWorkerPrivate::cancelVarRequest end";
365 }
381 }
366
382
367 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeAcqRequest(
383 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeAcqRequest(
368 QUuid acqRequestId)
384 QUuid acqRequestId)
369 {
385 {
370 qCDebug(LOG_VariableAcquisitionWorker())
386 qCDebug(LOG_VariableAcquisitionWorker())
371 << "VariableAcquisitionWorkerPrivate::removeAcqRequest";
387 << "VariableAcquisitionWorkerPrivate::removeAcqRequest";
372 QUuid vIdentifier;
388 QUuid vIdentifier;
373 std::shared_ptr<IDataProvider> provider;
389 std::shared_ptr<IDataProvider> provider;
374 lockRead();
390 lockRead();
375 auto acqIt = m_AcqIdentifierToAcqRequestMap.find(acqRequestId);
391 auto acqIt = m_AcqIdentifierToAcqRequestMap.find(acqRequestId);
376 if (acqIt != m_AcqIdentifierToAcqRequestMap.cend()) {
392 if (acqIt != m_AcqIdentifierToAcqRequestMap.cend()) {
377 vIdentifier = acqIt->second.m_vIdentifier;
393 vIdentifier = acqIt->second.m_vIdentifier;
378 provider = acqIt->second.m_Provider;
394 provider = acqIt->second.m_Provider;
379
395
380 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
396 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
381 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
397 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
382 if (it->second.first == acqRequestId) {
398 if (it->second.first == acqRequestId) {
383 // acqRequest is currently running -> let's aborting it
399 // acqRequest is currently running -> let's aborting it
384 unlock();
400 unlock();
385
401
386 // Remove the current request from the worker
402 // Remove the current request from the worker
387 updateToNextRequest(vIdentifier);
403 updateToNextRequest(vIdentifier);
388
404
389 // notify the request aborting to the provider
405 // notify the request aborting to the provider
390 provider->requestDataAborting(acqRequestId);
406 provider->requestDataAborting(acqRequestId);
391 }
407 }
392 else if (it->second.second == acqRequestId) {
408 else if (it->second.second == acqRequestId) {
393 it->second.second = QUuid();
409 it->second.second = QUuid();
394 unlock();
410 unlock();
395 }
411 }
396 else {
412 else {
397 unlock();
413 unlock();
398 }
414 }
399 }
415 }
400 else {
416 else {
401 unlock();
417 unlock();
402 }
418 }
403 }
419 }
404 else {
420 else {
405 unlock();
421 unlock();
406 }
422 }
407
423
408 lockWrite();
424 lockWrite();
409
425
426 qCDebug(LOG_VariableAcquisitionWorker())
427 << "VariableAcquisitionWorkerPrivate::updateToNextRequest removeAcqRequest: "
428 << acqRequestId;
410 m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqRequestId);
429 m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqRequestId);
411 m_AcqIdentifierToAcqRequestMap.erase(acqRequestId);
430 m_AcqIdentifierToAcqRequestMap.erase(acqRequestId);
412
431
413 unlock();
432 unlock();
414 qCDebug(LOG_VariableAcquisitionWorker())
433 qCDebug(LOG_VariableAcquisitionWorker())
415 << "VariableAcquisitionWorkerPrivate::removeAcqRequest END";
434 << "VariableAcquisitionWorkerPrivate::removeAcqRequest END";
416 }
435 }
General Comments 0
You need to be logged in to leave comments. Login now