##// END OF EJS Templates
Remove old log
perrinel -
r822:b0ea6b620c8b
parent child
Show More
@@ -1,436 +1,416
1 #include "Variable/VariableAcquisitionWorker.h"
1 #include "Variable/VariableAcquisitionWorker.h"
2
2
3 #include "Variable/Variable.h"
3 #include "Variable/Variable.h"
4
4
5 #include <Data/AcquisitionRequest.h>
5 #include <Data/AcquisitionRequest.h>
6 #include <Data/SqpRange.h>
6 #include <Data/SqpRange.h>
7
7
8 #include <unordered_map>
8 #include <unordered_map>
9 #include <utility>
9 #include <utility>
10
10
11 #include <QMutex>
11 #include <QMutex>
12 #include <QReadWriteLock>
12 #include <QReadWriteLock>
13 #include <QThread>
13 #include <QThread>
14
14
15 #include <cmath>
15 #include <cmath>
16
16
17 Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker")
17 Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker")
18
18
19 struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate {
19 struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate {
20
20
21 explicit VariableAcquisitionWorkerPrivate(VariableAcquisitionWorker *parent)
21 explicit VariableAcquisitionWorkerPrivate(VariableAcquisitionWorker *parent)
22 : m_Lock{QReadWriteLock::Recursive}, q{parent}
22 : m_Lock{QReadWriteLock::Recursive}, q{parent}
23 {
23 {
24 }
24 }
25
25
26 void lockRead() { m_Lock.lockForRead(); }
26 void lockRead() { m_Lock.lockForRead(); }
27 void lockWrite() { m_Lock.lockForWrite(); }
27 void lockWrite() { m_Lock.lockForWrite(); }
28 void unlock() { m_Lock.unlock(); }
28 void unlock() { m_Lock.unlock(); }
29
29
30 void removeVariableRequest(QUuid vIdentifier);
30 void removeVariableRequest(QUuid vIdentifier);
31
31
32 /// Remove the current request and execute the next one if exist
32 /// Remove the current request and execute the next one if exist
33 void updateToNextRequest(QUuid vIdentifier);
33 void updateToNextRequest(QUuid vIdentifier);
34
34
35 /// Remove and/or abort all AcqRequest in link with varRequestId
35 /// Remove and/or abort all AcqRequest in link with varRequestId
36 void cancelVarRequest(QUuid varRequestId);
36 void cancelVarRequest(QUuid varRequestId);
37 void removeAcqRequest(QUuid acqRequestId);
37 void removeAcqRequest(QUuid acqRequestId);
38
38
39 QMutex m_WorkingMutex;
39 QMutex m_WorkingMutex;
40 QReadWriteLock m_Lock;
40 QReadWriteLock m_Lock;
41
41
42 std::map<QUuid, QVector<AcquisitionDataPacket> > m_AcqIdentifierToAcqDataPacketVectorMap;
42 std::map<QUuid, QVector<AcquisitionDataPacket> > m_AcqIdentifierToAcqDataPacketVectorMap;
43 std::map<QUuid, AcquisitionRequest> m_AcqIdentifierToAcqRequestMap;
43 std::map<QUuid, AcquisitionRequest> m_AcqIdentifierToAcqRequestMap;
44 std::map<QUuid, std::pair<QUuid, QUuid> > m_VIdentifierToCurrrentAcqIdNextIdPairMap;
44 std::map<QUuid, std::pair<QUuid, QUuid> > m_VIdentifierToCurrrentAcqIdNextIdPairMap;
45 VariableAcquisitionWorker *q;
45 VariableAcquisitionWorker *q;
46 };
46 };
47
47
48
48
49 VariableAcquisitionWorker::VariableAcquisitionWorker(QObject *parent)
49 VariableAcquisitionWorker::VariableAcquisitionWorker(QObject *parent)
50 : QObject{parent}, impl{spimpl::make_unique_impl<VariableAcquisitionWorkerPrivate>(this)}
50 : QObject{parent}, impl{spimpl::make_unique_impl<VariableAcquisitionWorkerPrivate>(this)}
51 {
51 {
52 }
52 }
53
53
54 VariableAcquisitionWorker::~VariableAcquisitionWorker()
54 VariableAcquisitionWorker::~VariableAcquisitionWorker()
55 {
55 {
56 qCInfo(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker destruction")
56 qCInfo(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker destruction")
57 << QThread::currentThread();
57 << QThread::currentThread();
58 this->waitForFinish();
58 this->waitForFinish();
59 }
59 }
60
60
61
61
62 QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid vIdentifier,
62 QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid vIdentifier,
63 SqpRange rangeRequested,
63 SqpRange rangeRequested,
64 SqpRange cacheRangeRequested,
64 SqpRange cacheRangeRequested,
65 DataProviderParameters parameters,
65 DataProviderParameters parameters,
66 std::shared_ptr<IDataProvider> provider)
66 std::shared_ptr<IDataProvider> provider)
67 {
67 {
68 qCDebug(LOG_VariableAcquisitionWorker())
68 qCDebug(LOG_VariableAcquisitionWorker())
69 << tr("TORM VariableAcquisitionWorker::pushVariableRequest ") << cacheRangeRequested;
69 << tr("TORM VariableAcquisitionWorker::pushVariableRequest ") << cacheRangeRequested;
70 auto varRequestIdCanceled = QUuid();
70 auto varRequestIdCanceled = QUuid();
71
71
72 // Request creation
72 // Request creation
73 auto acqRequest = AcquisitionRequest{};
73 auto acqRequest = AcquisitionRequest{};
74 qCDebug(LOG_VariableAcquisitionWorker()) << tr("PushVariableRequest ") << vIdentifier
74 qCDebug(LOG_VariableAcquisitionWorker()) << tr("PushVariableRequest ") << vIdentifier
75 << varRequestId;
75 << varRequestId;
76 acqRequest.m_VarRequestId = varRequestId;
76 acqRequest.m_VarRequestId = varRequestId;
77 acqRequest.m_vIdentifier = vIdentifier;
77 acqRequest.m_vIdentifier = vIdentifier;
78 acqRequest.m_DataProviderParameters = parameters;
78 acqRequest.m_DataProviderParameters = parameters;
79 acqRequest.m_RangeRequested = rangeRequested;
79 acqRequest.m_RangeRequested = rangeRequested;
80 acqRequest.m_CacheRangeRequested = cacheRangeRequested;
80 acqRequest.m_CacheRangeRequested = cacheRangeRequested;
81 acqRequest.m_Size = parameters.m_Times.size();
81 acqRequest.m_Size = parameters.m_Times.size();
82 acqRequest.m_Provider = provider;
82 acqRequest.m_Provider = provider;
83
83
84
84
85 // Register request
85 // Register request
86 impl->lockWrite();
86 impl->lockWrite();
87 impl->m_AcqIdentifierToAcqRequestMap.insert(
87 impl->m_AcqIdentifierToAcqRequestMap.insert(
88 std::make_pair(acqRequest.m_AcqIdentifier, acqRequest));
88 std::make_pair(acqRequest.m_AcqIdentifier, acqRequest));
89
89
90 auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
90 auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
91 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
91 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
92 // A current request already exists, we can replace the next one
92 // A current request already exists, we can replace the next one
93 auto oldAcqId = it->second.second;
93 auto oldAcqId = it->second.second;
94 auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(oldAcqId);
94 auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(oldAcqId);
95 if (acqIdentifierToAcqRequestMapIt != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
95 if (acqIdentifierToAcqRequestMapIt != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
96 auto oldAcqRequest = acqIdentifierToAcqRequestMapIt->second;
96 auto oldAcqRequest = acqIdentifierToAcqRequestMapIt->second;
97 varRequestIdCanceled = oldAcqRequest.m_VarRequestId;
97 varRequestIdCanceled = oldAcqRequest.m_VarRequestId;
98 }
98 }
99
99
100 it->second.second = acqRequest.m_AcqIdentifier;
100 it->second.second = acqRequest.m_AcqIdentifier;
101 impl->unlock();
101 impl->unlock();
102
102
103 // remove old acqIdentifier from the worker
103 // remove old acqIdentifier from the worker
104 impl->cancelVarRequest(varRequestIdCanceled);
104 impl->cancelVarRequest(varRequestIdCanceled);
105 // impl->m_AcqIdentifierToAcqRequestMap.erase(oldAcqId);
105 // impl->m_AcqIdentifierToAcqRequestMap.erase(oldAcqId);
106 }
106 }
107 else {
107 else {
108 // First request for the variable, it must be stored and executed
108 // First request for the variable, it must be stored and executed
109 impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.insert(
109 impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.insert(
110 std::make_pair(vIdentifier, std::make_pair(acqRequest.m_AcqIdentifier, QUuid())));
110 std::make_pair(vIdentifier, std::make_pair(acqRequest.m_AcqIdentifier, QUuid())));
111 impl->unlock();
111 impl->unlock();
112
112
113 QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
113 QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
114 Q_ARG(QUuid, acqRequest.m_AcqIdentifier));
114 Q_ARG(QUuid, acqRequest.m_AcqIdentifier));
115 }
115 }
116
116
117 return varRequestIdCanceled;
117 return varRequestIdCanceled;
118 }
118 }
119
119
120 void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier)
120 void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier)
121 {
121 {
122 impl->lockRead();
122 impl->lockRead();
123
123
124 auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
124 auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
125 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
125 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
126 auto currentAcqId = it->second.first;
126 auto currentAcqId = it->second.first;
127
127
128 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(currentAcqId);
128 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(currentAcqId);
129 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
129 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
130 auto request = it->second;
130 auto request = it->second;
131 impl->unlock();
131 impl->unlock();
132
132
133 // Remove the current request from the worker
133 // Remove the current request from the worker
134 impl->updateToNextRequest(vIdentifier);
134 impl->updateToNextRequest(vIdentifier);
135
135
136 // notify the request aborting to the provider
136 // notify the request aborting to the provider
137 request.m_Provider->requestDataAborting(currentAcqId);
137 request.m_Provider->requestDataAborting(currentAcqId);
138 }
138 }
139 else {
139 else {
140 impl->unlock();
140 impl->unlock();
141 qCWarning(LOG_VariableAcquisitionWorker())
141 qCWarning(LOG_VariableAcquisitionWorker())
142 << tr("Impossible to abort an unknown acquisition request") << currentAcqId;
142 << tr("Impossible to abort an unknown acquisition request") << currentAcqId;
143 }
143 }
144 }
144 }
145 else {
145 else {
146 impl->unlock();
146 impl->unlock();
147 }
147 }
148 }
148 }
149
149
150 void VariableAcquisitionWorker::onVariableRetrieveDataInProgress(QUuid acqIdentifier,
150 void VariableAcquisitionWorker::onVariableRetrieveDataInProgress(QUuid acqIdentifier,
151 double progress)
151 double progress)
152 {
152 {
153 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableRetrieveDataInProgress ")
153 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableRetrieveDataInProgress ")
154 << acqIdentifier << progress;
154 << acqIdentifier << progress;
155 impl->lockRead();
155 impl->lockRead();
156 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
156 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
157 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
157 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
158 auto currentPartSize = (aIdToARit->second.m_Size != 0) ? 100 / aIdToARit->second.m_Size : 0;
158 auto currentPartSize = (aIdToARit->second.m_Size != 0) ? 100 / aIdToARit->second.m_Size : 0;
159
159
160 auto currentPartProgress
160 auto currentPartProgress
161 = std::isnan(progress) ? 0.0 : (progress * currentPartSize) / 100.0;
161 = std::isnan(progress) ? 0.0 : (progress * currentPartSize) / 100.0;
162 auto currentAlreadyProgress = aIdToARit->second.m_Progression * currentPartSize;
162 auto currentAlreadyProgress = aIdToARit->second.m_Progression * currentPartSize;
163
163
164 auto finalProgression = currentAlreadyProgress + currentPartProgress;
164 auto finalProgression = currentAlreadyProgress + currentPartProgress;
165 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, finalProgression);
165 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, finalProgression);
166 qCDebug(LOG_VariableAcquisitionWorker())
166 qCDebug(LOG_VariableAcquisitionWorker())
167 << tr("TORM: onVariableRetrieveDataInProgress ")
167 << tr("TORM: onVariableRetrieveDataInProgress ")
168 << QThread::currentThread()->objectName() << aIdToARit->second.m_vIdentifier
168 << QThread::currentThread()->objectName() << aIdToARit->second.m_vIdentifier
169 << currentPartSize << currentAlreadyProgress << currentPartProgress << finalProgression;
169 << currentPartSize << currentAlreadyProgress << currentPartProgress << finalProgression;
170 if (finalProgression == 100.0) {
170 if (finalProgression == 100.0) {
171 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, 0.0);
171 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, 0.0);
172 }
172 }
173 }
173 }
174 impl->unlock();
174 impl->unlock();
175 }
175 }
176
176
177 void VariableAcquisitionWorker::onVariableAcquisitionFailed(QUuid acqIdentifier)
177 void VariableAcquisitionWorker::onVariableAcquisitionFailed(QUuid acqIdentifier)
178 {
178 {
179 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
179 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
180 << QThread::currentThread();
180 << QThread::currentThread();
181 impl->lockRead();
181 impl->lockRead();
182 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
182 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
183 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
183 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
184 auto request = it->second;
184 auto request = it->second;
185 impl->unlock();
185 impl->unlock();
186 qCInfo(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
186 qCInfo(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
187 << acqIdentifier << request.m_vIdentifier
187 << acqIdentifier << request.m_vIdentifier
188 << QThread::currentThread();
188 << QThread::currentThread();
189 emit variableCanceledRequested(request.m_vIdentifier);
189 emit variableCanceledRequested(request.m_vIdentifier);
190 }
190 }
191 else {
191 else {
192 impl->unlock();
192 impl->unlock();
193 // TODO log no acqIdentifier recognized
193 // TODO log no acqIdentifier recognized
194 }
194 }
195 }
195 }
196
196
197 void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier,
197 void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier,
198 std::shared_ptr<IDataSeries> dataSeries,
198 std::shared_ptr<IDataSeries> dataSeries,
199 SqpRange dataRangeAcquired)
199 SqpRange dataRangeAcquired)
200 {
200 {
201 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableDataAcquired on range ")
201 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableDataAcquired on range ")
202 << acqIdentifier << dataRangeAcquired;
202 << acqIdentifier << dataRangeAcquired;
203 impl->lockWrite();
203 impl->lockWrite();
204 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
204 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
205 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
205 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
206 // Store the result
206 // Store the result
207 auto dataPacket = AcquisitionDataPacket{};
207 auto dataPacket = AcquisitionDataPacket{};
208 dataPacket.m_Range = dataRangeAcquired;
208 dataPacket.m_Range = dataRangeAcquired;
209 dataPacket.m_DateSeries = dataSeries;
209 dataPacket.m_DateSeries = dataSeries;
210
210
211 auto aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
211 auto aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
212 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
212 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
213 // A current request result already exists, we can update it
213 // A current request result already exists, we can update it
214 aIdToADPVit->second.push_back(dataPacket);
214 aIdToADPVit->second.push_back(dataPacket);
215 }
215 }
216 else {
216 else {
217 // First request result for the variable, it must be stored
217 // First request result for the variable, it must be stored
218 impl->m_AcqIdentifierToAcqDataPacketVectorMap.insert(
218 impl->m_AcqIdentifierToAcqDataPacketVectorMap.insert(
219 std::make_pair(acqIdentifier, QVector<AcquisitionDataPacket>() << dataPacket));
219 std::make_pair(acqIdentifier, QVector<AcquisitionDataPacket>() << dataPacket));
220 }
220 }
221
221
222
222
223 // Decrement the counter of the request
223 // Decrement the counter of the request
224 auto &acqRequest = aIdToARit->second;
224 auto &acqRequest = aIdToARit->second;
225 acqRequest.m_Progression = acqRequest.m_Progression + 1;
225 acqRequest.m_Progression = acqRequest.m_Progression + 1;
226
226
227 // if the counter is 0, we can return data then run the next request if it exists and
227 // if the counter is 0, we can return data then run the next request if it exists and
228 // removed the finished request
228 // removed the finished request
229 if (acqRequest.m_Size == acqRequest.m_Progression) {
229 if (acqRequest.m_Size == acqRequest.m_Progression) {
230 auto varId = acqRequest.m_vIdentifier;
230 auto varId = acqRequest.m_vIdentifier;
231 auto rangeRequested = acqRequest.m_RangeRequested;
231 auto rangeRequested = acqRequest.m_RangeRequested;
232 auto cacheRangeRequested = acqRequest.m_CacheRangeRequested;
232 auto cacheRangeRequested = acqRequest.m_CacheRangeRequested;
233 // Return the data
233 // Return the data
234 aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
234 aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
235 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
235 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
236 emit dataProvided(varId, rangeRequested, cacheRangeRequested, aIdToADPVit->second);
236 emit dataProvided(varId, rangeRequested, cacheRangeRequested, aIdToADPVit->second);
237 }
237 }
238 impl->unlock();
238 impl->unlock();
239
239
240 // Update to the next request
240 // Update to the next request
241 impl->updateToNextRequest(acqRequest.m_vIdentifier);
241 impl->updateToNextRequest(acqRequest.m_vIdentifier);
242 }
242 }
243 else {
243 else {
244 impl->unlock();
244 impl->unlock();
245 }
245 }
246 }
246 }
247 else {
247 else {
248 impl->unlock();
248 impl->unlock();
249 qCWarning(LOG_VariableAcquisitionWorker())
249 qCWarning(LOG_VariableAcquisitionWorker())
250 << tr("Impossible to retrieve AcquisitionRequest for the incoming data.");
250 << tr("Impossible to retrieve AcquisitionRequest for the incoming data.");
251 }
251 }
252 }
252 }
253
253
254 void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier)
254 void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier)
255 {
255 {
256 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onExecuteRequest") << QThread::currentThread();
256 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onExecuteRequest") << QThread::currentThread();
257 impl->lockRead();
257 impl->lockRead();
258 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
258 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
259 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
259 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
260 auto request = it->second;
260 auto request = it->second;
261 impl->unlock();
261 impl->unlock();
262 emit variableRequestInProgress(request.m_vIdentifier, 0.1);
262 emit variableRequestInProgress(request.m_vIdentifier, 0.1);
263 request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters);
263 request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters);
264 }
264 }
265 else {
265 else {
266 impl->unlock();
266 impl->unlock();
267 // TODO log no acqIdentifier recognized
267 // TODO log no acqIdentifier recognized
268 }
268 }
269 }
269 }
270
270
271 void VariableAcquisitionWorker::initialize()
271 void VariableAcquisitionWorker::initialize()
272 {
272 {
273 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init")
273 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init")
274 << QThread::currentThread();
274 << QThread::currentThread();
275 impl->m_WorkingMutex.lock();
275 impl->m_WorkingMutex.lock();
276 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init END");
276 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init END");
277 }
277 }
278
278
279 void VariableAcquisitionWorker::finalize()
279 void VariableAcquisitionWorker::finalize()
280 {
280 {
281 impl->m_WorkingMutex.unlock();
281 impl->m_WorkingMutex.unlock();
282 }
282 }
283
283
284 void VariableAcquisitionWorker::waitForFinish()
284 void VariableAcquisitionWorker::waitForFinish()
285 {
285 {
286 QMutexLocker locker{&impl->m_WorkingMutex};
286 QMutexLocker locker{&impl->m_WorkingMutex};
287 }
287 }
288
288
289 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariableRequest(
289 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariableRequest(
290 QUuid vIdentifier)
290 QUuid vIdentifier)
291 {
291 {
292 lockWrite();
292 lockWrite();
293 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
293 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
294
294
295 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
295 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
296 // A current request already exists, we can replace the next one
296 // A current request already exists, we can replace the next one
297
297
298 m_AcqIdentifierToAcqRequestMap.erase(it->second.first);
298 m_AcqIdentifierToAcqRequestMap.erase(it->second.first);
299 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.first);
299 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.first);
300
300
301 m_AcqIdentifierToAcqRequestMap.erase(it->second.second);
301 m_AcqIdentifierToAcqRequestMap.erase(it->second.second);
302 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.second);
302 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.second);
303 }
303 }
304 m_VIdentifierToCurrrentAcqIdNextIdPairMap.erase(vIdentifier);
304 m_VIdentifierToCurrrentAcqIdNextIdPairMap.erase(vIdentifier);
305 unlock();
305 unlock();
306 }
306 }
307
307
308 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::updateToNextRequest(
308 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::updateToNextRequest(
309 QUuid vIdentifier)
309 QUuid vIdentifier)
310 {
310 {
311 lockRead();
311 lockRead();
312 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
312 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
313 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
313 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
314 if (it->second.second.isNull()) {
314 if (it->second.second.isNull()) {
315 unlock();
315 unlock();
316 // There is no next request, we can remove the variable request
316 // There is no next request, we can remove the variable request
317 removeVariableRequest(vIdentifier);
317 removeVariableRequest(vIdentifier);
318 }
318 }
319 else {
319 else {
320 auto acqIdentifierToRemove = it->second.first;
320 auto acqIdentifierToRemove = it->second.first;
321 // Move the next request to the current request
321 // Move the next request to the current request
322 auto nextRequestId = it->second.second;
322 auto nextRequestId = it->second.second;
323 it->second.first = nextRequestId;
323 it->second.first = nextRequestId;
324 it->second.second = QUuid();
324 it->second.second = QUuid();
325 unlock();
325 unlock();
326 // Remove AcquisitionRequest and results;
326 // Remove AcquisitionRequest and results;
327 lockWrite();
327 lockWrite();
328 m_AcqIdentifierToAcqRequestMap.erase(acqIdentifierToRemove);
328 m_AcqIdentifierToAcqRequestMap.erase(acqIdentifierToRemove);
329 m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqIdentifierToRemove);
329 m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqIdentifierToRemove);
330 unlock();
330 unlock();
331 // Execute the current request
331 // Execute the current request
332 QMetaObject::invokeMethod(q, "onExecuteRequest", Qt::QueuedConnection,
332 QMetaObject::invokeMethod(q, "onExecuteRequest", Qt::QueuedConnection,
333 Q_ARG(QUuid, nextRequestId));
333 Q_ARG(QUuid, nextRequestId));
334 }
334 }
335 }
335 }
336 else {
336 else {
337 unlock();
337 unlock();
338 qCCritical(LOG_VariableAcquisitionWorker())
338 qCCritical(LOG_VariableAcquisitionWorker())
339 << tr("Impossible to execute the acquisition on an unfound variable ");
339 << tr("Impossible to execute the acquisition on an unfound variable ");
340 }
340 }
341 }
341 }
342
342
343 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::cancelVarRequest(
343 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::cancelVarRequest(
344 QUuid varRequestId)
344 QUuid varRequestId)
345 {
345 {
346 qCCritical(LOG_VariableAcquisitionWorker())
346 qCDebug(LOG_VariableAcquisitionWorker())
347 << "VariableAcquisitionWorkerPrivate::cancelVarRequest 0";
347 << "VariableAcquisitionWorkerPrivate::cancelVarRequest 0";
348 lockRead();
348 lockRead();
349 // get all AcqIdentifier in link with varRequestId
349 // get all AcqIdentifier in link with varRequestId
350 QVector<QUuid> acqIdsToRm;
350 QVector<QUuid> acqIdsToRm;
351 auto cend = m_AcqIdentifierToAcqRequestMap.cend();
351 auto cend = m_AcqIdentifierToAcqRequestMap.cend();
352 for (auto it = m_AcqIdentifierToAcqRequestMap.cbegin(); it != cend; ++it) {
352 for (auto it = m_AcqIdentifierToAcqRequestMap.cbegin(); it != cend; ++it) {
353 if (it->second.m_VarRequestId == varRequestId) {
353 if (it->second.m_VarRequestId == varRequestId) {
354 acqIdsToRm << it->first;
354 acqIdsToRm << it->first;
355 }
355 }
356 }
356 }
357 unlock();
357 unlock();
358 // run aborting or removing of acqIdsToRm
358 // run aborting or removing of acqIdsToRm
359
359
360 for (auto acqId : acqIdsToRm) {
360 for (auto acqId : acqIdsToRm) {
361 removeAcqRequest(acqId);
361 removeAcqRequest(acqId);
362 }
362 }
363 qCCritical(LOG_VariableAcquisitionWorker())
363 qCDebug(LOG_VariableAcquisitionWorker())
364 << "VariableAcquisitionWorkerPrivate::cancelVarRequest end";
364 << "VariableAcquisitionWorkerPrivate::cancelVarRequest end";
365 }
365 }
366
366
367 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeAcqRequest(
367 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeAcqRequest(
368 QUuid acqRequestId)
368 QUuid acqRequestId)
369 {
369 {
370 qCDebug(LOG_VariableAcquisitionWorker())
370 qCDebug(LOG_VariableAcquisitionWorker())
371 << "VariableAcquisitionWorkerPrivate::removeAcqRequest 0";
371 << "VariableAcquisitionWorkerPrivate::removeAcqRequest";
372 QUuid vIdentifier;
372 QUuid vIdentifier;
373 std::shared_ptr<IDataProvider> provider;
373 std::shared_ptr<IDataProvider> provider;
374 lockRead();
374 lockRead();
375 auto acqIt = m_AcqIdentifierToAcqRequestMap.find(acqRequestId);
375 auto acqIt = m_AcqIdentifierToAcqRequestMap.find(acqRequestId);
376 qCDebug(LOG_VariableAcquisitionWorker())
377 << "VariableAcquisitionWorkerPrivate::removeAcqRequest 1";
378 if (acqIt != m_AcqIdentifierToAcqRequestMap.cend()) {
376 if (acqIt != m_AcqIdentifierToAcqRequestMap.cend()) {
379 vIdentifier = acqIt->second.m_vIdentifier;
377 vIdentifier = acqIt->second.m_vIdentifier;
380 provider = acqIt->second.m_Provider;
378 provider = acqIt->second.m_Provider;
381
379
382 qCDebug(LOG_VariableAcquisitionWorker())
383 << "VariableAcquisitionWorkerPrivate::removeAcqRequest 2";
384 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
380 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
385 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
381 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
386 qCDebug(LOG_VariableAcquisitionWorker())
387 << "VariableAcquisitionWorkerPrivate::removeAcqRequest 3";
388 if (it->second.first == acqRequestId) {
382 if (it->second.first == acqRequestId) {
389 // acqRequest is currently running -> let's aborting it
383 // acqRequest is currently running -> let's aborting it
390 qCDebug(LOG_VariableAcquisitionWorker())
391 << "VariableAcquisitionWorkerPrivate::removeAcqRequest 4";
392 unlock();
384 unlock();
393
385
394 // Remove the current request from the worker
386 // Remove the current request from the worker
395 updateToNextRequest(vIdentifier);
387 updateToNextRequest(vIdentifier);
396
388
397 // notify the request aborting to the provider
389 // notify the request aborting to the provider
398 provider->requestDataAborting(acqRequestId);
390 provider->requestDataAborting(acqRequestId);
399 qCDebug(LOG_VariableAcquisitionWorker())
400 << "VariableAcquisitionWorkerPrivate::removeAcqRequest 5";
401 }
391 }
402 else if (it->second.second == acqRequestId) {
392 else if (it->second.second == acqRequestId) {
403 it->second.second = QUuid();
393 it->second.second = QUuid();
404 qCDebug(LOG_VariableAcquisitionWorker())
405 << "VariableAcquisitionWorkerPrivate::removeAcqRequest 6";
406 unlock();
394 unlock();
407 }
395 }
408 else {
396 else {
409 qCDebug(LOG_VariableAcquisitionWorker())
410 << "VariableAcquisitionWorkerPrivate::removeAcqRequest 7";
411 unlock();
397 unlock();
412 }
398 }
413 }
399 }
414 else {
400 else {
415 qCDebug(LOG_VariableAcquisitionWorker())
416 << "VariableAcquisitionWorkerPrivate::removeAcqRequest 8";
417 unlock();
401 unlock();
418 }
402 }
419 }
403 }
420 else {
404 else {
421 qCDebug(LOG_VariableAcquisitionWorker())
422 << "VariableAcquisitionWorkerPrivate::removeAcqRequest 9";
423 unlock();
405 unlock();
424 }
406 }
425
407
426 qCDebug(LOG_VariableAcquisitionWorker())
427 << "VariableAcquisitionWorkerPrivate::removeAcqRequest 10";
428 lockWrite();
408 lockWrite();
429
409
430 m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqRequestId);
410 m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqRequestId);
431 m_AcqIdentifierToAcqRequestMap.erase(acqRequestId);
411 m_AcqIdentifierToAcqRequestMap.erase(acqRequestId);
432
412
433 unlock();
413 unlock();
434 qCDebug(LOG_VariableAcquisitionWorker())
414 qCDebug(LOG_VariableAcquisitionWorker())
435 << "VariableAcquisitionWorkerPrivate::removeAcqRequest 11";
415 << "VariableAcquisitionWorkerPrivate::removeAcqRequest END";
436 }
416 }
General Comments 3
Under Review
author

Auto status change to "Under Review"

Approved
author

Status change > Approved

You need to be logged in to leave comments. Login now