##// END OF EJS Templates
Run request canceling when unit isn"t found in the file. Clean log.
perrinel -
r832:4ab17865bd43
parent child
Show More
@@ -1,416 +1,416
1 1 #include "Variable/VariableAcquisitionWorker.h"
2 2
3 3 #include "Variable/Variable.h"
4 4
5 5 #include <Data/AcquisitionRequest.h>
6 6 #include <Data/SqpRange.h>
7 7
8 8 #include <unordered_map>
9 9 #include <utility>
10 10
11 11 #include <QMutex>
12 12 #include <QReadWriteLock>
13 13 #include <QThread>
14 14
15 15 #include <cmath>
16 16
17 17 Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker")
18 18
19 19 struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate {
20 20
21 21 explicit VariableAcquisitionWorkerPrivate(VariableAcquisitionWorker *parent)
22 22 : m_Lock{QReadWriteLock::Recursive}, q{parent}
23 23 {
24 24 }
25 25
26 26 void lockRead() { m_Lock.lockForRead(); }
27 27 void lockWrite() { m_Lock.lockForWrite(); }
28 28 void unlock() { m_Lock.unlock(); }
29 29
30 30 void removeVariableRequest(QUuid vIdentifier);
31 31
32 32 /// Remove the current request and execute the next one if exist
33 33 void updateToNextRequest(QUuid vIdentifier);
34 34
35 35 /// Remove and/or abort all AcqRequest in link with varRequestId
36 36 void cancelVarRequest(QUuid varRequestId);
37 37 void removeAcqRequest(QUuid acqRequestId);
38 38
39 39 QMutex m_WorkingMutex;
40 40 QReadWriteLock m_Lock;
41 41
42 42 std::map<QUuid, QVector<AcquisitionDataPacket> > m_AcqIdentifierToAcqDataPacketVectorMap;
43 43 std::map<QUuid, AcquisitionRequest> m_AcqIdentifierToAcqRequestMap;
44 44 std::map<QUuid, std::pair<QUuid, QUuid> > m_VIdentifierToCurrrentAcqIdNextIdPairMap;
45 45 VariableAcquisitionWorker *q;
46 46 };
47 47
48 48
49 49 VariableAcquisitionWorker::VariableAcquisitionWorker(QObject *parent)
50 50 : QObject{parent}, impl{spimpl::make_unique_impl<VariableAcquisitionWorkerPrivate>(this)}
51 51 {
52 52 }
53 53
54 54 VariableAcquisitionWorker::~VariableAcquisitionWorker()
55 55 {
56 56 qCInfo(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker destruction")
57 57 << QThread::currentThread();
58 58 this->waitForFinish();
59 59 }
60 60
61 61
62 62 QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid vIdentifier,
63 63 SqpRange rangeRequested,
64 64 SqpRange cacheRangeRequested,
65 65 DataProviderParameters parameters,
66 66 std::shared_ptr<IDataProvider> provider)
67 67 {
68 68 qCDebug(LOG_VariableAcquisitionWorker())
69 69 << tr("TORM VariableAcquisitionWorker::pushVariableRequest ") << cacheRangeRequested;
70 70 auto varRequestIdCanceled = QUuid();
71 71
72 72 // Request creation
73 73 auto acqRequest = AcquisitionRequest{};
74 74 qCDebug(LOG_VariableAcquisitionWorker()) << tr("PushVariableRequest ") << vIdentifier
75 75 << varRequestId;
76 76 acqRequest.m_VarRequestId = varRequestId;
77 77 acqRequest.m_vIdentifier = vIdentifier;
78 78 acqRequest.m_DataProviderParameters = parameters;
79 79 acqRequest.m_RangeRequested = rangeRequested;
80 80 acqRequest.m_CacheRangeRequested = cacheRangeRequested;
81 81 acqRequest.m_Size = parameters.m_Times.size();
82 82 acqRequest.m_Provider = provider;
83 83
84 84
85 85 // Register request
86 86 impl->lockWrite();
87 87 impl->m_AcqIdentifierToAcqRequestMap.insert(
88 88 std::make_pair(acqRequest.m_AcqIdentifier, acqRequest));
89 89
90 90 auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
91 91 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
92 92 // A current request already exists, we can replace the next one
93 93 auto oldAcqId = it->second.second;
94 94 auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(oldAcqId);
95 95 if (acqIdentifierToAcqRequestMapIt != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
96 96 auto oldAcqRequest = acqIdentifierToAcqRequestMapIt->second;
97 97 varRequestIdCanceled = oldAcqRequest.m_VarRequestId;
98 98 }
99 99
100 100 it->second.second = acqRequest.m_AcqIdentifier;
101 101 impl->unlock();
102 102
103 103 // remove old acqIdentifier from the worker
104 104 impl->cancelVarRequest(varRequestIdCanceled);
105 105 // impl->m_AcqIdentifierToAcqRequestMap.erase(oldAcqId);
106 106 }
107 107 else {
108 108 // First request for the variable, it must be stored and executed
109 109 impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.insert(
110 110 std::make_pair(vIdentifier, std::make_pair(acqRequest.m_AcqIdentifier, QUuid())));
111 111 impl->unlock();
112 112
113 113 QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
114 114 Q_ARG(QUuid, acqRequest.m_AcqIdentifier));
115 115 }
116 116
117 117 return varRequestIdCanceled;
118 118 }
119 119
120 120 void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier)
121 121 {
122 122 impl->lockRead();
123 123
124 124 auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
125 125 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
126 126 auto currentAcqId = it->second.first;
127 127
128 128 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(currentAcqId);
129 129 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
130 130 auto request = it->second;
131 131 impl->unlock();
132 132
133 133 // Remove the current request from the worker
134 134 impl->updateToNextRequest(vIdentifier);
135 135
136 136 // notify the request aborting to the provider
137 137 request.m_Provider->requestDataAborting(currentAcqId);
138 138 }
139 139 else {
140 140 impl->unlock();
141 141 qCWarning(LOG_VariableAcquisitionWorker())
142 142 << tr("Impossible to abort an unknown acquisition request") << currentAcqId;
143 143 }
144 144 }
145 145 else {
146 146 impl->unlock();
147 147 }
148 148 }
149 149
150 150 void VariableAcquisitionWorker::onVariableRetrieveDataInProgress(QUuid acqIdentifier,
151 151 double progress)
152 152 {
153 153 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableRetrieveDataInProgress ")
154 154 << acqIdentifier << progress;
155 155 impl->lockRead();
156 156 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
157 157 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
158 158 auto currentPartSize = (aIdToARit->second.m_Size != 0) ? 100 / aIdToARit->second.m_Size : 0;
159 159
160 160 auto currentPartProgress
161 161 = std::isnan(progress) ? 0.0 : (progress * currentPartSize) / 100.0;
162 162 auto currentAlreadyProgress = aIdToARit->second.m_Progression * currentPartSize;
163 163
164 164 auto finalProgression = currentAlreadyProgress + currentPartProgress;
165 165 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, finalProgression);
166 166 qCDebug(LOG_VariableAcquisitionWorker())
167 167 << tr("TORM: onVariableRetrieveDataInProgress ")
168 168 << QThread::currentThread()->objectName() << aIdToARit->second.m_vIdentifier
169 169 << currentPartSize << currentAlreadyProgress << currentPartProgress << finalProgression;
170 170 if (finalProgression == 100.0) {
171 171 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, 0.0);
172 172 }
173 173 }
174 174 impl->unlock();
175 175 }
176 176
177 177 void VariableAcquisitionWorker::onVariableAcquisitionFailed(QUuid acqIdentifier)
178 178 {
179 179 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
180 180 << QThread::currentThread();
181 181 impl->lockRead();
182 182 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
183 183 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
184 184 auto request = it->second;
185 185 impl->unlock();
186 qCInfo(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
187 << acqIdentifier << request.m_vIdentifier
188 << QThread::currentThread();
186 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
187 << acqIdentifier << request.m_vIdentifier
188 << QThread::currentThread();
189 189 emit variableCanceledRequested(request.m_vIdentifier);
190 190 }
191 191 else {
192 192 impl->unlock();
193 193 // TODO log no acqIdentifier recognized
194 194 }
195 195 }
196 196
197 197 void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier,
198 198 std::shared_ptr<IDataSeries> dataSeries,
199 199 SqpRange dataRangeAcquired)
200 200 {
201 201 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableDataAcquired on range ")
202 202 << acqIdentifier << dataRangeAcquired;
203 203 impl->lockWrite();
204 204 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
205 205 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
206 206 // Store the result
207 207 auto dataPacket = AcquisitionDataPacket{};
208 208 dataPacket.m_Range = dataRangeAcquired;
209 209 dataPacket.m_DateSeries = dataSeries;
210 210
211 211 auto aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
212 212 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
213 213 // A current request result already exists, we can update it
214 214 aIdToADPVit->second.push_back(dataPacket);
215 215 }
216 216 else {
217 217 // First request result for the variable, it must be stored
218 218 impl->m_AcqIdentifierToAcqDataPacketVectorMap.insert(
219 219 std::make_pair(acqIdentifier, QVector<AcquisitionDataPacket>() << dataPacket));
220 220 }
221 221
222 222
223 223 // Decrement the counter of the request
224 224 auto &acqRequest = aIdToARit->second;
225 225 acqRequest.m_Progression = acqRequest.m_Progression + 1;
226 226
227 227 // if the counter is 0, we can return data then run the next request if it exists and
228 228 // removed the finished request
229 229 if (acqRequest.m_Size == acqRequest.m_Progression) {
230 230 auto varId = acqRequest.m_vIdentifier;
231 231 auto rangeRequested = acqRequest.m_RangeRequested;
232 232 auto cacheRangeRequested = acqRequest.m_CacheRangeRequested;
233 233 // Return the data
234 234 aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
235 235 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
236 236 emit dataProvided(varId, rangeRequested, cacheRangeRequested, aIdToADPVit->second);
237 237 }
238 238 impl->unlock();
239 239
240 240 // Update to the next request
241 241 impl->updateToNextRequest(acqRequest.m_vIdentifier);
242 242 }
243 243 else {
244 244 impl->unlock();
245 245 }
246 246 }
247 247 else {
248 248 impl->unlock();
249 249 qCWarning(LOG_VariableAcquisitionWorker())
250 250 << tr("Impossible to retrieve AcquisitionRequest for the incoming data.");
251 251 }
252 252 }
253 253
254 254 void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier)
255 255 {
256 256 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onExecuteRequest") << QThread::currentThread();
257 257 impl->lockRead();
258 258 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
259 259 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
260 260 auto request = it->second;
261 261 impl->unlock();
262 262 emit variableRequestInProgress(request.m_vIdentifier, 0.1);
263 263 request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters);
264 264 }
265 265 else {
266 266 impl->unlock();
267 267 // TODO log no acqIdentifier recognized
268 268 }
269 269 }
270 270
271 271 void VariableAcquisitionWorker::initialize()
272 272 {
273 273 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init")
274 274 << QThread::currentThread();
275 275 impl->m_WorkingMutex.lock();
276 276 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init END");
277 277 }
278 278
279 279 void VariableAcquisitionWorker::finalize()
280 280 {
281 281 impl->m_WorkingMutex.unlock();
282 282 }
283 283
284 284 void VariableAcquisitionWorker::waitForFinish()
285 285 {
286 286 QMutexLocker locker{&impl->m_WorkingMutex};
287 287 }
288 288
289 289 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariableRequest(
290 290 QUuid vIdentifier)
291 291 {
292 292 lockWrite();
293 293 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
294 294
295 295 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
296 296 // A current request already exists, we can replace the next one
297 297
298 298 m_AcqIdentifierToAcqRequestMap.erase(it->second.first);
299 299 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.first);
300 300
301 301 m_AcqIdentifierToAcqRequestMap.erase(it->second.second);
302 302 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.second);
303 303 }
304 304 m_VIdentifierToCurrrentAcqIdNextIdPairMap.erase(vIdentifier);
305 305 unlock();
306 306 }
307 307
308 308 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::updateToNextRequest(
309 309 QUuid vIdentifier)
310 310 {
311 311 lockRead();
312 312 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
313 313 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
314 314 if (it->second.second.isNull()) {
315 315 unlock();
316 316 // There is no next request, we can remove the variable request
317 317 removeVariableRequest(vIdentifier);
318 318 }
319 319 else {
320 320 auto acqIdentifierToRemove = it->second.first;
321 321 // Move the next request to the current request
322 322 auto nextRequestId = it->second.second;
323 323 it->second.first = nextRequestId;
324 324 it->second.second = QUuid();
325 325 unlock();
326 326 // Remove AcquisitionRequest and results;
327 327 lockWrite();
328 328 m_AcqIdentifierToAcqRequestMap.erase(acqIdentifierToRemove);
329 329 m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqIdentifierToRemove);
330 330 unlock();
331 331 // Execute the current request
332 332 QMetaObject::invokeMethod(q, "onExecuteRequest", Qt::QueuedConnection,
333 333 Q_ARG(QUuid, nextRequestId));
334 334 }
335 335 }
336 336 else {
337 337 unlock();
338 338 qCCritical(LOG_VariableAcquisitionWorker())
339 339 << tr("Impossible to execute the acquisition on an unfound variable ");
340 340 }
341 341 }
342 342
343 343 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::cancelVarRequest(
344 344 QUuid varRequestId)
345 345 {
346 346 qCDebug(LOG_VariableAcquisitionWorker())
347 347 << "VariableAcquisitionWorkerPrivate::cancelVarRequest 0";
348 348 lockRead();
349 349 // get all AcqIdentifier in link with varRequestId
350 350 QVector<QUuid> acqIdsToRm;
351 351 auto cend = m_AcqIdentifierToAcqRequestMap.cend();
352 352 for (auto it = m_AcqIdentifierToAcqRequestMap.cbegin(); it != cend; ++it) {
353 353 if (it->second.m_VarRequestId == varRequestId) {
354 354 acqIdsToRm << it->first;
355 355 }
356 356 }
357 357 unlock();
358 358 // run aborting or removing of acqIdsToRm
359 359
360 360 for (auto acqId : acqIdsToRm) {
361 361 removeAcqRequest(acqId);
362 362 }
363 363 qCDebug(LOG_VariableAcquisitionWorker())
364 364 << "VariableAcquisitionWorkerPrivate::cancelVarRequest end";
365 365 }
366 366
367 367 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeAcqRequest(
368 368 QUuid acqRequestId)
369 369 {
370 370 qCDebug(LOG_VariableAcquisitionWorker())
371 371 << "VariableAcquisitionWorkerPrivate::removeAcqRequest";
372 372 QUuid vIdentifier;
373 373 std::shared_ptr<IDataProvider> provider;
374 374 lockRead();
375 375 auto acqIt = m_AcqIdentifierToAcqRequestMap.find(acqRequestId);
376 376 if (acqIt != m_AcqIdentifierToAcqRequestMap.cend()) {
377 377 vIdentifier = acqIt->second.m_vIdentifier;
378 378 provider = acqIt->second.m_Provider;
379 379
380 380 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
381 381 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
382 382 if (it->second.first == acqRequestId) {
383 383 // acqRequest is currently running -> let's aborting it
384 384 unlock();
385 385
386 386 // Remove the current request from the worker
387 387 updateToNextRequest(vIdentifier);
388 388
389 389 // notify the request aborting to the provider
390 390 provider->requestDataAborting(acqRequestId);
391 391 }
392 392 else if (it->second.second == acqRequestId) {
393 393 it->second.second = QUuid();
394 394 unlock();
395 395 }
396 396 else {
397 397 unlock();
398 398 }
399 399 }
400 400 else {
401 401 unlock();
402 402 }
403 403 }
404 404 else {
405 405 unlock();
406 406 }
407 407
408 408 lockWrite();
409 409
410 410 m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqRequestId);
411 411 m_AcqIdentifierToAcqRequestMap.erase(acqRequestId);
412 412
413 413 unlock();
414 414 qCDebug(LOG_VariableAcquisitionWorker())
415 415 << "VariableAcquisitionWorkerPrivate::removeAcqRequest END";
416 416 }
@@ -1,973 +1,971
1 1 #include <Variable/Variable.h>
2 2 #include <Variable/VariableAcquisitionWorker.h>
3 3 #include <Variable/VariableCacheStrategy.h>
4 4 #include <Variable/VariableCacheStrategyFactory.h>
5 5 #include <Variable/VariableController.h>
6 6 #include <Variable/VariableModel.h>
7 7 #include <Variable/VariableSynchronizationGroup.h>
8 8
9 9 #include <Data/DataProviderParameters.h>
10 10 #include <Data/IDataProvider.h>
11 11 #include <Data/IDataSeries.h>
12 12 #include <Data/VariableRequest.h>
13 13 #include <Time/TimeController.h>
14 14
15 15 #include <QMutex>
16 16 #include <QThread>
17 17 #include <QUuid>
18 18 #include <QtCore/QItemSelectionModel>
19 19
20 20 #include <deque>
21 21 #include <set>
22 22 #include <unordered_map>
23 23
24 24 Q_LOGGING_CATEGORY(LOG_VariableController, "VariableController")
25 25
26 26 namespace {
27 27
28 28 SqpRange computeSynchroRangeRequested(const SqpRange &varRange, const SqpRange &graphRange,
29 29 const SqpRange &oldGraphRange)
30 30 {
31 31 auto zoomType = VariableController::getZoomType(graphRange, oldGraphRange);
32 32
33 33 auto varRangeRequested = varRange;
34 34 switch (zoomType) {
35 35 case AcquisitionZoomType::ZoomIn: {
36 36 auto deltaLeft = graphRange.m_TStart - oldGraphRange.m_TStart;
37 37 auto deltaRight = oldGraphRange.m_TEnd - graphRange.m_TEnd;
38 38 varRangeRequested.m_TStart += deltaLeft;
39 39 varRangeRequested.m_TEnd -= deltaRight;
40 40 break;
41 41 }
42 42
43 43 case AcquisitionZoomType::ZoomOut: {
44 44 auto deltaLeft = oldGraphRange.m_TStart - graphRange.m_TStart;
45 45 auto deltaRight = graphRange.m_TEnd - oldGraphRange.m_TEnd;
46 46 varRangeRequested.m_TStart -= deltaLeft;
47 47 varRangeRequested.m_TEnd += deltaRight;
48 48 break;
49 49 }
50 50 case AcquisitionZoomType::PanRight: {
51 51 auto deltaLeft = graphRange.m_TStart - oldGraphRange.m_TStart;
52 52 auto deltaRight = graphRange.m_TEnd - oldGraphRange.m_TEnd;
53 53 varRangeRequested.m_TStart += deltaLeft;
54 54 varRangeRequested.m_TEnd += deltaRight;
55 55 break;
56 56 }
57 57 case AcquisitionZoomType::PanLeft: {
58 58 auto deltaLeft = oldGraphRange.m_TStart - graphRange.m_TStart;
59 59 auto deltaRight = oldGraphRange.m_TEnd - graphRange.m_TEnd;
60 60 varRangeRequested.m_TStart -= deltaLeft;
61 61 varRangeRequested.m_TEnd -= deltaRight;
62 62 break;
63 63 }
64 64 case AcquisitionZoomType::Unknown: {
65 65 qCCritical(LOG_VariableController())
66 66 << VariableController::tr("Impossible to synchronize: zoom type unknown");
67 67 break;
68 68 }
69 69 default:
70 70 qCCritical(LOG_VariableController()) << VariableController::tr(
71 71 "Impossible to synchronize: zoom type not take into account");
72 72 // No action
73 73 break;
74 74 }
75 75
76 76 return varRangeRequested;
77 77 }
78 78 }
79 79
80 80 enum class VariableRequestHandlerState { OFF, RUNNING, PENDING };
81 81
82 82 struct VariableRequestHandler {
83 83
84 84 VariableRequestHandler()
85 85 {
86 86 m_CanUpdate = false;
87 87 m_State = VariableRequestHandlerState::OFF;
88 88 }
89 89
90 90 QUuid m_VarId;
91 91 VariableRequest m_RunningVarRequest;
92 92 VariableRequest m_PendingVarRequest;
93 93 VariableRequestHandlerState m_State;
94 94 bool m_CanUpdate;
95 95 };
96 96
97 97 struct VariableController::VariableControllerPrivate {
98 98 explicit VariableControllerPrivate(VariableController *parent)
99 99 : m_WorkingMutex{},
100 100 m_VariableModel{new VariableModel{parent}},
101 101 m_VariableSelectionModel{new QItemSelectionModel{m_VariableModel, parent}},
102 102 // m_VariableCacheStrategy{std::make_unique<VariableCacheStrategy>()},
103 103 m_VariableCacheStrategy{VariableCacheStrategyFactory::createCacheStrategy(
104 104 CacheStrategy::SingleThreshold)},
105 105 m_VariableAcquisitionWorker{std::make_unique<VariableAcquisitionWorker>()},
106 106 q{parent}
107 107 {
108 108
109 109 m_VariableAcquisitionWorker->moveToThread(&m_VariableAcquisitionWorkerThread);
110 110 m_VariableAcquisitionWorkerThread.setObjectName("VariableAcquisitionWorkerThread");
111 111 }
112 112
113 113
114 114 virtual ~VariableControllerPrivate()
115 115 {
116 116 qCDebug(LOG_VariableController()) << tr("VariableControllerPrivate destruction");
117 117 m_VariableAcquisitionWorkerThread.quit();
118 118 m_VariableAcquisitionWorkerThread.wait();
119 119 }
120 120
121 121
122 122 void processRequest(std::shared_ptr<Variable> var, const SqpRange &rangeRequested,
123 123 QUuid varRequestId);
124 124
125 125 std::shared_ptr<Variable> findVariable(QUuid vIdentifier);
126 126 std::shared_ptr<IDataSeries>
127 127 retrieveDataSeries(const QVector<AcquisitionDataPacket> acqDataPacketVector);
128 128
129 129 void registerProvider(std::shared_ptr<IDataProvider> provider);
130 130
131 131 void storeVariableRequest(QUuid varId, QUuid varRequestId, const VariableRequest &varRequest);
132 132 QUuid acceptVariableRequest(QUuid varId, std::shared_ptr<IDataSeries> dataSeries);
133 133 void updateVariables(QUuid varRequestId);
134 134 void updateVariableRequest(QUuid varRequestId);
135 135 void cancelVariableRequest(QUuid varRequestId);
136 136 void executeVarRequest(std::shared_ptr<Variable> var, VariableRequest &varRequest);
137 137
138 138 QMutex m_WorkingMutex;
139 139 /// Variable model. The VariableController has the ownership
140 140 VariableModel *m_VariableModel;
141 141 QItemSelectionModel *m_VariableSelectionModel;
142 142
143 143
144 144 TimeController *m_TimeController{nullptr};
145 145 std::unique_ptr<VariableCacheStrategy> m_VariableCacheStrategy;
146 146 std::unique_ptr<VariableAcquisitionWorker> m_VariableAcquisitionWorker;
147 147 QThread m_VariableAcquisitionWorkerThread;
148 148
149 149 std::unordered_map<std::shared_ptr<Variable>, std::shared_ptr<IDataProvider> >
150 150 m_VariableToProviderMap;
151 151 std::unordered_map<std::shared_ptr<Variable>, QUuid> m_VariableToIdentifierMap;
152 152 std::map<QUuid, std::shared_ptr<VariableSynchronizationGroup> >
153 153 m_GroupIdToVariableSynchronizationGroupMap;
154 154 std::map<QUuid, QUuid> m_VariableIdGroupIdMap;
155 155 std::set<std::shared_ptr<IDataProvider> > m_ProviderSet;
156 156
157 157 std::map<QUuid, std::list<QUuid> > m_VarGroupIdToVarIds;
158 158 std::map<QUuid, std::unique_ptr<VariableRequestHandler> > m_VarIdToVarRequestHandler;
159 159
160 160 VariableController *q;
161 161 };
162 162
163 163
164 164 VariableController::VariableController(QObject *parent)
165 165 : QObject{parent}, impl{spimpl::make_unique_impl<VariableControllerPrivate>(this)}
166 166 {
167 167 qCDebug(LOG_VariableController()) << tr("VariableController construction")
168 168 << QThread::currentThread();
169 169
170 170 connect(impl->m_VariableModel, &VariableModel::abortProgessRequested, this,
171 171 &VariableController::onAbortProgressRequested);
172 172
173 173 connect(impl->m_VariableAcquisitionWorker.get(),
174 174 &VariableAcquisitionWorker::variableCanceledRequested, this,
175 175 &VariableController::onAbortAcquisitionRequested);
176 176
177 177 connect(impl->m_VariableAcquisitionWorker.get(), &VariableAcquisitionWorker::dataProvided, this,
178 178 &VariableController::onDataProvided);
179 179 connect(impl->m_VariableAcquisitionWorker.get(),
180 180 &VariableAcquisitionWorker::variableRequestInProgress, this,
181 181 &VariableController::onVariableRetrieveDataInProgress);
182 182
183 183
184 184 connect(&impl->m_VariableAcquisitionWorkerThread, &QThread::started,
185 185 impl->m_VariableAcquisitionWorker.get(), &VariableAcquisitionWorker::initialize);
186 186 connect(&impl->m_VariableAcquisitionWorkerThread, &QThread::finished,
187 187 impl->m_VariableAcquisitionWorker.get(), &VariableAcquisitionWorker::finalize);
188 188
189 189
190 190 impl->m_VariableAcquisitionWorkerThread.start();
191 191 }
192 192
193 193 VariableController::~VariableController()
194 194 {
195 195 qCDebug(LOG_VariableController()) << tr("VariableController destruction")
196 196 << QThread::currentThread();
197 197 this->waitForFinish();
198 198 }
199 199
200 200 VariableModel *VariableController::variableModel() noexcept
201 201 {
202 202 return impl->m_VariableModel;
203 203 }
204 204
205 205 QItemSelectionModel *VariableController::variableSelectionModel() noexcept
206 206 {
207 207 return impl->m_VariableSelectionModel;
208 208 }
209 209
210 210 void VariableController::setTimeController(TimeController *timeController) noexcept
211 211 {
212 212 impl->m_TimeController = timeController;
213 213 }
214 214
215 215 std::shared_ptr<Variable>
216 216 VariableController::cloneVariable(std::shared_ptr<Variable> variable) noexcept
217 217 {
218 218 if (impl->m_VariableModel->containsVariable(variable)) {
219 219 // Clones variable
220 220 auto duplicate = variable->clone();
221 221
222 222 // Adds clone to model
223 223 impl->m_VariableModel->addVariable(duplicate);
224 224
225 225 // Generates clone identifier
226 226 impl->m_VariableToIdentifierMap[duplicate] = QUuid::createUuid();
227 227
228 228 // Registers provider
229 229 auto variableProvider = impl->m_VariableToProviderMap.at(variable);
230 230 auto duplicateProvider = variableProvider != nullptr ? variableProvider->clone() : nullptr;
231 231
232 232 impl->m_VariableToProviderMap[duplicate] = duplicateProvider;
233 233 if (duplicateProvider) {
234 234 impl->registerProvider(duplicateProvider);
235 235 }
236 236
237 237 return duplicate;
238 238 }
239 239 else {
240 240 qCCritical(LOG_VariableController())
241 241 << tr("Can't create duplicate of variable %1: variable not registered in the model")
242 242 .arg(variable->name());
243 243 return nullptr;
244 244 }
245 245 }
246 246
247 247 void VariableController::deleteVariable(std::shared_ptr<Variable> variable) noexcept
248 248 {
249 249 if (!variable) {
250 250 qCCritical(LOG_VariableController()) << "Can't delete variable: variable is null";
251 251 return;
252 252 }
253 253
254 254 // Spreads in SciQlop that the variable will be deleted, so that potential receivers can
255 255 // make some treatments before the deletion
256 256 emit variableAboutToBeDeleted(variable);
257 257
258 258 // Deletes identifier
259 259 impl->m_VariableToIdentifierMap.erase(variable);
260 260
261 261 // Deletes provider
262 262 auto nbProvidersDeleted = impl->m_VariableToProviderMap.erase(variable);
263 263 qCDebug(LOG_VariableController())
264 264 << tr("Number of providers deleted for variable %1: %2")
265 265 .arg(variable->name(), QString::number(nbProvidersDeleted));
266 266
267 267
268 268 // Deletes from model
269 269 impl->m_VariableModel->deleteVariable(variable);
270 270 }
271 271
272 272 void VariableController::deleteVariables(
273 273 const QVector<std::shared_ptr<Variable> > &variables) noexcept
274 274 {
275 275 for (auto variable : qAsConst(variables)) {
276 276 deleteVariable(variable);
277 277 }
278 278 }
279 279
280 280 std::shared_ptr<Variable>
281 281 VariableController::createVariable(const QString &name, const QVariantHash &metadata,
282 282 std::shared_ptr<IDataProvider> provider) noexcept
283 283 {
284 284 if (!impl->m_TimeController) {
285 285 qCCritical(LOG_VariableController())
286 286 << tr("Impossible to create variable: The time controller is null");
287 287 return nullptr;
288 288 }
289 289
290 290 auto range = impl->m_TimeController->dateTime();
291 291
292 292 if (auto newVariable = impl->m_VariableModel->createVariable(name, metadata)) {
293 293 auto varId = QUuid::createUuid();
294 294
295 295 // Create the handler
296 296 auto varRequestHandler = std::make_unique<VariableRequestHandler>();
297 297 varRequestHandler->m_VarId = varId;
298 298
299 299 impl->m_VarIdToVarRequestHandler.insert(
300 300 std::make_pair(varId, std::move(varRequestHandler)));
301 301
302 302 // store the provider
303 303 impl->registerProvider(provider);
304 304
305 305 // Associate the provider
306 306 impl->m_VariableToProviderMap[newVariable] = provider;
307 307 impl->m_VariableToIdentifierMap[newVariable] = varId;
308 308
309 309 this->onRequestDataLoading(QVector<std::shared_ptr<Variable> >{newVariable}, range, false);
310 310
311 311 // auto varRequestId = QUuid::createUuid();
312 312 // qCInfo(LOG_VariableController()) << "createVariable: " << varId << varRequestId;
313 313 // impl->processRequest(newVariable, range, varRequestId);
314 314 // impl->updateVariableRequest(varRequestId);
315 315
316 316 return newVariable;
317 317 }
318 318 }
319 319
320 320 void VariableController::onDateTimeOnSelection(const SqpRange &dateTime)
321 321 {
322 322 // NOTE: Even if acquisition request is aborting, the graphe range will be changed
323 323 qCDebug(LOG_VariableController()) << "VariableController::onDateTimeOnSelection"
324 324 << QThread::currentThread()->objectName();
325 325 auto selectedRows = impl->m_VariableSelectionModel->selectedRows();
326 326 auto variables = QVector<std::shared_ptr<Variable> >{};
327 327
328 328 for (const auto &selectedRow : qAsConst(selectedRows)) {
329 329 if (auto selectedVariable = impl->m_VariableModel->variable(selectedRow.row())) {
330 330 variables << selectedVariable;
331 331
332 332 // notify that rescale operation has to be done
333 333 emit rangeChanged(selectedVariable, dateTime);
334 334 }
335 335 }
336 336
337 337 if (!variables.isEmpty()) {
338 338 this->onRequestDataLoading(variables, dateTime, true);
339 339 }
340 340 }
341 341
342 342 void VariableController::onDataProvided(QUuid vIdentifier, const SqpRange &rangeRequested,
343 343 const SqpRange &cacheRangeRequested,
344 344 QVector<AcquisitionDataPacket> dataAcquired)
345 345 {
346 qCInfo(LOG_VariableController()) << tr("onDataProvided") << QThread::currentThread();
346 qCDebug(LOG_VariableController()) << tr("onDataProvided") << QThread::currentThread();
347 347 auto retrievedDataSeries = impl->retrieveDataSeries(dataAcquired);
348 348 auto varRequestId = impl->acceptVariableRequest(vIdentifier, retrievedDataSeries);
349 349 if (!varRequestId.isNull()) {
350 350 impl->updateVariables(varRequestId);
351 351 }
352 352 }
353 353
354 354 void VariableController::onVariableRetrieveDataInProgress(QUuid identifier, double progress)
355 355 {
356 356 qCDebug(LOG_VariableController())
357 357 << "TORM: variableController::onVariableRetrieveDataInProgress"
358 358 << QThread::currentThread()->objectName() << progress;
359 359 if (auto var = impl->findVariable(identifier)) {
360 360 impl->m_VariableModel->setDataProgress(var, progress);
361 361 }
362 362 else {
363 363 qCCritical(LOG_VariableController())
364 364 << tr("Impossible to notify progression of a null variable");
365 365 }
366 366 }
367 367
368 368 void VariableController::onAbortProgressRequested(std::shared_ptr<Variable> variable)
369 369 {
370 qCDebug(LOG_VariableController()) << "TORM: variableController::onAbortProgressRequested"
371 << QThread::currentThread()->objectName() << variable->name();
370 372
371 373 auto itVar = impl->m_VariableToIdentifierMap.find(variable);
372 374 if (itVar == impl->m_VariableToIdentifierMap.cend()) {
373 375 qCCritical(LOG_VariableController())
374 376 << tr("Impossible to onAbortProgressRequested request for unknown variable");
375 377 return;
376 378 }
377 379
378 380 auto varId = itVar->second;
379 381
380 382 auto itVarHandler = impl->m_VarIdToVarRequestHandler.find(varId);
381 383 if (itVarHandler == impl->m_VarIdToVarRequestHandler.cend()) {
382 384 qCCritical(LOG_VariableController())
383 385 << tr("Impossible to onAbortProgressRequested for variable with unknown handler");
384 386 return;
385 387 }
386 388
387 389 auto varHandler = itVarHandler->second.get();
388 390
389 391 // case where a variable has a running request
390 392 if (varHandler->m_State != VariableRequestHandlerState::OFF) {
391 393 impl->cancelVariableRequest(varHandler->m_RunningVarRequest.m_VariableGroupId);
392 394 }
393 395 }
394 396
395 397 void VariableController::onAbortAcquisitionRequested(QUuid vIdentifier)
396 398 {
397 399 qCDebug(LOG_VariableController()) << "TORM: variableController::onAbortAcquisitionRequested"
398 400 << QThread::currentThread()->objectName() << vIdentifier;
399 401
400 402 if (auto var = impl->findVariable(vIdentifier)) {
401 403 this->onAbortProgressRequested(var);
402 404 }
403 405 else {
404 406 qCCritical(LOG_VariableController())
405 407 << tr("Impossible to abort Acquisition Requestof a null variable");
406 408 }
407 409 }
408 410
409 411 void VariableController::onAddSynchronizationGroupId(QUuid synchronizationGroupId)
410 412 {
411 413 qCDebug(LOG_VariableController()) << "TORM: VariableController::onAddSynchronizationGroupId"
412 414 << QThread::currentThread()->objectName()
413 415 << synchronizationGroupId;
414 416 auto vSynchroGroup = std::make_shared<VariableSynchronizationGroup>();
415 417 impl->m_GroupIdToVariableSynchronizationGroupMap.insert(
416 418 std::make_pair(synchronizationGroupId, vSynchroGroup));
417 419 }
418 420
419 421 void VariableController::onRemoveSynchronizationGroupId(QUuid synchronizationGroupId)
420 422 {
421 423 impl->m_GroupIdToVariableSynchronizationGroupMap.erase(synchronizationGroupId);
422 424 }
423 425
424 426 void VariableController::onAddSynchronized(std::shared_ptr<Variable> variable,
425 427 QUuid synchronizationGroupId)
426 428
427 429 {
428 430 qCDebug(LOG_VariableController()) << "TORM: VariableController::onAddSynchronized"
429 431 << synchronizationGroupId;
430 432 auto varToVarIdIt = impl->m_VariableToIdentifierMap.find(variable);
431 433 if (varToVarIdIt != impl->m_VariableToIdentifierMap.cend()) {
432 434 auto groupIdToVSGIt
433 435 = impl->m_GroupIdToVariableSynchronizationGroupMap.find(synchronizationGroupId);
434 436 if (groupIdToVSGIt != impl->m_GroupIdToVariableSynchronizationGroupMap.cend()) {
435 437 impl->m_VariableIdGroupIdMap.insert(
436 438 std::make_pair(varToVarIdIt->second, synchronizationGroupId));
437 439 groupIdToVSGIt->second->addVariableId(varToVarIdIt->second);
438 440 }
439 441 else {
440 442 qCCritical(LOG_VariableController())
441 443 << tr("Impossible to synchronize a variable with an unknown sycnhronization group")
442 444 << variable->name();
443 445 }
444 446 }
445 447 else {
446 448 qCCritical(LOG_VariableController())
447 449 << tr("Impossible to synchronize a variable with no identifier") << variable->name();
448 450 }
449 451 }
450 452
451 453 void VariableController::desynchronize(std::shared_ptr<Variable> variable,
452 454 QUuid synchronizationGroupId)
453 455 {
454 456 // Gets variable id
455 457 auto variableIt = impl->m_VariableToIdentifierMap.find(variable);
456 458 if (variableIt == impl->m_VariableToIdentifierMap.cend()) {
457 459 qCCritical(LOG_VariableController())
458 460 << tr("Can't desynchronize variable %1: variable identifier not found")
459 461 .arg(variable->name());
460 462 return;
461 463 }
462 464
463 465 // Gets synchronization group
464 466 auto groupIt = impl->m_GroupIdToVariableSynchronizationGroupMap.find(synchronizationGroupId);
465 467 if (groupIt == impl->m_GroupIdToVariableSynchronizationGroupMap.cend()) {
466 468 qCCritical(LOG_VariableController())
467 469 << tr("Can't desynchronize variable %1: unknown synchronization group")
468 470 .arg(variable->name());
469 471 return;
470 472 }
471 473
472 474 auto variableId = variableIt->second;
473 475
474 476 // Removes variable from synchronization group
475 477 auto synchronizationGroup = groupIt->second;
476 478 synchronizationGroup->removeVariableId(variableId);
477 479
478 480 // Removes link between variable and synchronization group
479 481 impl->m_VariableIdGroupIdMap.erase(variableId);
480 482 }
481 483
482 484 void VariableController::onRequestDataLoading(QVector<std::shared_ptr<Variable> > variables,
483 485 const SqpRange &range, bool synchronise)
484 486 {
485 487 // variables is assumed synchronized
486 488 // TODO: Asser variables synchronization
487 489 // we want to load data of the variable for the dateTime.
488 490 if (variables.isEmpty()) {
489 491 return;
490 492 }
491 493
492 494 auto varRequestId = QUuid::createUuid();
493 qCCritical(LOG_VariableController()) << "VariableController::onRequestDataLoading"
494 << QThread::currentThread()->objectName() << varRequestId
495 << range;
495 qCDebug(LOG_VariableController()) << "VariableController::onRequestDataLoading"
496 << QThread::currentThread()->objectName() << varRequestId
497 << range;
496 498
497 499 if (!synchronise) {
498 500 auto varIds = std::list<QUuid>{};
499 501 for (const auto &var : variables) {
500 502 auto vId = impl->m_VariableToIdentifierMap.at(var);
501 503 varIds.push_back(vId);
502 504 }
503 505 impl->m_VarGroupIdToVarIds.insert(std::make_pair(varRequestId, varIds));
504 506 for (const auto &var : variables) {
505 507 qCInfo(LOG_VariableController()) << "processRequest for" << var->name() << varRequestId
506 508 << varIds.size();
507 509 impl->processRequest(var, range, varRequestId);
508 510 }
509 511 }
510 512 else {
511 513 auto vId = impl->m_VariableToIdentifierMap.at(variables.first());
512 qCInfo(LOG_VariableController()) << "Var in synchro group 0" << vId;
513 514 auto varIdToGroupIdIt = impl->m_VariableIdGroupIdMap.find(vId);
514 515 if (varIdToGroupIdIt != impl->m_VariableIdGroupIdMap.cend()) {
515 516 auto groupId = varIdToGroupIdIt->second;
516 517
517 518 auto vSynchronizationGroup
518 519 = impl->m_GroupIdToVariableSynchronizationGroupMap.at(groupId);
519 520 auto vSyncIds = vSynchronizationGroup->getIds();
520 521
521 qCInfo(LOG_VariableController()) << "Var in synchro group 1" << groupId;
522
523 522 auto varIds = std::list<QUuid>{};
524 523 for (auto vId : vSyncIds) {
525 524 varIds.push_back(vId);
526 525 }
527 526 impl->m_VarGroupIdToVarIds.insert(std::make_pair(varRequestId, varIds));
528 527
529 528 for (auto vId : vSyncIds) {
530 529 auto var = impl->findVariable(vId);
531 530
532 531 // Don't process already processed var
533 532 if (var != nullptr) {
534 qCInfo(LOG_VariableController()) << "processRequest synchro for" << var->name()
535 << varRequestId;
533 qCDebug(LOG_VariableController()) << "processRequest synchro for" << var->name()
534 << varRequestId;
536 535 auto vSyncRangeRequested
537 536 = variables.contains(var)
538 537 ? range
539 538 : computeSynchroRangeRequested(var->range(), range,
540 539 variables.first()->range());
541 540 qCDebug(LOG_VariableController()) << "synchro RR" << vSyncRangeRequested;
542 541 impl->processRequest(var, vSyncRangeRequested, varRequestId);
543 542 }
544 543 else {
545 544 qCCritical(LOG_VariableController())
546 545
547 546 << tr("Impossible to synchronize a null variable");
548 547 }
549 548 }
550 549 }
551 550 }
552 551
553 552 impl->updateVariables(varRequestId);
554 553 }
555 554
556 555
557 556 void VariableController::initialize()
558 557 {
559 558 qCDebug(LOG_VariableController()) << tr("VariableController init") << QThread::currentThread();
560 559 impl->m_WorkingMutex.lock();
561 560 qCDebug(LOG_VariableController()) << tr("VariableController init END");
562 561 }
563 562
564 563 void VariableController::finalize()
565 564 {
566 565 impl->m_WorkingMutex.unlock();
567 566 }
568 567
569 568 void VariableController::waitForFinish()
570 569 {
571 570 QMutexLocker locker{&impl->m_WorkingMutex};
572 571 }
573 572
574 573 AcquisitionZoomType VariableController::getZoomType(const SqpRange &range, const SqpRange &oldRange)
575 574 {
576 575 // t1.m_TStart <= t2.m_TStart && t2.m_TEnd <= t1.m_TEnd
577 576 auto zoomType = AcquisitionZoomType::Unknown;
578 577 if (range.m_TStart <= oldRange.m_TStart && oldRange.m_TEnd <= range.m_TEnd) {
579 578 qCDebug(LOG_VariableController()) << "zoomtype: ZoomOut";
580 579 zoomType = AcquisitionZoomType::ZoomOut;
581 580 }
582 581 else if (range.m_TStart > oldRange.m_TStart && range.m_TEnd > oldRange.m_TEnd) {
583 582 qCDebug(LOG_VariableController()) << "zoomtype: PanRight";
584 583 zoomType = AcquisitionZoomType::PanRight;
585 584 }
586 585 else if (range.m_TStart < oldRange.m_TStart && range.m_TEnd < oldRange.m_TEnd) {
587 586 qCDebug(LOG_VariableController()) << "zoomtype: PanLeft";
588 587 zoomType = AcquisitionZoomType::PanLeft;
589 588 }
590 589 else if (range.m_TStart > oldRange.m_TStart && oldRange.m_TEnd > range.m_TEnd) {
591 590 qCDebug(LOG_VariableController()) << "zoomtype: ZoomIn";
592 591 zoomType = AcquisitionZoomType::ZoomIn;
593 592 }
594 593 else {
595 594 qCDebug(LOG_VariableController()) << "getZoomType: Unknown type detected";
596 595 }
597 596 return zoomType;
598 597 }
599 598
600 599 void VariableController::VariableControllerPrivate::processRequest(std::shared_ptr<Variable> var,
601 600 const SqpRange &rangeRequested,
602 601 QUuid varRequestId)
603 602 {
604 603 auto itVar = m_VariableToIdentifierMap.find(var);
605 604 if (itVar == m_VariableToIdentifierMap.cend()) {
606 605 qCCritical(LOG_VariableController())
607 606 << tr("Impossible to process request for unknown variable");
608 607 return;
609 608 }
610 609
611 610 auto varId = itVar->second;
612 611
613 612 auto itVarHandler = m_VarIdToVarRequestHandler.find(varId);
614 613 if (itVarHandler == m_VarIdToVarRequestHandler.cend()) {
615 614 qCCritical(LOG_VariableController())
616 615 << tr("Impossible to process request for variable with unknown handler");
617 616 return;
618 617 }
619 618
620 619 auto oldRange = var->range();
621 620
622 621 auto varHandler = itVarHandler->second.get();
623 622
624 623 if (varHandler->m_State != VariableRequestHandlerState::OFF) {
625 624 oldRange = varHandler->m_RunningVarRequest.m_RangeRequested;
626 625 }
627 626
628 627 auto varRequest = VariableRequest{};
629 628 varRequest.m_VariableGroupId = varRequestId;
630 629 auto varStrategyRangesRequested
631 630 = m_VariableCacheStrategy->computeRange(oldRange, rangeRequested);
632 631 varRequest.m_RangeRequested = varStrategyRangesRequested.first;
633 632 varRequest.m_CacheRangeRequested = varStrategyRangesRequested.second;
634 633
635 634 switch (varHandler->m_State) {
636 635 case VariableRequestHandlerState::OFF: {
637 qCCritical(LOG_VariableController()) << tr("Process Request OFF")
638 << varRequest.m_RangeRequested
639 << varRequest.m_CacheRangeRequested;
636 qCDebug(LOG_VariableController()) << tr("Process Request OFF")
637 << varRequest.m_RangeRequested
638 << varRequest.m_CacheRangeRequested;
640 639 varHandler->m_RunningVarRequest = varRequest;
641 640 varHandler->m_State = VariableRequestHandlerState::RUNNING;
642 641 executeVarRequest(var, varRequest);
643 642 break;
644 643 }
645 644 case VariableRequestHandlerState::RUNNING: {
646 qCCritical(LOG_VariableController()) << tr("Process Request RUNNING")
647 << varRequest.m_RangeRequested
648 << varRequest.m_CacheRangeRequested;
645 qCDebug(LOG_VariableController()) << tr("Process Request RUNNING")
646 << varRequest.m_RangeRequested
647 << varRequest.m_CacheRangeRequested;
649 648 varHandler->m_State = VariableRequestHandlerState::PENDING;
650 649 varHandler->m_PendingVarRequest = varRequest;
651 650 break;
652 651 }
653 652 case VariableRequestHandlerState::PENDING: {
654 qCCritical(LOG_VariableController()) << tr("Process Request PENDING")
655 << varRequest.m_RangeRequested
656 << varRequest.m_CacheRangeRequested;
653 qCDebug(LOG_VariableController()) << tr("Process Request PENDING")
654 << varRequest.m_RangeRequested
655 << varRequest.m_CacheRangeRequested;
657 656 auto variableGroupIdToCancel = varHandler->m_PendingVarRequest.m_VariableGroupId;
658 657 varHandler->m_PendingVarRequest = varRequest;
659 658 cancelVariableRequest(variableGroupIdToCancel);
660 659
661 660 break;
662 661 }
663 662 default:
664 663 qCCritical(LOG_VariableController())
665 664 << QObject::tr("Unknown VariableRequestHandlerState");
666 665 }
667 666 }
668 667
669 668 std::shared_ptr<Variable>
670 669 VariableController::VariableControllerPrivate::findVariable(QUuid vIdentifier)
671 670 {
672 671 std::shared_ptr<Variable> var;
673 672 auto findReply = [vIdentifier](const auto &entry) { return vIdentifier == entry.second; };
674 673
675 674 auto end = m_VariableToIdentifierMap.cend();
676 675 auto it = std::find_if(m_VariableToIdentifierMap.cbegin(), end, findReply);
677 676 if (it != end) {
678 677 var = it->first;
679 678 }
680 679 else {
681 680 qCCritical(LOG_VariableController())
682 681 << tr("Impossible to find the variable with the identifier: ") << vIdentifier;
683 682 }
684 683
685 684 return var;
686 685 }
687 686
688 687 std::shared_ptr<IDataSeries> VariableController::VariableControllerPrivate::retrieveDataSeries(
689 688 const QVector<AcquisitionDataPacket> acqDataPacketVector)
690 689 {
691 690 qCDebug(LOG_VariableController()) << tr("TORM: retrieveDataSeries acqDataPacketVector size")
692 691 << acqDataPacketVector.size();
693 692 std::shared_ptr<IDataSeries> dataSeries;
694 693 if (!acqDataPacketVector.isEmpty()) {
695 694 dataSeries = acqDataPacketVector[0].m_DateSeries;
696 695 for (int i = 1; i < acqDataPacketVector.size(); ++i) {
697 696 dataSeries->merge(acqDataPacketVector[i].m_DateSeries.get());
698 697 }
699 698 }
700 699 qCDebug(LOG_VariableController()) << tr("TORM: retrieveDataSeries acqDataPacketVector size END")
701 700 << acqDataPacketVector.size();
702 701 return dataSeries;
703 702 }
704 703
705 704 void VariableController::VariableControllerPrivate::registerProvider(
706 705 std::shared_ptr<IDataProvider> provider)
707 706 {
708 707 if (m_ProviderSet.find(provider) == m_ProviderSet.end()) {
709 708 qCDebug(LOG_VariableController()) << tr("Registering of a new provider")
710 709 << provider->objectName();
711 710 m_ProviderSet.insert(provider);
712 711 connect(provider.get(), &IDataProvider::dataProvided, m_VariableAcquisitionWorker.get(),
713 712 &VariableAcquisitionWorker::onVariableDataAcquired);
714 713 connect(provider.get(), &IDataProvider::dataProvidedProgress,
715 714 m_VariableAcquisitionWorker.get(),
716 715 &VariableAcquisitionWorker::onVariableRetrieveDataInProgress);
717 716 connect(provider.get(), &IDataProvider::dataProvidedFailed,
718 717 m_VariableAcquisitionWorker.get(),
719 718 &VariableAcquisitionWorker::onVariableAcquisitionFailed);
720 719 }
721 720 else {
722 721 qCDebug(LOG_VariableController()) << tr("Cannot register provider, it already exists ");
723 722 }
724 723 }
725 724
726 725 QUuid VariableController::VariableControllerPrivate::acceptVariableRequest(
727 726 QUuid varId, std::shared_ptr<IDataSeries> dataSeries)
728 727 {
729 728 auto itVarHandler = m_VarIdToVarRequestHandler.find(varId);
730 729 if (itVarHandler == m_VarIdToVarRequestHandler.cend()) {
731 730 return QUuid();
732 731 }
733 732
734 733 auto varHandler = itVarHandler->second.get();
735 734 if (varHandler->m_State == VariableRequestHandlerState::OFF) {
736 735 // TODO log impossible case !!!
737 736 }
738 737
739 738 varHandler->m_RunningVarRequest.m_DataSeries = dataSeries;
740 739 varHandler->m_CanUpdate = true;
741 740
742 741 // Element traitΓ©, on a dΓ©jΓ  toutes les donnΓ©es necessaires
743 742 auto varGroupId = varHandler->m_RunningVarRequest.m_VariableGroupId;
744 qCInfo(LOG_VariableController()) << "Variable::acceptVariableRequest" << varGroupId
745 << m_VarGroupIdToVarIds.size();
743 qCDebug(LOG_VariableController()) << "Variable::acceptVariableRequest" << varGroupId
744 << m_VarGroupIdToVarIds.size();
746 745
747 746 return varHandler->m_RunningVarRequest.m_VariableGroupId;
748 747 }
749 748
750 749 void VariableController::VariableControllerPrivate::updateVariables(QUuid varRequestId)
751 750 {
752 qCInfo(LOG_VariableController()) << "VariableControllerPrivate::updateVariables"
753 << QThread::currentThread()->objectName() << varRequestId;
751 qCDebug(LOG_VariableController()) << "VariableControllerPrivate::updateVariables"
752 << QThread::currentThread()->objectName() << varRequestId;
754 753
755 754 auto varGroupIdToVarIdsIt = m_VarGroupIdToVarIds.find(varRequestId);
756 755 if (varGroupIdToVarIdsIt == m_VarGroupIdToVarIds.end()) {
757 // TODO LOG cannot update since varGroupdId isn't here anymore
758 qCInfo(LOG_VariableController()) << "Impossible to updateVariables of unknown variables"
759 << varRequestId;
756 qCWarning(LOG_VariableController())
757 << tr("Impossible to updateVariables of unknown variables") << varRequestId;
760 758 return;
761 759 }
762 760
763 761 auto &varIds = varGroupIdToVarIdsIt->second;
764 762 auto varIdsEnd = varIds.end();
765 763 bool processVariableUpdate = true;
766 qCInfo(LOG_VariableController())
767 << "VariableControllerPrivate::compute procss for group of size" << varIds.size();
768 764 for (auto varIdsIt = varIds.begin(); (varIdsIt != varIdsEnd) && processVariableUpdate;
769 765 ++varIdsIt) {
770 766 auto itVarHandler = m_VarIdToVarRequestHandler.find(*varIdsIt);
771 767 if (itVarHandler != m_VarIdToVarRequestHandler.cend()) {
772 768 processVariableUpdate &= itVarHandler->second->m_CanUpdate;
773 769 }
774 770 }
775 771
776 772 if (processVariableUpdate) {
777 qCInfo(LOG_VariableController()) << "Final update OK for the var request" << varIds.size();
773 qCDebug(LOG_VariableController()) << "Final update OK for the var request" << varIds.size();
778 774 for (auto varIdsIt = varIds.begin(); varIdsIt != varIdsEnd; ++varIdsIt) {
779 775 auto itVarHandler = m_VarIdToVarRequestHandler.find(*varIdsIt);
780 776 if (itVarHandler != m_VarIdToVarRequestHandler.cend()) {
781 777 if (auto var = findVariable(*varIdsIt)) {
782 778 auto &varRequest = itVarHandler->second->m_RunningVarRequest;
783 779 var->setRange(varRequest.m_RangeRequested);
784 780 var->setCacheRange(varRequest.m_CacheRangeRequested);
785 qCInfo(LOG_VariableController()) << tr("1: onDataProvided")
786 << varRequest.m_RangeRequested
787 << varRequest.m_CacheRangeRequested;
788 qCInfo(LOG_VariableController()) << tr("2: onDataProvided var points before")
789 << var->nbPoints()
790 << varRequest.m_DataSeries->nbPoints();
781 qCDebug(LOG_VariableController()) << tr("1: onDataProvided")
782 << varRequest.m_RangeRequested
783 << varRequest.m_CacheRangeRequested;
784 qCDebug(LOG_VariableController()) << tr("2: onDataProvided var points before")
785 << var->nbPoints()
786 << varRequest.m_DataSeries->nbPoints();
791 787 var->mergeDataSeries(varRequest.m_DataSeries);
792 qCInfo(LOG_VariableController()) << tr("3: onDataProvided var points after")
793 << var->nbPoints();
788 qCDebug(LOG_VariableController()) << tr("3: onDataProvided var points after")
789 << var->nbPoints();
794 790
795 791 emit var->updated();
796 qCCritical(LOG_VariableController()) << tr("Update OK");
792 qCDebug(LOG_VariableController()) << tr("Update OK");
797 793 }
798 794 else {
799 795 qCCritical(LOG_VariableController())
800 796 << tr("Impossible to update data to a null variable");
801 797 }
802 798 }
803 799 }
804 800 updateVariableRequest(varRequestId);
805 801
806 802 // cleaning varRequestId
807 qCInfo(LOG_VariableController()) << tr("m_VarGroupIdToVarIds erase") << varRequestId;
803 qCDebug(LOG_VariableController()) << tr("m_VarGroupIdToVarIds erase") << varRequestId;
808 804 m_VarGroupIdToVarIds.erase(varRequestId);
809 805 }
810 806 }
811 807
812 808
813 809 void VariableController::VariableControllerPrivate::updateVariableRequest(QUuid varRequestId)
814 810 {
815 811 auto varGroupIdToVarIdsIt = m_VarGroupIdToVarIds.find(varRequestId);
816 812 if (varGroupIdToVarIdsIt == m_VarGroupIdToVarIds.end()) {
817 813 // TODO LOG cannot update variable request since varGroupdId isn't here anymore
818 814 return;
819 815 }
820 816
821 817 auto &varIds = varGroupIdToVarIdsIt->second;
822 818 auto varIdsEnd = varIds.end();
823 819 for (auto varIdsIt = varIds.begin(); (varIdsIt != varIdsEnd); ++varIdsIt) {
824 820 auto itVarHandler = m_VarIdToVarRequestHandler.find(*varIdsIt);
825 821 if (itVarHandler != m_VarIdToVarRequestHandler.cend()) {
826 822
827 823 auto varHandler = itVarHandler->second.get();
828 824 varHandler->m_CanUpdate = false;
829 825
830 826
831 827 switch (varHandler->m_State) {
832 828 case VariableRequestHandlerState::OFF: {
833 829 qCCritical(LOG_VariableController())
834 830 << QObject::tr("Impossible to update a variable with handler in OFF state");
835 831 } break;
836 832 case VariableRequestHandlerState::RUNNING: {
837 833 varHandler->m_State = VariableRequestHandlerState::OFF;
838 834 varHandler->m_RunningVarRequest = VariableRequest{};
839 835 break;
840 836 }
841 837 case VariableRequestHandlerState::PENDING: {
842 838 varHandler->m_State = VariableRequestHandlerState::RUNNING;
843 839 varHandler->m_RunningVarRequest = varHandler->m_PendingVarRequest;
844 840 varHandler->m_PendingVarRequest = VariableRequest{};
845 841 auto var = findVariable(itVarHandler->first);
846 842 executeVarRequest(var, varHandler->m_RunningVarRequest);
847 843 break;
848 844 }
849 845 default:
850 846 qCCritical(LOG_VariableController())
851 847 << QObject::tr("Unknown VariableRequestHandlerState");
852 848 }
853 849 }
854 850 }
855 851 }
856 852
857 853
858 854 void VariableController::VariableControllerPrivate::cancelVariableRequest(QUuid varRequestId)
859 855 {
856 qCDebug(LOG_VariableController()) << tr("cancelVariableRequest") << varRequestId;
857
860 858 auto varGroupIdToVarIdsIt = m_VarGroupIdToVarIds.find(varRequestId);
861 859 if (varGroupIdToVarIdsIt == m_VarGroupIdToVarIds.end()) {
862 860 qCCritical(LOG_VariableController())
863 861 << tr("Impossible to cancelVariableRequest for unknown varGroupdId") << varRequestId;
864 862 return;
865 863 }
866 864
867 865 auto &varIds = varGroupIdToVarIdsIt->second;
868 866 auto varIdsEnd = varIds.end();
869 867 for (auto varIdsIt = varIds.begin(); (varIdsIt != varIdsEnd); ++varIdsIt) {
870 868 auto itVarHandler = m_VarIdToVarRequestHandler.find(*varIdsIt);
871 869 if (itVarHandler != m_VarIdToVarRequestHandler.cend()) {
872 870
873 871 auto varHandler = itVarHandler->second.get();
874 872 varHandler->m_CanUpdate = false;
875 873 varHandler->m_VarId = QUuid{};
876 874 switch (varHandler->m_State) {
877 875 case VariableRequestHandlerState::OFF: {
878 876 qCWarning(LOG_VariableController())
879 877 << QObject::tr("Impossible to cancel a variable with no running request");
880 878 break;
881 879 }
882 880 case VariableRequestHandlerState::RUNNING: {
883 881
884 882 if (varHandler->m_RunningVarRequest.m_VariableGroupId == varRequestId) {
885 883 auto var = findVariable(itVarHandler->first);
886 884 auto varProvider = m_VariableToProviderMap.at(var);
887 885 if (varProvider != nullptr) {
888 886 m_VariableAcquisitionWorker->abortProgressRequested(
889 887 itVarHandler->first);
890 888 }
891 889 m_VariableModel->setDataProgress(var, 0.0);
892 890 varHandler->m_State = VariableRequestHandlerState::OFF;
893 891 varHandler->m_RunningVarRequest = VariableRequest{};
894 892 }
895 893 else {
896 894 // TODO: log Impossible to cancel the running variable request beacause its
897 895 // varRequestId isn't not the canceled one
898 896 }
899 897 break;
900 898 }
901 899 case VariableRequestHandlerState::PENDING: {
902 900 if (varHandler->m_RunningVarRequest.m_VariableGroupId == varRequestId) {
903 901 auto var = findVariable(itVarHandler->first);
904 902 auto varProvider = m_VariableToProviderMap.at(var);
905 903 if (varProvider != nullptr) {
906 904 m_VariableAcquisitionWorker->abortProgressRequested(
907 905 itVarHandler->first);
908 906 }
909 907 m_VariableModel->setDataProgress(var, 0.0);
910 908 varHandler->m_State = VariableRequestHandlerState::RUNNING;
911 909 varHandler->m_RunningVarRequest = varHandler->m_PendingVarRequest;
912 910 executeVarRequest(var, varHandler->m_RunningVarRequest);
913 911 }
914 912 else if (varHandler->m_PendingVarRequest.m_VariableGroupId == varRequestId) {
915 913 varHandler->m_State = VariableRequestHandlerState::RUNNING;
916 914 varHandler->m_PendingVarRequest = VariableRequest{};
917 915 }
918 916 else {
919 917 // TODO: log Impossible to cancel the variable request beacause its
920 918 // varRequestId isn't not the canceled one
921 919 }
922 920 break;
923 921 }
924 922 default:
925 923 qCCritical(LOG_VariableController())
926 924 << QObject::tr("Unknown VariableRequestHandlerState");
927 925 }
928 926 }
929 927 }
928 qCDebug(LOG_VariableController()) << tr("cancelVariableRequest: erase") << varRequestId;
930 929 m_VarGroupIdToVarIds.erase(varRequestId);
931 930 }
932 931
933 932 void VariableController::VariableControllerPrivate::executeVarRequest(std::shared_ptr<Variable> var,
934 933 VariableRequest &varRequest)
935 934 {
936 qCCritical(LOG_VariableController()) << tr("TORM: executeVarRequest");
935 qCDebug(LOG_VariableController()) << tr("TORM: executeVarRequest");
937 936
938 937 auto varId = m_VariableToIdentifierMap.at(var);
939 938
940 939 auto varCacheRange = var->cacheRange();
941 940 auto varCacheRangeRequested = varRequest.m_CacheRangeRequested;
942 941 auto notInCacheRangeList
943 942 = Variable::provideNotInCacheRangeList(varCacheRange, varCacheRangeRequested);
944 943 // auto inCacheRangeList
945 944 // = Variable::provideInCacheRangeList(varCacheRange, varCacheRangeRequested);
946 945
947 946 if (!notInCacheRangeList.empty()) {
948 947
949 948 auto varProvider = m_VariableToProviderMap.at(var);
950 949 if (varProvider != nullptr) {
951 qCCritical(LOG_VariableController()) << "executeVarRequest "
952 << varRequest.m_RangeRequested
953 << varRequest.m_CacheRangeRequested;
950 qCDebug(LOG_VariableController()) << "executeVarRequest " << varRequest.m_RangeRequested
951 << varRequest.m_CacheRangeRequested;
954 952 m_VariableAcquisitionWorker->pushVariableRequest(
955 953 varRequest.m_VariableGroupId, varId, varRequest.m_RangeRequested,
956 954 varRequest.m_CacheRangeRequested,
957 955 DataProviderParameters{std::move(notInCacheRangeList), var->metadata()},
958 956 varProvider);
959 957 }
960 958 else {
961 959 qCCritical(LOG_VariableController())
962 960 << "Impossible to provide data with a null provider";
963 961 }
964 962
965 963 // if (!inCacheRangeList.empty()) {
966 964 // emit q->updateVarDisplaying(var, inCacheRangeList.first());
967 965 // }
968 966 }
969 967 else {
970 968 acceptVariableRequest(varId,
971 969 var->dataSeries()->subDataSeries(varRequest.m_CacheRangeRequested));
972 970 }
973 971 }
@@ -1,285 +1,282
1 1 #include "AmdaProvider.h"
2 2 #include "AmdaDefs.h"
3 3 #include "AmdaResultParser.h"
4 4
5 5 #include <Common/DateUtils.h>
6 6 #include <Data/DataProviderParameters.h>
7 7 #include <Network/NetworkController.h>
8 8 #include <SqpApplication.h>
9 9 #include <Variable/Variable.h>
10 10
11 11 #include <QNetworkAccessManager>
12 12 #include <QNetworkReply>
13 13 #include <QTemporaryFile>
14 14 #include <QThread>
15 15
16 16 Q_LOGGING_CATEGORY(LOG_AmdaProvider, "AmdaProvider")
17 17
18 18 namespace {
19 19
20 20 /// URL format for a request on AMDA server. The parameters are as follows:
21 21 /// - %1: start date
22 22 /// - %2: end date
23 23 /// - %3: parameter id
24 24 /// AMDA V2: http://amdatest.irap.omp.eu/php/rest/
25 25 const auto AMDA_URL_FORMAT = QStringLiteral(
26 26 "http://amda.irap.omp.eu/php/rest/"
27 27 "getParameter.php?startTime=%1&stopTime=%2&parameterID=%3&outputFormat=ASCII&"
28 28 "timeFormat=ISO8601&gzip=0");
29 29
30 30 /// Dates format passed in the URL (e.g 2013-09-23T09:00)
31 31 const auto AMDA_TIME_FORMAT = QStringLiteral("yyyy-MM-ddThh:mm:ss");
32 32
33 33 /// Formats a time to a date that can be passed in URL
34 34 QString dateFormat(double sqpRange) noexcept
35 35 {
36 36 auto dateTime = DateUtils::dateTime(sqpRange);
37 37 return dateTime.toString(AMDA_TIME_FORMAT);
38 38 }
39 39
40 40 AmdaResultParser::ValueType valueType(const QString &valueType)
41 41 {
42 42 if (valueType == QStringLiteral("scalar")) {
43 43 return AmdaResultParser::ValueType::SCALAR;
44 44 }
45 45 else if (valueType == QStringLiteral("vector")) {
46 46 return AmdaResultParser::ValueType::VECTOR;
47 47 }
48 48 else {
49 49 return AmdaResultParser::ValueType::UNKNOWN;
50 50 }
51 51 }
52 52
53 53 } // namespace
54 54
55 55 AmdaProvider::AmdaProvider()
56 56 {
57 57 qCDebug(LOG_AmdaProvider()) << tr("AmdaProvider::AmdaProvider") << QThread::currentThread();
58 58 if (auto app = sqpApp) {
59 59 auto &networkController = app->networkController();
60 60 connect(this, SIGNAL(requestConstructed(std::shared_ptr<QNetworkRequest>, QUuid,
61 61 std::function<void(QNetworkReply *, QUuid)>)),
62 62 &networkController,
63 63 SLOT(onProcessRequested(std::shared_ptr<QNetworkRequest>, QUuid,
64 64 std::function<void(QNetworkReply *, QUuid)>)));
65 65
66 66
67 67 connect(&sqpApp->networkController(),
68 68 SIGNAL(replyDownloadProgress(QUuid, std::shared_ptr<QNetworkRequest>, double)),
69 69 this,
70 70 SLOT(onReplyDownloadProgress(QUuid, std::shared_ptr<QNetworkRequest>, double)));
71 71 }
72 72 }
73 73
74 74 std::shared_ptr<IDataProvider> AmdaProvider::clone() const
75 75 {
76 76 // No copy is made in the clone
77 77 return std::make_shared<AmdaProvider>();
78 78 }
79 79
80 80 void AmdaProvider::requestDataLoading(QUuid acqIdentifier, const DataProviderParameters &parameters)
81 81 {
82 82 // NOTE: Try to use multithread if possible
83 83 const auto times = parameters.m_Times;
84 84 const auto data = parameters.m_Data;
85 85 for (const auto &dateTime : qAsConst(times)) {
86 86 qCDebug(LOG_AmdaProvider()) << tr("TORM AmdaProvider::requestDataLoading ") << acqIdentifier
87 87 << dateTime;
88 88 this->retrieveData(acqIdentifier, dateTime, data);
89 89
90 90
91 91 // TORM when AMDA will support quick asynchrone request
92 92 QThread::msleep(1000);
93 93 }
94 94 }
95 95
96 96 void AmdaProvider::requestDataAborting(QUuid acqIdentifier)
97 97 {
98 98 if (auto app = sqpApp) {
99 99 auto &networkController = app->networkController();
100 100 networkController.onReplyCanceled(acqIdentifier);
101 101 }
102 102 }
103 103
104 104 void AmdaProvider::onReplyDownloadProgress(QUuid acqIdentifier,
105 105 std::shared_ptr<QNetworkRequest> networkRequest,
106 106 double progress)
107 107 {
108 108 qCDebug(LOG_AmdaProvider()) << tr("onReplyDownloadProgress") << acqIdentifier
109 109 << networkRequest.get() << progress;
110 110 auto acqIdToRequestProgressMapIt = m_AcqIdToRequestProgressMap.find(acqIdentifier);
111 111 if (acqIdToRequestProgressMapIt != m_AcqIdToRequestProgressMap.end()) {
112 112
113 113 // Update the progression for the current request
114 114 auto requestPtr = networkRequest;
115 115 auto findRequest = [requestPtr](const auto &entry) { return requestPtr == entry.first; };
116 116
117 117 auto &requestProgressMap = acqIdToRequestProgressMapIt->second;
118 118 auto requestProgressMapEnd = requestProgressMap.end();
119 119 auto requestProgressMapIt
120 120 = std::find_if(requestProgressMap.begin(), requestProgressMapEnd, findRequest);
121 121
122 122 if (requestProgressMapIt != requestProgressMapEnd) {
123 123 requestProgressMapIt->second = progress;
124 124 }
125 125 else {
126 126 // This case can happened when a progression is send after the request has been
127 127 // finished.
128 128 // Generaly the case when aborting a request
129 129 qCDebug(LOG_AmdaProvider()) << tr("Can't retrieve Request in progress") << acqIdentifier
130 130 << networkRequest.get() << progress;
131 131 }
132 132
133 133 // Compute the current final progress and notify it
134 134 double finalProgress = 0.0;
135 135
136 136 auto fraq = requestProgressMap.size();
137 137
138 138 for (auto requestProgress : requestProgressMap) {
139 139 finalProgress += requestProgress.second;
140 140 qCDebug(LOG_AmdaProvider()) << tr("Current final progress without fraq:")
141 141 << finalProgress << requestProgress.second;
142 142 }
143 143
144 144 if (fraq > 0) {
145 145 finalProgress = finalProgress / fraq;
146 146 }
147 147
148 148 qCDebug(LOG_AmdaProvider()) << tr("Current final progress: ") << fraq << finalProgress;
149 149 emit dataProvidedProgress(acqIdentifier, finalProgress);
150 150 }
151 151 else {
152 152 // This case can happened when a progression is send after the request has been finished.
153 153 // Generaly the case when aborting a request
154 154 emit dataProvidedProgress(acqIdentifier, 100.0);
155 155 }
156 156 }
157 157
158 158 void AmdaProvider::retrieveData(QUuid token, const SqpRange &dateTime, const QVariantHash &data)
159 159 {
160 160 // Retrieves product ID from data: if the value is invalid, no request is made
161 161 auto productId = data.value(AMDA_XML_ID_KEY).toString();
162 162 if (productId.isNull()) {
163 163 qCCritical(LOG_AmdaProvider()) << tr("Can't retrieve data: unknown product id");
164 164 return;
165 165 }
166 166
167 167 // Retrieves the data type that determines whether the expected format for the result file is
168 168 // scalar, vector...
169 169 auto productValueType = valueType(data.value(AMDA_DATA_TYPE_KEY).toString());
170 170
171 171 // /////////// //
172 172 // Creates URL //
173 173 // /////////// //
174 174
175 175 auto startDate = dateFormat(dateTime.m_TStart);
176 176 auto endDate = dateFormat(dateTime.m_TEnd);
177 177
178 178 auto url = QUrl{QString{AMDA_URL_FORMAT}.arg(startDate, endDate, productId)};
179 179 qCInfo(LOG_AmdaProvider()) << tr("TORM AmdaProvider::retrieveData url:") << url;
180 180 auto tempFile = std::make_shared<QTemporaryFile>();
181 181
182 182 // LAMBDA
183 183 auto httpDownloadFinished = [this, dateTime, tempFile,
184 184 productValueType](QNetworkReply *reply, QUuid dataId) noexcept {
185 185
186 186 // Don't do anything if the reply was abort
187 187 if (reply->error() == QNetworkReply::NoError) {
188 188
189 189 if (tempFile) {
190 190 auto replyReadAll = reply->readAll();
191 191 if (!replyReadAll.isEmpty()) {
192 192 tempFile->write(replyReadAll);
193 193 }
194 194 tempFile->close();
195 195
196 196 // Parse results file
197 197 if (auto dataSeries
198 198 = AmdaResultParser::readTxt(tempFile->fileName(), productValueType)) {
199 199 emit dataProvided(dataId, dataSeries, dateTime);
200 200 }
201 201 else {
202 202 /// @todo ALX : debug
203 203 emit dataProvidedFailed(dataId);
204 204 }
205 205 }
206 qCDebug(LOG_AmdaProvider()) << tr("acquisition requests erase because of finishing")
207 << dataId;
208 206 m_AcqIdToRequestProgressMap.erase(dataId);
209 207 }
210 208 else {
211 209 qCCritical(LOG_AmdaProvider()) << tr("httpDownloadFinished ERROR");
212 210 emit dataProvidedFailed(dataId);
213 211 }
214 212
215 213 };
216 214 auto httpFinishedLambda
217 215 = [this, httpDownloadFinished, tempFile](QNetworkReply *reply, QUuid dataId) noexcept {
218 216
219 217 // Don't do anything if the reply was abort
220 218 if (reply->error() == QNetworkReply::NoError) {
221 219 // AMDA v2: auto downloadFileUrl = QUrl{QString{reply->readAll()}.trimmed()};
222 220 auto downloadFileUrl = QUrl{QString{reply->readAll()}};
223 221
224 222 qCInfo(LOG_AmdaProvider())
225 223 << tr("TORM AmdaProvider::retrieveData downloadFileUrl:") << downloadFileUrl;
226 224 // Executes request for downloading file //
227 225
228 226 // Creates destination file
229 227 if (tempFile->open()) {
230 228 // Executes request and store the request for progression
231 229 auto request = std::make_shared<QNetworkRequest>(downloadFileUrl);
232 230 updateRequestProgress(dataId, request, 0.0);
233 231 emit requestConstructed(request, dataId, httpDownloadFinished);
234 232 }
235 233 else {
236 234 emit dataProvidedFailed(dataId);
237 235 }
238 236 }
239 237 else {
240 qCDebug(LOG_AmdaProvider())
241 << tr("acquisition requests erase because of aborting") << dataId;
242 238 qCCritical(LOG_AmdaProvider()) << tr("httpFinishedLambda ERROR");
243 239 m_AcqIdToRequestProgressMap.erase(dataId);
244 240 emit dataProvidedFailed(dataId);
245 241 }
246 242 };
247 243
248 244 // //////////////// //
249 245 // Executes request //
250 246 // //////////////// //
251 247
252 248 auto request = std::make_shared<QNetworkRequest>(url);
253 249 qCDebug(LOG_AmdaProvider()) << tr("First Request creation") << request.get();
254 250 updateRequestProgress(token, request, 0.0);
255 251
256 252 emit requestConstructed(request, token, httpFinishedLambda);
257 253 }
258 254
259 255 void AmdaProvider::updateRequestProgress(QUuid acqIdentifier,
260 256 std::shared_ptr<QNetworkRequest> request, double progress)
261 257 {
258 qCDebug(LOG_AmdaProvider()) << tr("updateRequestProgress request") << request.get();
262 259 auto acqIdToRequestProgressMapIt = m_AcqIdToRequestProgressMap.find(acqIdentifier);
263 260 if (acqIdToRequestProgressMapIt != m_AcqIdToRequestProgressMap.end()) {
264 261 auto &requestProgressMap = acqIdToRequestProgressMapIt->second;
265 262 auto requestProgressMapIt = requestProgressMap.find(request);
266 263 if (requestProgressMapIt != requestProgressMap.end()) {
267 264 requestProgressMapIt->second = progress;
268 265 qCDebug(LOG_AmdaProvider()) << tr("updateRequestProgress new progress for request")
269 266 << acqIdentifier << request.get() << progress;
270 267 }
271 268 else {
272 269 qCDebug(LOG_AmdaProvider()) << tr("updateRequestProgress new request") << acqIdentifier
273 270 << request.get() << progress;
274 271 acqIdToRequestProgressMapIt->second.insert(std::make_pair(request, progress));
275 272 }
276 273 }
277 274 else {
278 275 qCDebug(LOG_AmdaProvider()) << tr("updateRequestProgress new acqIdentifier")
279 276 << acqIdentifier << request.get() << progress;
280 277 auto requestProgressMap = std::map<std::shared_ptr<QNetworkRequest>, double>{};
281 278 requestProgressMap.insert(std::make_pair(request, progress));
282 279 m_AcqIdToRequestProgressMap.insert(
283 280 std::make_pair(acqIdentifier, std::move(requestProgressMap)));
284 281 }
285 282 }
@@ -1,237 +1,240
1 1 #include "AmdaResultParser.h"
2 2
3 3 #include <Common/DateUtils.h>
4 4 #include <Data/ScalarSeries.h>
5 5 #include <Data/VectorSeries.h>
6 6
7 7 #include <QDateTime>
8 8 #include <QFile>
9 9 #include <QRegularExpression>
10 10
11 11 #include <cmath>
12 12
13 13 Q_LOGGING_CATEGORY(LOG_AmdaResultParser, "AmdaResultParser")
14 14
15 15 namespace {
16 16
17 17 /// Message in result file when the file was not found on server
18 18 const auto FILE_NOT_FOUND_MESSAGE = QStringLiteral("Not Found");
19 19
20 20 /// Separator between values in a result line
21 21 const auto RESULT_LINE_SEPARATOR = QRegularExpression{QStringLiteral("\\s+")};
22 22
23 23 // AMDA V2
24 24 // /// Regex to find the header of the data in the file. This header indicates the end of comments
25 25 // in
26 26 // /// the file
27 27 // const auto DATA_HEADER_REGEX = QRegularExpression{QStringLiteral("#\\s*DATA\\s*:")};
28 28
29 29 /// Format for dates in result files
30 30 const auto DATE_FORMAT = QStringLiteral("yyyy-MM-ddThh:mm:ss.zzz");
31 31
32 32 // AMDA V2
33 33 // /// Regex to find unit in a line. Examples of valid lines:
34 34 // /// ... PARAMETER_UNITS : nT ...
35 35 // /// ... PARAMETER_UNITS:nT ...
36 36 // /// ... PARAMETER_UNITS: mΒ² ...
37 37 // /// ... PARAMETER_UNITS : m/s ...
38 38 // const auto UNIT_REGEX = QRegularExpression{QStringLiteral("\\s*PARAMETER_UNITS\\s*:\\s*(.+)")};
39 39
40 40 /// ... - Units : nT - ...
41 41 /// ... -Units:nT- ...
42 42 /// ... -Units: mΒ²- ...
43 43 /// ... - Units : m/s - ...
44 44 const auto UNIT_REGEX = QRegularExpression{QStringLiteral("-\\s*Units\\s*:\\s*(.+?)\\s*-")};
45 45
46 46 QDateTime dateTimeFromString(const QString &stringDate) noexcept
47 47 {
48 48 #if QT_VERSION >= QT_VERSION_CHECK(5, 8, 0)
49 49 return QDateTime::fromString(stringDate, Qt::ISODateWithMs);
50 50 #else
51 51 return QDateTime::fromString(stringDate, DATE_FORMAT);
52 52 #endif
53 53 }
54 54
55 55 /// Converts a string date to a double date
56 56 /// @return a double that represents the date in seconds, NaN if the string date can't be converted
57 57 double doubleDate(const QString &stringDate) noexcept
58 58 {
59 59 // Format: yyyy-MM-ddThh:mm:ss.zzz
60 60 auto dateTime = dateTimeFromString(stringDate);
61 61 dateTime.setTimeSpec(Qt::UTC);
62 62 return dateTime.isValid() ? DateUtils::secondsSinceEpoch(dateTime)
63 63 : std::numeric_limits<double>::quiet_NaN();
64 64 }
65 65
66 66 /// Checks if a line is a comment line
67 67 bool isCommentLine(const QString &line)
68 68 {
69 69 return line.startsWith("#");
70 70 }
71 71
72 72 /// @return the number of lines to be read depending on the type of value passed in parameter
73 73 int nbValues(AmdaResultParser::ValueType valueType) noexcept
74 74 {
75 75 switch (valueType) {
76 76 case AmdaResultParser::ValueType::SCALAR:
77 77 return 1;
78 78 case AmdaResultParser::ValueType::VECTOR:
79 79 return 3;
80 80 case AmdaResultParser::ValueType::UNKNOWN:
81 81 // Invalid case
82 82 break;
83 83 }
84 84
85 85 // Invalid cases
86 86 qCCritical(LOG_AmdaResultParser())
87 87 << QObject::tr("Can't get the number of values to read: unsupported type");
88 88 return 0;
89 89 }
90 90
91 91 /**
92 92 * Reads stream to retrieve x-axis unit
93 93 * @param stream the stream to read
94 94 * @return the unit that has been read in the stream, a default unit (time unit with no label) if an
95 95 * error occured during reading
96 96 */
97 97 Unit readXAxisUnit(QTextStream &stream)
98 98 {
99 99 QString line{};
100 100
101 101 // Searches unit in the comment lines (as long as the reading has not reached the data header)
102 102 // AMDA V2: while (stream.readLineInto(&line) && !line.contains(DATA_HEADER_REGEX)) {
103 103 while (stream.readLineInto(&line) && isCommentLine(line)) {
104 104 auto match = UNIT_REGEX.match(line);
105 105 if (match.hasMatch()) {
106 106 return Unit{match.captured(1), true};
107 107 }
108 108 }
109 109
110 110 qCWarning(LOG_AmdaResultParser()) << QObject::tr("The unit could not be found in the file");
111 111
112 112 // Error cases
113 113 return Unit{{}, true};
114 114 }
115 115
116 116 /**
117 117 * Reads stream to retrieve results
118 118 * @param stream the stream to read
119 119 * @return the pair of vectors x-axis data/values data that has been read in the stream
120 120 */
121 121 std::pair<std::vector<double>, std::vector<double> >
122 122 readResults(QTextStream &stream, AmdaResultParser::ValueType valueType)
123 123 {
124 124 auto expectedNbValues = nbValues(valueType) + 1;
125 125
126 126 auto xData = std::vector<double>{};
127 127 auto valuesData = std::vector<double>{};
128 128
129 129 QString line{};
130 130
131 131 // Skip comment lines
132 132 while (stream.readLineInto(&line) && isCommentLine(line)) {
133 133 }
134 134
135 135 if (!stream.atEnd()) {
136 136 do {
137 137 auto lineData = line.split(RESULT_LINE_SEPARATOR, QString::SkipEmptyParts);
138 138 if (lineData.size() == expectedNbValues) {
139 139 // X : the data is converted from date to double (in secs)
140 140 auto x = doubleDate(lineData.at(0));
141 141
142 142 // Adds result only if x is valid. Then, if value is invalid, it is set to NaN
143 143 if (!std::isnan(x)) {
144 144 xData.push_back(x);
145 145
146 146 // Values
147 147 for (auto valueIndex = 1; valueIndex < expectedNbValues; ++valueIndex) {
148 148 auto column = valueIndex;
149 149
150 150 bool valueOk;
151 151 auto value = lineData.at(column).toDouble(&valueOk);
152 152
153 153 if (!valueOk) {
154 154 qCWarning(LOG_AmdaResultParser())
155 155 << QObject::tr(
156 156 "Value from (line %1, column %2) is invalid and will be "
157 157 "converted to NaN")
158 158 .arg(line, column);
159 159 value = std::numeric_limits<double>::quiet_NaN();
160 160 }
161 161 valuesData.push_back(value);
162 162 }
163 163 }
164 164 else {
165 165 qCWarning(LOG_AmdaResultParser())
166 166 << QObject::tr("Can't retrieve results from line %1: x is invalid")
167 167 .arg(line);
168 168 }
169 169 }
170 170 else {
171 171 qCWarning(LOG_AmdaResultParser())
172 172 << QObject::tr("Can't retrieve results from line %1: invalid line").arg(line);
173 173 }
174 174 } while (stream.readLineInto(&line));
175 175 }
176 176
177 177 return std::make_pair(std::move(xData), std::move(valuesData));
178 178 }
179 179
180 180 } // namespace
181 181
182 182 std::shared_ptr<IDataSeries> AmdaResultParser::readTxt(const QString &filePath,
183 183 ValueType valueType) noexcept
184 184 {
185 185 if (valueType == ValueType::UNKNOWN) {
186 186 qCCritical(LOG_AmdaResultParser())
187 187 << QObject::tr("Can't retrieve AMDA data: the type of values to be read is unknown");
188 188 return nullptr;
189 189 }
190 190
191 191 QFile file{filePath};
192 192
193 193 if (!file.open(QFile::ReadOnly | QIODevice::Text)) {
194 194 qCCritical(LOG_AmdaResultParser())
195 195 << QObject::tr("Can't retrieve AMDA data from file %1: %2")
196 196 .arg(filePath, file.errorString());
197 197 return nullptr;
198 198 }
199 199
200 200 QTextStream stream{&file};
201 201
202 202 // Checks if the file was found on the server
203 203 auto firstLine = stream.readLine();
204 204 if (firstLine.compare(FILE_NOT_FOUND_MESSAGE) == 0) {
205 205 qCCritical(LOG_AmdaResultParser())
206 206 << QObject::tr("Can't retrieve AMDA data from file %1: file was not found on server")
207 207 .arg(filePath);
208 208 return nullptr;
209 209 }
210 210
211 211 // Reads x-axis unit
212 212 stream.seek(0); // returns to the beginning of the file
213 213 auto xAxisUnit = readXAxisUnit(stream);
214 if (xAxisUnit.m_Name.isEmpty()) {
215 return nullptr;
216 }
214 217
215 218 // Reads results
216 219 // AMDA V2: remove line
217 220 stream.seek(0); // returns to the beginning of the file
218 221 auto results = readResults(stream, valueType);
219 222
220 223 // Creates data series
221 224 switch (valueType) {
222 225 case ValueType::SCALAR:
223 226 return std::make_shared<ScalarSeries>(std::move(results.first),
224 227 std::move(results.second), xAxisUnit, Unit{});
225 228 case ValueType::VECTOR:
226 229 return std::make_shared<VectorSeries>(std::move(results.first),
227 230 std::move(results.second), xAxisUnit, Unit{});
228 231 case ValueType::UNKNOWN:
229 232 // Invalid case
230 233 break;
231 234 }
232 235
233 236 // Invalid cases
234 237 qCCritical(LOG_AmdaResultParser())
235 238 << QObject::tr("Can't create data series: unsupported value type");
236 239 return nullptr;
237 240 }
General Comments 3
Under Review
author

Auto status change to "Under Review"

Approved
author

Merge lasted acquisition developpement on main Sciqlop branch

You need to be logged in to leave comments. Login now