##// END OF EJS Templates
Fix lock bug
perrinel -
r819:baf81ae414e4
parent child
Show More
@@ -1,409 +1,422
1 #include "Variable/VariableAcquisitionWorker.h"
1 #include "Variable/VariableAcquisitionWorker.h"
2
2
3 #include "Variable/Variable.h"
3 #include "Variable/Variable.h"
4
4
5 #include <Data/AcquisitionRequest.h>
5 #include <Data/AcquisitionRequest.h>
6 #include <Data/SqpRange.h>
6 #include <Data/SqpRange.h>
7
7
8 #include <unordered_map>
8 #include <unordered_map>
9 #include <utility>
9 #include <utility>
10
10
11 #include <QMutex>
11 #include <QMutex>
12 #include <QReadWriteLock>
12 #include <QReadWriteLock>
13 #include <QThread>
13 #include <QThread>
14
14
15 #include <cmath>
15 #include <cmath>
16
16
17 Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker")
17 Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker")
18
18
19 struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate {
19 struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate {
20
20
21 explicit VariableAcquisitionWorkerPrivate(VariableAcquisitionWorker *parent)
21 explicit VariableAcquisitionWorkerPrivate(VariableAcquisitionWorker *parent)
22 : m_Lock{QReadWriteLock::Recursive}, q{parent}
22 : m_Lock{QReadWriteLock::Recursive}, q{parent}
23 {
23 {
24 }
24 }
25
25
26 void lockRead() { m_Lock.lockForRead(); }
26 void lockRead() { m_Lock.lockForRead(); }
27 void lockWrite() { m_Lock.lockForWrite(); }
27 void lockWrite() { m_Lock.lockForWrite(); }
28 void unlock() { m_Lock.unlock(); }
28 void unlock() { m_Lock.unlock(); }
29
29
30 void removeVariableRequest(QUuid vIdentifier);
30 void removeVariableRequest(QUuid vIdentifier);
31
31
32 /// Remove the current request and execute the next one if exist
32 /// Remove the current request and execute the next one if exist
33 void updateToNextRequest(QUuid vIdentifier);
33 void updateToNextRequest(QUuid vIdentifier);
34
34
35 /// Remove and/or abort all AcqRequest in link with varRequestId
35 /// Remove and/or abort all AcqRequest in link with varRequestId
36 void cancelVarRequest(QUuid varRequestId);
36 void cancelVarRequest(QUuid varRequestId);
37 void removeAcqRequest(QUuid acqRequestId);
37 void removeAcqRequest(QUuid acqRequestId);
38
38
39 QMutex m_WorkingMutex;
39 QMutex m_WorkingMutex;
40 QReadWriteLock m_Lock;
40 QReadWriteLock m_Lock;
41
41
42 std::map<QUuid, QVector<AcquisitionDataPacket> > m_AcqIdentifierToAcqDataPacketVectorMap;
42 std::map<QUuid, QVector<AcquisitionDataPacket> > m_AcqIdentifierToAcqDataPacketVectorMap;
43 std::map<QUuid, AcquisitionRequest> m_AcqIdentifierToAcqRequestMap;
43 std::map<QUuid, AcquisitionRequest> m_AcqIdentifierToAcqRequestMap;
44 std::map<QUuid, std::pair<QUuid, QUuid> > m_VIdentifierToCurrrentAcqIdNextIdPairMap;
44 std::map<QUuid, std::pair<QUuid, QUuid> > m_VIdentifierToCurrrentAcqIdNextIdPairMap;
45 VariableAcquisitionWorker *q;
45 VariableAcquisitionWorker *q;
46 };
46 };
47
47
48
48
49 VariableAcquisitionWorker::VariableAcquisitionWorker(QObject *parent)
49 VariableAcquisitionWorker::VariableAcquisitionWorker(QObject *parent)
50 : QObject{parent}, impl{spimpl::make_unique_impl<VariableAcquisitionWorkerPrivate>(this)}
50 : QObject{parent}, impl{spimpl::make_unique_impl<VariableAcquisitionWorkerPrivate>(this)}
51 {
51 {
52 }
52 }
53
53
54 VariableAcquisitionWorker::~VariableAcquisitionWorker()
54 VariableAcquisitionWorker::~VariableAcquisitionWorker()
55 {
55 {
56 qCInfo(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker destruction")
56 qCInfo(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker destruction")
57 << QThread::currentThread();
57 << QThread::currentThread();
58 this->waitForFinish();
58 this->waitForFinish();
59 }
59 }
60
60
61
61
62 QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid vIdentifier,
62 QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid vIdentifier,
63 SqpRange rangeRequested,
63 SqpRange rangeRequested,
64 SqpRange cacheRangeRequested,
64 SqpRange cacheRangeRequested,
65 DataProviderParameters parameters,
65 DataProviderParameters parameters,
66 std::shared_ptr<IDataProvider> provider)
66 std::shared_ptr<IDataProvider> provider)
67 {
67 {
68 qCDebug(LOG_VariableAcquisitionWorker())
68 qCDebug(LOG_VariableAcquisitionWorker())
69 << tr("TORM VariableAcquisitionWorker::pushVariableRequest ") << cacheRangeRequested;
69 << tr("TORM VariableAcquisitionWorker::pushVariableRequest ") << cacheRangeRequested;
70 auto varRequestIdCanceled = QUuid();
70 auto varRequestIdCanceled = QUuid();
71
71
72 // Request creation
72 // Request creation
73 auto acqRequest = AcquisitionRequest{};
73 auto acqRequest = AcquisitionRequest{};
74 qCInfo(LOG_VariableAcquisitionWorker()) << tr("TpushVariableRequest ") << vIdentifier
74 qCInfo(LOG_VariableAcquisitionWorker()) << tr("TpushVariableRequest ") << vIdentifier
75 << varRequestId;
75 << varRequestId;
76 acqRequest.m_VarRequestId = varRequestId;
76 acqRequest.m_VarRequestId = varRequestId;
77 acqRequest.m_vIdentifier = vIdentifier;
77 acqRequest.m_vIdentifier = vIdentifier;
78 acqRequest.m_DataProviderParameters = parameters;
78 acqRequest.m_DataProviderParameters = parameters;
79 acqRequest.m_RangeRequested = rangeRequested;
79 acqRequest.m_RangeRequested = rangeRequested;
80 acqRequest.m_CacheRangeRequested = cacheRangeRequested;
80 acqRequest.m_CacheRangeRequested = cacheRangeRequested;
81 acqRequest.m_Size = parameters.m_Times.size();
81 acqRequest.m_Size = parameters.m_Times.size();
82 acqRequest.m_Provider = provider;
82 acqRequest.m_Provider = provider;
83
83
84
84
85 // Register request
85 // Register request
86 impl->lockWrite();
86 impl->lockWrite();
87 impl->m_AcqIdentifierToAcqRequestMap.insert(
87 impl->m_AcqIdentifierToAcqRequestMap.insert(
88 std::make_pair(acqRequest.m_AcqIdentifier, acqRequest));
88 std::make_pair(acqRequest.m_AcqIdentifier, acqRequest));
89
89
90 auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
90 auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
91 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
91 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
92 // A current request already exists, we can replace the next one
92 // A current request already exists, we can replace the next one
93 auto oldAcqId = it->second.second;
93 auto oldAcqId = it->second.second;
94 auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(oldAcqId);
94 auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(oldAcqId);
95 if (acqIdentifierToAcqRequestMapIt != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
95 if (acqIdentifierToAcqRequestMapIt != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
96 auto oldAcqRequest = acqIdentifierToAcqRequestMapIt->second;
96 auto oldAcqRequest = acqIdentifierToAcqRequestMapIt->second;
97 varRequestIdCanceled = oldAcqRequest.m_VarRequestId;
97 varRequestIdCanceled = oldAcqRequest.m_VarRequestId;
98 }
98 }
99 // remove old acqIdentifier from the worker
100 impl->cancelVarRequest(oldAcqId);
101 // impl->m_AcqIdentifierToAcqRequestMap.erase(oldAcqId);
102
99
103 it->second.second = acqRequest.m_AcqIdentifier;
100 it->second.second = acqRequest.m_AcqIdentifier;
104
105
106 impl->unlock();
101 impl->unlock();
102
103 // remove old acqIdentifier from the worker
104 impl->cancelVarRequest(oldAcqId);
105 // impl->m_AcqIdentifierToAcqRequestMap.erase(oldAcqId);
107 }
106 }
108 else {
107 else {
109 // First request for the variable, it must be stored and executed
108 // First request for the variable, it must be stored and executed
110 impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.insert(
109 impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.insert(
111 std::make_pair(vIdentifier, std::make_pair(acqRequest.m_AcqIdentifier, QUuid())));
110 std::make_pair(vIdentifier, std::make_pair(acqRequest.m_AcqIdentifier, QUuid())));
112 impl->unlock();
111 impl->unlock();
113
112
114 QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
113 QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
115 Q_ARG(QUuid, acqRequest.m_AcqIdentifier));
114 Q_ARG(QUuid, acqRequest.m_AcqIdentifier));
116 }
115 }
117
116
118 return varRequestIdCanceled;
117 return varRequestIdCanceled;
119 }
118 }
120
119
121 void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier)
120 void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier)
122 {
121 {
123 impl->lockRead();
122 impl->lockRead();
124
123
125 auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
124 auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
126 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
125 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
127 auto currentAcqId = it->second.first;
126 auto currentAcqId = it->second.first;
128
127
129 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(currentAcqId);
128 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(currentAcqId);
130 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
129 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
131 auto request = it->second;
130 auto request = it->second;
132 impl->unlock();
131 impl->unlock();
133
132
134 // Remove the current request from the worker
133 // Remove the current request from the worker
135 impl->updateToNextRequest(vIdentifier);
134 impl->updateToNextRequest(vIdentifier);
136
135
137 // notify the request aborting to the provider
136 // notify the request aborting to the provider
138 request.m_Provider->requestDataAborting(currentAcqId);
137 request.m_Provider->requestDataAborting(currentAcqId);
139 }
138 }
140 else {
139 else {
141 impl->unlock();
140 impl->unlock();
142 qCWarning(LOG_VariableAcquisitionWorker())
141 qCWarning(LOG_VariableAcquisitionWorker())
143 << tr("Impossible to abort an unknown acquisition request") << currentAcqId;
142 << tr("Impossible to abort an unknown acquisition request") << currentAcqId;
144 }
143 }
145 }
144 }
146 else {
145 else {
147 impl->unlock();
146 impl->unlock();
148 }
147 }
149 }
148 }
150
149
151 void VariableAcquisitionWorker::onVariableRetrieveDataInProgress(QUuid acqIdentifier,
150 void VariableAcquisitionWorker::onVariableRetrieveDataInProgress(QUuid acqIdentifier,
152 double progress)
151 double progress)
153 {
152 {
154 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableRetrieveDataInProgress ")
153 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableRetrieveDataInProgress ")
155 << acqIdentifier << progress;
154 << acqIdentifier << progress;
156 impl->lockRead();
155 impl->lockRead();
157 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
156 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
158 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
157 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
159 auto currentPartSize = (aIdToARit->second.m_Size != 0) ? 100 / aIdToARit->second.m_Size : 0;
158 auto currentPartSize = (aIdToARit->second.m_Size != 0) ? 100 / aIdToARit->second.m_Size : 0;
160
159
161 auto currentPartProgress
160 auto currentPartProgress
162 = std::isnan(progress) ? 0.0 : (progress * currentPartSize) / 100.0;
161 = std::isnan(progress) ? 0.0 : (progress * currentPartSize) / 100.0;
163 auto currentAlreadyProgress = aIdToARit->second.m_Progression * currentPartSize;
162 auto currentAlreadyProgress = aIdToARit->second.m_Progression * currentPartSize;
164
163
165 auto finalProgression = currentAlreadyProgress + currentPartProgress;
164 auto finalProgression = currentAlreadyProgress + currentPartProgress;
166 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, finalProgression);
165 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, finalProgression);
167 qCDebug(LOG_VariableAcquisitionWorker())
166 qCDebug(LOG_VariableAcquisitionWorker())
168 << tr("TORM: onVariableRetrieveDataInProgress ")
167 << tr("TORM: onVariableRetrieveDataInProgress ")
169 << QThread::currentThread()->objectName() << aIdToARit->second.m_vIdentifier
168 << QThread::currentThread()->objectName() << aIdToARit->second.m_vIdentifier
170 << currentPartSize << currentAlreadyProgress << currentPartProgress << finalProgression;
169 << currentPartSize << currentAlreadyProgress << currentPartProgress << finalProgression;
171 if (finalProgression == 100.0) {
170 if (finalProgression == 100.0) {
172 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, 0.0);
171 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, 0.0);
173 }
172 }
174 }
173 }
175 impl->unlock();
174 impl->unlock();
176 }
175 }
177
176
178 void VariableAcquisitionWorker::onVariableAcquisitionFailed(QUuid acqIdentifier)
177 void VariableAcquisitionWorker::onVariableAcquisitionFailed(QUuid acqIdentifier)
179 {
178 {
180 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
179 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
181 << QThread::currentThread();
180 << QThread::currentThread();
182 impl->lockRead();
181 impl->lockRead();
183 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
182 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
184 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
183 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
185 auto request = it->second;
184 auto request = it->second;
186 impl->unlock();
185 impl->unlock();
187 qCInfo(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
186 qCInfo(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
188 << acqIdentifier << request.m_vIdentifier
187 << acqIdentifier << request.m_vIdentifier
189 << QThread::currentThread();
188 << QThread::currentThread();
190 emit variableCanceledRequested(request.m_vIdentifier);
189 emit variableCanceledRequested(request.m_vIdentifier);
191 }
190 }
192 else {
191 else {
193 impl->unlock();
192 impl->unlock();
194 // TODO log no acqIdentifier recognized
193 // TODO log no acqIdentifier recognized
195 }
194 }
196 }
195 }
197
196
198 void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier,
197 void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier,
199 std::shared_ptr<IDataSeries> dataSeries,
198 std::shared_ptr<IDataSeries> dataSeries,
200 SqpRange dataRangeAcquired)
199 SqpRange dataRangeAcquired)
201 {
200 {
202 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableDataAcquired on range ")
201 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableDataAcquired on range ")
203 << acqIdentifier << dataRangeAcquired;
202 << acqIdentifier << dataRangeAcquired;
204 impl->lockWrite();
203 impl->lockWrite();
205 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
204 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
206 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
205 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
207 // Store the result
206 // Store the result
208 auto dataPacket = AcquisitionDataPacket{};
207 auto dataPacket = AcquisitionDataPacket{};
209 dataPacket.m_Range = dataRangeAcquired;
208 dataPacket.m_Range = dataRangeAcquired;
210 dataPacket.m_DateSeries = dataSeries;
209 dataPacket.m_DateSeries = dataSeries;
211
210
212 auto aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
211 auto aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
213 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
212 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
214 // A current request result already exists, we can update it
213 // A current request result already exists, we can update it
215 aIdToADPVit->second.push_back(dataPacket);
214 aIdToADPVit->second.push_back(dataPacket);
216 }
215 }
217 else {
216 else {
218 // First request result for the variable, it must be stored
217 // First request result for the variable, it must be stored
219 impl->m_AcqIdentifierToAcqDataPacketVectorMap.insert(
218 impl->m_AcqIdentifierToAcqDataPacketVectorMap.insert(
220 std::make_pair(acqIdentifier, QVector<AcquisitionDataPacket>() << dataPacket));
219 std::make_pair(acqIdentifier, QVector<AcquisitionDataPacket>() << dataPacket));
221 }
220 }
222
221
223
222
224 // Decrement the counter of the request
223 // Decrement the counter of the request
225 auto &acqRequest = aIdToARit->second;
224 auto &acqRequest = aIdToARit->second;
226 acqRequest.m_Progression = acqRequest.m_Progression + 1;
225 acqRequest.m_Progression = acqRequest.m_Progression + 1;
227
226
228 // if the counter is 0, we can return data then run the next request if it exists and
227 // if the counter is 0, we can return data then run the next request if it exists and
229 // removed the finished request
228 // removed the finished request
230 if (acqRequest.m_Size == acqRequest.m_Progression) {
229 if (acqRequest.m_Size == acqRequest.m_Progression) {
231 auto varId = acqRequest.m_vIdentifier;
230 auto varId = acqRequest.m_vIdentifier;
232 auto rangeRequested = acqRequest.m_RangeRequested;
231 auto rangeRequested = acqRequest.m_RangeRequested;
233 auto cacheRangeRequested = acqRequest.m_CacheRangeRequested;
232 auto cacheRangeRequested = acqRequest.m_CacheRangeRequested;
234 // Return the data
233 // Return the data
235 aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
234 aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
236 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
235 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
237 emit dataProvided(varId, rangeRequested, cacheRangeRequested, aIdToADPVit->second);
236 emit dataProvided(varId, rangeRequested, cacheRangeRequested, aIdToADPVit->second);
238 }
237 }
239 impl->unlock();
238 impl->unlock();
240
239
241 // Update to the next request
240 // Update to the next request
242 impl->updateToNextRequest(acqRequest.m_vIdentifier);
241 impl->updateToNextRequest(acqRequest.m_vIdentifier);
243 }
242 }
244 else {
243 else {
245 impl->unlock();
244 impl->unlock();
246 }
245 }
247 }
246 }
248 else {
247 else {
249 impl->unlock();
248 impl->unlock();
250 qCWarning(LOG_VariableAcquisitionWorker())
249 qCWarning(LOG_VariableAcquisitionWorker())
251 << tr("Impossible to retrieve AcquisitionRequest for the incoming data.");
250 << tr("Impossible to retrieve AcquisitionRequest for the incoming data.");
252 }
251 }
253 }
252 }
254
253
255 void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier)
254 void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier)
256 {
255 {
257 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onExecuteRequest") << QThread::currentThread();
256 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onExecuteRequest") << QThread::currentThread();
258 impl->lockRead();
257 impl->lockRead();
259 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
258 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
260 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
259 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
261 auto request = it->second;
260 auto request = it->second;
262 impl->unlock();
261 impl->unlock();
263 emit variableRequestInProgress(request.m_vIdentifier, 0.1);
262 emit variableRequestInProgress(request.m_vIdentifier, 0.1);
264 request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters);
263 request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters);
265 }
264 }
266 else {
265 else {
267 impl->unlock();
266 impl->unlock();
268 // TODO log no acqIdentifier recognized
267 // TODO log no acqIdentifier recognized
269 }
268 }
270 }
269 }
271
270
272 void VariableAcquisitionWorker::initialize()
271 void VariableAcquisitionWorker::initialize()
273 {
272 {
274 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init")
273 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init")
275 << QThread::currentThread();
274 << QThread::currentThread();
276 impl->m_WorkingMutex.lock();
275 impl->m_WorkingMutex.lock();
277 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init END");
276 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init END");
278 }
277 }
279
278
280 void VariableAcquisitionWorker::finalize()
279 void VariableAcquisitionWorker::finalize()
281 {
280 {
282 impl->m_WorkingMutex.unlock();
281 impl->m_WorkingMutex.unlock();
283 }
282 }
284
283
285 void VariableAcquisitionWorker::waitForFinish()
284 void VariableAcquisitionWorker::waitForFinish()
286 {
285 {
287 QMutexLocker locker{&impl->m_WorkingMutex};
286 QMutexLocker locker{&impl->m_WorkingMutex};
288 }
287 }
289
288
290 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariableRequest(
289 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariableRequest(
291 QUuid vIdentifier)
290 QUuid vIdentifier)
292 {
291 {
293 lockWrite();
292 lockWrite();
294 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
293 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
295
294
296 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
295 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
297 // A current request already exists, we can replace the next one
296 // A current request already exists, we can replace the next one
298
297
299 m_AcqIdentifierToAcqRequestMap.erase(it->second.first);
298 m_AcqIdentifierToAcqRequestMap.erase(it->second.first);
300 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.first);
299 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.first);
301
300
302 m_AcqIdentifierToAcqRequestMap.erase(it->second.second);
301 m_AcqIdentifierToAcqRequestMap.erase(it->second.second);
303 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.second);
302 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.second);
304 }
303 }
305 m_VIdentifierToCurrrentAcqIdNextIdPairMap.erase(vIdentifier);
304 m_VIdentifierToCurrrentAcqIdNextIdPairMap.erase(vIdentifier);
306 unlock();
305 unlock();
307 }
306 }
308
307
309 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::updateToNextRequest(
308 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::updateToNextRequest(
310 QUuid vIdentifier)
309 QUuid vIdentifier)
311 {
310 {
312 lockRead();
311 lockRead();
313 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
312 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
314 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
313 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
315 if (it->second.second.isNull()) {
314 if (it->second.second.isNull()) {
316 unlock();
315 unlock();
317 // There is no next request, we can remove the variable request
316 // There is no next request, we can remove the variable request
318 removeVariableRequest(vIdentifier);
317 removeVariableRequest(vIdentifier);
319 }
318 }
320 else {
319 else {
321 auto acqIdentifierToRemove = it->second.first;
320 auto acqIdentifierToRemove = it->second.first;
322 // Move the next request to the current request
321 // Move the next request to the current request
323 auto nextRequestId = it->second.second;
322 auto nextRequestId = it->second.second;
324 it->second.first = nextRequestId;
323 it->second.first = nextRequestId;
325 it->second.second = QUuid();
324 it->second.second = QUuid();
326 unlock();
325 unlock();
327 // Remove AcquisitionRequest and results;
326 // Remove AcquisitionRequest and results;
328 lockWrite();
327 lockWrite();
329 m_AcqIdentifierToAcqRequestMap.erase(acqIdentifierToRemove);
328 m_AcqIdentifierToAcqRequestMap.erase(acqIdentifierToRemove);
330 m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqIdentifierToRemove);
329 m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqIdentifierToRemove);
331 unlock();
330 unlock();
332 // Execute the current request
331 // Execute the current request
333 QMetaObject::invokeMethod(q, "onExecuteRequest", Qt::QueuedConnection,
332 QMetaObject::invokeMethod(q, "onExecuteRequest", Qt::QueuedConnection,
334 Q_ARG(QUuid, nextRequestId));
333 Q_ARG(QUuid, nextRequestId));
335 }
334 }
336 }
335 }
337 else {
336 else {
338 unlock();
337 unlock();
339 qCCritical(LOG_VariableAcquisitionWorker())
338 qCCritical(LOG_VariableAcquisitionWorker())
340 << tr("Impossible to execute the acquisition on an unfound variable ");
339 << tr("Impossible to execute the acquisition on an unfound variable ");
341 }
340 }
342 }
341 }
343
342
344 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::cancelVarRequest(
343 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::cancelVarRequest(
345 QUuid varRequestId)
344 QUuid varRequestId)
346 {
345 {
346 qInfo() << "VariableAcquisitionWorkerPrivate::cancelVarRequest 0";
347 lockRead();
347 lockRead();
348 // get all AcqIdentifier in link with varRequestId
348 // get all AcqIdentifier in link with varRequestId
349 QVector<QUuid> acqIdsToRm;
349 QVector<QUuid> acqIdsToRm;
350 auto cend = m_AcqIdentifierToAcqRequestMap.cend();
350 auto cend = m_AcqIdentifierToAcqRequestMap.cend();
351 for (auto it = m_AcqIdentifierToAcqRequestMap.cbegin(); it != cend; ++it) {
351 for (auto it = m_AcqIdentifierToAcqRequestMap.cbegin(); it != cend; ++it) {
352 if (it->second.m_VarRequestId == varRequestId) {
352 if (it->second.m_VarRequestId == varRequestId) {
353 acqIdsToRm << it->first;
353 acqIdsToRm << it->first;
354 }
354 }
355 }
355 }
356 unlock();
356 unlock();
357 // run aborting or removing of acqIdsToRm
357 // run aborting or removing of acqIdsToRm
358
358
359 for (auto acqId : acqIdsToRm) {
359 for (auto acqId : acqIdsToRm) {
360 removeAcqRequest(acqId);
360 removeAcqRequest(acqId);
361 }
361 }
362 qInfo() << "VariableAcquisitionWorkerPrivate::cancelVarRequest end";
362 }
363 }
363
364
364 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeAcqRequest(
365 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeAcqRequest(
365 QUuid acqRequestId)
366 QUuid acqRequestId)
366 {
367 {
368 qInfo() << "VariableAcquisitionWorkerPrivate::removeAcqRequest 0";
367 QUuid vIdentifier;
369 QUuid vIdentifier;
368 std::shared_ptr<IDataProvider> provider;
370 std::shared_ptr<IDataProvider> provider;
369 lockRead();
371 lockRead();
370 auto acqIt = m_AcqIdentifierToAcqRequestMap.find(acqRequestId);
372 auto acqIt = m_AcqIdentifierToAcqRequestMap.find(acqRequestId);
373 qInfo() << "VariableAcquisitionWorkerPrivate::removeAcqRequest 1";
371 if (acqIt != m_AcqIdentifierToAcqRequestMap.cend()) {
374 if (acqIt != m_AcqIdentifierToAcqRequestMap.cend()) {
372 vIdentifier = acqIt->second.m_vIdentifier;
375 vIdentifier = acqIt->second.m_vIdentifier;
373 provider = acqIt->second.m_Provider;
376 provider = acqIt->second.m_Provider;
374
377
378 qInfo() << "VariableAcquisitionWorkerPrivate::removeAcqRequest 2";
375 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
379 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
376 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
380 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
381 qInfo() << "VariableAcquisitionWorkerPrivate::removeAcqRequest 3";
377 if (it->second.first == acqRequestId) {
382 if (it->second.first == acqRequestId) {
378 // acqRequest is currently running -> let's aborting it
383 // acqRequest is currently running -> let's aborting it
384 qInfo() << "VariableAcquisitionWorkerPrivate::removeAcqRequest 4";
379 unlock();
385 unlock();
380
386
381 // Remove the current request from the worker
387 // Remove the current request from the worker
382 updateToNextRequest(vIdentifier);
388 updateToNextRequest(vIdentifier);
383
389
384 // notify the request aborting to the provider
390 // notify the request aborting to the provider
385 provider->requestDataAborting(acqRequestId);
391 provider->requestDataAborting(acqRequestId);
392 qInfo() << "VariableAcquisitionWorkerPrivate::removeAcqRequest 5";
386 }
393 }
387 else if (it->second.second == acqRequestId) {
394 else if (it->second.second == acqRequestId) {
388 it->second.second = QUuid();
395 it->second.second = QUuid();
396 qInfo() << "VariableAcquisitionWorkerPrivate::removeAcqRequest 6";
389 unlock();
397 unlock();
390 }
398 }
391 else {
399 else {
400 qInfo() << "VariableAcquisitionWorkerPrivate::removeAcqRequest 7";
392 unlock();
401 unlock();
393 }
402 }
394 }
403 }
395 else {
404 else {
405 qInfo() << "VariableAcquisitionWorkerPrivate::removeAcqRequest 8";
396 unlock();
406 unlock();
397 }
407 }
398 }
408 }
399 else {
409 else {
410 qInfo() << "VariableAcquisitionWorkerPrivate::removeAcqRequest 9";
400 unlock();
411 unlock();
401 }
412 }
402
413
414 qInfo() << "VariableAcquisitionWorkerPrivate::removeAcqRequest 10";
403 lockWrite();
415 lockWrite();
404
416
405 m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqRequestId);
417 m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqRequestId);
406 m_AcqIdentifierToAcqRequestMap.erase(acqRequestId);
418 m_AcqIdentifierToAcqRequestMap.erase(acqRequestId);
407
419
408 unlock();
420 unlock();
421 qInfo() << "VariableAcquisitionWorkerPrivate::removeAcqRequest 11";
409 }
422 }
General Comments 3
Under Review
author

Auto status change to "Under Review"

Approved
author

Status change > Approved

You need to be logged in to leave comments. Login now