##// END OF EJS Templates
Implementation of progression
perrinel -
r606:7ea0025fca62 feature/SynchroIm...
parent child
Show More
@@ -1,38 +1,40
1 1 #ifndef SCIQLOP_ACQUISITIONREQUEST_H
2 2 #define SCIQLOP_ACQUISITIONREQUEST_H
3 3
4 4 #include <QObject>
5 5
6 6 #include <QUuid>
7 7
8 8 #include <Common/DateUtils.h>
9 9 #include <Common/MetaTypes.h>
10 10 #include <Data/DataProviderParameters.h>
11 11 #include <Data/IDataProvider.h>
12 12 #include <Data/SqpRange.h>
13 13
14 14 #include <memory>
15 15
16 16 /**
17 17 * @brief The AcquisitionRequest struct holds the information of an variable request
18 18 */
19 19 struct AcquisitionRequest {
20 20 AcquisitionRequest()
21 21 {
22 22 m_AcqIdentifier = QUuid::createUuid();
23 23 m_Size = 0;
24 m_Progression = 0;
24 25 }
25 26
26 27 QUuid m_VarRequestId;
27 28 QUuid m_AcqIdentifier;
28 29 QUuid m_vIdentifier;
29 30 DataProviderParameters m_DataProviderParameters;
30 31 SqpRange m_RangeRequested;
31 32 SqpRange m_CacheRangeRequested;
32 33 int m_Size;
34 int m_Progression;
33 35 std::shared_ptr<IDataProvider> m_Provider;
34 36 };
35 37
36 38 SCIQLOP_REGISTER_META_TYPE(ACQUISITIONREQUEST_REGISTRY, AcquisitionRequest)
37 39
38 40 #endif // SCIQLOP_ACQUISITIONREQUEST_H
@@ -1,48 +1,49
1 1 #ifndef SCIQLOP_NETWORKCONTROLLER_H
2 2 #define SCIQLOP_NETWORKCONTROLLER_H
3 3
4 4 #include "CoreGlobal.h"
5 5
6 6 #include <QLoggingCategory>
7 7 #include <QObject>
8 8 #include <QUuid>
9 9
10 10 #include <Common/spimpl.h>
11 11 #include <functional>
12 12
13 13 Q_DECLARE_LOGGING_CATEGORY(LOG_NetworkController)
14 14
15 15 class QNetworkReply;
16 16 class QNetworkRequest;
17 17
18 18 /**
19 19 * @brief The NetworkController class aims to handle all network connection of SciQlop.
20 20 */
21 21 class SCIQLOP_CORE_EXPORT NetworkController : public QObject {
22 22 Q_OBJECT
23 23 public:
24 24 explicit NetworkController(QObject *parent = 0);
25 25
26 26 void initialize();
27 27 void finalize();
28 28
29 29 public slots:
30 30 /// Execute request and call callback when the reply is finished. Identifier is attached to the
31 31 /// callback
32 32 void onProcessRequested(const QNetworkRequest &request, QUuid identifier,
33 33 std::function<void(QNetworkReply *, QUuid)> callback);
34 34 /// Cancel the request of identifier
35 35 void onReplyCanceled(QUuid identifier);
36 36
37 37 signals:
38 38 void replyFinished(QNetworkReply *reply, QUuid identifier);
39 void replyDownloadProgress(QUuid identifier, double progress);
39 void replyDownloadProgress(QUuid identifier, const QNetworkRequest &networkRequest,
40 double progress);
40 41
41 42 private:
42 43 void waitForFinish();
43 44
44 45 class NetworkControllerPrivate;
45 46 spimpl::unique_impl_ptr<NetworkControllerPrivate> impl;
46 47 };
47 48
48 49 #endif // SCIQLOP_NETWORKCONTROLLER_H
@@ -1,134 +1,134
1 1 #include "Network/NetworkController.h"
2 2
3 3 #include <QMutex>
4 4 #include <QNetworkAccessManager>
5 5 #include <QNetworkReply>
6 6 #include <QNetworkRequest>
7 7 #include <QReadWriteLock>
8 8 #include <QThread>
9 9
10 10 #include <unordered_map>
11 11
12 12 Q_LOGGING_CATEGORY(LOG_NetworkController, "NetworkController")
13 13
14 14 struct NetworkController::NetworkControllerPrivate {
15 15 explicit NetworkControllerPrivate(NetworkController *parent) : m_WorkingMutex{} {}
16 16
17 17 void lockRead() { m_Lock.lockForRead(); }
18 18 void lockWrite() { m_Lock.lockForWrite(); }
19 19 void unlock() { m_Lock.unlock(); }
20 20
21 21 QMutex m_WorkingMutex;
22 22
23 23 QReadWriteLock m_Lock;
24 24 std::unordered_map<QNetworkReply *, QUuid> m_NetworkReplyToVariableId;
25 25 std::unique_ptr<QNetworkAccessManager> m_AccessManager{nullptr};
26 26 };
27 27
28 28 NetworkController::NetworkController(QObject *parent)
29 29 : QObject(parent), impl{spimpl::make_unique_impl<NetworkControllerPrivate>(this)}
30 30 {
31 31 }
32 32
33 33 void NetworkController::onProcessRequested(const QNetworkRequest &request, QUuid identifier,
34 34 std::function<void(QNetworkReply *, QUuid)> callback)
35 35 {
36 36 qCDebug(LOG_NetworkController()) << tr("NetworkController registered")
37 37 << QThread::currentThread()->objectName();
38 38 auto reply = impl->m_AccessManager->get(request);
39 39
40 40 // Store the couple reply id
41 41 impl->lockWrite();
42 42 impl->m_NetworkReplyToVariableId[reply] = identifier;
43 43 impl->unlock();
44 44
45 auto onReplyFinished = [reply, this, identifier, callback]() {
45 auto onReplyFinished = [request, reply, this, identifier, callback]() {
46 46
47 47 qCDebug(LOG_NetworkController()) << tr("NetworkController onReplyFinished")
48 48 << QThread::currentThread() << reply;
49 49 impl->lockRead();
50 50 auto it = impl->m_NetworkReplyToVariableId.find(reply);
51 51 impl->unlock();
52 52 if (it != impl->m_NetworkReplyToVariableId.cend()) {
53 53 impl->lockWrite();
54 54 impl->m_NetworkReplyToVariableId.erase(reply);
55 55 impl->unlock();
56 56 // Deletes reply
57 57 callback(reply, identifier);
58 58 reply->deleteLater();
59 59
60 emit this->replyDownloadProgress(identifier, 0);
60 emit this->replyDownloadProgress(identifier, request, 0);
61 61 }
62 62
63 63 qCDebug(LOG_NetworkController()) << tr("NetworkController onReplyFinished END")
64 64 << QThread::currentThread() << reply;
65 65 };
66 66
67 auto onReplyProgress = [reply, this](qint64 bytesRead, qint64 totalBytes) {
67 auto onReplyProgress = [reply, request, this](qint64 bytesRead, qint64 totalBytes) {
68 68
69 69 double progress = (bytesRead * 100.0) / totalBytes;
70 70 qCDebug(LOG_NetworkController()) << tr("NetworkController onReplyProgress") << progress
71 71 << QThread::currentThread() << reply;
72 72 impl->lockRead();
73 73 auto it = impl->m_NetworkReplyToVariableId.find(reply);
74 74 impl->unlock();
75 75 if (it != impl->m_NetworkReplyToVariableId.cend()) {
76 emit this->replyDownloadProgress(it->second, progress);
76 emit this->replyDownloadProgress(it->second, request, progress);
77 77 }
78 78 qCDebug(LOG_NetworkController()) << tr("NetworkController onReplyProgress END")
79 79 << QThread::currentThread() << reply;
80 80 };
81 81
82 82
83 83 connect(reply, &QNetworkReply::finished, this, onReplyFinished);
84 84 connect(reply, &QNetworkReply::downloadProgress, this, onReplyProgress);
85 85 qCDebug(LOG_NetworkController()) << tr("NetworkController registered END")
86 86 << QThread::currentThread()->objectName() << reply;
87 87 }
88 88
89 89 void NetworkController::initialize()
90 90 {
91 91 qCDebug(LOG_NetworkController()) << tr("NetworkController init") << QThread::currentThread();
92 92 impl->m_WorkingMutex.lock();
93 93 impl->m_AccessManager = std::make_unique<QNetworkAccessManager>();
94 94
95 95
96 96 auto onReplyErrors = [this](QNetworkReply *reply, const QList<QSslError> &errors) {
97 97
98 98 qCCritical(LOG_NetworkController()) << tr("NetworkAcessManager errors: ") << errors;
99 99
100 100 };
101 101
102 102
103 103 connect(impl->m_AccessManager.get(), &QNetworkAccessManager::sslErrors, this, onReplyErrors);
104 104
105 105 qCDebug(LOG_NetworkController()) << tr("NetworkController init END");
106 106 }
107 107
108 108 void NetworkController::finalize()
109 109 {
110 110 impl->m_WorkingMutex.unlock();
111 111 }
112 112
113 113 void NetworkController::onReplyCanceled(QUuid identifier)
114 114 {
115 115 auto findReply = [identifier](const auto &entry) { return identifier == entry.second; };
116 116 qCDebug(LOG_NetworkController()) << tr("NetworkController onReplyCanceled")
117 117 << QThread::currentThread();
118 118
119 119
120 120 impl->lockRead();
121 121 auto end = impl->m_NetworkReplyToVariableId.cend();
122 122 auto it = std::find_if(impl->m_NetworkReplyToVariableId.cbegin(), end, findReply);
123 123 impl->unlock();
124 124 if (it != end) {
125 125 it->first->abort();
126 126 }
127 127 qCDebug(LOG_NetworkController()) << tr("NetworkController onReplyCanceled END")
128 128 << QThread::currentThread();
129 129 }
130 130
131 131 void NetworkController::waitForFinish()
132 132 {
133 133 QMutexLocker locker{&impl->m_WorkingMutex};
134 134 }
@@ -1,238 +1,271
1 1 #include "Variable/VariableAcquisitionWorker.h"
2 2
3 3 #include "Variable/Variable.h"
4 4
5 5 #include <Data/AcquisitionRequest.h>
6 6 #include <Data/SqpRange.h>
7 7
8 8 #include <unordered_map>
9 9 #include <utility>
10 10
11 11 #include <QMutex>
12 12 #include <QReadWriteLock>
13 13 #include <QThread>
14 14
15 #include <cmath>
16
15 17 Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker")
16 18
17 19 struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate {
18 20
19 21 explicit VariableAcquisitionWorkerPrivate() : m_Lock{QReadWriteLock::Recursive} {}
20 22
21 23 void lockRead() { m_Lock.lockForRead(); }
22 24 void lockWrite() { m_Lock.lockForWrite(); }
23 25 void unlock() { m_Lock.unlock(); }
24 26
25 27 void removeVariableRequest(QUuid vIdentifier);
26 28
27 29 QMutex m_WorkingMutex;
28 30 QReadWriteLock m_Lock;
29 31
30 32 std::map<QUuid, QVector<AcquisitionDataPacket> > m_AcqIdentifierToAcqDataPacketVectorMap;
31 33 std::map<QUuid, AcquisitionRequest> m_AcqIdentifierToAcqRequestMap;
32 34 std::map<QUuid, std::pair<QUuid, QUuid> > m_VIdentifierToCurrrentAcqIdNextIdPairMap;
33 35 };
34 36
35 37
36 38 VariableAcquisitionWorker::VariableAcquisitionWorker(QObject *parent)
37 39 : QObject{parent}, impl{spimpl::make_unique_impl<VariableAcquisitionWorkerPrivate>()}
38 40 {
39 41 }
40 42
41 43 VariableAcquisitionWorker::~VariableAcquisitionWorker()
42 44 {
43 45 qCInfo(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker destruction")
44 46 << QThread::currentThread();
45 47 this->waitForFinish();
46 48 }
47 49
48 50
49 51 QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid vIdentifier,
50 52 SqpRange rangeRequested,
51 53 SqpRange cacheRangeRequested,
52 54 DataProviderParameters parameters,
53 55 std::shared_ptr<IDataProvider> provider)
54 56 {
55 57 qCDebug(LOG_VariableAcquisitionWorker())
56 58 << tr("TORM VariableAcquisitionWorker::pushVariableRequest ") << cacheRangeRequested;
57 59 auto varRequestIdCanceled = QUuid();
58 60
59 61 // Request creation
60 62 auto acqRequest = AcquisitionRequest{};
61 63 acqRequest.m_VarRequestId = varRequestId;
62 64 acqRequest.m_vIdentifier = vIdentifier;
63 65 acqRequest.m_DataProviderParameters = parameters;
64 66 acqRequest.m_RangeRequested = rangeRequested;
65 67 acqRequest.m_CacheRangeRequested = cacheRangeRequested;
66 68 acqRequest.m_Size = parameters.m_Times.size();
67 69 acqRequest.m_Provider = provider;
68 70
69 71
70 72 // Register request
71 73 impl->lockWrite();
72 74 impl->m_AcqIdentifierToAcqRequestMap.insert(
73 75 std::make_pair(acqRequest.m_AcqIdentifier, acqRequest));
74 76
75 77 auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
76 78 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
77 79 // A current request already exists, we can replace the next one
78 80 auto nextAcqId = it->second.second;
79 81 auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(nextAcqId);
80 82 if (acqIdentifierToAcqRequestMapIt != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
81 83 auto request = acqIdentifierToAcqRequestMapIt->second;
82 84 varRequestIdCanceled = request.m_VarRequestId;
83 85 }
84 86
85 87 it->second.second = acqRequest.m_AcqIdentifier;
86 88 impl->unlock();
87 89 }
88 90 else {
89 91 // First request for the variable, it must be stored and executed
90 92 impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.insert(
91 93 std::make_pair(vIdentifier, std::make_pair(acqRequest.m_AcqIdentifier, QUuid())));
92 94 impl->unlock();
93 95
94 96 QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
95 97 Q_ARG(QUuid, acqRequest.m_AcqIdentifier));
96 98 }
97 99
98 100 return varRequestIdCanceled;
99 101 }
100 102
101 103 void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier)
102 104 {
103 105 // TODO
104 106 }
105 107
106 108 void VariableAcquisitionWorker::onVariableRetrieveDataInProgress(QUuid acqIdentifier,
107 109 double progress)
108 110 {
109 // TODO
111 impl->lockRead();
112 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
113 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
114 auto currentPartSize = (aIdToARit->second.m_Size != 0) ? 100 / aIdToARit->second.m_Size : 0;
115
116 auto currentPartProgress
117 = std::isnan(progress) ? 0.0 : (progress * currentPartSize) / 100.0;
118 auto currentAlreadyProgress = aIdToARit->second.m_Progression * currentPartSize;
119
120 qCInfo(LOG_VariableAcquisitionWorker()) << tr("TORM: progress :") << progress;
121 qCInfo(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableRetrieveDataInProgress A:")
122 << aIdToARit->second.m_Progression
123 << aIdToARit->second.m_Size;
124 qCInfo(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableRetrieveDataInProgress B:")
125 << currentPartSize;
126 qCInfo(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableRetrieveDataInProgress C:")
127 << currentPartProgress;
128 qCInfo(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableRetrieveDataInProgress D:")
129 << currentAlreadyProgress;
130 qCInfo(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableRetrieveDataInProgress E:")
131 << currentAlreadyProgress + currentPartProgress
132 << "\n";
133
134 auto finalProgression = currentAlreadyProgress + currentPartProgress;
135 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, finalProgression);
136
137 if (finalProgression == 100.0) {
138 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, 0.0);
139 }
140 }
141 impl->unlock();
110 142 }
111 143
112 144 void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier,
113 145 std::shared_ptr<IDataSeries> dataSeries,
114 146 SqpRange dataRangeAcquired)
115 147 {
116 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableDataAcquired on range ")
117 << acqIdentifier << dataRangeAcquired;
148 qCInfo(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableDataAcquired on range ")
149 << acqIdentifier << dataRangeAcquired;
118 150 impl->lockWrite();
119 151 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
120 152 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
121 153 // Store the result
122 154 auto dataPacket = AcquisitionDataPacket{};
123 155 dataPacket.m_Range = dataRangeAcquired;
124 156 dataPacket.m_DateSeries = dataSeries;
125 157
126 158 auto aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
127 159 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
128 160 // A current request result already exists, we can update it
129 161 aIdToADPVit->second.push_back(dataPacket);
130 162 }
131 163 else {
132 164 // First request result for the variable, it must be stored
133 165 impl->m_AcqIdentifierToAcqDataPacketVectorMap.insert(
134 166 std::make_pair(acqIdentifier, QVector<AcquisitionDataPacket>() << dataPacket));
135 167 }
136 168
137 169
138 170 // Decrement the counter of the request
139 171 auto &acqRequest = aIdToARit->second;
140 acqRequest.m_Size = acqRequest.m_Size - 1;
172 acqRequest.m_Progression = acqRequest.m_Progression + 1;
141 173
142 174 // if the counter is 0, we can return data then run the next request if it exists and
143 175 // removed the finished request
144 if (acqRequest.m_Size == 0) {
176 if (acqRequest.m_Size == acqRequest.m_Progression) {
145 177 // Return the data
146 178 aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
147 179 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
148 180 emit dataProvided(acqRequest.m_vIdentifier, acqRequest.m_RangeRequested,
149 181 acqRequest.m_CacheRangeRequested, aIdToADPVit->second);
150 182 }
151 183
152 184 // Execute the next one
153 185 auto it
154 186 = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(acqRequest.m_vIdentifier);
155 187
156 188 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
157 189 if (it->second.second.isNull()) {
158 190 // There is no next request, we can remove the variable request
159 191 impl->removeVariableRequest(acqRequest.m_vIdentifier);
160 192 }
161 193 else {
162 194 auto acqIdentifierToRemove = it->second.first;
163 195 // Move the next request to the current request
164 196 it->second.first = it->second.second;
165 197 it->second.second = QUuid();
166 198 // Remove AcquisitionRequest and results;
167 199 impl->m_AcqIdentifierToAcqRequestMap.erase(acqIdentifierToRemove);
168 200 impl->m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqIdentifierToRemove);
169 201 // Execute the current request
170 202 QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
171 203 Q_ARG(QUuid, it->second.first));
172 204 }
173 205 }
174 206 else {
175 207 qCCritical(LOG_VariableAcquisitionWorker())
176 208 << tr("Impossible to execute the acquisition on an unfound variable ");
177 209 }
178 210 }
179 211 }
180 212 else {
181 213 qCCritical(LOG_VariableAcquisitionWorker())
182 214 << tr("Impossible to retrieve AcquisitionRequest for the incoming data");
183 215 }
184 216 impl->unlock();
185 217 }
186 218
187 219 void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier)
188 220 {
189 221 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onExecuteRequest") << QThread::currentThread();
190 222 impl->lockRead();
191 223 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
192 224 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
193 225 auto request = it->second;
194 226 impl->unlock();
227 emit variableRequestInProgress(request.m_vIdentifier, 0.1);
195 228 request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters);
196 229 }
197 230 else {
198 231 impl->unlock();
199 232 // TODO log no acqIdentifier recognized
200 233 }
201 234 }
202 235
203 236 void VariableAcquisitionWorker::initialize()
204 237 {
205 238 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init")
206 239 << QThread::currentThread();
207 240 impl->m_WorkingMutex.lock();
208 241 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init END");
209 242 }
210 243
211 244 void VariableAcquisitionWorker::finalize()
212 245 {
213 246 impl->m_WorkingMutex.unlock();
214 247 }
215 248
216 249 void VariableAcquisitionWorker::waitForFinish()
217 250 {
218 251 QMutexLocker locker{&impl->m_WorkingMutex};
219 252 }
220 253
221 254 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariableRequest(
222 255 QUuid vIdentifier)
223 256 {
224 257 lockWrite();
225 258 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
226 259
227 260 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
228 261 // A current request already exists, we can replace the next one
229 262
230 263 m_AcqIdentifierToAcqRequestMap.erase(it->second.first);
231 264 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.first);
232 265
233 266 m_AcqIdentifierToAcqRequestMap.erase(it->second.second);
234 267 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.second);
235 268 }
236 269 m_VIdentifierToCurrrentAcqIdNextIdPairMap.erase(vIdentifier);
237 270 unlock();
238 271 }
@@ -1,738 +1,740
1 1 #include <Variable/Variable.h>
2 2 #include <Variable/VariableAcquisitionWorker.h>
3 3 #include <Variable/VariableCacheStrategy.h>
4 4 #include <Variable/VariableController.h>
5 5 #include <Variable/VariableModel.h>
6 6 #include <Variable/VariableSynchronizationGroup.h>
7 7
8 8 #include <Data/DataProviderParameters.h>
9 9 #include <Data/IDataProvider.h>
10 10 #include <Data/IDataSeries.h>
11 11 #include <Data/VariableRequest.h>
12 12 #include <Time/TimeController.h>
13 13
14 14 #include <QMutex>
15 15 #include <QThread>
16 16 #include <QUuid>
17 17 #include <QtCore/QItemSelectionModel>
18 18
19 19 #include <deque>
20 20 #include <set>
21 21 #include <unordered_map>
22 22
23 23 Q_LOGGING_CATEGORY(LOG_VariableController, "VariableController")
24 24
25 25 namespace {
26 26
27 27 SqpRange computeSynchroRangeRequested(const SqpRange &varRange, const SqpRange &graphRange,
28 28 const SqpRange &oldGraphRange)
29 29 {
30 30 auto zoomType = VariableController::getZoomType(graphRange, oldGraphRange);
31 31
32 32 auto varRangeRequested = varRange;
33 33 switch (zoomType) {
34 34 case AcquisitionZoomType::ZoomIn: {
35 35 auto deltaLeft = graphRange.m_TStart - oldGraphRange.m_TStart;
36 36 auto deltaRight = oldGraphRange.m_TEnd - graphRange.m_TEnd;
37 37 varRangeRequested.m_TStart += deltaLeft;
38 38 varRangeRequested.m_TEnd -= deltaRight;
39 39 break;
40 40 }
41 41
42 42 case AcquisitionZoomType::ZoomOut: {
43 43 auto deltaLeft = oldGraphRange.m_TStart - graphRange.m_TStart;
44 44 auto deltaRight = graphRange.m_TEnd - oldGraphRange.m_TEnd;
45 45 varRangeRequested.m_TStart -= deltaLeft;
46 46 varRangeRequested.m_TEnd += deltaRight;
47 47 break;
48 48 }
49 49 case AcquisitionZoomType::PanRight: {
50 50 auto deltaRight = graphRange.m_TEnd - oldGraphRange.m_TEnd;
51 51 varRangeRequested.m_TStart += deltaRight;
52 52 varRangeRequested.m_TEnd += deltaRight;
53 53 break;
54 54 }
55 55 case AcquisitionZoomType::PanLeft: {
56 56 auto deltaLeft = oldGraphRange.m_TStart - graphRange.m_TStart;
57 57 varRangeRequested.m_TStart -= deltaLeft;
58 58 varRangeRequested.m_TEnd -= deltaLeft;
59 59 break;
60 60 }
61 61 case AcquisitionZoomType::Unknown: {
62 62 qCCritical(LOG_VariableController())
63 63 << VariableController::tr("Impossible to synchronize: zoom type unknown");
64 64 break;
65 65 }
66 66 default:
67 67 qCCritical(LOG_VariableController()) << VariableController::tr(
68 68 "Impossible to synchronize: zoom type not take into account");
69 69 // No action
70 70 break;
71 71 }
72 72
73 73 return varRangeRequested;
74 74 }
75 75 }
76 76
77 77 struct VariableController::VariableControllerPrivate {
78 78 explicit VariableControllerPrivate(VariableController *parent)
79 79 : m_WorkingMutex{},
80 80 m_VariableModel{new VariableModel{parent}},
81 81 m_VariableSelectionModel{new QItemSelectionModel{m_VariableModel, parent}},
82 82 m_VariableCacheStrategy{std::make_unique<VariableCacheStrategy>()},
83 83 m_VariableAcquisitionWorker{std::make_unique<VariableAcquisitionWorker>()},
84 84 q{parent}
85 85 {
86 86
87 87 m_VariableAcquisitionWorker->moveToThread(&m_VariableAcquisitionWorkerThread);
88 88 m_VariableAcquisitionWorkerThread.setObjectName("VariableAcquisitionWorkerThread");
89 89 }
90 90
91 91
92 92 virtual ~VariableControllerPrivate()
93 93 {
94 94 qCDebug(LOG_VariableController()) << tr("VariableControllerPrivate destruction");
95 95 m_VariableAcquisitionWorkerThread.quit();
96 96 m_VariableAcquisitionWorkerThread.wait();
97 97 }
98 98
99 99
100 100 void processRequest(std::shared_ptr<Variable> var, const SqpRange &rangeRequested,
101 101 QUuid varRequestId);
102 102
103 103 QVector<SqpRange> provideNotInCacheDateTimeList(std::shared_ptr<Variable> variable,
104 104 const SqpRange &dateTime);
105 105
106 106 std::shared_ptr<Variable> findVariable(QUuid vIdentifier);
107 107 std::shared_ptr<IDataSeries>
108 108 retrieveDataSeries(const QVector<AcquisitionDataPacket> acqDataPacketVector);
109 109
110 110 void registerProvider(std::shared_ptr<IDataProvider> provider);
111 111
112 112 void storeVariableRequest(QUuid varId, QUuid varRequestId, const VariableRequest &varRequest);
113 113 QUuid acceptVariableRequest(QUuid varId, std::shared_ptr<IDataSeries> dataSeries);
114 114 void updateVariableRequest(QUuid varRequestId);
115 115 void cancelVariableRequest(QUuid varRequestId);
116 116
117 117 QMutex m_WorkingMutex;
118 118 /// Variable model. The VariableController has the ownership
119 119 VariableModel *m_VariableModel;
120 120 QItemSelectionModel *m_VariableSelectionModel;
121 121
122 122
123 123 TimeController *m_TimeController{nullptr};
124 124 std::unique_ptr<VariableCacheStrategy> m_VariableCacheStrategy;
125 125 std::unique_ptr<VariableAcquisitionWorker> m_VariableAcquisitionWorker;
126 126 QThread m_VariableAcquisitionWorkerThread;
127 127
128 128 std::unordered_map<std::shared_ptr<Variable>, std::shared_ptr<IDataProvider> >
129 129 m_VariableToProviderMap;
130 130 std::unordered_map<std::shared_ptr<Variable>, QUuid> m_VariableToIdentifierMap;
131 131 std::map<QUuid, std::shared_ptr<VariableSynchronizationGroup> >
132 132 m_GroupIdToVariableSynchronizationGroupMap;
133 133 std::map<QUuid, QUuid> m_VariableIdGroupIdMap;
134 134 std::set<std::shared_ptr<IDataProvider> > m_ProviderSet;
135 135
136 136 std::map<QUuid, std::map<QUuid, VariableRequest> > m_VarRequestIdToVarIdVarRequestMap;
137 137
138 138 std::map<QUuid, std::deque<QUuid> > m_VarIdToVarRequestIdQueueMap;
139 139
140 140
141 141 VariableController *q;
142 142 };
143 143
144 144
145 145 VariableController::VariableController(QObject *parent)
146 146 : QObject{parent}, impl{spimpl::make_unique_impl<VariableControllerPrivate>(this)}
147 147 {
148 148 qCDebug(LOG_VariableController()) << tr("VariableController construction")
149 149 << QThread::currentThread();
150 150
151 151 connect(impl->m_VariableModel, &VariableModel::abortProgessRequested, this,
152 152 &VariableController::onAbortProgressRequested);
153 153
154 154 connect(impl->m_VariableAcquisitionWorker.get(), &VariableAcquisitionWorker::dataProvided, this,
155 155 &VariableController::onDataProvided);
156 156 connect(impl->m_VariableAcquisitionWorker.get(),
157 157 &VariableAcquisitionWorker::variableRequestInProgress, this,
158 158 &VariableController::onVariableRetrieveDataInProgress);
159 159
160 160 connect(&impl->m_VariableAcquisitionWorkerThread, &QThread::started,
161 161 impl->m_VariableAcquisitionWorker.get(), &VariableAcquisitionWorker::initialize);
162 162 connect(&impl->m_VariableAcquisitionWorkerThread, &QThread::finished,
163 163 impl->m_VariableAcquisitionWorker.get(), &VariableAcquisitionWorker::finalize);
164 164
165 165
166 166 impl->m_VariableAcquisitionWorkerThread.start();
167 167 }
168 168
169 169 VariableController::~VariableController()
170 170 {
171 171 qCDebug(LOG_VariableController()) << tr("VariableController destruction")
172 172 << QThread::currentThread();
173 173 this->waitForFinish();
174 174 }
175 175
176 176 VariableModel *VariableController::variableModel() noexcept
177 177 {
178 178 return impl->m_VariableModel;
179 179 }
180 180
181 181 QItemSelectionModel *VariableController::variableSelectionModel() noexcept
182 182 {
183 183 return impl->m_VariableSelectionModel;
184 184 }
185 185
186 186 void VariableController::setTimeController(TimeController *timeController) noexcept
187 187 {
188 188 impl->m_TimeController = timeController;
189 189 }
190 190
191 191 void VariableController::deleteVariable(std::shared_ptr<Variable> variable) noexcept
192 192 {
193 193 if (!variable) {
194 194 qCCritical(LOG_VariableController()) << "Can't delete variable: variable is null";
195 195 return;
196 196 }
197 197
198 198 // Spreads in SciQlop that the variable will be deleted, so that potential receivers can
199 199 // make some treatments before the deletion
200 200 emit variableAboutToBeDeleted(variable);
201 201
202 202 // Deletes identifier
203 203 impl->m_VariableToIdentifierMap.erase(variable);
204 204
205 205 // Deletes provider
206 206 auto nbProvidersDeleted = impl->m_VariableToProviderMap.erase(variable);
207 207 qCDebug(LOG_VariableController())
208 208 << tr("Number of providers deleted for variable %1: %2")
209 209 .arg(variable->name(), QString::number(nbProvidersDeleted));
210 210
211 211
212 212 // Deletes from model
213 213 impl->m_VariableModel->deleteVariable(variable);
214 214 }
215 215
216 216 void VariableController::deleteVariables(
217 217 const QVector<std::shared_ptr<Variable> > &variables) noexcept
218 218 {
219 219 for (auto variable : qAsConst(variables)) {
220 220 deleteVariable(variable);
221 221 }
222 222 }
223 223
224 224 void VariableController::abortProgress(std::shared_ptr<Variable> variable)
225 225 {
226 226 }
227 227
228 228 std::shared_ptr<Variable>
229 229 VariableController::createVariable(const QString &name, const QVariantHash &metadata,
230 230 std::shared_ptr<IDataProvider> provider) noexcept
231 231 {
232 232 if (!impl->m_TimeController) {
233 233 qCCritical(LOG_VariableController())
234 234 << tr("Impossible to create variable: The time controller is null");
235 235 return nullptr;
236 236 }
237 237
238 238 auto range = impl->m_TimeController->dateTime();
239 239
240 240 if (auto newVariable = impl->m_VariableModel->createVariable(name, range, metadata)) {
241 241 auto identifier = QUuid::createUuid();
242 242
243 243 // store the provider
244 244 impl->registerProvider(provider);
245 245
246 246 // Associate the provider
247 247 impl->m_VariableToProviderMap[newVariable] = provider;
248 248 impl->m_VariableToIdentifierMap[newVariable] = identifier;
249 249
250 250
251 251 auto varRequestId = QUuid::createUuid();
252 252 qCInfo(LOG_VariableController()) << "processRequest for" << name << varRequestId;
253 253 impl->processRequest(newVariable, range, varRequestId);
254 254 impl->updateVariableRequest(varRequestId);
255 255
256 256 return newVariable;
257 257 }
258 258 }
259 259
260 260 void VariableController::onDateTimeOnSelection(const SqpRange &dateTime)
261 261 {
262 262 // TODO check synchronisation and Rescale
263 263 qCDebug(LOG_VariableController()) << "VariableController::onDateTimeOnSelection"
264 264 << QThread::currentThread()->objectName();
265 265 auto selectedRows = impl->m_VariableSelectionModel->selectedRows();
266 266 auto varRequestId = QUuid::createUuid();
267 267
268 268 for (const auto &selectedRow : qAsConst(selectedRows)) {
269 269 if (auto selectedVariable = impl->m_VariableModel->variable(selectedRow.row())) {
270 270 selectedVariable->setRange(dateTime);
271 271 impl->processRequest(selectedVariable, dateTime, varRequestId);
272 272
273 273 // notify that rescale operation has to be done
274 274 emit rangeChanged(selectedVariable, dateTime);
275 275 }
276 276 }
277 277 impl->updateVariableRequest(varRequestId);
278 278 }
279 279
280 280 void VariableController::onDataProvided(QUuid vIdentifier, const SqpRange &rangeRequested,
281 281 const SqpRange &cacheRangeRequested,
282 282 QVector<AcquisitionDataPacket> dataAcquired)
283 283 {
284 284 auto retrievedDataSeries = impl->retrieveDataSeries(dataAcquired);
285 285 auto varRequestId = impl->acceptVariableRequest(vIdentifier, retrievedDataSeries);
286 286 if (!varRequestId.isNull()) {
287 287 impl->updateVariableRequest(varRequestId);
288 288 }
289 289 }
290 290
291 291 void VariableController::onVariableRetrieveDataInProgress(QUuid identifier, double progress)
292 292 {
293 qCInfo(LOG_VariableController()) << "TORM: ariableController::onVariableRetrieveDataInProgress"
294 << QThread::currentThread()->objectName() << progress;
293 295 if (auto var = impl->findVariable(identifier)) {
294 296 impl->m_VariableModel->setDataProgress(var, progress);
295 297 }
296 298 else {
297 299 qCCritical(LOG_VariableController())
298 300 << tr("Impossible to notify progression of a null variable");
299 301 }
300 302 }
301 303
302 304 void VariableController::onAbortProgressRequested(std::shared_ptr<Variable> variable)
303 305 {
304 306 qCDebug(LOG_VariableController()) << "TORM: VariableController::onAbortProgressRequested"
305 307 << QThread::currentThread()->objectName();
306 308
307 309 auto it = impl->m_VariableToIdentifierMap.find(variable);
308 310 if (it != impl->m_VariableToIdentifierMap.cend()) {
309 311 impl->m_VariableToProviderMap.at(variable)->requestDataAborting(it->second);
310 312 }
311 313 else {
312 314 qCWarning(LOG_VariableController())
313 315 << tr("Aborting progression of inexistant variable detected !!!")
314 316 << QThread::currentThread()->objectName();
315 317 }
316 318 }
317 319
318 320 void VariableController::onAddSynchronizationGroupId(QUuid synchronizationGroupId)
319 321 {
320 322 qCDebug(LOG_VariableController()) << "TORM: VariableController::onAddSynchronizationGroupId"
321 323 << QThread::currentThread()->objectName()
322 324 << synchronizationGroupId;
323 325 auto vSynchroGroup = std::make_shared<VariableSynchronizationGroup>();
324 326 impl->m_GroupIdToVariableSynchronizationGroupMap.insert(
325 327 std::make_pair(synchronizationGroupId, vSynchroGroup));
326 328 }
327 329
328 330 void VariableController::onRemoveSynchronizationGroupId(QUuid synchronizationGroupId)
329 331 {
330 332 impl->m_GroupIdToVariableSynchronizationGroupMap.erase(synchronizationGroupId);
331 333 }
332 334
333 335 void VariableController::onAddSynchronized(std::shared_ptr<Variable> variable,
334 336 QUuid synchronizationGroupId)
335 337
336 338 {
337 339 qCDebug(LOG_VariableController()) << "TORM: VariableController::onAddSynchronized"
338 340 << synchronizationGroupId;
339 341 auto varToVarIdIt = impl->m_VariableToIdentifierMap.find(variable);
340 342 if (varToVarIdIt != impl->m_VariableToIdentifierMap.cend()) {
341 343 auto groupIdToVSGIt
342 344 = impl->m_GroupIdToVariableSynchronizationGroupMap.find(synchronizationGroupId);
343 345 if (groupIdToVSGIt != impl->m_GroupIdToVariableSynchronizationGroupMap.cend()) {
344 346 impl->m_VariableIdGroupIdMap.insert(
345 347 std::make_pair(varToVarIdIt->second, synchronizationGroupId));
346 348 groupIdToVSGIt->second->addVariableId(varToVarIdIt->second);
347 349 }
348 350 else {
349 351 qCCritical(LOG_VariableController())
350 352 << tr("Impossible to synchronize a variable with an unknown sycnhronization group")
351 353 << variable->name();
352 354 }
353 355 }
354 356 else {
355 357 qCCritical(LOG_VariableController())
356 358 << tr("Impossible to synchronize a variable with no identifier") << variable->name();
357 359 }
358 360 }
359 361
360 362
361 363 void VariableController::onRequestDataLoading(QVector<std::shared_ptr<Variable> > variables,
362 364 const SqpRange &range, const SqpRange &oldRange,
363 365 bool synchronise)
364 366 {
365 367 // NOTE: oldRange isn't really necessary since oldRange == variable->range().
366 368
367 369 // we want to load data of the variable for the dateTime.
368 370 // First we check if the cache contains some of them.
369 371 // For the other, we ask the provider to give them.
370 372
371 373 auto varRequestId = QUuid::createUuid();
372 374 qCInfo(LOG_VariableController()) << "VariableController::onRequestDataLoading"
373 375 << QThread::currentThread()->objectName() << varRequestId;
374 376
375 377 for (const auto &var : variables) {
376 378 qCDebug(LOG_VariableController()) << "processRequest for" << var->name() << varRequestId;
377 379 impl->processRequest(var, range, varRequestId);
378 380 }
379 381
380 382 if (synchronise) {
381 383 // Get the group ids
382 384 qCDebug(LOG_VariableController())
383 385 << "TORM VariableController::onRequestDataLoading for synchro var ENABLE";
384 386 auto groupIds = std::set<QUuid>{};
385 387 auto groupIdToOldRangeMap = std::map<QUuid, SqpRange>{};
386 388 for (const auto &var : variables) {
387 389 auto varToVarIdIt = impl->m_VariableToIdentifierMap.find(var);
388 390 if (varToVarIdIt != impl->m_VariableToIdentifierMap.cend()) {
389 391 auto vId = varToVarIdIt->second;
390 392 auto varIdToGroupIdIt = impl->m_VariableIdGroupIdMap.find(vId);
391 393 if (varIdToGroupIdIt != impl->m_VariableIdGroupIdMap.cend()) {
392 394 auto gId = varIdToGroupIdIt->second;
393 395 groupIdToOldRangeMap.insert(std::make_pair(gId, var->range()));
394 396 if (groupIds.find(gId) == groupIds.cend()) {
395 397 qCDebug(LOG_VariableController()) << "Synchro detect group " << gId;
396 398 groupIds.insert(gId);
397 399 }
398 400 }
399 401 }
400 402 }
401 403
402 404 // We assume here all group ids exist
403 405 for (const auto &gId : groupIds) {
404 406 auto vSynchronizationGroup = impl->m_GroupIdToVariableSynchronizationGroupMap.at(gId);
405 407 auto vSyncIds = vSynchronizationGroup->getIds();
406 408 qCDebug(LOG_VariableController()) << "Var in synchro group ";
407 409 for (auto vId : vSyncIds) {
408 410 auto var = impl->findVariable(vId);
409 411
410 412 // Don't process already processed var
411 413 if (!variables.contains(var)) {
412 414 if (var != nullptr) {
413 415 qCDebug(LOG_VariableController()) << "processRequest synchro for"
414 416 << var->name();
415 417 auto vSyncRangeRequested = computeSynchroRangeRequested(
416 418 var->range(), range, groupIdToOldRangeMap.at(gId));
417 419 qCDebug(LOG_VariableController()) << "synchro RR" << vSyncRangeRequested;
418 420 impl->processRequest(var, vSyncRangeRequested, varRequestId);
419 421 }
420 422 else {
421 423 qCCritical(LOG_VariableController())
422 424
423 425 << tr("Impossible to synchronize a null variable");
424 426 }
425 427 }
426 428 }
427 429 }
428 430 }
429 431
430 432 impl->updateVariableRequest(varRequestId);
431 433 }
432 434
433 435
434 436 void VariableController::initialize()
435 437 {
436 438 qCDebug(LOG_VariableController()) << tr("VariableController init") << QThread::currentThread();
437 439 impl->m_WorkingMutex.lock();
438 440 qCDebug(LOG_VariableController()) << tr("VariableController init END");
439 441 }
440 442
441 443 void VariableController::finalize()
442 444 {
443 445 impl->m_WorkingMutex.unlock();
444 446 }
445 447
446 448 void VariableController::waitForFinish()
447 449 {
448 450 QMutexLocker locker{&impl->m_WorkingMutex};
449 451 }
450 452
451 453 AcquisitionZoomType VariableController::getZoomType(const SqpRange &range, const SqpRange &oldRange)
452 454 {
453 455 // t1.m_TStart <= t2.m_TStart && t2.m_TEnd <= t1.m_TEnd
454 456 auto zoomType = AcquisitionZoomType::Unknown;
455 457 if (range.m_TStart <= oldRange.m_TStart && oldRange.m_TEnd <= range.m_TEnd) {
456 458 zoomType = AcquisitionZoomType::ZoomOut;
457 459 }
458 460 else if (range.m_TStart > oldRange.m_TStart && range.m_TEnd > oldRange.m_TEnd) {
459 461 zoomType = AcquisitionZoomType::PanRight;
460 462 }
461 463 else if (range.m_TStart < oldRange.m_TStart && range.m_TEnd < oldRange.m_TEnd) {
462 464 zoomType = AcquisitionZoomType::PanLeft;
463 465 }
464 466 else if (range.m_TStart > oldRange.m_TStart && oldRange.m_TEnd > range.m_TEnd) {
465 467 zoomType = AcquisitionZoomType::ZoomIn;
466 468 }
467 469 else {
468 470 qCCritical(LOG_VariableController()) << "getZoomType: Unknown type detected";
469 471 }
470 472 return zoomType;
471 473 }
472 474
473 475 void VariableController::VariableControllerPrivate::processRequest(std::shared_ptr<Variable> var,
474 476 const SqpRange &rangeRequested,
475 477 QUuid varRequestId)
476 478 {
477 479
478 480 // TODO: protect at
479 481 auto varRequest = VariableRequest{};
480 482 auto varId = m_VariableToIdentifierMap.at(var);
481 483
482 484 auto varStrategyRangesRequested
483 485 = m_VariableCacheStrategy->computeStrategyRanges(var->range(), rangeRequested);
484 486 auto notInCacheRangeList = var->provideNotInCacheRangeList(varStrategyRangesRequested.second);
485 487 auto inCacheRangeList = var->provideInCacheRangeList(varStrategyRangesRequested.second);
486 488
487 489 if (!notInCacheRangeList.empty()) {
488 490 varRequest.m_RangeRequested = varStrategyRangesRequested.first;
489 491 varRequest.m_CacheRangeRequested = varStrategyRangesRequested.second;
490 492 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM processRequest RR ") << rangeRequested;
491 493 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM processRequest R ")
492 494 << varStrategyRangesRequested.first;
493 495 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM processRequest CR ")
494 496 << varStrategyRangesRequested.second;
495 497 // store VarRequest
496 498 storeVariableRequest(varId, varRequestId, varRequest);
497 499
498 500 auto varProvider = m_VariableToProviderMap.at(var);
499 501 if (varProvider != nullptr) {
500 502 auto varRequestIdCanceled = m_VariableAcquisitionWorker->pushVariableRequest(
501 503 varRequestId, varId, varStrategyRangesRequested.first,
502 504 varStrategyRangesRequested.second,
503 505 DataProviderParameters{std::move(notInCacheRangeList), var->metadata()},
504 506 varProvider);
505 507
506 508 if (!varRequestIdCanceled.isNull()) {
507 509 qCInfo(LOG_VariableAcquisitionWorker()) << tr("varRequestIdCanceled: ")
508 510 << varRequestIdCanceled;
509 511 cancelVariableRequest(varRequestIdCanceled);
510 512 }
511 513 }
512 514 else {
513 515 qCCritical(LOG_VariableController())
514 516 << "Impossible to provide data with a null provider";
515 517 }
516 518
517 519 if (!inCacheRangeList.empty()) {
518 520 emit q->updateVarDisplaying(var, inCacheRangeList.first());
519 521 }
520 522 }
521 523 else {
522 524
523 525 varRequest.m_RangeRequested = varStrategyRangesRequested.first;
524 526 varRequest.m_CacheRangeRequested = varStrategyRangesRequested.second;
525 527 // store VarRequest
526 528 storeVariableRequest(varId, varRequestId, varRequest);
527 529 acceptVariableRequest(varId,
528 530 var->dataSeries()->subDataSeries(varStrategyRangesRequested.second));
529 531 }
530 532 }
531 533
532 534 std::shared_ptr<Variable>
533 535 VariableController::VariableControllerPrivate::findVariable(QUuid vIdentifier)
534 536 {
535 537 std::shared_ptr<Variable> var;
536 538 auto findReply = [vIdentifier](const auto &entry) { return vIdentifier == entry.second; };
537 539
538 540 auto end = m_VariableToIdentifierMap.cend();
539 541 auto it = std::find_if(m_VariableToIdentifierMap.cbegin(), end, findReply);
540 542 if (it != end) {
541 543 var = it->first;
542 544 }
543 545 else {
544 546 qCCritical(LOG_VariableController())
545 547 << tr("Impossible to find the variable with the identifier: ") << vIdentifier;
546 548 }
547 549
548 550 return var;
549 551 }
550 552
551 553 std::shared_ptr<IDataSeries> VariableController::VariableControllerPrivate::retrieveDataSeries(
552 554 const QVector<AcquisitionDataPacket> acqDataPacketVector)
553 555 {
554 556 qCDebug(LOG_VariableController()) << tr("TORM: retrieveDataSeries acqDataPacketVector size")
555 557 << acqDataPacketVector.size();
556 558 std::shared_ptr<IDataSeries> dataSeries;
557 559 if (!acqDataPacketVector.isEmpty()) {
558 560 dataSeries = acqDataPacketVector[0].m_DateSeries;
559 561 for (int i = 1; i < acqDataPacketVector.size(); ++i) {
560 562 dataSeries->merge(acqDataPacketVector[i].m_DateSeries.get());
561 563 }
562 564 }
563 565 qCDebug(LOG_VariableController()) << tr("TORM: retrieveDataSeries acqDataPacketVector size END")
564 566 << acqDataPacketVector.size();
565 567 return dataSeries;
566 568 }
567 569
568 570 void VariableController::VariableControllerPrivate::registerProvider(
569 571 std::shared_ptr<IDataProvider> provider)
570 572 {
571 573 if (m_ProviderSet.find(provider) == m_ProviderSet.end()) {
572 574 qCDebug(LOG_VariableController()) << tr("Registering of a new provider")
573 575 << provider->objectName();
574 576 m_ProviderSet.insert(provider);
575 577 connect(provider.get(), &IDataProvider::dataProvided, m_VariableAcquisitionWorker.get(),
576 578 &VariableAcquisitionWorker::onVariableDataAcquired);
577 579 connect(provider.get(), &IDataProvider::dataProvidedProgress,
578 580 m_VariableAcquisitionWorker.get(),
579 581 &VariableAcquisitionWorker::onVariableRetrieveDataInProgress);
580 582 }
581 583 else {
582 584 qCDebug(LOG_VariableController()) << tr("Cannot register provider, it already exists ");
583 585 }
584 586 }
585 587
586 588 void VariableController::VariableControllerPrivate::storeVariableRequest(
587 589 QUuid varId, QUuid varRequestId, const VariableRequest &varRequest)
588 590 {
589 591 // First request for the variable. we can create an entry for it
590 592 auto varIdToVarRequestIdQueueMapIt = m_VarIdToVarRequestIdQueueMap.find(varId);
591 593 if (varIdToVarRequestIdQueueMapIt == m_VarIdToVarRequestIdQueueMap.cend()) {
592 594 auto varRequestIdQueue = std::deque<QUuid>{};
593 595 qCDebug(LOG_VariableController()) << tr("Store REQUEST in QUEUE");
594 596 varRequestIdQueue.push_back(varRequestId);
595 597 m_VarIdToVarRequestIdQueueMap.insert(std::make_pair(varId, std::move(varRequestIdQueue)));
596 598 }
597 599 else {
598 600 qCDebug(LOG_VariableController()) << tr("Store REQUEST in EXISTING QUEUE");
599 601 auto &varRequestIdQueue = varIdToVarRequestIdQueueMapIt->second;
600 602 varRequestIdQueue.push_back(varRequestId);
601 603 }
602 604
603 605 auto varRequestIdToVarIdVarRequestMapIt = m_VarRequestIdToVarIdVarRequestMap.find(varRequestId);
604 606 if (varRequestIdToVarIdVarRequestMapIt == m_VarRequestIdToVarIdVarRequestMap.cend()) {
605 607 auto varIdToVarRequestMap = std::map<QUuid, VariableRequest>{};
606 608 varIdToVarRequestMap.insert(std::make_pair(varId, varRequest));
607 609 qCDebug(LOG_VariableController()) << tr("Store REQUESTID in MAP");
608 610 m_VarRequestIdToVarIdVarRequestMap.insert(
609 611 std::make_pair(varRequestId, std::move(varIdToVarRequestMap)));
610 612 }
611 613 else {
612 614 auto &varIdToVarRequestMap = varRequestIdToVarIdVarRequestMapIt->second;
613 615 qCDebug(LOG_VariableController()) << tr("Store REQUESTID in EXISTING MAP");
614 616 varIdToVarRequestMap.insert(std::make_pair(varId, varRequest));
615 617 }
616 618 }
617 619
618 620 QUuid VariableController::VariableControllerPrivate::acceptVariableRequest(
619 621 QUuid varId, std::shared_ptr<IDataSeries> dataSeries)
620 622 {
621 623 QUuid varRequestId;
622 624 auto varIdToVarRequestIdQueueMapIt = m_VarIdToVarRequestIdQueueMap.find(varId);
623 625 if (varIdToVarRequestIdQueueMapIt != m_VarIdToVarRequestIdQueueMap.cend()) {
624 626 auto &varRequestIdQueue = varIdToVarRequestIdQueueMapIt->second;
625 627 varRequestId = varRequestIdQueue.front();
626 628 auto varRequestIdToVarIdVarRequestMapIt
627 629 = m_VarRequestIdToVarIdVarRequestMap.find(varRequestId);
628 630 if (varRequestIdToVarIdVarRequestMapIt != m_VarRequestIdToVarIdVarRequestMap.cend()) {
629 631 auto &varIdToVarRequestMap = varRequestIdToVarIdVarRequestMapIt->second;
630 632 auto varIdToVarRequestMapIt = varIdToVarRequestMap.find(varId);
631 633 if (varIdToVarRequestMapIt != varIdToVarRequestMap.cend()) {
632 634 qCDebug(LOG_VariableController()) << tr("acceptVariableRequest");
633 635 auto &varRequest = varIdToVarRequestMapIt->second;
634 636 varRequest.m_DataSeries = dataSeries;
635 637 varRequest.m_CanUpdate = true;
636 638 }
637 639 else {
638 640 qCDebug(LOG_VariableController())
639 641 << tr("Impossible to acceptVariableRequest of a unknown variable id attached "
640 642 "to a variableRequestId")
641 643 << varRequestId << varId;
642 644 }
643 645 }
644 646 else {
645 647 qCCritical(LOG_VariableController())
646 648 << tr("Impossible to acceptVariableRequest of a unknown variableRequestId")
647 649 << varRequestId;
648 650 }
649 651
650 652 qCDebug(LOG_VariableController()) << tr("1: erase REQUEST in QUEUE ?")
651 653 << varRequestIdQueue.size();
652 654 varRequestIdQueue.pop_front();
653 655 qCDebug(LOG_VariableController()) << tr("2: erase REQUEST in QUEUE ?")
654 656 << varRequestIdQueue.size();
655 657 if (varRequestIdQueue.empty()) {
656 658 m_VarIdToVarRequestIdQueueMap.erase(varId);
657 659 }
658 660 }
659 661 else {
660 662 qCCritical(LOG_VariableController())
661 663 << tr("Impossible to acceptVariableRequest of a unknown variable id") << varId;
662 664 }
663 665
664 666 return varRequestId;
665 667 }
666 668
667 669 void VariableController::VariableControllerPrivate::updateVariableRequest(QUuid varRequestId)
668 670 {
669 671
670 672 auto varRequestIdToVarIdVarRequestMapIt = m_VarRequestIdToVarIdVarRequestMap.find(varRequestId);
671 673 if (varRequestIdToVarIdVarRequestMapIt != m_VarRequestIdToVarIdVarRequestMap.cend()) {
672 674 bool processVariableUpdate = true;
673 675 auto &varIdToVarRequestMap = varRequestIdToVarIdVarRequestMapIt->second;
674 676 for (auto varIdToVarRequestMapIt = varIdToVarRequestMap.cbegin();
675 677 (varIdToVarRequestMapIt != varIdToVarRequestMap.cend()) && processVariableUpdate;
676 678 ++varIdToVarRequestMapIt) {
677 679 processVariableUpdate &= varIdToVarRequestMapIt->second.m_CanUpdate;
678 680 qCDebug(LOG_VariableController()) << tr("updateVariableRequest")
679 681 << processVariableUpdate;
680 682 }
681 683
682 684 if (processVariableUpdate) {
683 685 for (auto varIdToVarRequestMapIt = varIdToVarRequestMap.cbegin();
684 686 varIdToVarRequestMapIt != varIdToVarRequestMap.cend(); ++varIdToVarRequestMapIt) {
685 687 if (auto var = findVariable(varIdToVarRequestMapIt->first)) {
686 688 auto &varRequest = varIdToVarRequestMapIt->second;
687 689 var->setRange(varRequest.m_RangeRequested);
688 690 var->setCacheRange(varRequest.m_CacheRangeRequested);
689 691 qCDebug(LOG_VariableController()) << tr("1: onDataProvided")
690 692 << varRequest.m_RangeRequested;
691 693 qCDebug(LOG_VariableController()) << tr("2: onDataProvided")
692 694 << varRequest.m_CacheRangeRequested;
693 695 var->mergeDataSeries(varRequest.m_DataSeries);
694 696 qCDebug(LOG_VariableController()) << tr("3: onDataProvided")
695 697 << varRequest.m_DataSeries->range();
696 698 qCDebug(LOG_VariableController()) << tr("4: onDataProvided");
697 699 emit var->updated();
698 700 }
699 701 else {
700 702 qCCritical(LOG_VariableController())
701 703 << tr("Impossible to update data to a null variable");
702 704 }
703 705 }
704 706
705 707 // cleaning varRequestId
706 708 qCDebug(LOG_VariableController()) << tr("0: erase REQUEST in MAP ?")
707 709 << m_VarRequestIdToVarIdVarRequestMap.size();
708 710 m_VarRequestIdToVarIdVarRequestMap.erase(varRequestId);
709 711 qCDebug(LOG_VariableController()) << tr("1: erase REQUEST in MAP ?")
710 712 << m_VarRequestIdToVarIdVarRequestMap.size();
711 713 }
712 714 }
713 715 else {
714 716 qCCritical(LOG_VariableController())
715 717 << tr("Cannot updateVariableRequest for a unknow varRequestId") << varRequestId;
716 718 }
717 719 }
718 720
719 721 void VariableController::VariableControllerPrivate::cancelVariableRequest(QUuid varRequestId)
720 722 {
721 723 // cleaning varRequestId
722 724 m_VarRequestIdToVarIdVarRequestMap.erase(varRequestId);
723 725
724 726 for (auto varIdToVarRequestIdQueueMapIt = m_VarIdToVarRequestIdQueueMap.begin();
725 727 varIdToVarRequestIdQueueMapIt != m_VarIdToVarRequestIdQueueMap.end();) {
726 728 auto &varRequestIdQueue = varIdToVarRequestIdQueueMapIt->second;
727 729 varRequestIdQueue.erase(
728 730 std::remove(varRequestIdQueue.begin(), varRequestIdQueue.end(), varRequestId),
729 731 varRequestIdQueue.end());
730 732 if (varRequestIdQueue.empty()) {
731 733 varIdToVarRequestIdQueueMapIt
732 734 = m_VarIdToVarRequestIdQueueMap.erase(varIdToVarRequestIdQueueMapIt);
733 735 }
734 736 else {
735 737 ++varIdToVarRequestIdQueueMapIt;
736 738 }
737 739 }
738 740 }
@@ -1,239 +1,239
1 1 #include "Data/ArrayData.h"
2 2 #include <QObject>
3 3 #include <QtTest>
4 4
5 5 using Container = QVector<QVector<double> >;
6 6 using InputData = QPair<QVector<double>, int>;
7 7
8 8 namespace {
9 9
10 10 InputData flatten(const Container &container)
11 11 {
12 12 if (container.isEmpty()) {
13 13 return {};
14 14 }
15 15
16 16 // We assume here that each component of the container have the same size
17 17 auto containerSize = container.size();
18 18 auto componentSize = container.first().size();
19 19
20 20 auto result = QVector<double>{};
21 21 result.reserve(componentSize * containerSize);
22 22
23 23 for (auto i = 0; i < componentSize; ++i) {
24 24 for (auto j = 0; j < containerSize; ++j) {
25 25 result.append(container.at(j).at(i));
26 26 }
27 27 }
28 28
29 29 return {result, containerSize};
30 30 }
31 31
32 32 void verifyArrayData(const ArrayData<2> &arrayData, const Container &expectedData)
33 33 {
34 34 auto verifyComponent = [&arrayData](const auto &componentData, const auto &equalFun) {
35 35 QVERIFY(std::equal(arrayData.cbegin(), arrayData.cend(), componentData.cbegin(),
36 36 componentData.cend(),
37 37 [&equalFun](const auto &dataSeriesIt, const auto &expectedValue) {
38 38 return equalFun(dataSeriesIt, expectedValue);
39 39 }));
40 40 };
41 41
42 42 for (auto i = 0; i < expectedData.size(); ++i) {
43 43 verifyComponent(expectedData.at(i), [i](const auto &seriesIt, const auto &value) {
44 44 return seriesIt.at(i) == value;
45 45 });
46 46 }
47 47 }
48 48
49 49 } // namespace
50 50
51 51 class TestTwoDimArrayData : public QObject {
52 52 Q_OBJECT
53 53 private slots:
54 54 /// Tests @sa ArrayData ctor
55 55 void testCtor_data();
56 56 void testCtor();
57 57
58 58 /// Tests @sa ArrayData::add()
59 59 void testAdd_data();
60 60 void testAdd();
61 61
62 62 /// Tests @sa ArrayData::clear()
63 63 void testClear_data();
64 64 void testClear();
65 65
66 66 /// Tests @sa ArrayData::size()
67 67 void testSize_data();
68 68 void testSize();
69 69
70 70 /// Tests @sa ArrayData::sort()
71 71 void testSort_data();
72 72 void testSort();
73 73 };
74 74
75 75 void TestTwoDimArrayData::testCtor_data()
76 76 {
77 77 // Test structure
78 78 QTest::addColumn<InputData>("inputData"); // array data's input
79 79 QTest::addColumn<bool>("success"); // array data has been successfully constructed
80 80 QTest::addColumn<Container>("expectedData"); // expected array data (when success)
81 81
82 82 // Test cases
83 83 QTest::newRow("validInput") << flatten(Container{{1., 2., 3., 4., 5.},
84 84 {6., 7., 8., 9., 10.},
85 85 {11., 12., 13., 14., 15.}})
86 86 << true << Container{{1., 2., 3., 4., 5.},
87 87 {6., 7., 8., 9., 10.},
88 88 {11., 12., 13., 14., 15.}};
89 QTest::newRow("invalidInput (invalid data size")
90 << InputData{{1., 2., 3., 4., 5., 6., 7.}, 3} << false << Container{{}, {}, {}};
89 QTest::newRow("invalidInput (invalid data size") << InputData{{1., 2., 3., 4., 5., 6., 7.}, 3}
90 << false << Container{{}, {}, {}};
91 91 QTest::newRow("invalidInput (less than two components")
92 92 << flatten(Container{{1., 2., 3., 4., 5.}}) << false << Container{{}, {}, {}};
93 93 }
94 94
95 95 void TestTwoDimArrayData::testCtor()
96 96 {
97 97 QFETCH(InputData, inputData);
98 98 QFETCH(bool, success);
99 99
100 100 if (success) {
101 101 QFETCH(Container, expectedData);
102 102
103 103 ArrayData<2> arrayData{inputData.first, inputData.second};
104 104 verifyArrayData(arrayData, expectedData);
105 105 }
106 106 else {
107 107 QVERIFY_EXCEPTION_THROWN(ArrayData<2>(inputData.first, inputData.second),
108 108 std::invalid_argument);
109 109 }
110 110 }
111 111
112 112 void TestTwoDimArrayData::testAdd_data()
113 113 {
114 114 // Test structure
115 115 QTest::addColumn<InputData>("inputData"); // array's data input
116 116 QTest::addColumn<InputData>("otherData"); // array data's input to merge with
117 117 QTest::addColumn<bool>("prepend"); // prepend or append merge
118 118 QTest::addColumn<Container>("expectedData"); // expected data after merge
119 119
120 120 // Test cases
121 121 auto inputData = flatten(
122 122 Container{{1., 2., 3., 4., 5.}, {11., 12., 13., 14., 15.}, {21., 22., 23., 24., 25.}});
123 123
124 124 auto vectorContainer = flatten(Container{{6., 7., 8.}, {16., 17., 18.}, {26., 27., 28}});
125 125 auto tensorContainer = flatten(Container{{6., 7., 8.},
126 126 {16., 17., 18.},
127 127 {26., 27., 28},
128 128 {36., 37., 38.},
129 129 {46., 47., 48.},
130 130 {56., 57., 58}});
131 131
132 132 QTest::newRow("appendMerge") << inputData << vectorContainer << false
133 133 << Container{{1., 2., 3., 4., 5., 6., 7., 8.},
134 134 {11., 12., 13., 14., 15., 16., 17., 18.},
135 135 {21., 22., 23., 24., 25., 26., 27., 28}};
136 136 QTest::newRow("prependMerge") << inputData << vectorContainer << true
137 137 << Container{{6., 7., 8., 1., 2., 3., 4., 5.},
138 138 {16., 17., 18., 11., 12., 13., 14., 15.},
139 139 {26., 27., 28, 21., 22., 23., 24., 25.}};
140 140 QTest::newRow("invalidMerge") << inputData << tensorContainer << false
141 141 << Container{{1., 2., 3., 4., 5.},
142 142 {11., 12., 13., 14., 15.},
143 143 {21., 22., 23., 24., 25.}};
144 144 }
145 145
146 146 void TestTwoDimArrayData::testAdd()
147 147 {
148 148 QFETCH(InputData, inputData);
149 149 QFETCH(InputData, otherData);
150 150 QFETCH(bool, prepend);
151 151 QFETCH(Container, expectedData);
152 152
153 153 ArrayData<2> arrayData{inputData.first, inputData.second};
154 154 ArrayData<2> other{otherData.first, otherData.second};
155 155
156 156 arrayData.add(other, prepend);
157 157
158 158 verifyArrayData(arrayData, expectedData);
159 159 }
160 160
161 161 void TestTwoDimArrayData::testClear_data()
162 162 {
163 163 // Test structure
164 164 QTest::addColumn<InputData>("inputData"); // array data's input
165 165
166 166 // Test cases
167 167 QTest::newRow("data1") << flatten(
168 168 Container{{1., 2., 3., 4., 5.}, {6., 7., 8., 9., 10.}, {11., 12., 13., 14., 15.}});
169 169 }
170 170
171 171 void TestTwoDimArrayData::testClear()
172 172 {
173 173 QFETCH(InputData, inputData);
174 174
175 175 ArrayData<2> arrayData{inputData.first, inputData.second};
176 176 arrayData.clear();
177 177
178 178 auto emptyData = Container(inputData.second, QVector<double>{});
179 179 verifyArrayData(arrayData, emptyData);
180 180 }
181 181
182 182 void TestTwoDimArrayData::testSize_data()
183 183 {
184 184 // Test structure
185 185 QTest::addColumn<InputData>("inputData"); // array data's input
186 186 QTest::addColumn<int>("expectedSize"); // expected array data size
187 187
188 188 // Test cases
189 189 QTest::newRow("data1") << flatten(Container{{1., 2., 3., 4., 5.}, {6., 7., 8., 9., 10.}}) << 5;
190 190 QTest::newRow("data2") << flatten(Container{{1., 2., 3., 4., 5.},
191 191 {6., 7., 8., 9., 10.},
192 192 {11., 12., 13., 14., 15.}})
193 193 << 5;
194 194 }
195 195
196 196 void TestTwoDimArrayData::testSize()
197 197 {
198 198 QFETCH(InputData, inputData);
199 199 QFETCH(int, expectedSize);
200 200
201 201 ArrayData<2> arrayData{inputData.first, inputData.second};
202 202 QVERIFY(arrayData.size() == expectedSize);
203 203 }
204 204
205 205 void TestTwoDimArrayData::testSort_data()
206 206 {
207 207 // Test structure
208 208 QTest::addColumn<InputData>("inputData"); // array data's input
209 209 QTest::addColumn<std::vector<int> >("sortPermutation"); // permutation used to sort data
210 210 QTest::addColumn<Container>("expectedData"); // expected data after sorting
211 211
212 212 // Test cases
213 213 QTest::newRow("data1")
214 214 << flatten(
215 215 Container{{1., 2., 3., 4., 5.}, {6., 7., 8., 9., 10.}, {11., 12., 13., 14., 15.}})
216 216 << std::vector<int>{0, 2, 3, 1, 4}
217 217 << Container{{1., 3., 4., 2., 5.}, {6., 8., 9., 7., 10.}, {11., 13., 14., 12., 15.}};
218 218 QTest::newRow("data2")
219 219 << flatten(
220 220 Container{{1., 2., 3., 4., 5.}, {6., 7., 8., 9., 10.}, {11., 12., 13., 14., 15.}})
221 221 << std::vector<int>{2, 4, 3, 0, 1}
222 222 << Container{{3., 5., 4., 1., 2.}, {8., 10., 9., 6., 7.}, {13., 15., 14., 11., 12.}};
223 223 }
224 224
225 225 void TestTwoDimArrayData::testSort()
226 226 {
227 227 QFETCH(InputData, inputData);
228 228 QFETCH(std::vector<int>, sortPermutation);
229 229 QFETCH(Container, expectedData);
230 230
231 231 ArrayData<2> arrayData{inputData.first, inputData.second};
232 232 auto sortedArrayData = arrayData.sort(sortPermutation);
233 233 QVERIFY(sortedArrayData != nullptr);
234 234
235 235 verifyArrayData(*sortedArrayData, expectedData);
236 236 }
237 237
238 238 QTEST_MAIN(TestTwoDimArrayData)
239 239 #include "TestTwoDimArrayData.moc"
@@ -1,30 +1,41
1 1 #ifndef SCIQLOP_AMDAPROVIDER_H
2 2 #define SCIQLOP_AMDAPROVIDER_H
3 3
4 4 #include "AmdaGlobal.h"
5 5
6 6 #include <Data/IDataProvider.h>
7 7
8 8 #include <QLoggingCategory>
9 9
10 #include <map>
10 11
11 12 Q_DECLARE_LOGGING_CATEGORY(LOG_AmdaProvider)
12 13
13 14 class QNetworkReply;
15 class QNetworkRequest;
14 16
15 17 /**
16 18 * @brief The AmdaProvider class is an example of how a data provider can generate data
17 19 */
18 20 class SCIQLOP_AMDA_EXPORT AmdaProvider : public IDataProvider {
19 21 public:
20 22 explicit AmdaProvider();
21 23
22 24 void requestDataLoading(QUuid acqIdentifier, const DataProviderParameters &parameters) override;
23 25
24 26 void requestDataAborting(QUuid acqIdentifier) override;
25 27
28 private slots:
29 void onReplyDownloadProgress(QUuid, const QNetworkRequest &, double progress);
30
26 31 private:
27 32 void retrieveData(QUuid token, const SqpRange &dateTime, const QVariantHash &data);
33
34 void updateRequestProgress(QUuid acqIdentifier, std::shared_ptr<QNetworkRequest> request,
35 double progress);
36
37 std::map<QUuid, std::map<std::shared_ptr<QNetworkRequest>, double> >
38 m_AcqIdToRequestProgressMap;
28 39 };
29 40
30 41 #endif // SCIQLOP_AMDAPROVIDER_H
@@ -1,168 +1,254
1 1 #include "AmdaProvider.h"
2 2 #include "AmdaDefs.h"
3 3 #include "AmdaResultParser.h"
4 4
5 5 #include <Common/DateUtils.h>
6 6 #include <Data/DataProviderParameters.h>
7 7 #include <Network/NetworkController.h>
8 8 #include <SqpApplication.h>
9 9 #include <Variable/Variable.h>
10 10
11 11 #include <QNetworkAccessManager>
12 12 #include <QNetworkReply>
13 13 #include <QTemporaryFile>
14 14 #include <QThread>
15 15
16 16 Q_LOGGING_CATEGORY(LOG_AmdaProvider, "AmdaProvider")
17 17
18 18 namespace {
19 19
20 20 /// URL format for a request on AMDA server. The parameters are as follows:
21 21 /// - %1: start date
22 22 /// - %2: end date
23 23 /// - %3: parameter id
24 24 const auto AMDA_URL_FORMAT = QStringLiteral(
25 25 "http://amda.irap.omp.eu/php/rest/"
26 26 "getParameter.php?startTime=%1&stopTime=%2&parameterID=%3&outputFormat=ASCII&"
27 27 "timeFormat=ISO8601&gzip=0");
28 28
29 29 /// Dates format passed in the URL (e.g 2013-09-23T09:00)
30 30 const auto AMDA_TIME_FORMAT = QStringLiteral("yyyy-MM-ddThh:mm:ss");
31 31
32 // struct AmdaProgression {
33 // QUuid acqIdentifier;
34 // std::map<QNetworkRequest, double> m_RequestId;
35 //};
36
32 37 /// Formats a time to a date that can be passed in URL
33 38 QString dateFormat(double sqpRange) noexcept
34 39 {
35 40 auto dateTime = DateUtils::dateTime(sqpRange);
36 41 return dateTime.toString(AMDA_TIME_FORMAT);
37 42 }
38 43
39 44 AmdaResultParser::ValueType valueType(const QString &valueType)
40 45 {
41 46 if (valueType == QStringLiteral("scalar")) {
42 47 return AmdaResultParser::ValueType::SCALAR;
43 48 }
44 49 else if (valueType == QStringLiteral("vector")) {
45 50 return AmdaResultParser::ValueType::VECTOR;
46 51 }
47 52 else {
48 53 return AmdaResultParser::ValueType::UNKNOWN;
49 54 }
50 55 }
51 56
52 57 } // namespace
53 58
54 59 AmdaProvider::AmdaProvider()
55 60 {
56 61 qCDebug(LOG_AmdaProvider()) << tr("AmdaProvider::AmdaProvider") << QThread::currentThread();
57 62 if (auto app = sqpApp) {
58 63 auto &networkController = app->networkController();
59 64 connect(this, SIGNAL(requestConstructed(QNetworkRequest, QUuid,
60 65 std::function<void(QNetworkReply *, QUuid)>)),
61 66 &networkController,
62 67 SLOT(onProcessRequested(QNetworkRequest, QUuid,
63 68 std::function<void(QNetworkReply *, QUuid)>)));
64 69
65 70
66 connect(&sqpApp->networkController(), SIGNAL(replyDownloadProgress(QUuid, double)), this,
67 SIGNAL(dataProvidedProgress(QUuid, double)));
71 connect(&sqpApp->networkController(),
72 SIGNAL(replyDownloadProgress(QUuid, const QNetworkRequest &, double)), this,
73 SLOT(onReplyDownloadProgress(QUuid, const QNetworkRequest &, double)));
68 74 }
69 75 }
70 76
71 77 void AmdaProvider::requestDataLoading(QUuid acqIdentifier, const DataProviderParameters &parameters)
72 78 {
73 79 // NOTE: Try to use multithread if possible
74 80 const auto times = parameters.m_Times;
75 81 const auto data = parameters.m_Data;
76 82 for (const auto &dateTime : qAsConst(times)) {
77 83 this->retrieveData(acqIdentifier, dateTime, data);
78 84
85
79 86 // TORM when AMDA will support quick asynchrone request
80 87 QThread::msleep(1000);
81 88 }
82 89 }
83 90
84 91 void AmdaProvider::requestDataAborting(QUuid acqIdentifier)
85 92 {
86 93 if (auto app = sqpApp) {
87 94 auto &networkController = app->networkController();
88 95 networkController.onReplyCanceled(acqIdentifier);
89 96 }
90 97 }
91 98
99 void AmdaProvider::onReplyDownloadProgress(QUuid acqIdentifier,
100 const QNetworkRequest &networkRequest, double progress)
101 {
102 qCCritical(LOG_AmdaProvider()) << tr("onReplyDownloadProgress") << progress;
103 auto acqIdToRequestProgressMapIt = m_AcqIdToRequestProgressMap.find(acqIdentifier);
104 if (acqIdToRequestProgressMapIt != m_AcqIdToRequestProgressMap.end()) {
105
106 qCCritical(LOG_AmdaProvider()) << tr("1 onReplyDownloadProgress") << progress;
107 auto requestPtr = &networkRequest;
108 auto findRequest
109 = [requestPtr](const auto &entry) { return requestPtr == entry.first.get(); };
110
111 auto &requestProgressMap = acqIdToRequestProgressMapIt->second;
112 auto requestProgressMapEnd = requestProgressMap.end();
113 auto requestProgressMapIt
114 = std::find_if(requestProgressMap.begin(), requestProgressMapEnd, findRequest);
115
116 if (requestProgressMapIt != requestProgressMapEnd) {
117 requestProgressMapIt->second = progress;
118 }
119 else {
120 qCCritical(LOG_AmdaProvider()) << tr("Can't retrieve Request in progress");
121 }
122 }
123
124 acqIdToRequestProgressMapIt = m_AcqIdToRequestProgressMap.find(acqIdentifier);
125 if (acqIdToRequestProgressMapIt != m_AcqIdToRequestProgressMap.end()) {
126 qCCritical(LOG_AmdaProvider()) << tr("2 onReplyDownloadProgress") << progress;
127 double finalProgress = 0.0;
128
129 auto &requestProgressMap = acqIdToRequestProgressMapIt->second;
130 auto fraq = requestProgressMap.size();
131
132 for (auto requestProgress : requestProgressMap) {
133 finalProgress += requestProgress.second;
134 }
135
136 if (fraq > 0) {
137 finalProgress = finalProgress / fraq;
138 }
139
140 qCCritical(LOG_AmdaProvider()) << tr("2 onReplyDownloadProgress") << finalProgress;
141 emit dataProvidedProgress(acqIdentifier, finalProgress);
142 }
143 else {
144 emit dataProvidedProgress(acqIdentifier, 0.0);
145 }
146 }
147
92 148 void AmdaProvider::retrieveData(QUuid token, const SqpRange &dateTime, const QVariantHash &data)
93 149 {
94 150 // Retrieves product ID from data: if the value is invalid, no request is made
95 151 auto productId = data.value(AMDA_XML_ID_KEY).toString();
96 152 if (productId.isNull()) {
97 153 qCCritical(LOG_AmdaProvider()) << tr("Can't retrieve data: unknown product id");
98 154 return;
99 155 }
100 156 qCDebug(LOG_AmdaProvider()) << tr("AmdaProvider::retrieveData") << dateTime;
101 157
102 158 // Retrieves the data type that determines whether the expected format for the result file is
103 159 // scalar, vector...
104 160 auto productValueType = valueType(data.value(AMDA_DATA_TYPE_KEY).toString());
105 161
106 162 // /////////// //
107 163 // Creates URL //
108 164 // /////////// //
109 165
110 166 auto startDate = dateFormat(dateTime.m_TStart);
111 167 auto endDate = dateFormat(dateTime.m_TEnd);
112 168
113 169 auto url = QUrl{QString{AMDA_URL_FORMAT}.arg(startDate, endDate, productId)};
114 170 qCInfo(LOG_AmdaProvider()) << tr("TORM AmdaProvider::retrieveData url:") << url;
115 171 auto tempFile = std::make_shared<QTemporaryFile>();
116 172
117 173 // LAMBDA
118 174 auto httpDownloadFinished = [this, dateTime, tempFile,
119 175 productValueType](QNetworkReply *reply, QUuid dataId) noexcept {
120 176
121 177 // Don't do anything if the reply was abort
122 178 if (reply->error() != QNetworkReply::OperationCanceledError) {
123 179
124 180 if (tempFile) {
125 181 auto replyReadAll = reply->readAll();
126 182 if (!replyReadAll.isEmpty()) {
127 183 tempFile->write(replyReadAll);
128 184 }
129 185 tempFile->close();
130 186
131 187 // Parse results file
132 188 if (auto dataSeries
133 189 = AmdaResultParser::readTxt(tempFile->fileName(), productValueType)) {
134 190 emit dataProvided(dataId, dataSeries, dateTime);
135 191 }
136 192 else {
137 193 /// @todo ALX : debug
138 194 }
139 195 }
196 m_AcqIdToRequestProgressMap.erase(dataId);
140 197 }
141 198
142 199 };
143 200 auto httpFinishedLambda
144 201 = [this, httpDownloadFinished, tempFile](QNetworkReply *reply, QUuid dataId) noexcept {
145 202
146 203 // Don't do anything if the reply was abort
147 204 if (reply->error() != QNetworkReply::OperationCanceledError) {
148 205 auto downloadFileUrl = QUrl{QString{reply->readAll()}};
149 206
150
151 207 qCInfo(LOG_AmdaProvider())
152 208 << tr("TORM AmdaProvider::retrieveData downloadFileUrl:") << downloadFileUrl;
153 209 // Executes request for downloading file //
154 210
155 211 // Creates destination file
156 212 if (tempFile->open()) {
157 213 // Executes request
158 emit requestConstructed(QNetworkRequest{downloadFileUrl}, dataId,
159 httpDownloadFinished);
214 auto request = std::make_shared<QNetworkRequest>(downloadFileUrl);
215 updateRequestProgress(dataId, request, 0.0);
216 emit requestConstructed(*request.get(), dataId, httpDownloadFinished);
160 217 }
161 218 }
219 else {
220 m_AcqIdToRequestProgressMap.erase(dataId);
221 }
162 222 };
163 223
164 224 // //////////////// //
165 225 // Executes request //
166 226 // //////////////// //
167 emit requestConstructed(QNetworkRequest{url}, token, httpFinishedLambda);
227
228 auto request = std::make_shared<QNetworkRequest>(url);
229 updateRequestProgress(token, request, 0.0);
230
231 emit requestConstructed(*request.get(), token, httpFinishedLambda);
232 }
233
234 void AmdaProvider::updateRequestProgress(QUuid acqIdentifier,
235 std::shared_ptr<QNetworkRequest> request, double progress)
236 {
237 auto acqIdToRequestProgressMapIt = m_AcqIdToRequestProgressMap.find(acqIdentifier);
238 if (acqIdToRequestProgressMapIt != m_AcqIdToRequestProgressMap.end()) {
239 auto &requestProgressMap = acqIdToRequestProgressMapIt->second;
240 auto requestProgressMapIt = requestProgressMap.find(request);
241 if (requestProgressMapIt != requestProgressMap.end()) {
242 requestProgressMapIt->second = progress;
243 }
244 else {
245 acqIdToRequestProgressMapIt->second.insert(std::make_pair(request, progress));
246 }
247 }
248 else {
249 auto requestProgressMap = std::map<std::shared_ptr<QNetworkRequest>, double>{};
250 requestProgressMap.insert(std::make_pair(request, progress));
251 m_AcqIdToRequestProgressMap.insert(
252 std::make_pair(acqIdentifier, std::move(requestProgressMap)));
253 }
168 254 }
@@ -1,103 +1,108
1 1 #include "CosinusProvider.h"
2 2
3 3 #include <Data/DataProviderParameters.h>
4 4 #include <Data/ScalarSeries.h>
5 5
6 6 #include <cmath>
7 7
8 8 #include <QFuture>
9 9 #include <QThread>
10 10 #include <QtConcurrent/QtConcurrent>
11 11
12 12 Q_LOGGING_CATEGORY(LOG_CosinusProvider, "CosinusProvider")
13 13
14 14 std::shared_ptr<IDataSeries> CosinusProvider::retrieveData(QUuid acqIdentifier,
15 15 const SqpRange &dataRangeRequested)
16 16 {
17 17 // TODO: Add Mutex
18 18 auto dataIndex = 0;
19 19
20 20 // Gets the timerange from the parameters
21 21 double freq = 1.0;
22 22 double start = std::ceil(dataRangeRequested.m_TStart * freq); // 100 htz
23 23 double end = std::floor(dataRangeRequested.m_TEnd * freq); // 100 htz
24 24
25 25 // We assure that timerange is valid
26 26 if (end < start) {
27 27 std::swap(start, end);
28 28 }
29 29
30 30 // Generates scalar series containing cosinus values (one value per second)
31 31 auto dataCount = end - start;
32 32
33 33 auto xAxisData = QVector<double>{};
34 34 xAxisData.resize(dataCount);
35 35
36 36 auto valuesData = QVector<double>{};
37 37 valuesData.resize(dataCount);
38 38
39 39 int progress = 0;
40 40 auto progressEnd = dataCount;
41 41 for (auto time = start; time < end; ++time, ++dataIndex) {
42 42 auto it = m_VariableToEnableProvider.find(acqIdentifier);
43 43 if (it != m_VariableToEnableProvider.end() && it.value()) {
44 44 const auto timeOnFreq = time / freq;
45 45
46 46 xAxisData.replace(dataIndex, timeOnFreq);
47 47 valuesData.replace(dataIndex, std::cos(timeOnFreq));
48 48
49 49 // progression
50 50 int currentProgress = (time - start) * 100.0 / progressEnd;
51 51 if (currentProgress != progress) {
52 52 progress = currentProgress;
53 53
54 54 emit dataProvidedProgress(acqIdentifier, progress);
55 qCInfo(LOG_CosinusProvider()) << "TORM: CosinusProvider::retrieveData"
56 << QThread::currentThread()->objectName() << progress;
57 // NOTE: Try to use multithread if possible
55 58 }
56 59 }
57 60 else {
58 61 if (!it.value()) {
59 62 qCDebug(LOG_CosinusProvider())
60 63 << "CosinusProvider::retrieveData: ARRET De l'acquisition detectΓ©"
61 64 << end - time;
62 65 }
63 66 }
64 67 }
65 emit dataProvidedProgress(acqIdentifier, 0.0);
66
68 if (progress != 100) {
69 // We can close progression beacause all data has been retrieved
70 emit dataProvidedProgress(acqIdentifier, 100);
71 }
67 72 return std::make_shared<ScalarSeries>(std::move(xAxisData), std::move(valuesData),
68 73 Unit{QStringLiteral("t"), true}, Unit{});
69 74 }
70 75
71 76 void CosinusProvider::requestDataLoading(QUuid acqIdentifier,
72 77 const DataProviderParameters &parameters)
73 78 {
74 79 // TODO: Add Mutex
75 80 m_VariableToEnableProvider[acqIdentifier] = true;
76 81 qCDebug(LOG_CosinusProvider()) << "TORM: CosinusProvider::requestDataLoading"
77 82 << QThread::currentThread()->objectName();
78 83 // NOTE: Try to use multithread if possible
79 84 const auto times = parameters.m_Times;
80 85
81 86 for (const auto &dateTime : qAsConst(times)) {
82 87 if (m_VariableToEnableProvider[acqIdentifier]) {
83 88 auto scalarSeries = this->retrieveData(acqIdentifier, dateTime);
84 89 qCDebug(LOG_CosinusProvider()) << "TORM: CosinusProvider::dataProvided";
85 90 emit dataProvided(acqIdentifier, scalarSeries, dateTime);
86 91 }
87 92 }
88 93 }
89 94
90 95 void CosinusProvider::requestDataAborting(QUuid acqIdentifier)
91 96 {
92 97 // TODO: Add Mutex
93 98 qCDebug(LOG_CosinusProvider()) << "CosinusProvider::requestDataAborting" << acqIdentifier
94 99 << QThread::currentThread()->objectName();
95 100 auto it = m_VariableToEnableProvider.find(acqIdentifier);
96 101 if (it != m_VariableToEnableProvider.end()) {
97 102 it.value() = false;
98 103 }
99 104 else {
100 105 qCWarning(LOG_CosinusProvider())
101 106 << tr("Aborting progression of inexistant identifier detected !!!");
102 107 }
103 108 }
General Comments 0
You need to be logged in to leave comments. Login now