##// END OF EJS Templates
Remove unused pending request of worker since it's already in the VC....
perrinel -
r1387:3f0567bfecb5 HEAD
parent child
Show More
@@ -1,430 +1,386
1 #include "Variable/VariableAcquisitionWorker.h"
1 #include "Variable/VariableAcquisitionWorker.h"
2
2
3 #include "Variable/Variable.h"
3 #include "Variable/Variable.h"
4
4
5 #include <Data/AcquisitionRequest.h>
5 #include <Data/AcquisitionRequest.h>
6 #include <Data/SqpRange.h>
6 #include <Data/SqpRange.h>
7
7
8 #include <unordered_map>
8 #include <unordered_map>
9 #include <utility>
9 #include <utility>
10
10
11 #include <QMutex>
11 #include <QMutex>
12 #include <QReadWriteLock>
12 #include <QReadWriteLock>
13 #include <QThread>
13 #include <QThread>
14
14
15 #include <cmath>
15 #include <cmath>
16
16
17 Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker")
17 Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker")
18
18
19 struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate {
19 struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate {
20
20
21 explicit VariableAcquisitionWorkerPrivate(VariableAcquisitionWorker *parent)
21 explicit VariableAcquisitionWorkerPrivate(VariableAcquisitionWorker *parent)
22 : m_Lock{QReadWriteLock::Recursive}, q{parent}
22 : m_Lock{QReadWriteLock::Recursive}, q{parent}
23 {
23 {
24 }
24 }
25
25
26 void lockRead() { m_Lock.lockForRead(); }
26 void lockRead() { m_Lock.lockForRead(); }
27 void lockWrite() { m_Lock.lockForWrite(); }
27 void lockWrite() { m_Lock.lockForWrite(); }
28 void unlock() { m_Lock.unlock(); }
28 void unlock() { m_Lock.unlock(); }
29
29
30 void removeVariableRequest(QUuid vIdentifier);
30 void removeVariableRequest(QUuid vIdentifier);
31
31
32 /// Remove the current request and execute the next one if exist
33 void updateToNextRequest(QUuid vIdentifier);
34
35 /// Remove and/or abort all AcqRequest in link with varRequestId
32 /// Remove and/or abort all AcqRequest in link with varRequestId
36 void cancelVarRequest(QUuid varRequestId);
33 void cancelVarRequest(QUuid varRequestId);
37 void removeAcqRequest(QUuid acqRequestId);
34 void removeAcqRequest(QUuid acqRequestId);
38
35
39 QMutex m_WorkingMutex;
36 QMutex m_WorkingMutex;
40 QReadWriteLock m_Lock;
37 QReadWriteLock m_Lock;
41
38
42 std::map<QUuid, QVector<AcquisitionDataPacket> > m_AcqIdentifierToAcqDataPacketVectorMap;
39 std::map<QUuid, QVector<AcquisitionDataPacket> > m_AcqIdentifierToAcqDataPacketVectorMap;
43 std::map<QUuid, AcquisitionRequest> m_AcqIdentifierToAcqRequestMap;
40 std::map<QUuid, AcquisitionRequest> m_AcqIdentifierToAcqRequestMap;
44 std::map<QUuid, std::pair<QUuid, QUuid> > m_VIdentifierToCurrrentAcqIdNextIdPairMap;
41 std::map<QUuid, QUuid> m_VIdentifierToCurrrentAcqIdMap;
45 VariableAcquisitionWorker *q;
42 VariableAcquisitionWorker *q;
46 };
43 };
47
44
48
45
49 VariableAcquisitionWorker::VariableAcquisitionWorker(QObject *parent)
46 VariableAcquisitionWorker::VariableAcquisitionWorker(QObject *parent)
50 : QObject{parent}, impl{spimpl::make_unique_impl<VariableAcquisitionWorkerPrivate>(this)}
47 : QObject{parent}, impl{spimpl::make_unique_impl<VariableAcquisitionWorkerPrivate>(this)}
51 {
48 {
52 }
49 }
53
50
54 VariableAcquisitionWorker::~VariableAcquisitionWorker()
51 VariableAcquisitionWorker::~VariableAcquisitionWorker()
55 {
52 {
56 qCInfo(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker destruction")
53 qCInfo(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker destruction")
57 << QThread::currentThread();
54 << QThread::currentThread();
58 this->waitForFinish();
55 this->waitForFinish();
59 }
56 }
60
57
61
58
62 QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid vIdentifier,
59 QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid vIdentifier,
63 DataProviderParameters parameters,
60 DataProviderParameters parameters,
64 std::shared_ptr<IDataProvider> provider)
61 std::shared_ptr<IDataProvider> provider)
65 {
62 {
66 qCDebug(LOG_VariableAcquisitionWorker())
63 qCDebug(LOG_VariableAcquisitionWorker())
67 << tr("TORM VariableAcquisitionWorker::pushVariableRequest varRequestId: ") << varRequestId
64 << tr("TORM VariableAcquisitionWorker::pushVariableRequest varRequestId: ") << varRequestId
68 << "vId: " << vIdentifier;
65 << "vId: " << vIdentifier;
69 auto varRequestIdCanceled = QUuid();
66 auto varRequestIdCanceled = QUuid();
70
67
71 // Request creation
68 // Request creation
72 auto acqRequest = AcquisitionRequest{};
69 auto acqRequest = AcquisitionRequest{};
73 qCDebug(LOG_VariableAcquisitionWorker()) << tr("Add acqRequest ") << acqRequest.m_AcqIdentifier;
74 acqRequest.m_VarRequestId = varRequestId;
70 acqRequest.m_VarRequestId = varRequestId;
75 acqRequest.m_vIdentifier = vIdentifier;
71 acqRequest.m_vIdentifier = vIdentifier;
76 acqRequest.m_DataProviderParameters = parameters;
72 acqRequest.m_DataProviderParameters = parameters;
77 acqRequest.m_Size = parameters.m_Times.size();
73 acqRequest.m_Size = parameters.m_Times.size();
78 acqRequest.m_Provider = provider;
74 acqRequest.m_Provider = provider;
75 qCInfo(LOG_VariableAcquisitionWorker()) << tr("Add acqRequest ") << acqRequest.m_AcqIdentifier
76 << acqRequest.m_Size;
79
77
80
78
81 // Register request
79 // Register request
82 impl->lockWrite();
80 impl->lockWrite();
83 impl->m_AcqIdentifierToAcqRequestMap.insert(
81 impl->m_AcqIdentifierToAcqRequestMap.insert(
84 std::make_pair(acqRequest.m_AcqIdentifier, acqRequest));
82 std::make_pair(acqRequest.m_AcqIdentifier, acqRequest));
85
83
86 auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
84 auto it = impl->m_VIdentifierToCurrrentAcqIdMap.find(vIdentifier);
87 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
85 if (it != impl->m_VIdentifierToCurrrentAcqIdMap.cend()) {
88 // A current request already exists, we can replace the next one
86 // A current request already exists, we can cancel it
89 auto oldAcqId = it->second.second;
87 // remove old acqIdentifier from the worker
88 auto oldAcqId = it->second;
90 auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(oldAcqId);
89 auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(oldAcqId);
91 if (acqIdentifierToAcqRequestMapIt != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
90 if (acqIdentifierToAcqRequestMapIt != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
92 auto oldAcqRequest = acqIdentifierToAcqRequestMapIt->second;
91 auto oldAcqRequest = acqIdentifierToAcqRequestMapIt->second;
93 varRequestIdCanceled = oldAcqRequest.m_VarRequestId;
92 varRequestIdCanceled = oldAcqRequest.m_VarRequestId;
94 }
93 }
95
96 it->second.second = acqRequest.m_AcqIdentifier;
97 impl->unlock();
94 impl->unlock();
98
99 // remove old acqIdentifier from the worker
100 impl->cancelVarRequest(varRequestIdCanceled);
95 impl->cancelVarRequest(varRequestIdCanceled);
101 // impl->m_AcqIdentifierToAcqRequestMap.erase(oldAcqId);
102 }
96 }
103 else {
97 else {
104 // First request for the variable, it must be stored and executed
98 impl->unlock();
105 impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.insert(
99 }
106 std::make_pair(vIdentifier, std::make_pair(acqRequest.m_AcqIdentifier, QUuid())));
100
101 // Request for the variable, it must be stored and executed
102 impl->lockWrite();
103 impl->m_VIdentifierToCurrrentAcqIdMap.insert(
104 std::make_pair(vIdentifier, acqRequest.m_AcqIdentifier));
107 impl->unlock();
105 impl->unlock();
108
106
109 QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
107 QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
110 Q_ARG(QUuid, acqRequest.m_AcqIdentifier));
108 Q_ARG(QUuid, acqRequest.m_AcqIdentifier));
111 }
112
109
113 return varRequestIdCanceled;
110 return varRequestIdCanceled;
114 }
111 }
115
112
116 void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier)
113 void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier)
117 {
114 {
118 impl->lockRead();
115 impl->lockRead();
119
116
120 auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
117 auto it = impl->m_VIdentifierToCurrrentAcqIdMap.find(vIdentifier);
121 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
118 if (it != impl->m_VIdentifierToCurrrentAcqIdMap.cend()) {
122 auto currentAcqId = it->second.first;
119 auto currentAcqId = it->second;
123
120
124 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(currentAcqId);
121 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(currentAcqId);
125 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
122 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
126 auto request = it->second;
123 auto request = it->second;
127 impl->unlock();
124 impl->unlock();
128
125
129 // Remove the current request from the worker
130 impl->updateToNextRequest(vIdentifier);
131
132 // notify the request aborting to the provider
126 // notify the request aborting to the provider
133 request.m_Provider->requestDataAborting(currentAcqId);
127 request.m_Provider->requestDataAborting(currentAcqId);
134 }
128 }
135 else {
129 else {
136 impl->unlock();
130 impl->unlock();
137 qCWarning(LOG_VariableAcquisitionWorker())
131 qCWarning(LOG_VariableAcquisitionWorker())
138 << tr("Impossible to abort an unknown acquisition request") << currentAcqId;
132 << tr("Impossible to abort an unknown acquisition request") << currentAcqId;
139 }
133 }
140 }
134 }
141 else {
135 else {
142 impl->unlock();
136 impl->unlock();
143 }
137 }
144 }
138 }
145
139
146 void VariableAcquisitionWorker::onVariableRetrieveDataInProgress(QUuid acqIdentifier,
140 void VariableAcquisitionWorker::onVariableRetrieveDataInProgress(QUuid acqIdentifier,
147 double progress)
141 double progress)
148 {
142 {
149 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableRetrieveDataInProgress ")
143 qCInfo(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableRetrieveDataInProgress ")
150 << QThread::currentThread()->objectName()
144 << QThread::currentThread()->objectName()
151 << acqIdentifier << progress;
145 << acqIdentifier << progress;
152 impl->lockRead();
146 impl->lockRead();
153 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
147 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
154 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
148 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
155 auto currentPartSize = (aIdToARit->second.m_Size != 0) ? 100 / aIdToARit->second.m_Size : 0;
149 auto progressPartSize
150 = (aIdToARit->second.m_Size != 0) ? 100 / aIdToARit->second.m_Size : 0;
156
151
157 auto currentPartProgress
152 auto currentPartProgress
158 = std::isnan(progress) ? 0.0 : (progress * currentPartSize) / 100.0;
153 = std::isnan(progress) ? 0.0 : (progress * progressPartSize) / 100.0;
159 auto currentAlreadyProgress = aIdToARit->second.m_Progression * currentPartSize;
154
155 // We can only give an approximation of the currentProgression since its upgrade is async.
156 qCInfo(LOG_VariableAcquisitionWorker())
157 << tr("Progression: ") << aIdToARit->second.m_Progression << aIdToARit->second.m_Size;
158 auto currentProgression = aIdToARit->second.m_Progression;
159 if (currentProgression == aIdToARit->second.m_Size) {
160 currentProgression = aIdToARit->second.m_Size - 1;
161 }
162
163 auto currentAlreadyProgress = progressPartSize * currentProgression;
164
160
165
161 auto finalProgression = currentAlreadyProgress + currentPartProgress;
166 auto finalProgression = currentAlreadyProgress + currentPartProgress;
162 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, finalProgression);
167 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, finalProgression);
163 qCDebug(LOG_VariableAcquisitionWorker())
168 qCInfo(LOG_VariableAcquisitionWorker())
164 << tr("TORM: onVariableRetrieveDataInProgress ")
169 << tr("TORM: onVariableRetrieveDataInProgress ")
165 << QThread::currentThread()->objectName() << aIdToARit->second.m_vIdentifier
170 << QThread::currentThread()->objectName() << aIdToARit->second.m_vIdentifier
166 << currentPartSize << currentAlreadyProgress << currentPartProgress << finalProgression;
171 << progressPartSize << currentAlreadyProgress << currentPartProgress
172 << finalProgression;
173
167 if (finalProgression == 100.0) {
174 if (finalProgression == 100.0) {
168 qCDebug(LOG_VariableAcquisitionWorker())
175 qCInfo(LOG_VariableAcquisitionWorker())
169 << tr("TORM: onVariableRetrieveDataInProgress : finished : 0.0 ");
176 << tr("TORM: onVariableRetrieveDataInProgress : finished : 0.0 ");
170 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, 0.0);
177 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, 0.0);
171 }
178 }
172 }
179 }
173 impl->unlock();
180 impl->unlock();
174 }
181 }
175
182
176 void VariableAcquisitionWorker::onVariableAcquisitionFailed(QUuid acqIdentifier)
183 void VariableAcquisitionWorker::onVariableAcquisitionFailed(QUuid acqIdentifier)
177 {
184 {
178 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
185 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
179 << QThread::currentThread();
186 << QThread::currentThread();
180 impl->lockRead();
187 impl->lockRead();
181 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
188 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
182 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
189 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
183 auto request = it->second;
190 auto request = it->second;
184 impl->unlock();
191 impl->unlock();
185 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
192 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
186 << acqIdentifier << request.m_vIdentifier
193 << acqIdentifier << request.m_vIdentifier
187 << QThread::currentThread();
194 << QThread::currentThread();
188 emit variableCanceledRequested(request.m_vIdentifier);
195 emit variableCanceledRequested(request.m_vIdentifier);
189 }
196 }
190 else {
197 else {
191 impl->unlock();
198 impl->unlock();
192 // TODO log no acqIdentifier recognized
199 // TODO log no acqIdentifier recognized
193 }
200 }
194 }
201 }
195
202
196 void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier,
203 void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier,
197 std::shared_ptr<IDataSeries> dataSeries,
204 std::shared_ptr<IDataSeries> dataSeries,
198 SqpRange dataRangeAcquired)
205 SqpRange dataRangeAcquired)
199 {
206 {
200 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableDataAcquired on range ")
207 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableDataAcquired on range ")
201 << acqIdentifier << dataRangeAcquired;
208 << acqIdentifier << dataRangeAcquired;
202 impl->lockWrite();
209 impl->lockWrite();
203 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
210 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
204 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
211 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
205 // Store the result
212 // Store the result
206 auto dataPacket = AcquisitionDataPacket{};
213 auto dataPacket = AcquisitionDataPacket{};
207 dataPacket.m_Range = dataRangeAcquired;
214 dataPacket.m_Range = dataRangeAcquired;
208 dataPacket.m_DateSeries = dataSeries;
215 dataPacket.m_DateSeries = dataSeries;
209
216
210 auto aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
217 auto aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
211 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
218 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
212 // A current request result already exists, we can update it
219 // A current request result already exists, we can update it
213 aIdToADPVit->second.push_back(dataPacket);
220 aIdToADPVit->second.push_back(dataPacket);
214 }
221 }
215 else {
222 else {
216 // First request result for the variable, it must be stored
223 // First request result for the variable, it must be stored
217 impl->m_AcqIdentifierToAcqDataPacketVectorMap.insert(
224 impl->m_AcqIdentifierToAcqDataPacketVectorMap.insert(
218 std::make_pair(acqIdentifier, QVector<AcquisitionDataPacket>() << dataPacket));
225 std::make_pair(acqIdentifier, QVector<AcquisitionDataPacket>() << dataPacket));
219 }
226 }
220
227
221
228
222 // Decrement the counter of the request
229 // Decrement the counter of the request
223 auto &acqRequest = aIdToARit->second;
230 auto &acqRequest = aIdToARit->second;
231 qCInfo(LOG_VariableAcquisitionWorker()) << tr("TORM: +1 update progresson ")
232 << acqIdentifier;
224 acqRequest.m_Progression = acqRequest.m_Progression + 1;
233 acqRequest.m_Progression = acqRequest.m_Progression + 1;
225
234
226 // if the counter is 0, we can return data then run the next request if it exists and
235 // if the counter is 0, we can return data then run the next request if it exists and
227 // removed the finished request
236 // removed the finished request
228 if (acqRequest.m_Size == acqRequest.m_Progression) {
237 if (acqRequest.m_Size == acqRequest.m_Progression) {
229 auto varId = acqRequest.m_vIdentifier;
238 auto varId = acqRequest.m_vIdentifier;
230 // Return the data
239 // Return the data
231 aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
240 aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
232 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
241 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
233 emit dataProvided(varId, aIdToADPVit->second);
242 emit dataProvided(varId, aIdToADPVit->second);
234 }
243 }
235 impl->unlock();
244 impl->unlock();
236
237 // Update to the next request
238 impl->updateToNextRequest(acqRequest.m_vIdentifier);
239 }
245 }
240 else {
246 else {
241 impl->unlock();
247 impl->unlock();
242 }
248 }
243 }
249 }
244 else {
250 else {
245 impl->unlock();
251 impl->unlock();
246 qCWarning(LOG_VariableAcquisitionWorker())
252 qCWarning(LOG_VariableAcquisitionWorker())
247 << tr("Impossible to retrieve AcquisitionRequest for the incoming data.");
253 << tr("Impossible to retrieve AcquisitionRequest for the incoming data.");
248 }
254 }
249 }
255 }
250
256
251 void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier)
257 void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier)
252 {
258 {
253 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onExecuteRequest") << QThread::currentThread();
259 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onExecuteRequest") << QThread::currentThread();
254 impl->lockRead();
260 impl->lockRead();
255 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
261 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
256 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
262 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
257 auto request = it->second;
263 auto request = it->second;
258 impl->unlock();
264 impl->unlock();
259 emit variableRequestInProgress(request.m_vIdentifier, 0.1);
265 emit variableRequestInProgress(request.m_vIdentifier, 0.1);
260 qCDebug(LOG_VariableAcquisitionWorker()) << tr("Start request 10%") << acqIdentifier
266 qCDebug(LOG_VariableAcquisitionWorker()) << tr("Start request 10%") << acqIdentifier
261 << QThread::currentThread();
267 << QThread::currentThread();
262 request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters);
268 request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters);
263 }
269 }
264 else {
270 else {
265 impl->unlock();
271 impl->unlock();
266 // TODO log no acqIdentifier recognized
272 // TODO log no acqIdentifier recognized
267 }
273 }
268 }
274 }
269
275
270 void VariableAcquisitionWorker::initialize()
276 void VariableAcquisitionWorker::initialize()
271 {
277 {
272 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init")
278 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init")
273 << QThread::currentThread();
279 << QThread::currentThread();
274 impl->m_WorkingMutex.lock();
280 impl->m_WorkingMutex.lock();
275 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init END");
281 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init END");
276 }
282 }
277
283
278 void VariableAcquisitionWorker::finalize()
284 void VariableAcquisitionWorker::finalize()
279 {
285 {
280 impl->m_WorkingMutex.unlock();
286 impl->m_WorkingMutex.unlock();
281 }
287 }
282
288
283 void VariableAcquisitionWorker::waitForFinish()
289 void VariableAcquisitionWorker::waitForFinish()
284 {
290 {
285 QMutexLocker locker{&impl->m_WorkingMutex};
291 QMutexLocker locker{&impl->m_WorkingMutex};
286 }
292 }
287
293
288 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariableRequest(
294 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariableRequest(
289 QUuid vIdentifier)
295 QUuid vIdentifier)
290 {
296 {
291 lockWrite();
297 lockWrite();
292 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
298 auto it = m_VIdentifierToCurrrentAcqIdMap.find(vIdentifier);
293
299
294 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
300 if (it != m_VIdentifierToCurrrentAcqIdMap.cend()) {
295 // A current request already exists, we can replace the next one
301 // A current request already exists, we can replace the next one
296
302
297 qCDebug(LOG_VariableAcquisitionWorker())
303 qCDebug(LOG_VariableAcquisitionWorker())
298 << "VariableAcquisitionWorkerPrivate::removeVariableRequest "
304 << "VariableAcquisitionWorkerPrivate::removeVariableRequest "
299 << QThread::currentThread()->objectName() << it->second.first;
305 << QThread::currentThread()->objectName() << it->second;
300 m_AcqIdentifierToAcqRequestMap.erase(it->second.first);
306 m_AcqIdentifierToAcqRequestMap.erase(it->second);
301 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.first);
307 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second);
302
303 qCDebug(LOG_VariableAcquisitionWorker())
304 << "VariableAcquisitionWorkerPrivate::removeVariableRequest " << it->second.second;
305 m_AcqIdentifierToAcqRequestMap.erase(it->second.second);
306 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.second);
307 }
308 }
308
309
309 // stop any progression
310 // stop any progression
310 emit q->variableRequestInProgress(vIdentifier, 0.0);
311 emit q->variableRequestInProgress(vIdentifier, 0.0);
311
312
312 m_VIdentifierToCurrrentAcqIdNextIdPairMap.erase(vIdentifier);
313 m_VIdentifierToCurrrentAcqIdMap.erase(vIdentifier);
313 unlock();
314 unlock();
314 }
315 }
315
316
316 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::updateToNextRequest(
317 QUuid vIdentifier)
318 {
319 lockRead();
320 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
321 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
322 if (it->second.second.isNull()) {
323 unlock();
324 // There is no next request, we can remove the variable request
325 removeVariableRequest(vIdentifier);
326 }
327 else {
328 auto acqIdentifierToRemove = it->second.first;
329 // Move the next request to the current request
330 auto nextRequestId = it->second.second;
331 it->second.first = nextRequestId;
332 it->second.second = QUuid();
333 unlock();
334 // Remove AcquisitionRequest and results;
335 lockWrite();
336 qCDebug(LOG_VariableAcquisitionWorker())
337 << "VariableAcquisitionWorkerPrivate::updateToNextRequest removed: "
338 << acqIdentifierToRemove;
339 m_AcqIdentifierToAcqRequestMap.erase(acqIdentifierToRemove);
340 m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqIdentifierToRemove);
341 unlock();
342 // Execute the current request
343 QMetaObject::invokeMethod(q, "onExecuteRequest", Qt::QueuedConnection,
344 Q_ARG(QUuid, nextRequestId));
345 }
346 }
347 else {
348 unlock();
349 qCCritical(LOG_VariableAcquisitionWorker())
350 << tr("Impossible to execute the acquisition on an unfound variable ");
351 }
352 }
353
354 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::cancelVarRequest(
317 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::cancelVarRequest(
355 QUuid varRequestId)
318 QUuid varRequestId)
356 {
319 {
357 qCDebug(LOG_VariableAcquisitionWorker())
320 qCDebug(LOG_VariableAcquisitionWorker())
358 << "VariableAcquisitionWorkerPrivate::cancelVarRequest 0";
321 << "VariableAcquisitionWorkerPrivate::cancelVarRequest 0";
359 lockRead();
322 lockRead();
360 // get all AcqIdentifier in link with varRequestId
323 // get all AcqIdentifier in link with varRequestId
361 QVector<QUuid> acqIdsToRm;
324 QVector<QUuid> acqIdsToRm;
362 auto cend = m_AcqIdentifierToAcqRequestMap.cend();
325 auto cend = m_AcqIdentifierToAcqRequestMap.cend();
363 for (auto it = m_AcqIdentifierToAcqRequestMap.cbegin(); it != cend; ++it) {
326 for (auto it = m_AcqIdentifierToAcqRequestMap.cbegin(); it != cend; ++it) {
364 if (it->second.m_VarRequestId == varRequestId) {
327 if (it->second.m_VarRequestId == varRequestId) {
365 acqIdsToRm << it->first;
328 acqIdsToRm << it->first;
366 }
329 }
367 }
330 }
368 unlock();
331 unlock();
369 // run aborting or removing of acqIdsToRm
332 // run aborting or removing of acqIdsToRm
370
333
371 for (auto acqId : acqIdsToRm) {
334 for (auto acqId : acqIdsToRm) {
372 removeAcqRequest(acqId);
335 removeAcqRequest(acqId);
373 }
336 }
374 qCDebug(LOG_VariableAcquisitionWorker())
337 qCDebug(LOG_VariableAcquisitionWorker())
375 << "VariableAcquisitionWorkerPrivate::cancelVarRequest end";
338 << "VariableAcquisitionWorkerPrivate::cancelVarRequest end";
376 }
339 }
377
340
378 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeAcqRequest(
341 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeAcqRequest(
379 QUuid acqRequestId)
342 QUuid acqRequestId)
380 {
343 {
381 qCDebug(LOG_VariableAcquisitionWorker())
344 qCDebug(LOG_VariableAcquisitionWorker())
382 << "VariableAcquisitionWorkerPrivate::removeAcqRequest";
345 << "VariableAcquisitionWorkerPrivate::removeAcqRequest";
383 QUuid vIdentifier;
346 QUuid vIdentifier;
384 std::shared_ptr<IDataProvider> provider;
347 std::shared_ptr<IDataProvider> provider;
385 lockRead();
348 lockRead();
386 auto acqIt = m_AcqIdentifierToAcqRequestMap.find(acqRequestId);
349 auto acqIt = m_AcqIdentifierToAcqRequestMap.find(acqRequestId);
387 if (acqIt != m_AcqIdentifierToAcqRequestMap.cend()) {
350 if (acqIt != m_AcqIdentifierToAcqRequestMap.cend()) {
388 vIdentifier = acqIt->second.m_vIdentifier;
351 vIdentifier = acqIt->second.m_vIdentifier;
389 provider = acqIt->second.m_Provider;
352 provider = acqIt->second.m_Provider;
390
353
391 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
354 auto it = m_VIdentifierToCurrrentAcqIdMap.find(vIdentifier);
392 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
355 if (it != m_VIdentifierToCurrrentAcqIdMap.cend()) {
393 if (it->second.first == acqRequestId) {
356 if (it->second == acqRequestId) {
394 // acqRequest is currently running -> let's aborting it
357 // acqRequest is currently running -> let's aborting it
395 unlock();
358 unlock();
396
359
397 // Remove the current request from the worker
398 updateToNextRequest(vIdentifier);
399
400 // notify the request aborting to the provider
360 // notify the request aborting to the provider
401 provider->requestDataAborting(acqRequestId);
361 provider->requestDataAborting(acqRequestId);
402 }
362 }
403 else if (it->second.second == acqRequestId) {
404 it->second.second = QUuid();
405 unlock();
406 }
407 else {
363 else {
408 unlock();
364 unlock();
409 }
365 }
410 }
366 }
411 else {
367 else {
412 unlock();
368 unlock();
413 }
369 }
414 }
370 }
415 else {
371 else {
416 unlock();
372 unlock();
417 }
373 }
418
374
419 lockWrite();
375 lockWrite();
420
376
421 qCDebug(LOG_VariableAcquisitionWorker())
377 qCDebug(LOG_VariableAcquisitionWorker())
422 << "VariableAcquisitionWorkerPrivate::updateToNextRequest removeAcqRequest: "
378 << "VariableAcquisitionWorkerPrivate::updateToNextRequest removeAcqRequest: "
423 << acqRequestId;
379 << acqRequestId;
424 m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqRequestId);
380 m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqRequestId);
425 m_AcqIdentifierToAcqRequestMap.erase(acqRequestId);
381 m_AcqIdentifierToAcqRequestMap.erase(acqRequestId);
426
382
427 unlock();
383 unlock();
428 qCDebug(LOG_VariableAcquisitionWorker())
384 qCDebug(LOG_VariableAcquisitionWorker())
429 << "VariableAcquisitionWorkerPrivate::removeAcqRequest END";
385 << "VariableAcquisitionWorkerPrivate::removeAcqRequest END";
430 }
386 }
General Comments 0
You need to be logged in to leave comments. Login now