##// END OF EJS Templates
Fix bug when canceling acquisition
perrinel -
r1399:5f9a7b80e8f3
parent child
Show More
@@ -1,368 +1,369
1 1 #include "Variable/VariableAcquisitionWorker.h"
2 2
3 3 #include "Variable/Variable.h"
4 4
5 5 #include <Data/AcquisitionRequest.h>
6 6 #include <Data/SqpRange.h>
7 7
8 8 #include <unordered_map>
9 9 #include <utility>
10 10
11 11 #include <QMutex>
12 12 #include <QReadWriteLock>
13 13 #include <QThread>
14 14
15 15 #include <cmath>
16 16
17 17 Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker")
18 18
19 19 struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate {
20 20
21 21 explicit VariableAcquisitionWorkerPrivate(VariableAcquisitionWorker *parent)
22 22 : m_Lock{QReadWriteLock::Recursive}, q{parent}
23 23 {
24 24 }
25 25
26 26 void lockRead() { m_Lock.lockForRead(); }
27 27 void lockWrite() { m_Lock.lockForWrite(); }
28 28 void unlock() { m_Lock.unlock(); }
29 29
30 30 void removeVariableRequest(QUuid vIdentifier);
31 31
32 32 /// Remove and/or abort all AcqRequest in link with varRequestId
33 33 void cancelVarRequest(QUuid varRequestId);
34 34 void removeAcqRequest(QUuid acqRequestId);
35 35
36 36 QMutex m_WorkingMutex;
37 37 QReadWriteLock m_Lock;
38 38
39 39 std::map<QUuid, QVector<AcquisitionDataPacket> > m_AcqIdentifierToAcqDataPacketVectorMap;
40 40 std::map<QUuid, AcquisitionRequest> m_AcqIdentifierToAcqRequestMap;
41 41 std::map<QUuid, QUuid> m_VIdentifierToCurrrentAcqIdMap;
42 42 VariableAcquisitionWorker *q;
43 43 };
44 44
45 45
46 46 VariableAcquisitionWorker::VariableAcquisitionWorker(QObject *parent)
47 47 : QObject{parent}, impl{spimpl::make_unique_impl<VariableAcquisitionWorkerPrivate>(this)}
48 48 {
49 49 }
50 50
51 51 VariableAcquisitionWorker::~VariableAcquisitionWorker()
52 52 {
53 53 qCInfo(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker destruction")
54 54 << QThread::currentThread();
55 55 this->waitForFinish();
56 56 }
57 57
58 58
59 59 QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid vIdentifier,
60 60 DataProviderParameters parameters,
61 61 std::shared_ptr<IDataProvider> provider)
62 62 {
63 63 qCDebug(LOG_VariableAcquisitionWorker())
64 64 << tr("TORM VariableAcquisitionWorker::pushVariableRequest varRequestId: ") << varRequestId
65 65 << "vId: " << vIdentifier;
66 66 auto varRequestIdCanceled = QUuid();
67 67
68 68 // Request creation
69 69 auto acqRequest = AcquisitionRequest{};
70 70 acqRequest.m_VarRequestId = varRequestId;
71 71 acqRequest.m_vIdentifier = vIdentifier;
72 72 acqRequest.m_DataProviderParameters = parameters;
73 73 acqRequest.m_Size = parameters.m_Times.size();
74 74 acqRequest.m_Provider = provider;
75 75 qCInfo(LOG_VariableAcquisitionWorker()) << tr("Add acqRequest ") << acqRequest.m_AcqIdentifier
76 76 << acqRequest.m_Size;
77 77
78 78
79 79 // Register request
80 80 impl->lockWrite();
81 81 impl->m_AcqIdentifierToAcqRequestMap.insert(
82 82 std::make_pair(acqRequest.m_AcqIdentifier, acqRequest));
83 83
84 84 auto it = impl->m_VIdentifierToCurrrentAcqIdMap.find(vIdentifier);
85 85 if (it != impl->m_VIdentifierToCurrrentAcqIdMap.cend()) {
86 86 // A current request already exists, we can cancel it
87 87 // remove old acqIdentifier from the worker
88 88 auto oldAcqId = it->second;
89 89 auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(oldAcqId);
90 90 if (acqIdentifierToAcqRequestMapIt != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
91 91 auto oldAcqRequest = acqIdentifierToAcqRequestMapIt->second;
92 92 varRequestIdCanceled = oldAcqRequest.m_VarRequestId;
93 93 }
94 94 impl->unlock();
95 95 impl->cancelVarRequest(varRequestIdCanceled);
96 96 }
97 97 else {
98 98 impl->unlock();
99 99 }
100 100
101 101 // Request for the variable, it must be stored and executed
102 102 impl->lockWrite();
103 103 impl->m_VIdentifierToCurrrentAcqIdMap.insert(
104 104 std::make_pair(vIdentifier, acqRequest.m_AcqIdentifier));
105 105 impl->unlock();
106 106
107 107 QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
108 108 Q_ARG(QUuid, acqRequest.m_AcqIdentifier));
109 109
110 110 return varRequestIdCanceled;
111 111 }
112 112
113 113 void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier)
114 114 {
115 115 impl->lockRead();
116 116
117 117 auto it = impl->m_VIdentifierToCurrrentAcqIdMap.find(vIdentifier);
118 118 if (it != impl->m_VIdentifierToCurrrentAcqIdMap.cend()) {
119 119 auto currentAcqId = it->second;
120 120
121 121 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(currentAcqId);
122 122 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
123 123 auto request = it->second;
124 124 impl->unlock();
125 125
126 126 // notify the request aborting to the provider
127 127 request.m_Provider->requestDataAborting(currentAcqId);
128 128 }
129 129 else {
130 130 impl->unlock();
131 131 qCWarning(LOG_VariableAcquisitionWorker())
132 132 << tr("Impossible to abort an unknown acquisition request") << currentAcqId;
133 133 }
134 134 }
135 135 else {
136 136 impl->unlock();
137 137 }
138 138 }
139 139
140 140 void VariableAcquisitionWorker::onVariableRetrieveDataInProgress(QUuid acqIdentifier,
141 141 double progress)
142 142 {
143 143 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableRetrieveDataInProgress ")
144 144 << QThread::currentThread()->objectName()
145 145 << acqIdentifier << progress;
146 146 impl->lockRead();
147 147 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
148 148 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
149 149 auto progressPartSize
150 150 = (aIdToARit->second.m_Size != 0) ? 100 / aIdToARit->second.m_Size : 0;
151 151
152 152 auto currentPartProgress
153 153 = std::isnan(progress) ? 0.0 : (progress * progressPartSize) / 100.0;
154 154
155 155 // We can only give an approximation of the currentProgression since its upgrade is async.
156 156 auto currentProgression = aIdToARit->second.m_Progression;
157 157 if (currentProgression == aIdToARit->second.m_Size) {
158 158 currentProgression = aIdToARit->second.m_Size - 1;
159 159 }
160 160
161 161 auto currentAlreadyProgress = progressPartSize * currentProgression;
162 162
163 163
164 164 auto finalProgression = currentAlreadyProgress + currentPartProgress;
165 165 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, finalProgression);
166 166
167 167 if (finalProgression == 100.0) {
168 168 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, 0.0);
169 169 }
170 170 }
171 171 impl->unlock();
172 172 }
173 173
174 174 void VariableAcquisitionWorker::onVariableAcquisitionFailed(QUuid acqIdentifier)
175 175 {
176 176 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
177 177 << QThread::currentThread();
178 178 impl->lockRead();
179 179 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
180 180 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
181 181 auto request = it->second;
182 182 impl->unlock();
183 183 emit variableCanceledRequested(request.m_vIdentifier);
184 184 }
185 185 else {
186 186 impl->unlock();
187 187 }
188 188 }
189 189
190 190 void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier,
191 191 std::shared_ptr<IDataSeries> dataSeries,
192 192 SqpRange dataRangeAcquired)
193 193 {
194 194 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableDataAcquired on range ")
195 195 << acqIdentifier << dataRangeAcquired;
196 196 impl->lockWrite();
197 197 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
198 198 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
199 199 // Store the result
200 200 auto dataPacket = AcquisitionDataPacket{};
201 201 dataPacket.m_Range = dataRangeAcquired;
202 202 dataPacket.m_DateSeries = dataSeries;
203 203
204 204 auto aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
205 205 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
206 206 // A current request result already exists, we can update it
207 207 aIdToADPVit->second.push_back(dataPacket);
208 208 }
209 209 else {
210 210 // First request result for the variable, it must be stored
211 211 impl->m_AcqIdentifierToAcqDataPacketVectorMap.insert(
212 212 std::make_pair(acqIdentifier, QVector<AcquisitionDataPacket>() << dataPacket));
213 213 }
214 214
215 215
216 216 // Decrement the counter of the request
217 217 auto &acqRequest = aIdToARit->second;
218 218 acqRequest.m_Progression = acqRequest.m_Progression + 1;
219 219
220 220 // if the counter is 0, we can return data then run the next request if it exists and
221 221 // removed the finished request
222 222 if (acqRequest.m_Size == acqRequest.m_Progression) {
223 223 auto varId = acqRequest.m_vIdentifier;
224 224 // Return the data
225 225 aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
226 226 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
227 227 emit dataProvided(varId, aIdToADPVit->second);
228 228 }
229 229 impl->unlock();
230 230 }
231 231 else {
232 232 impl->unlock();
233 233 }
234 234 }
235 235 else {
236 236 impl->unlock();
237 237 qCWarning(LOG_VariableAcquisitionWorker())
238 238 << tr("Impossible to retrieve AcquisitionRequest for the incoming data.");
239 239 }
240 240 }
241 241
242 242 void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier)
243 243 {
244 244 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onExecuteRequest") << QThread::currentThread();
245 245 impl->lockRead();
246 246 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
247 247 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
248 248 auto request = it->second;
249 249 impl->unlock();
250 250 emit variableRequestInProgress(request.m_vIdentifier, 0.1);
251 251 qCDebug(LOG_VariableAcquisitionWorker()) << tr("Start request 10%") << acqIdentifier
252 252 << QThread::currentThread();
253 253 request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters);
254 254 }
255 255 else {
256 256 impl->unlock();
257 257 // TODO log no acqIdentifier recognized
258 258 }
259 259 }
260 260
261 261 void VariableAcquisitionWorker::initialize()
262 262 {
263 263 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init")
264 264 << QThread::currentThread();
265 265 impl->m_WorkingMutex.lock();
266 266 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init END");
267 267 }
268 268
269 269 void VariableAcquisitionWorker::finalize()
270 270 {
271 271 impl->m_WorkingMutex.unlock();
272 272 }
273 273
274 274 void VariableAcquisitionWorker::waitForFinish()
275 275 {
276 276 QMutexLocker locker{&impl->m_WorkingMutex};
277 277 }
278 278
279 279 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariableRequest(
280 280 QUuid vIdentifier)
281 281 {
282 282 lockWrite();
283 283 auto it = m_VIdentifierToCurrrentAcqIdMap.find(vIdentifier);
284 284
285 285 if (it != m_VIdentifierToCurrrentAcqIdMap.cend()) {
286 286 // A current request already exists, we can replace the next one
287 287
288 288 qCDebug(LOG_VariableAcquisitionWorker())
289 289 << "VariableAcquisitionWorkerPrivate::removeVariableRequest "
290 290 << QThread::currentThread()->objectName() << it->second;
291 291 m_AcqIdentifierToAcqRequestMap.erase(it->second);
292 292 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second);
293 293 }
294 294
295 295 // stop any progression
296 296 emit q->variableRequestInProgress(vIdentifier, 0.0);
297 297
298 298 m_VIdentifierToCurrrentAcqIdMap.erase(vIdentifier);
299 299 unlock();
300 300 }
301 301
302 302 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::cancelVarRequest(
303 303 QUuid varRequestId)
304 304 {
305 305 qCDebug(LOG_VariableAcquisitionWorker())
306 306 << "VariableAcquisitionWorkerPrivate::cancelVarRequest start";
307 307 lockRead();
308 308 // get all AcqIdentifier in link with varRequestId
309 309 QVector<QUuid> acqIdsToRm;
310 310 auto cend = m_AcqIdentifierToAcqRequestMap.cend();
311 311 for (auto it = m_AcqIdentifierToAcqRequestMap.cbegin(); it != cend; ++it) {
312 312 if (it->second.m_VarRequestId == varRequestId) {
313 313 acqIdsToRm << it->first;
314 314 }
315 315 }
316 316 unlock();
317 317 // run aborting or removing of acqIdsToRm
318 318
319 319 for (auto acqId : acqIdsToRm) {
320 320 removeAcqRequest(acqId);
321 321 }
322 322 qCDebug(LOG_VariableAcquisitionWorker())
323 323 << "VariableAcquisitionWorkerPrivate::cancelVarRequest end";
324 324 }
325 325
326 326 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeAcqRequest(
327 327 QUuid acqRequestId)
328 328 {
329 329 qCDebug(LOG_VariableAcquisitionWorker())
330 330 << "VariableAcquisitionWorkerPrivate::removeAcqRequest";
331 331 QUuid vIdentifier;
332 332 std::shared_ptr<IDataProvider> provider;
333 333 lockRead();
334 334 auto acqIt = m_AcqIdentifierToAcqRequestMap.find(acqRequestId);
335 335 if (acqIt != m_AcqIdentifierToAcqRequestMap.cend()) {
336 336 vIdentifier = acqIt->second.m_vIdentifier;
337 337 provider = acqIt->second.m_Provider;
338 338
339 339 auto it = m_VIdentifierToCurrrentAcqIdMap.find(vIdentifier);
340 340 if (it != m_VIdentifierToCurrrentAcqIdMap.cend()) {
341 341 if (it->second == acqRequestId) {
342 342 // acqRequest is currently running -> let's aborting it
343 343 unlock();
344 344
345 345 // notify the request aborting to the provider
346 346 provider->requestDataAborting(acqRequestId);
347 347 }
348 348 else {
349 349 unlock();
350 350 }
351 351 }
352 352 else {
353 353 unlock();
354 354 }
355 355 }
356 356 else {
357 357 unlock();
358 358 }
359 359
360 360 lockWrite();
361 361
362 362 m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqRequestId);
363 363 m_AcqIdentifierToAcqRequestMap.erase(acqRequestId);
364 m_VIdentifierToCurrrentAcqIdMap.erase(vIdentifier);
364 365
365 366 unlock();
366 367 qCDebug(LOG_VariableAcquisitionWorker())
367 368 << "VariableAcquisitionWorkerPrivate::removeAcqRequest END";
368 369 }
@@ -1,1092 +1,1101
1 1 #include <Variable/Variable.h>
2 2 #include <Variable/VariableAcquisitionWorker.h>
3 3 #include <Variable/VariableCacheStrategy.h>
4 4 #include <Variable/VariableCacheStrategyFactory.h>
5 5 #include <Variable/VariableController.h>
6 6 #include <Variable/VariableModel.h>
7 7 #include <Variable/VariableSynchronizationGroup.h>
8 8
9 9 #include <Data/DataProviderParameters.h>
10 10 #include <Data/IDataProvider.h>
11 11 #include <Data/IDataSeries.h>
12 12 #include <Data/VariableRequest.h>
13 13 #include <Time/TimeController.h>
14 14
15 15 #include <QDataStream>
16 16 #include <QMutex>
17 17 #include <QThread>
18 18 #include <QUuid>
19 19 #include <QtCore/QItemSelectionModel>
20 20
21 21 #include <deque>
22 22 #include <set>
23 23 #include <unordered_map>
24 24
25 25 Q_LOGGING_CATEGORY(LOG_VariableController, "VariableController")
26 26
27 27 namespace {
28 28
29 29 SqpRange computeSynchroRangeRequested(const SqpRange &varRange, const SqpRange &graphRange,
30 30 const SqpRange &oldGraphRange)
31 31 {
32 32 auto zoomType = VariableController::getZoomType(graphRange, oldGraphRange);
33 33
34 34 auto varRangeRequested = varRange;
35 35 switch (zoomType) {
36 36 case AcquisitionZoomType::ZoomIn: {
37 37 auto deltaLeft = graphRange.m_TStart - oldGraphRange.m_TStart;
38 38 auto deltaRight = oldGraphRange.m_TEnd - graphRange.m_TEnd;
39 39 varRangeRequested.m_TStart += deltaLeft;
40 40 varRangeRequested.m_TEnd -= deltaRight;
41 41 break;
42 42 }
43 43
44 44 case AcquisitionZoomType::ZoomOut: {
45 45 auto deltaLeft = oldGraphRange.m_TStart - graphRange.m_TStart;
46 46 auto deltaRight = graphRange.m_TEnd - oldGraphRange.m_TEnd;
47 47 varRangeRequested.m_TStart -= deltaLeft;
48 48 varRangeRequested.m_TEnd += deltaRight;
49 49 break;
50 50 }
51 51 case AcquisitionZoomType::PanRight: {
52 52 auto deltaLeft = graphRange.m_TStart - oldGraphRange.m_TStart;
53 53 auto deltaRight = graphRange.m_TEnd - oldGraphRange.m_TEnd;
54 54 varRangeRequested.m_TStart += deltaLeft;
55 55 varRangeRequested.m_TEnd += deltaRight;
56 56 break;
57 57 }
58 58 case AcquisitionZoomType::PanLeft: {
59 59 auto deltaLeft = oldGraphRange.m_TStart - graphRange.m_TStart;
60 60 auto deltaRight = oldGraphRange.m_TEnd - graphRange.m_TEnd;
61 61 varRangeRequested.m_TStart -= deltaLeft;
62 62 varRangeRequested.m_TEnd -= deltaRight;
63 63 break;
64 64 }
65 65 case AcquisitionZoomType::Unknown: {
66 66 qCCritical(LOG_VariableController())
67 67 << VariableController::tr("Impossible to synchronize: zoom type unknown");
68 68 break;
69 69 }
70 70 default:
71 71 qCCritical(LOG_VariableController()) << VariableController::tr(
72 72 "Impossible to synchronize: zoom type not take into account");
73 73 // No action
74 74 break;
75 75 }
76 76
77 77 return varRangeRequested;
78 78 }
79 79 }
80 80
81 81 enum class VariableRequestHandlerState { OFF, RUNNING, PENDING };
82 82
83 83 struct VariableRequestHandler {
84 84
85 85 VariableRequestHandler()
86 86 {
87 87 m_CanUpdate = false;
88 88 m_State = VariableRequestHandlerState::OFF;
89 89 }
90 90
91 91 QUuid m_VarId;
92 92 VariableRequest m_RunningVarRequest;
93 93 VariableRequest m_PendingVarRequest;
94 94 VariableRequestHandlerState m_State;
95 95 bool m_CanUpdate;
96 96 };
97 97
98 98 struct VariableController::VariableControllerPrivate {
99 99 explicit VariableControllerPrivate(VariableController *parent)
100 100 : m_WorkingMutex{},
101 101 m_VariableModel{new VariableModel{parent}},
102 102 m_VariableSelectionModel{new QItemSelectionModel{m_VariableModel, parent}},
103 103 // m_VariableCacheStrategy{std::make_unique<VariableCacheStrategy>()},
104 104 m_VariableCacheStrategy{VariableCacheStrategyFactory::createCacheStrategy(
105 105 CacheStrategy::SingleThreshold)},
106 106 m_VariableAcquisitionWorker{std::make_unique<VariableAcquisitionWorker>()},
107 107 q{parent}
108 108 {
109 109
110 110 m_VariableAcquisitionWorker->moveToThread(&m_VariableAcquisitionWorkerThread);
111 111 m_VariableAcquisitionWorkerThread.setObjectName("VariableAcquisitionWorkerThread");
112 112 }
113 113
114 114
115 115 virtual ~VariableControllerPrivate()
116 116 {
117 117 qCDebug(LOG_VariableController()) << tr("VariableControllerPrivate destruction");
118 118 m_VariableAcquisitionWorkerThread.quit();
119 119 m_VariableAcquisitionWorkerThread.wait();
120 120 }
121 121
122 122
123 123 void processRequest(std::shared_ptr<Variable> var, const SqpRange &rangeRequested,
124 124 QUuid varRequestId);
125 125
126 126 std::shared_ptr<Variable> findVariable(QUuid vIdentifier);
127 127 std::shared_ptr<IDataSeries>
128 128 retrieveDataSeries(const QVector<AcquisitionDataPacket> acqDataPacketVector);
129 129
130 130 void registerProvider(std::shared_ptr<IDataProvider> provider);
131 131
132 132 void storeVariableRequest(QUuid varId, QUuid varRequestId, const VariableRequest &varRequest);
133 133 QUuid acceptVariableRequest(QUuid varId, std::shared_ptr<IDataSeries> dataSeries);
134 134 void updateVariables(QUuid varRequestId);
135 135 void updateVariableRequest(QUuid varRequestId);
136 136 void cancelVariableRequest(QUuid varRequestId);
137 137 void executeVarRequest(std::shared_ptr<Variable> var, VariableRequest &varRequest);
138 138
139 139 template <typename VariableIterator>
140 140 void desynchronize(VariableIterator variableIt, const QUuid &syncGroupId);
141 141
142 142 QMutex m_WorkingMutex;
143 143 /// Variable model. The VariableController has the ownership
144 144 VariableModel *m_VariableModel;
145 145 QItemSelectionModel *m_VariableSelectionModel;
146 146
147 147
148 148 TimeController *m_TimeController{nullptr};
149 149 std::unique_ptr<VariableCacheStrategy> m_VariableCacheStrategy;
150 150 std::unique_ptr<VariableAcquisitionWorker> m_VariableAcquisitionWorker;
151 151 QThread m_VariableAcquisitionWorkerThread;
152 152
153 153 std::unordered_map<std::shared_ptr<Variable>, std::shared_ptr<IDataProvider> >
154 154 m_VariableToProviderMap;
155 155 std::unordered_map<std::shared_ptr<Variable>, QUuid> m_VariableToIdentifierMap;
156 156 std::map<QUuid, std::shared_ptr<VariableSynchronizationGroup> >
157 157 m_GroupIdToVariableSynchronizationGroupMap;
158 158 std::map<QUuid, QUuid> m_VariableIdGroupIdMap;
159 159 std::set<std::shared_ptr<IDataProvider> > m_ProviderSet;
160 160
161 161 std::map<QUuid, std::list<QUuid> > m_VarGroupIdToVarIds;
162 162 std::map<QUuid, std::unique_ptr<VariableRequestHandler> > m_VarIdToVarRequestHandler;
163 163
164 164 VariableController *q;
165 165 };
166 166
167 167
168 168 VariableController::VariableController(QObject *parent)
169 169 : QObject{parent}, impl{spimpl::make_unique_impl<VariableControllerPrivate>(this)}
170 170 {
171 171 qCDebug(LOG_VariableController()) << tr("VariableController construction")
172 172 << QThread::currentThread();
173 173
174 174 connect(impl->m_VariableModel, &VariableModel::abortProgessRequested, this,
175 175 &VariableController::onAbortProgressRequested);
176 176
177 177 connect(impl->m_VariableAcquisitionWorker.get(),
178 178 &VariableAcquisitionWorker::variableCanceledRequested, this,
179 179 &VariableController::onAbortAcquisitionRequested);
180 180
181 181 connect(impl->m_VariableAcquisitionWorker.get(), &VariableAcquisitionWorker::dataProvided, this,
182 182 &VariableController::onDataProvided);
183 183 connect(impl->m_VariableAcquisitionWorker.get(),
184 184 &VariableAcquisitionWorker::variableRequestInProgress, this,
185 185 &VariableController::onVariableRetrieveDataInProgress);
186 186
187 187
188 188 connect(&impl->m_VariableAcquisitionWorkerThread, &QThread::started,
189 189 impl->m_VariableAcquisitionWorker.get(), &VariableAcquisitionWorker::initialize);
190 190 connect(&impl->m_VariableAcquisitionWorkerThread, &QThread::finished,
191 191 impl->m_VariableAcquisitionWorker.get(), &VariableAcquisitionWorker::finalize);
192 192
193 193 connect(impl->m_VariableModel, &VariableModel::requestVariableRangeUpdate, this,
194 194 &VariableController::onUpdateDateTime);
195 195
196 196 impl->m_VariableAcquisitionWorkerThread.start();
197 197 }
198 198
199 199 VariableController::~VariableController()
200 200 {
201 201 qCDebug(LOG_VariableController()) << tr("VariableController destruction")
202 202 << QThread::currentThread();
203 203 this->waitForFinish();
204 204 }
205 205
206 206 VariableModel *VariableController::variableModel() noexcept
207 207 {
208 208 return impl->m_VariableModel;
209 209 }
210 210
211 211 QItemSelectionModel *VariableController::variableSelectionModel() noexcept
212 212 {
213 213 return impl->m_VariableSelectionModel;
214 214 }
215 215
216 216 void VariableController::setTimeController(TimeController *timeController) noexcept
217 217 {
218 218 impl->m_TimeController = timeController;
219 219 }
220 220
221 221 std::shared_ptr<Variable>
222 222 VariableController::cloneVariable(std::shared_ptr<Variable> variable) noexcept
223 223 {
224 224 if (impl->m_VariableModel->containsVariable(variable)) {
225 225 // Clones variable
226 226 auto duplicate = variable->clone();
227 227
228 228 // Adds clone to model
229 229 impl->m_VariableModel->addVariable(duplicate);
230 230
231 231 // Generates clone identifier
232 232 impl->m_VariableToIdentifierMap[duplicate] = QUuid::createUuid();
233 233
234 234 // Registers provider
235 235 auto variableProvider = impl->m_VariableToProviderMap.at(variable);
236 236 auto duplicateProvider = variableProvider != nullptr ? variableProvider->clone() : nullptr;
237 237
238 238 impl->m_VariableToProviderMap[duplicate] = duplicateProvider;
239 239 if (duplicateProvider) {
240 240 impl->registerProvider(duplicateProvider);
241 241 }
242 242
243 243 return duplicate;
244 244 }
245 245 else {
246 246 qCCritical(LOG_VariableController())
247 247 << tr("Can't create duplicate of variable %1: variable not registered in the model")
248 248 .arg(variable->name());
249 249 return nullptr;
250 250 }
251 251 }
252 252
253 253 void VariableController::deleteVariable(std::shared_ptr<Variable> variable) noexcept
254 254 {
255 255 if (!variable) {
256 256 qCCritical(LOG_VariableController()) << "Can't delete variable: variable is null";
257 257 return;
258 258 }
259 259
260 260 // Spreads in SciQlop that the variable will be deleted, so that potential receivers can
261 261 // make some treatments before the deletion
262 262 emit variableAboutToBeDeleted(variable);
263 263
264 264 auto variableIt = impl->m_VariableToIdentifierMap.find(variable);
265 265 Q_ASSERT(variableIt != impl->m_VariableToIdentifierMap.cend());
266 266
267 267 auto variableId = variableIt->second;
268 268
269 269 // Removes variable's handler
270 270 impl->m_VarIdToVarRequestHandler.erase(variableId);
271 271
272 272 // Desynchronizes variable (if the variable is in a sync group)
273 273 auto syncGroupIt = impl->m_VariableIdGroupIdMap.find(variableId);
274 274 if (syncGroupIt != impl->m_VariableIdGroupIdMap.cend()) {
275 275 impl->desynchronize(variableIt, syncGroupIt->second);
276 276 }
277 277
278 278 // Deletes identifier
279 279 impl->m_VariableToIdentifierMap.erase(variableIt);
280 280
281 281 // Deletes provider
282 282 auto nbProvidersDeleted = impl->m_VariableToProviderMap.erase(variable);
283 283 qCDebug(LOG_VariableController())
284 284 << tr("Number of providers deleted for variable %1: %2")
285 285 .arg(variable->name(), QString::number(nbProvidersDeleted));
286 286
287 287
288 288 // Deletes from model
289 289 impl->m_VariableModel->deleteVariable(variable);
290 290 }
291 291
292 292 void VariableController::deleteVariables(
293 293 const QVector<std::shared_ptr<Variable> > &variables) noexcept
294 294 {
295 295 for (auto variable : qAsConst(variables)) {
296 296 deleteVariable(variable);
297 297 }
298 298 }
299 299
300 300 QByteArray
301 301 VariableController::mimeDataForVariables(const QList<std::shared_ptr<Variable> > &variables) const
302 302 {
303 303 auto encodedData = QByteArray{};
304 304
305 305 QVariantList ids;
306 306 for (auto &var : variables) {
307 307 auto itVar = impl->m_VariableToIdentifierMap.find(var);
308 308 if (itVar == impl->m_VariableToIdentifierMap.cend()) {
309 309 qCCritical(LOG_VariableController())
310 310 << tr("Impossible to find the data for an unknown variable.");
311 311 }
312 312
313 313 ids << itVar->second.toByteArray();
314 314 }
315 315
316 316 QDataStream stream{&encodedData, QIODevice::WriteOnly};
317 317 stream << ids;
318 318
319 319 return encodedData;
320 320 }
321 321
322 322 QList<std::shared_ptr<Variable> >
323 323 VariableController::variablesForMimeData(const QByteArray &mimeData) const
324 324 {
325 325 auto variables = QList<std::shared_ptr<Variable> >{};
326 326 QDataStream stream{mimeData};
327 327
328 328 QVariantList ids;
329 329 stream >> ids;
330 330
331 331 for (auto id : ids) {
332 332 auto uuid = QUuid{id.toByteArray()};
333 333 auto var = impl->findVariable(uuid);
334 334 variables << var;
335 335 }
336 336
337 337 return variables;
338 338 }
339 339
340 340 std::shared_ptr<Variable>
341 341 VariableController::createVariable(const QString &name, const QVariantHash &metadata,
342 342 std::shared_ptr<IDataProvider> provider) noexcept
343 343 {
344 344 if (!impl->m_TimeController) {
345 345 qCCritical(LOG_VariableController())
346 346 << tr("Impossible to create variable: The time controller is null");
347 347 return nullptr;
348 348 }
349 349
350 350 auto range = impl->m_TimeController->dateTime();
351 351
352 352 if (auto newVariable = impl->m_VariableModel->createVariable(name, metadata)) {
353 353 auto varId = QUuid::createUuid();
354 354
355 355 // Create the handler
356 356 auto varRequestHandler = std::make_unique<VariableRequestHandler>();
357 357 varRequestHandler->m_VarId = varId;
358 358
359 359 impl->m_VarIdToVarRequestHandler.insert(
360 360 std::make_pair(varId, std::move(varRequestHandler)));
361 361
362 362 // store the provider
363 363 impl->registerProvider(provider);
364 364
365 365 // Associate the provider
366 366 impl->m_VariableToProviderMap[newVariable] = provider;
367 367 impl->m_VariableToIdentifierMap[newVariable] = varId;
368 368
369 369 this->onRequestDataLoading(QVector<std::shared_ptr<Variable> >{newVariable}, range, false);
370 370
371 371 emit variableAdded(newVariable);
372 372
373 373 return newVariable;
374 374 }
375 375
376 376 qCCritical(LOG_VariableController()) << tr("Impossible to create variable");
377 377 return nullptr;
378 378 }
379 379
380 380 void VariableController::onDateTimeOnSelection(const SqpRange &dateTime)
381 381 {
382 382 // NOTE: Even if acquisition request is aborting, the graphe range will be changed
383 383 qCDebug(LOG_VariableController()) << "VariableController::onDateTimeOnSelection"
384 384 << QThread::currentThread()->objectName();
385 385 auto selectedRows = impl->m_VariableSelectionModel->selectedRows();
386 386
387 387 // NOTE we only permit the time modification for one variable
388 388 if (selectedRows.size() == 1) {
389 389
390 390 if (auto selectedVariable
391 391 = impl->m_VariableModel->variable(qAsConst(selectedRows).first().row())) {
392 392
393 393 onUpdateDateTime(selectedVariable, dateTime);
394 394 }
395 395 }
396 396 else if (selectedRows.size() > 1) {
397 397 qCCritical(LOG_VariableController())
398 398 << tr("Impossible to set time for more than 1 variable in the same time");
399 399 }
400 400 else {
401 401 qCWarning(LOG_VariableController())
402 402 << tr("There is no variable selected to set the time one");
403 403 }
404 404 }
405 405
406 406 void VariableController::onUpdateDateTime(std::shared_ptr<Variable> variable,
407 407 const SqpRange &dateTime)
408 408 {
409 409 auto itVar = impl->m_VariableToIdentifierMap.find(variable);
410 410 if (itVar == impl->m_VariableToIdentifierMap.cend()) {
411 411 qCCritical(LOG_VariableController())
412 412 << tr("Impossible to onDateTimeOnSelection request for unknown variable");
413 413 return;
414 414 }
415 415
416 416 // notify that rescale operation has to be done
417 417 emit rangeChanged(variable, dateTime);
418 418
419 419 auto synchro
420 420 = impl->m_VariableIdGroupIdMap.find(itVar->second) != impl->m_VariableIdGroupIdMap.cend();
421 421
422 422 this->onRequestDataLoading(QVector<std::shared_ptr<Variable> >{variable}, dateTime, synchro);
423 423 }
424 424
425 425 void VariableController::onDataProvided(QUuid vIdentifier,
426 426 QVector<AcquisitionDataPacket> dataAcquired)
427 427 {
428 428 qCDebug(LOG_VariableController()) << tr("onDataProvided") << QThread::currentThread();
429 429 auto retrievedDataSeries = impl->retrieveDataSeries(dataAcquired);
430 430 auto varRequestId = impl->acceptVariableRequest(vIdentifier, retrievedDataSeries);
431 431 if (!varRequestId.isNull()) {
432 432 impl->updateVariables(varRequestId);
433 433 }
434 434 }
435 435
436 436 void VariableController::onVariableRetrieveDataInProgress(QUuid identifier, double progress)
437 437 {
438 438 qCDebug(LOG_VariableController())
439 439 << "TORM: variableController::onVariableRetrieveDataInProgress"
440 440 << QThread::currentThread()->objectName() << progress;
441 441 if (auto var = impl->findVariable(identifier)) {
442 442 qCDebug(LOG_VariableController())
443 443 << "TORM: variableController::onVariableRetrieveDataInProgress FOUND";
444 444 impl->m_VariableModel->setDataProgress(var, progress);
445 445 }
446 446 else {
447 447 qCCritical(LOG_VariableController())
448 448 << tr("Impossible to notify progression of a null variable");
449 449 }
450 450 }
451 451
452 452 void VariableController::onAbortProgressRequested(std::shared_ptr<Variable> variable)
453 453 {
454 454 qCDebug(LOG_VariableController()) << "TORM: variableController::onAbortProgressRequested"
455 455 << QThread::currentThread()->objectName() << variable->name();
456 456
457 457 auto itVar = impl->m_VariableToIdentifierMap.find(variable);
458 458 if (itVar == impl->m_VariableToIdentifierMap.cend()) {
459 459 qCCritical(LOG_VariableController())
460 460 << tr("Impossible to onAbortProgressRequested request for unknown variable");
461 461 return;
462 462 }
463 463
464 464 auto varId = itVar->second;
465 465
466 466 auto itVarHandler = impl->m_VarIdToVarRequestHandler.find(varId);
467 467 if (itVarHandler == impl->m_VarIdToVarRequestHandler.cend()) {
468 468 qCCritical(LOG_VariableController())
469 469 << tr("Impossible to onAbortProgressRequested for variable with unknown handler");
470 470 return;
471 471 }
472 472
473 473 auto varHandler = itVarHandler->second.get();
474 474
475 475 // case where a variable has a running request
476 476 if (varHandler->m_State != VariableRequestHandlerState::OFF) {
477 477 impl->cancelVariableRequest(varHandler->m_RunningVarRequest.m_VariableGroupId);
478 478 }
479 479 }
480 480
481 481 void VariableController::onAbortAcquisitionRequested(QUuid vIdentifier)
482 482 {
483 483 qCDebug(LOG_VariableController()) << "TORM: variableController::onAbortAcquisitionRequested"
484 484 << QThread::currentThread()->objectName() << vIdentifier;
485 485
486 486 if (auto var = impl->findVariable(vIdentifier)) {
487 487 this->onAbortProgressRequested(var);
488 488 }
489 489 else {
490 490 qCCritical(LOG_VariableController())
491 491 << tr("Impossible to abort Acquisition Requestof a null variable");
492 492 }
493 493 }
494 494
495 495 void VariableController::onAddSynchronizationGroupId(QUuid synchronizationGroupId)
496 496 {
497 497 qCDebug(LOG_VariableController()) << "TORM: VariableController::onAddSynchronizationGroupId"
498 498 << QThread::currentThread()->objectName()
499 499 << synchronizationGroupId;
500 500 auto vSynchroGroup = std::make_shared<VariableSynchronizationGroup>();
501 501 impl->m_GroupIdToVariableSynchronizationGroupMap.insert(
502 502 std::make_pair(synchronizationGroupId, vSynchroGroup));
503 503 }
504 504
505 505 void VariableController::onRemoveSynchronizationGroupId(QUuid synchronizationGroupId)
506 506 {
507 507 impl->m_GroupIdToVariableSynchronizationGroupMap.erase(synchronizationGroupId);
508 508 }
509 509
510 510 void VariableController::onAddSynchronized(std::shared_ptr<Variable> variable,
511 511 QUuid synchronizationGroupId)
512 512
513 513 {
514 514 qCDebug(LOG_VariableController()) << "TORM: VariableController::onAddSynchronized"
515 515 << synchronizationGroupId;
516 516 auto varToVarIdIt = impl->m_VariableToIdentifierMap.find(variable);
517 517 if (varToVarIdIt != impl->m_VariableToIdentifierMap.cend()) {
518 518 auto groupIdToVSGIt
519 519 = impl->m_GroupIdToVariableSynchronizationGroupMap.find(synchronizationGroupId);
520 520 if (groupIdToVSGIt != impl->m_GroupIdToVariableSynchronizationGroupMap.cend()) {
521 521 impl->m_VariableIdGroupIdMap.insert(
522 522 std::make_pair(varToVarIdIt->second, synchronizationGroupId));
523 523 groupIdToVSGIt->second->addVariableId(varToVarIdIt->second);
524 524 }
525 525 else {
526 526 qCCritical(LOG_VariableController())
527 527 << tr("Impossible to synchronize a variable with an unknown sycnhronization group")
528 528 << variable->name();
529 529 }
530 530 }
531 531 else {
532 532 qCCritical(LOG_VariableController())
533 533 << tr("Impossible to synchronize a variable with no identifier") << variable->name();
534 534 }
535 535 }
536 536
537 537 void VariableController::desynchronize(std::shared_ptr<Variable> variable,
538 538 QUuid synchronizationGroupId)
539 539 {
540 540 // Gets variable id
541 541 auto variableIt = impl->m_VariableToIdentifierMap.find(variable);
542 542 if (variableIt == impl->m_VariableToIdentifierMap.cend()) {
543 543 qCCritical(LOG_VariableController())
544 544 << tr("Can't desynchronize variable %1: variable identifier not found")
545 545 .arg(variable->name());
546 546 return;
547 547 }
548 548
549 549 impl->desynchronize(variableIt, synchronizationGroupId);
550 550 }
551 551
552 552 void VariableController::onRequestDataLoading(QVector<std::shared_ptr<Variable> > variables,
553 553 const SqpRange &range, bool synchronise)
554 554 {
555 555 // variables is assumed synchronized
556 556 // TODO: Asser variables synchronization
557 557 // we want to load data of the variable for the dateTime.
558 558 if (variables.isEmpty()) {
559 559 return;
560 560 }
561 561
562 562 auto varRequestId = QUuid::createUuid();
563 563 qCDebug(LOG_VariableController()) << "VariableController::onRequestDataLoading"
564 564 << QThread::currentThread()->objectName() << varRequestId
565 565 << range << synchronise;
566 566
567 567 if (!synchronise) {
568 568 auto varIds = std::list<QUuid>{};
569 569 for (const auto &var : variables) {
570 570 auto vId = impl->m_VariableToIdentifierMap.at(var);
571 571 varIds.push_back(vId);
572 572 }
573 573 impl->m_VarGroupIdToVarIds.insert(std::make_pair(varRequestId, varIds));
574 574 for (const auto &var : variables) {
575 575 qCDebug(LOG_VariableController()) << "onRequestDataLoading: for" << varRequestId
576 576 << varIds.size();
577 577 impl->processRequest(var, range, varRequestId);
578 578 }
579 579 }
580 580 else {
581 581 auto vId = impl->m_VariableToIdentifierMap.at(variables.first());
582 582 auto varIdToGroupIdIt = impl->m_VariableIdGroupIdMap.find(vId);
583 583 if (varIdToGroupIdIt != impl->m_VariableIdGroupIdMap.cend()) {
584 584 auto groupId = varIdToGroupIdIt->second;
585 585
586 586 auto vSynchronizationGroup
587 587 = impl->m_GroupIdToVariableSynchronizationGroupMap.at(groupId);
588 588 auto vSyncIds = vSynchronizationGroup->getIds();
589 589
590 590 auto varIds = std::list<QUuid>{};
591 591 for (auto vId : vSyncIds) {
592 592 varIds.push_back(vId);
593 593 }
594 594 qCDebug(LOG_VariableController()) << "onRequestDataLoading sync: for" << varRequestId
595 595 << varIds.size();
596 596 impl->m_VarGroupIdToVarIds.insert(std::make_pair(varRequestId, varIds));
597 597
598 598 for (auto vId : vSyncIds) {
599 599 auto var = impl->findVariable(vId);
600 600
601 601 // Don't process already processed var
602 602 if (var != nullptr) {
603 603 qCDebug(LOG_VariableController()) << "processRequest synchro for" << var->name()
604 604 << varRequestId;
605 605 auto vSyncRangeRequested
606 606 = variables.contains(var)
607 607 ? range
608 608 : computeSynchroRangeRequested(var->range(), range,
609 609 variables.first()->range());
610 610 qCDebug(LOG_VariableController()) << "synchro RR" << vSyncRangeRequested;
611 611 impl->processRequest(var, vSyncRangeRequested, varRequestId);
612 612 }
613 613 else {
614 614 qCCritical(LOG_VariableController())
615 615
616 616 << tr("Impossible to synchronize a null variable");
617 617 }
618 618 }
619 619 }
620 620 }
621 621
622 622 impl->updateVariables(varRequestId);
623 623 }
624 624
625 625
626 626 void VariableController::initialize()
627 627 {
628 628 qCDebug(LOG_VariableController()) << tr("VariableController init") << QThread::currentThread();
629 629 impl->m_WorkingMutex.lock();
630 630 qCDebug(LOG_VariableController()) << tr("VariableController init END");
631 631 }
632 632
633 633 void VariableController::finalize()
634 634 {
635 635 impl->m_WorkingMutex.unlock();
636 636 }
637 637
638 638 void VariableController::waitForFinish()
639 639 {
640 640 QMutexLocker locker{&impl->m_WorkingMutex};
641 641 }
642 642
643 643 AcquisitionZoomType VariableController::getZoomType(const SqpRange &range, const SqpRange &oldRange)
644 644 {
645 645 // t1.m_TStart <= t2.m_TStart && t2.m_TEnd <= t1.m_TEnd
646 646 auto zoomType = AcquisitionZoomType::Unknown;
647 647 if (range.m_TStart <= oldRange.m_TStart && oldRange.m_TEnd <= range.m_TEnd) {
648 648 qCDebug(LOG_VariableController()) << "zoomtype: ZoomOut";
649 649 zoomType = AcquisitionZoomType::ZoomOut;
650 650 }
651 651 else if (range.m_TStart > oldRange.m_TStart && range.m_TEnd > oldRange.m_TEnd) {
652 652 qCDebug(LOG_VariableController()) << "zoomtype: PanRight";
653 653 zoomType = AcquisitionZoomType::PanRight;
654 654 }
655 655 else if (range.m_TStart < oldRange.m_TStart && range.m_TEnd < oldRange.m_TEnd) {
656 656 qCDebug(LOG_VariableController()) << "zoomtype: PanLeft";
657 657 zoomType = AcquisitionZoomType::PanLeft;
658 658 }
659 659 else if (range.m_TStart >= oldRange.m_TStart && oldRange.m_TEnd >= range.m_TEnd) {
660 660 qCDebug(LOG_VariableController()) << "zoomtype: ZoomIn";
661 661 zoomType = AcquisitionZoomType::ZoomIn;
662 662 }
663 663 else {
664 664 qCDebug(LOG_VariableController()) << "getZoomType: Unknown type detected";
665 665 }
666 666 return zoomType;
667 667 }
668 668
669 669 void VariableController::VariableControllerPrivate::processRequest(std::shared_ptr<Variable> var,
670 670 const SqpRange &rangeRequested,
671 671 QUuid varRequestId)
672 672 {
673 673 auto itVar = m_VariableToIdentifierMap.find(var);
674 674 if (itVar == m_VariableToIdentifierMap.cend()) {
675 675 qCCritical(LOG_VariableController())
676 676 << tr("Impossible to process request for unknown variable");
677 677 return;
678 678 }
679 679
680 680 auto varId = itVar->second;
681 681
682 682 auto itVarHandler = m_VarIdToVarRequestHandler.find(varId);
683 683 if (itVarHandler == m_VarIdToVarRequestHandler.cend()) {
684 684 qCCritical(LOG_VariableController())
685 685 << tr("Impossible to process request for variable with unknown handler");
686 686 return;
687 687 }
688 688
689 689 auto oldRange = var->range();
690 690
691 691 auto varHandler = itVarHandler->second.get();
692 692
693 693 if (varHandler->m_State != VariableRequestHandlerState::OFF) {
694 694 oldRange = varHandler->m_RunningVarRequest.m_RangeRequested;
695 695 }
696 696
697 697 auto varRequest = VariableRequest{};
698 698 varRequest.m_VariableGroupId = varRequestId;
699 699 auto varStrategyRangesRequested
700 700 = m_VariableCacheStrategy->computeRange(oldRange, rangeRequested);
701 701 varRequest.m_RangeRequested = varStrategyRangesRequested.first;
702 702 varRequest.m_CacheRangeRequested = varStrategyRangesRequested.second;
703 703
704 704 switch (varHandler->m_State) {
705 705 case VariableRequestHandlerState::OFF: {
706 706 qCDebug(LOG_VariableController()) << tr("Process Request OFF")
707 707 << varRequest.m_RangeRequested
708 708 << varRequest.m_CacheRangeRequested;
709 709 varHandler->m_RunningVarRequest = varRequest;
710 710 varHandler->m_State = VariableRequestHandlerState::RUNNING;
711 711 executeVarRequest(var, varRequest);
712 712 break;
713 713 }
714 714 case VariableRequestHandlerState::RUNNING: {
715 715 qCDebug(LOG_VariableController()) << tr("Process Request RUNNING")
716 716 << varRequest.m_RangeRequested
717 717 << varRequest.m_CacheRangeRequested;
718 718 varHandler->m_State = VariableRequestHandlerState::PENDING;
719 719 varHandler->m_PendingVarRequest = varRequest;
720 720 break;
721 721 }
722 722 case VariableRequestHandlerState::PENDING: {
723 723 qCDebug(LOG_VariableController()) << tr("Process Request PENDING")
724 724 << varRequest.m_RangeRequested
725 725 << varRequest.m_CacheRangeRequested;
726 726 auto variableGroupIdToCancel = varHandler->m_PendingVarRequest.m_VariableGroupId;
727 727 cancelVariableRequest(variableGroupIdToCancel);
728 728 // Cancel variable can make state downgrade
729 729 varHandler->m_State = VariableRequestHandlerState::PENDING;
730 730 varHandler->m_PendingVarRequest = varRequest;
731 731
732 732 break;
733 733 }
734 734 default:
735 735 qCCritical(LOG_VariableController())
736 736 << QObject::tr("Unknown VariableRequestHandlerState");
737 737 }
738 738 }
739 739
740 740 std::shared_ptr<Variable>
741 741 VariableController::VariableControllerPrivate::findVariable(QUuid vIdentifier)
742 742 {
743 743 std::shared_ptr<Variable> var;
744 744 auto findReply = [vIdentifier](const auto &entry) { return vIdentifier == entry.second; };
745 745
746 746 auto end = m_VariableToIdentifierMap.cend();
747 747 auto it = std::find_if(m_VariableToIdentifierMap.cbegin(), end, findReply);
748 748 if (it != end) {
749 749 var = it->first;
750 750 }
751 751 else {
752 752 qCCritical(LOG_VariableController())
753 753 << tr("Impossible to find the variable with the identifier: ") << vIdentifier;
754 754 }
755 755
756 756 return var;
757 757 }
758 758
759 759 std::shared_ptr<IDataSeries> VariableController::VariableControllerPrivate::retrieveDataSeries(
760 760 const QVector<AcquisitionDataPacket> acqDataPacketVector)
761 761 {
762 762 qCDebug(LOG_VariableController()) << tr("TORM: retrieveDataSeries acqDataPacketVector size")
763 763 << acqDataPacketVector.size();
764 764 std::shared_ptr<IDataSeries> dataSeries;
765 765 if (!acqDataPacketVector.isEmpty()) {
766 766 dataSeries = acqDataPacketVector[0].m_DateSeries;
767 767 for (int i = 1; i < acqDataPacketVector.size(); ++i) {
768 768 dataSeries->merge(acqDataPacketVector[i].m_DateSeries.get());
769 769 }
770 770 }
771 771 qCDebug(LOG_VariableController()) << tr("TORM: retrieveDataSeries acqDataPacketVector size END")
772 772 << acqDataPacketVector.size();
773 773 return dataSeries;
774 774 }
775 775
776 776 void VariableController::VariableControllerPrivate::registerProvider(
777 777 std::shared_ptr<IDataProvider> provider)
778 778 {
779 779 if (m_ProviderSet.find(provider) == m_ProviderSet.end()) {
780 780 qCDebug(LOG_VariableController()) << tr("Registering of a new provider")
781 781 << provider->objectName();
782 782 m_ProviderSet.insert(provider);
783 783 connect(provider.get(), &IDataProvider::dataProvided, m_VariableAcquisitionWorker.get(),
784 784 &VariableAcquisitionWorker::onVariableDataAcquired);
785 785 connect(provider.get(), &IDataProvider::dataProvidedProgress,
786 786 m_VariableAcquisitionWorker.get(),
787 787 &VariableAcquisitionWorker::onVariableRetrieveDataInProgress);
788 788 connect(provider.get(), &IDataProvider::dataProvidedFailed,
789 789 m_VariableAcquisitionWorker.get(),
790 790 &VariableAcquisitionWorker::onVariableAcquisitionFailed);
791 791 }
792 792 else {
793 793 qCDebug(LOG_VariableController()) << tr("Cannot register provider, it already exists ");
794 794 }
795 795 }
796 796
797 797 QUuid VariableController::VariableControllerPrivate::acceptVariableRequest(
798 798 QUuid varId, std::shared_ptr<IDataSeries> dataSeries)
799 799 {
800 800 auto itVarHandler = m_VarIdToVarRequestHandler.find(varId);
801 801 if (itVarHandler == m_VarIdToVarRequestHandler.cend()) {
802 802 return QUuid();
803 803 }
804 804
805 805 auto varHandler = itVarHandler->second.get();
806 806 if (varHandler->m_State == VariableRequestHandlerState::OFF) {
807 807 qCCritical(LOG_VariableController())
808 808 << tr("acceptVariableRequest impossible on a variable with OFF state");
809 809 }
810 810
811 811 varHandler->m_RunningVarRequest.m_DataSeries = dataSeries;
812 812 varHandler->m_CanUpdate = true;
813 813
814 814 // Element traitΓ©, on a dΓ©jΓ  toutes les donnΓ©es necessaires
815 815 auto varGroupId = varHandler->m_RunningVarRequest.m_VariableGroupId;
816 816 qCDebug(LOG_VariableController()) << "Variable::acceptVariableRequest" << varGroupId
817 817 << m_VarGroupIdToVarIds.size();
818 818
819 819 return varHandler->m_RunningVarRequest.m_VariableGroupId;
820 820 }
821 821
822 822 void VariableController::VariableControllerPrivate::updateVariables(QUuid varRequestId)
823 823 {
824 824 qCDebug(LOG_VariableController()) << "VariableControllerPrivate::updateVariables"
825 825 << QThread::currentThread()->objectName() << varRequestId;
826 826
827 827 auto varGroupIdToVarIdsIt = m_VarGroupIdToVarIds.find(varRequestId);
828 828 if (varGroupIdToVarIdsIt == m_VarGroupIdToVarIds.end()) {
829 829 qCWarning(LOG_VariableController())
830 830 << tr("Impossible to updateVariables of unknown variables") << varRequestId;
831 831 return;
832 832 }
833 833
834 834 auto &varIds = varGroupIdToVarIdsIt->second;
835 835 auto varIdsEnd = varIds.end();
836 836 bool processVariableUpdate = true;
837 837 for (auto varIdsIt = varIds.begin(); (varIdsIt != varIdsEnd) && processVariableUpdate;
838 838 ++varIdsIt) {
839 839 auto itVarHandler = m_VarIdToVarRequestHandler.find(*varIdsIt);
840 840 if (itVarHandler != m_VarIdToVarRequestHandler.cend()) {
841 841 processVariableUpdate &= itVarHandler->second->m_CanUpdate;
842 842 }
843 843 }
844 844
845 845 if (processVariableUpdate) {
846 846 for (auto varIdsIt = varIds.begin(); varIdsIt != varIdsEnd; ++varIdsIt) {
847 847 auto itVarHandler = m_VarIdToVarRequestHandler.find(*varIdsIt);
848 848 if (itVarHandler != m_VarIdToVarRequestHandler.cend()) {
849 849 if (auto var = findVariable(*varIdsIt)) {
850 850 auto &varRequest = itVarHandler->second->m_RunningVarRequest;
851 851 var->setRange(varRequest.m_RangeRequested);
852 852 var->setCacheRange(varRequest.m_CacheRangeRequested);
853 853 qCDebug(LOG_VariableController()) << tr("1: onDataProvided")
854 854 << varRequest.m_RangeRequested
855 855 << varRequest.m_CacheRangeRequested;
856 856 qCDebug(LOG_VariableController()) << tr("2: onDataProvided var points before")
857 857 << var->nbPoints()
858 858 << varRequest.m_DataSeries->nbPoints();
859 859 var->mergeDataSeries(varRequest.m_DataSeries);
860 860 qCDebug(LOG_VariableController()) << tr("3: onDataProvided var points after")
861 861 << var->nbPoints();
862 862
863 863 emit var->updated();
864 864 }
865 865 else {
866 866 qCCritical(LOG_VariableController())
867 867 << tr("Impossible to update data to a null variable");
868 868 }
869 869 }
870 870 }
871 871 updateVariableRequest(varRequestId);
872 872
873 873 // cleaning varRequestId
874 874 qCDebug(LOG_VariableController()) << tr("m_VarGroupIdToVarIds erase") << varRequestId;
875 875 m_VarGroupIdToVarIds.erase(varRequestId);
876 876 if (m_VarGroupIdToVarIds.empty()) {
877 877 emit q->acquisitionFinished();
878 878 }
879 879 }
880 880 }
881 881
882 882
883 883 void VariableController::VariableControllerPrivate::updateVariableRequest(QUuid varRequestId)
884 884 {
885 885 auto varGroupIdToVarIdsIt = m_VarGroupIdToVarIds.find(varRequestId);
886 886 if (varGroupIdToVarIdsIt == m_VarGroupIdToVarIds.end()) {
887 887 qCCritical(LOG_VariableController()) << QObject::tr(
888 888 "Impossible to updateVariableRequest since varGroupdId isn't here anymore");
889 889
890 890 return;
891 891 }
892 892
893 893 auto &varIds = varGroupIdToVarIdsIt->second;
894 894 auto varIdsEnd = varIds.end();
895 895
896 896 // First pass all canUpdate of handler to false
897 897 for (auto varIdsIt = varIds.begin(); (varIdsIt != varIdsEnd); ++varIdsIt) {
898 898 auto itVarHandler = m_VarIdToVarRequestHandler.find(*varIdsIt);
899 899 if (itVarHandler != m_VarIdToVarRequestHandler.cend()) {
900 900
901 901 auto varHandler = itVarHandler->second.get();
902 902 varHandler->m_CanUpdate = false;
903 903 }
904 904 }
905 905 // Second update requests of handler
906 906 for (auto varIdsIt = varIds.begin(); (varIdsIt != varIdsEnd); ++varIdsIt) {
907 907 auto itVarHandler = m_VarIdToVarRequestHandler.find(*varIdsIt);
908 908 if (itVarHandler != m_VarIdToVarRequestHandler.cend()) {
909 909
910 910 auto varHandler = itVarHandler->second.get();
911 911
912 912 switch (varHandler->m_State) {
913 913 case VariableRequestHandlerState::OFF: {
914 914 qCCritical(LOG_VariableController())
915 915 << QObject::tr("Impossible to update a variable with handler in OFF state");
916 916 } break;
917 917 case VariableRequestHandlerState::RUNNING: {
918 918 varHandler->m_State = VariableRequestHandlerState::OFF;
919 919 varHandler->m_RunningVarRequest = VariableRequest{};
920 920 break;
921 921 }
922 922 case VariableRequestHandlerState::PENDING: {
923 923 varHandler->m_State = VariableRequestHandlerState::RUNNING;
924 924 varHandler->m_RunningVarRequest = varHandler->m_PendingVarRequest;
925 925 varHandler->m_PendingVarRequest = VariableRequest{};
926 926 auto var = findVariable(itVarHandler->first);
927 927 executeVarRequest(var, varHandler->m_RunningVarRequest);
928 928 updateVariables(varHandler->m_RunningVarRequest.m_VariableGroupId);
929 929 break;
930 930 }
931 931 default:
932 932 qCCritical(LOG_VariableController())
933 933 << QObject::tr("Unknown VariableRequestHandlerState");
934 934 }
935 935 }
936 936 }
937 937 }
938 938
939 939
940 940 void VariableController::VariableControllerPrivate::cancelVariableRequest(QUuid varRequestId)
941 941 {
942 942 qCDebug(LOG_VariableController()) << tr("cancelVariableRequest") << varRequestId;
943 943
944 944 auto varGroupIdToVarIdsIt = m_VarGroupIdToVarIds.find(varRequestId);
945 945 if (varGroupIdToVarIdsIt == m_VarGroupIdToVarIds.end()) {
946 946 qCCritical(LOG_VariableController())
947 947 << tr("Impossible to cancelVariableRequest for unknown varGroupdId") << varRequestId;
948 948 return;
949 949 }
950 950
951 951 auto &varIds = varGroupIdToVarIdsIt->second;
952 952 auto varIdsEnd = varIds.end();
953
954 // First pass all canUpdate of handler to false
955 for (auto varIdsIt = varIds.begin(); (varIdsIt != varIdsEnd); ++varIdsIt) {
956 auto itVarHandler = m_VarIdToVarRequestHandler.find(*varIdsIt);
957 if (itVarHandler != m_VarIdToVarRequestHandler.cend()) {
958
959 auto varHandler = itVarHandler->second.get();
960 varHandler->m_CanUpdate = false;
961 }
962 }
963
953 964 for (auto varIdsIt = varIds.begin(); (varIdsIt != varIdsEnd); ++varIdsIt) {
954 965 auto itVarHandler = m_VarIdToVarRequestHandler.find(*varIdsIt);
955 966 if (itVarHandler != m_VarIdToVarRequestHandler.cend()) {
956 967
957 968 auto varHandler = itVarHandler->second.get();
958 969 varHandler->m_VarId = QUuid{};
959 970 switch (varHandler->m_State) {
960 971 case VariableRequestHandlerState::OFF: {
961 972 qCWarning(LOG_VariableController())
962 973 << QObject::tr("Impossible to cancel a variable with no running request");
963 974 break;
964 975 }
965 976 case VariableRequestHandlerState::RUNNING: {
966 977
967 978 if (varHandler->m_RunningVarRequest.m_VariableGroupId == varRequestId) {
968 979 auto var = findVariable(itVarHandler->first);
969 980 auto varProvider = m_VariableToProviderMap.at(var);
970 981 if (varProvider != nullptr) {
971 982 m_VariableAcquisitionWorker->abortProgressRequested(
972 983 itVarHandler->first);
973 984 }
974 985 m_VariableModel->setDataProgress(var, 0.0);
975 varHandler->m_CanUpdate = false;
976 986 varHandler->m_State = VariableRequestHandlerState::OFF;
977 987 varHandler->m_RunningVarRequest = VariableRequest{};
978 988 }
979 989 else {
980 990 // TODO: log Impossible to cancel the running variable request beacause its
981 991 // varRequestId isn't not the canceled one
982 992 }
983 993 break;
984 994 }
985 995 case VariableRequestHandlerState::PENDING: {
986 996 if (varHandler->m_RunningVarRequest.m_VariableGroupId == varRequestId) {
987 997 auto var = findVariable(itVarHandler->first);
988 998 auto varProvider = m_VariableToProviderMap.at(var);
989 999 if (varProvider != nullptr) {
990 1000 m_VariableAcquisitionWorker->abortProgressRequested(
991 1001 itVarHandler->first);
992 1002 }
993 1003 m_VariableModel->setDataProgress(var, 0.0);
994 varHandler->m_CanUpdate = false;
995 1004 varHandler->m_State = VariableRequestHandlerState::RUNNING;
996 1005 varHandler->m_RunningVarRequest = varHandler->m_PendingVarRequest;
997 1006 varHandler->m_PendingVarRequest = VariableRequest{};
998 1007 executeVarRequest(var, varHandler->m_RunningVarRequest);
999 1008 }
1000 1009 else if (varHandler->m_PendingVarRequest.m_VariableGroupId == varRequestId) {
1001 1010 varHandler->m_State = VariableRequestHandlerState::RUNNING;
1002 1011 varHandler->m_PendingVarRequest = VariableRequest{};
1003 1012 }
1004 1013 else {
1005 1014 // TODO: log Impossible to cancel the variable request beacause its
1006 1015 // varRequestId isn't not the canceled one
1007 1016 }
1008 1017 break;
1009 1018 }
1010 1019 default:
1011 1020 qCCritical(LOG_VariableController())
1012 1021 << QObject::tr("Unknown VariableRequestHandlerState");
1013 1022 }
1014 1023 }
1015 1024 }
1016 1025 qCDebug(LOG_VariableController()) << tr("cancelVariableRequest: erase") << varRequestId;
1017 1026 m_VarGroupIdToVarIds.erase(varRequestId);
1018 1027 if (m_VarGroupIdToVarIds.empty()) {
1019 1028 emit q->acquisitionFinished();
1020 1029 }
1021 1030 }
1022 1031
1023 1032 void VariableController::VariableControllerPrivate::executeVarRequest(std::shared_ptr<Variable> var,
1024 1033 VariableRequest &varRequest)
1025 1034 {
1026 1035 auto varIdIt = m_VariableToIdentifierMap.find(var);
1027 1036 if (varIdIt == m_VariableToIdentifierMap.cend()) {
1028 1037 qCWarning(LOG_VariableController()) << tr(
1029 1038 "Can't execute request of a variable that is not registered (may has been deleted)");
1030 1039 return;
1031 1040 }
1032 1041
1033 1042 auto varId = varIdIt->second;
1034 1043
1035 1044 auto varCacheRange = var->cacheRange();
1036 1045 auto varCacheRangeRequested = varRequest.m_CacheRangeRequested;
1037 1046 auto notInCacheRangeList
1038 1047 = Variable::provideNotInCacheRangeList(varCacheRange, varCacheRangeRequested);
1039 1048 auto inCacheRangeList
1040 1049 = Variable::provideInCacheRangeList(varCacheRange, varCacheRangeRequested);
1041 1050
1042 1051 if (!notInCacheRangeList.empty()) {
1043 1052
1044 1053 auto varProvider = m_VariableToProviderMap.at(var);
1045 1054 if (varProvider != nullptr) {
1046 1055 qCDebug(LOG_VariableController()) << "executeVarRequest " << varRequest.m_RangeRequested
1047 1056 << varRequest.m_CacheRangeRequested;
1048 1057 m_VariableAcquisitionWorker->pushVariableRequest(
1049 1058 varRequest.m_VariableGroupId, varId,
1050 1059 DataProviderParameters{std::move(notInCacheRangeList), var->metadata()},
1051 1060 varProvider);
1052 1061 }
1053 1062 else {
1054 1063 qCCritical(LOG_VariableController())
1055 1064 << "Impossible to provide data with a null provider";
1056 1065 }
1057 1066
1058 1067 if (!inCacheRangeList.empty()) {
1059 1068 emit q->updateVarDisplaying(var, inCacheRangeList.first());
1060 1069 }
1061 1070 }
1062 1071 else {
1063 1072 qCDebug(LOG_VariableController()) << "All already in the cache "
1064 1073 << varRequest.m_RangeRequested;
1065 1074 acceptVariableRequest(varId,
1066 1075 var->dataSeries()->subDataSeries(varRequest.m_CacheRangeRequested));
1067 1076 }
1068 1077 }
1069 1078
1070 1079 template <typename VariableIterator>
1071 1080 void VariableController::VariableControllerPrivate::desynchronize(VariableIterator variableIt,
1072 1081 const QUuid &syncGroupId)
1073 1082 {
1074 1083 const auto &variable = variableIt->first;
1075 1084 const auto &variableId = variableIt->second;
1076 1085
1077 1086 // Gets synchronization group
1078 1087 auto groupIt = m_GroupIdToVariableSynchronizationGroupMap.find(syncGroupId);
1079 1088 if (groupIt == m_GroupIdToVariableSynchronizationGroupMap.cend()) {
1080 1089 qCCritical(LOG_VariableController())
1081 1090 << tr("Can't desynchronize variable %1: unknown synchronization group")
1082 1091 .arg(variable->name());
1083 1092 return;
1084 1093 }
1085 1094
1086 1095 // Removes variable from synchronization group
1087 1096 auto synchronizationGroup = groupIt->second;
1088 1097 synchronizationGroup->removeVariableId(variableId);
1089 1098
1090 1099 // Removes link between variable and synchronization group
1091 1100 m_VariableIdGroupIdMap.erase(variableId);
1092 1101 }
General Comments 0
You need to be logged in to leave comments. Login now