##// END OF EJS Templates
Implementation of progression
perrinel -
r750:f189aafd213c
parent child
Show More
@@ -1,38 +1,40
1 1 #ifndef SCIQLOP_ACQUISITIONREQUEST_H
2 2 #define SCIQLOP_ACQUISITIONREQUEST_H
3 3
4 4 #include <QObject>
5 5
6 6 #include <QUuid>
7 7
8 8 #include <Common/DateUtils.h>
9 9 #include <Common/MetaTypes.h>
10 10 #include <Data/DataProviderParameters.h>
11 11 #include <Data/IDataProvider.h>
12 12 #include <Data/SqpRange.h>
13 13
14 14 #include <memory>
15 15
16 16 /**
17 17 * @brief The AcquisitionRequest struct holds the information of an variable request
18 18 */
19 19 struct AcquisitionRequest {
20 20 AcquisitionRequest()
21 21 {
22 22 m_AcqIdentifier = QUuid::createUuid();
23 23 m_Size = 0;
24 m_Progression = 0;
24 25 }
25 26
26 27 QUuid m_VarRequestId;
27 28 QUuid m_AcqIdentifier;
28 29 QUuid m_vIdentifier;
29 30 DataProviderParameters m_DataProviderParameters;
30 31 SqpRange m_RangeRequested;
31 32 SqpRange m_CacheRangeRequested;
32 33 int m_Size;
34 int m_Progression;
33 35 std::shared_ptr<IDataProvider> m_Provider;
34 36 };
35 37
36 38 SCIQLOP_REGISTER_META_TYPE(ACQUISITIONREQUEST_REGISTRY, AcquisitionRequest)
37 39
38 40 #endif // SCIQLOP_ACQUISITIONREQUEST_H
@@ -1,48 +1,49
1 1 #ifndef SCIQLOP_NETWORKCONTROLLER_H
2 2 #define SCIQLOP_NETWORKCONTROLLER_H
3 3
4 4 #include "CoreGlobal.h"
5 5
6 6 #include <QLoggingCategory>
7 7 #include <QObject>
8 8 #include <QUuid>
9 9
10 10 #include <Common/spimpl.h>
11 11 #include <functional>
12 12
13 13 Q_DECLARE_LOGGING_CATEGORY(LOG_NetworkController)
14 14
15 15 class QNetworkReply;
16 16 class QNetworkRequest;
17 17
18 18 /**
19 19 * @brief The NetworkController class aims to handle all network connection of SciQlop.
20 20 */
21 21 class SCIQLOP_CORE_EXPORT NetworkController : public QObject {
22 22 Q_OBJECT
23 23 public:
24 24 explicit NetworkController(QObject *parent = 0);
25 25
26 26 void initialize();
27 27 void finalize();
28 28
29 29 public slots:
30 30 /// Execute request and call callback when the reply is finished. Identifier is attached to the
31 31 /// callback
32 32 void onProcessRequested(const QNetworkRequest &request, QUuid identifier,
33 33 std::function<void(QNetworkReply *, QUuid)> callback);
34 34 /// Cancel the request of identifier
35 35 void onReplyCanceled(QUuid identifier);
36 36
37 37 signals:
38 38 void replyFinished(QNetworkReply *reply, QUuid identifier);
39 void replyDownloadProgress(QUuid identifier, double progress);
39 void replyDownloadProgress(QUuid identifier, const QNetworkRequest &networkRequest,
40 double progress);
40 41
41 42 private:
42 43 void waitForFinish();
43 44
44 45 class NetworkControllerPrivate;
45 46 spimpl::unique_impl_ptr<NetworkControllerPrivate> impl;
46 47 };
47 48
48 49 #endif // SCIQLOP_NETWORKCONTROLLER_H
@@ -1,134 +1,134
1 1 #include "Network/NetworkController.h"
2 2
3 3 #include <QMutex>
4 4 #include <QNetworkAccessManager>
5 5 #include <QNetworkReply>
6 6 #include <QNetworkRequest>
7 7 #include <QReadWriteLock>
8 8 #include <QThread>
9 9
10 10 #include <unordered_map>
11 11
12 12 Q_LOGGING_CATEGORY(LOG_NetworkController, "NetworkController")
13 13
14 14 struct NetworkController::NetworkControllerPrivate {
15 15 explicit NetworkControllerPrivate(NetworkController *parent) : m_WorkingMutex{} {}
16 16
17 17 void lockRead() { m_Lock.lockForRead(); }
18 18 void lockWrite() { m_Lock.lockForWrite(); }
19 19 void unlock() { m_Lock.unlock(); }
20 20
21 21 QMutex m_WorkingMutex;
22 22
23 23 QReadWriteLock m_Lock;
24 24 std::unordered_map<QNetworkReply *, QUuid> m_NetworkReplyToVariableId;
25 25 std::unique_ptr<QNetworkAccessManager> m_AccessManager{nullptr};
26 26 };
27 27
28 28 NetworkController::NetworkController(QObject *parent)
29 29 : QObject(parent), impl{spimpl::make_unique_impl<NetworkControllerPrivate>(this)}
30 30 {
31 31 }
32 32
33 33 void NetworkController::onProcessRequested(const QNetworkRequest &request, QUuid identifier,
34 34 std::function<void(QNetworkReply *, QUuid)> callback)
35 35 {
36 36 qCDebug(LOG_NetworkController()) << tr("NetworkController registered")
37 37 << QThread::currentThread()->objectName();
38 38 auto reply = impl->m_AccessManager->get(request);
39 39
40 40 // Store the couple reply id
41 41 impl->lockWrite();
42 42 impl->m_NetworkReplyToVariableId[reply] = identifier;
43 43 impl->unlock();
44 44
45 auto onReplyFinished = [reply, this, identifier, callback]() {
45 auto onReplyFinished = [request, reply, this, identifier, callback]() {
46 46
47 47 qCDebug(LOG_NetworkController()) << tr("NetworkController onReplyFinished")
48 48 << QThread::currentThread() << reply;
49 49 impl->lockRead();
50 50 auto it = impl->m_NetworkReplyToVariableId.find(reply);
51 51 impl->unlock();
52 52 if (it != impl->m_NetworkReplyToVariableId.cend()) {
53 53 impl->lockWrite();
54 54 impl->m_NetworkReplyToVariableId.erase(reply);
55 55 impl->unlock();
56 56 // Deletes reply
57 57 callback(reply, identifier);
58 58 reply->deleteLater();
59 59
60 emit this->replyDownloadProgress(identifier, 0);
60 emit this->replyDownloadProgress(identifier, request, 0);
61 61 }
62 62
63 63 qCDebug(LOG_NetworkController()) << tr("NetworkController onReplyFinished END")
64 64 << QThread::currentThread() << reply;
65 65 };
66 66
67 auto onReplyProgress = [reply, this](qint64 bytesRead, qint64 totalBytes) {
67 auto onReplyProgress = [reply, request, this](qint64 bytesRead, qint64 totalBytes) {
68 68
69 69 double progress = (bytesRead * 100.0) / totalBytes;
70 70 qCDebug(LOG_NetworkController()) << tr("NetworkController onReplyProgress") << progress
71 71 << QThread::currentThread() << reply;
72 72 impl->lockRead();
73 73 auto it = impl->m_NetworkReplyToVariableId.find(reply);
74 74 impl->unlock();
75 75 if (it != impl->m_NetworkReplyToVariableId.cend()) {
76 emit this->replyDownloadProgress(it->second, progress);
76 emit this->replyDownloadProgress(it->second, request, progress);
77 77 }
78 78 qCDebug(LOG_NetworkController()) << tr("NetworkController onReplyProgress END")
79 79 << QThread::currentThread() << reply;
80 80 };
81 81
82 82
83 83 connect(reply, &QNetworkReply::finished, this, onReplyFinished);
84 84 connect(reply, &QNetworkReply::downloadProgress, this, onReplyProgress);
85 85 qCDebug(LOG_NetworkController()) << tr("NetworkController registered END")
86 86 << QThread::currentThread()->objectName() << reply;
87 87 }
88 88
89 89 void NetworkController::initialize()
90 90 {
91 91 qCDebug(LOG_NetworkController()) << tr("NetworkController init") << QThread::currentThread();
92 92 impl->m_WorkingMutex.lock();
93 93 impl->m_AccessManager = std::make_unique<QNetworkAccessManager>();
94 94
95 95
96 96 auto onReplyErrors = [this](QNetworkReply *reply, const QList<QSslError> &errors) {
97 97
98 98 qCCritical(LOG_NetworkController()) << tr("NetworkAcessManager errors: ") << errors;
99 99
100 100 };
101 101
102 102
103 103 connect(impl->m_AccessManager.get(), &QNetworkAccessManager::sslErrors, this, onReplyErrors);
104 104
105 105 qCDebug(LOG_NetworkController()) << tr("NetworkController init END");
106 106 }
107 107
108 108 void NetworkController::finalize()
109 109 {
110 110 impl->m_WorkingMutex.unlock();
111 111 }
112 112
113 113 void NetworkController::onReplyCanceled(QUuid identifier)
114 114 {
115 115 auto findReply = [identifier](const auto &entry) { return identifier == entry.second; };
116 116 qCDebug(LOG_NetworkController()) << tr("NetworkController onReplyCanceled")
117 117 << QThread::currentThread();
118 118
119 119
120 120 impl->lockRead();
121 121 auto end = impl->m_NetworkReplyToVariableId.cend();
122 122 auto it = std::find_if(impl->m_NetworkReplyToVariableId.cbegin(), end, findReply);
123 123 impl->unlock();
124 124 if (it != end) {
125 125 it->first->abort();
126 126 }
127 127 qCDebug(LOG_NetworkController()) << tr("NetworkController onReplyCanceled END")
128 128 << QThread::currentThread();
129 129 }
130 130
131 131 void NetworkController::waitForFinish()
132 132 {
133 133 QMutexLocker locker{&impl->m_WorkingMutex};
134 134 }
@@ -1,238 +1,287
1 1 #include "Variable/VariableAcquisitionWorker.h"
2 2
3 3 #include "Variable/Variable.h"
4 4
5 5 #include <Data/AcquisitionRequest.h>
6 6 #include <Data/SqpRange.h>
7 7
8 8 #include <unordered_map>
9 9 #include <utility>
10 10
11 11 #include <QMutex>
12 12 #include <QReadWriteLock>
13 13 #include <QThread>
14 14
15 #include <cmath>
16
15 17 Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker")
16 18
17 19 struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate {
18 20
19 21 explicit VariableAcquisitionWorkerPrivate() : m_Lock{QReadWriteLock::Recursive} {}
20 22
21 23 void lockRead() { m_Lock.lockForRead(); }
22 24 void lockWrite() { m_Lock.lockForWrite(); }
23 25 void unlock() { m_Lock.unlock(); }
24 26
25 27 void removeVariableRequest(QUuid vIdentifier);
26 28
27 29 QMutex m_WorkingMutex;
28 30 QReadWriteLock m_Lock;
29 31
30 32 std::map<QUuid, QVector<AcquisitionDataPacket> > m_AcqIdentifierToAcqDataPacketVectorMap;
31 33 std::map<QUuid, AcquisitionRequest> m_AcqIdentifierToAcqRequestMap;
32 34 std::map<QUuid, std::pair<QUuid, QUuid> > m_VIdentifierToCurrrentAcqIdNextIdPairMap;
33 35 };
34 36
35 37
36 38 VariableAcquisitionWorker::VariableAcquisitionWorker(QObject *parent)
37 39 : QObject{parent}, impl{spimpl::make_unique_impl<VariableAcquisitionWorkerPrivate>()}
38 40 {
39 41 }
40 42
41 43 VariableAcquisitionWorker::~VariableAcquisitionWorker()
42 44 {
43 45 qCInfo(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker destruction")
44 46 << QThread::currentThread();
45 47 this->waitForFinish();
46 48 }
47 49
48 50
49 51 QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid vIdentifier,
50 52 SqpRange rangeRequested,
51 53 SqpRange cacheRangeRequested,
52 54 DataProviderParameters parameters,
53 55 std::shared_ptr<IDataProvider> provider)
54 56 {
55 57 qCDebug(LOG_VariableAcquisitionWorker())
56 58 << tr("TORM VariableAcquisitionWorker::pushVariableRequest ") << cacheRangeRequested;
57 59 auto varRequestIdCanceled = QUuid();
58 60
59 61 // Request creation
60 62 auto acqRequest = AcquisitionRequest{};
61 63 acqRequest.m_VarRequestId = varRequestId;
62 64 acqRequest.m_vIdentifier = vIdentifier;
63 65 acqRequest.m_DataProviderParameters = parameters;
64 66 acqRequest.m_RangeRequested = rangeRequested;
65 67 acqRequest.m_CacheRangeRequested = cacheRangeRequested;
66 68 acqRequest.m_Size = parameters.m_Times.size();
67 69 acqRequest.m_Provider = provider;
68 70
69 71
70 72 // Register request
71 73 impl->lockWrite();
72 74 impl->m_AcqIdentifierToAcqRequestMap.insert(
73 75 std::make_pair(acqRequest.m_AcqIdentifier, acqRequest));
74 76
75 77 auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
76 78 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
77 79 // A current request already exists, we can replace the next one
78 80 auto nextAcqId = it->second.second;
79 81 auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(nextAcqId);
80 82 if (acqIdentifierToAcqRequestMapIt != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
81 83 auto request = acqIdentifierToAcqRequestMapIt->second;
82 84 varRequestIdCanceled = request.m_VarRequestId;
83 85 }
84 86
85 87 it->second.second = acqRequest.m_AcqIdentifier;
86 88 impl->unlock();
87 89 }
88 90 else {
89 91 // First request for the variable, it must be stored and executed
90 92 impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.insert(
91 93 std::make_pair(vIdentifier, std::make_pair(acqRequest.m_AcqIdentifier, QUuid())));
92 94 impl->unlock();
93 95
94 96 QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
95 97 Q_ARG(QUuid, acqRequest.m_AcqIdentifier));
96 98 }
97 99
98 100 return varRequestIdCanceled;
99 101 }
100 102
101 103 void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier)
102 104 {
103 105 // TODO
104 106 }
105 107
106 108 void VariableAcquisitionWorker::onVariableRetrieveDataInProgress(QUuid acqIdentifier,
107 109 double progress)
108 110 {
109 // TODO
111 impl->lockRead();
112 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
113 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
114 auto currentPartSize = (aIdToARit->second.m_Size != 0) ? 100 / aIdToARit->second.m_Size : 0;
115
116 auto currentPartProgress
117 = std::isnan(progress) ? 0.0 : (progress * currentPartSize) / 100.0;
118 auto currentAlreadyProgress = aIdToARit->second.m_Progression * currentPartSize;
119
120 qCInfo(LOG_VariableAcquisitionWorker()) << tr("TORM: progress :") << progress;
121 qCInfo(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableRetrieveDataInProgress A:")
122 << aIdToARit->second.m_Progression
123 << aIdToARit->second.m_Size;
124 qCInfo(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableRetrieveDataInProgress B:")
125 << currentPartSize;
126 qCInfo(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableRetrieveDataInProgress C:")
127 << currentPartProgress;
128 qCInfo(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableRetrieveDataInProgress D:")
129 << currentAlreadyProgress;
130 qCInfo(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableRetrieveDataInProgress E:")
131 << currentAlreadyProgress + currentPartProgress
132 << "\n";
133
134 auto finalProgression = currentAlreadyProgress + currentPartProgress;
135 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, finalProgression);
136
137 if (finalProgression == 100.0) {
138 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, 0.0);
139 }
140 }
141 impl->unlock();
110 142 }
111 143
112 144 void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier,
113 145 std::shared_ptr<IDataSeries> dataSeries,
114 146 SqpRange dataRangeAcquired)
115 147 {
116 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableDataAcquired on range ")
117 << acqIdentifier << dataRangeAcquired;
148 qCInfo(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableDataAcquired on range ")
149 << acqIdentifier << dataRangeAcquired;
118 150 impl->lockWrite();
119 151 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
120 152 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
121 153 // Store the result
122 154 auto dataPacket = AcquisitionDataPacket{};
123 155 dataPacket.m_Range = dataRangeAcquired;
124 156 dataPacket.m_DateSeries = dataSeries;
125 157
126 158 auto aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
127 159 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
128 160 // A current request result already exists, we can update it
129 161 aIdToADPVit->second.push_back(dataPacket);
130 162 }
131 163 else {
132 164 // First request result for the variable, it must be stored
133 165 impl->m_AcqIdentifierToAcqDataPacketVectorMap.insert(
134 166 std::make_pair(acqIdentifier, QVector<AcquisitionDataPacket>() << dataPacket));
135 167 }
136 168
137 169
138 170 // Decrement the counter of the request
139 171 auto &acqRequest = aIdToARit->second;
140 acqRequest.m_Size = acqRequest.m_Size - 1;
172 acqRequest.m_Progression = acqRequest.m_Progression + 1;
141 173
142 174 // if the counter is 0, we can return data then run the next request if it exists and
143 175 // removed the finished request
144 if (acqRequest.m_Size == 0) {
176 if (acqRequest.m_Size == acqRequest.m_Progression) {
145 177 // Return the data
146 178 aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
147 179 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
148 180 emit dataProvided(acqRequest.m_vIdentifier, acqRequest.m_RangeRequested,
149 181 acqRequest.m_CacheRangeRequested, aIdToADPVit->second);
150 182 }
151 183
152 184 // Execute the next one
153 185 auto it
154 186 = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(acqRequest.m_vIdentifier);
155 187
156 188 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
157 189 if (it->second.second.isNull()) {
158 190 // There is no next request, we can remove the variable request
159 191 impl->removeVariableRequest(acqRequest.m_vIdentifier);
160 192 }
161 193 else {
162 194 auto acqIdentifierToRemove = it->second.first;
163 195 // Move the next request to the current request
164 196 it->second.first = it->second.second;
165 197 it->second.second = QUuid();
166 198 // Remove AcquisitionRequest and results;
167 199 impl->m_AcqIdentifierToAcqRequestMap.erase(acqIdentifierToRemove);
168 200 impl->m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqIdentifierToRemove);
169 201 // Execute the current request
170 202 QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
171 203 Q_ARG(QUuid, it->second.first));
172 204 }
173 205 }
174 206 else {
175 207 qCCritical(LOG_VariableAcquisitionWorker())
176 208 << tr("Impossible to execute the acquisition on an unfound variable ");
177 209 }
178 210 }
179 211 }
180 212 else {
181 213 qCCritical(LOG_VariableAcquisitionWorker())
182 214 << tr("Impossible to retrieve AcquisitionRequest for the incoming data");
183 215 }
184 216 impl->unlock();
185 217 }
186 218
219 void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier)
220 {
221 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onExecuteRequest") << QThread::currentThread();
222 impl->lockRead();
223 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
224 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
225 auto request = it->second;
226 impl->unlock();
227 emit variableRequestInProgress(request.m_vIdentifier, 0.1);
228 request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters);
229 }
230 else {
231 impl->unlock();
232 // TODO log no acqIdentifier recognized
233 }
234 }
235
187 236 void VariableAcquisitionWorker::initialize()
188 237 {
189 238 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init")
190 239 << QThread::currentThread();
191 240 impl->m_WorkingMutex.lock();
192 241 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init END");
193 242 }
194 243
195 244 void VariableAcquisitionWorker::finalize()
196 245 {
197 246 impl->m_WorkingMutex.unlock();
198 247 }
199 248
200 249 void VariableAcquisitionWorker::waitForFinish()
201 250 {
202 251 QMutexLocker locker{&impl->m_WorkingMutex};
203 252 }
204 253
205 254 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariableRequest(
206 255 QUuid vIdentifier)
207 256 {
208 257 lockWrite();
209 258 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
210 259
211 260 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
212 261 // A current request already exists, we can replace the next one
213 262
214 263 m_AcqIdentifierToAcqRequestMap.erase(it->second.first);
215 264 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.first);
216 265
217 266 m_AcqIdentifierToAcqRequestMap.erase(it->second.second);
218 267 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.second);
219 268 }
220 269 m_VIdentifierToCurrrentAcqIdNextIdPairMap.erase(vIdentifier);
221 270 unlock();
222 271 }
223 272
224 void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier)
225 {
226 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onExecuteRequest") << QThread::currentThread();
227 impl->lockRead();
228 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
229 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
230 auto request = it->second;
231 impl->unlock();
232 request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters);
233 }
234 else {
235 impl->unlock();
236 // TODO log no acqIdentifier recognized
237 }
238 }
273 //void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier)
274 //{
275 // qCDebug(LOG_VariableAcquisitionWorker()) << tr("onExecuteRequest") << QThread::currentThread();
276 // impl->lockRead();
277 // auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
278 // if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
279 // auto request = it->second;
280 // impl->unlock();
281 // request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters);
282 // }
283 // else {
284 // impl->unlock();
285 // // TODO log no acqIdentifier recognized
286 // }
287 //}
@@ -1,805 +1,807
1 1 #include <Variable/Variable.h>
2 2 #include <Variable/VariableAcquisitionWorker.h>
3 3 #include <Variable/VariableCacheStrategy.h>
4 4 #include <Variable/VariableController.h>
5 5 #include <Variable/VariableModel.h>
6 6 #include <Variable/VariableSynchronizationGroup.h>
7 7
8 8 #include <Data/DataProviderParameters.h>
9 9 #include <Data/IDataProvider.h>
10 10 #include <Data/IDataSeries.h>
11 11 #include <Data/VariableRequest.h>
12 12 #include <Time/TimeController.h>
13 13
14 14 #include <QMutex>
15 15 #include <QThread>
16 16 #include <QUuid>
17 17 #include <QtCore/QItemSelectionModel>
18 18
19 19 #include <deque>
20 20 #include <set>
21 21 #include <unordered_map>
22 22
23 23 Q_LOGGING_CATEGORY(LOG_VariableController, "VariableController")
24 24
25 25 namespace {
26 26
27 27 SqpRange computeSynchroRangeRequested(const SqpRange &varRange, const SqpRange &graphRange,
28 28 const SqpRange &oldGraphRange)
29 29 {
30 30 auto zoomType = VariableController::getZoomType(graphRange, oldGraphRange);
31 31
32 32 auto varRangeRequested = varRange;
33 33 switch (zoomType) {
34 34 case AcquisitionZoomType::ZoomIn: {
35 35 auto deltaLeft = graphRange.m_TStart - oldGraphRange.m_TStart;
36 36 auto deltaRight = oldGraphRange.m_TEnd - graphRange.m_TEnd;
37 37 varRangeRequested.m_TStart += deltaLeft;
38 38 varRangeRequested.m_TEnd -= deltaRight;
39 39 break;
40 40 }
41 41
42 42 case AcquisitionZoomType::ZoomOut: {
43 43 auto deltaLeft = oldGraphRange.m_TStart - graphRange.m_TStart;
44 44 auto deltaRight = graphRange.m_TEnd - oldGraphRange.m_TEnd;
45 45 varRangeRequested.m_TStart -= deltaLeft;
46 46 varRangeRequested.m_TEnd += deltaRight;
47 47 break;
48 48 }
49 49 case AcquisitionZoomType::PanRight: {
50 50 auto deltaRight = graphRange.m_TEnd - oldGraphRange.m_TEnd;
51 51 varRangeRequested.m_TStart += deltaRight;
52 52 varRangeRequested.m_TEnd += deltaRight;
53 53 break;
54 54 }
55 55 case AcquisitionZoomType::PanLeft: {
56 56 auto deltaLeft = oldGraphRange.m_TStart - graphRange.m_TStart;
57 57 varRangeRequested.m_TStart -= deltaLeft;
58 58 varRangeRequested.m_TEnd -= deltaLeft;
59 59 break;
60 60 }
61 61 case AcquisitionZoomType::Unknown: {
62 62 qCCritical(LOG_VariableController())
63 63 << VariableController::tr("Impossible to synchronize: zoom type unknown");
64 64 break;
65 65 }
66 66 default:
67 67 qCCritical(LOG_VariableController()) << VariableController::tr(
68 68 "Impossible to synchronize: zoom type not take into account");
69 69 // No action
70 70 break;
71 71 }
72 72
73 73 return varRangeRequested;
74 74 }
75 75 }
76 76
77 77 struct VariableController::VariableControllerPrivate {
78 78 explicit VariableControllerPrivate(VariableController *parent)
79 79 : m_WorkingMutex{},
80 80 m_VariableModel{new VariableModel{parent}},
81 81 m_VariableSelectionModel{new QItemSelectionModel{m_VariableModel, parent}},
82 82 m_VariableCacheStrategy{std::make_unique<VariableCacheStrategy>()},
83 83 m_VariableAcquisitionWorker{std::make_unique<VariableAcquisitionWorker>()},
84 84 q{parent}
85 85 {
86 86
87 87 m_VariableAcquisitionWorker->moveToThread(&m_VariableAcquisitionWorkerThread);
88 88 m_VariableAcquisitionWorkerThread.setObjectName("VariableAcquisitionWorkerThread");
89 89 }
90 90
91 91
92 92 virtual ~VariableControllerPrivate()
93 93 {
94 94 qCDebug(LOG_VariableController()) << tr("VariableControllerPrivate destruction");
95 95 m_VariableAcquisitionWorkerThread.quit();
96 96 m_VariableAcquisitionWorkerThread.wait();
97 97 }
98 98
99 99
100 100 void processRequest(std::shared_ptr<Variable> var, const SqpRange &rangeRequested,
101 101 QUuid varRequestId);
102 102
103 103 QVector<SqpRange> provideNotInCacheDateTimeList(std::shared_ptr<Variable> variable,
104 104 const SqpRange &dateTime);
105 105
106 106 std::shared_ptr<Variable> findVariable(QUuid vIdentifier);
107 107 std::shared_ptr<IDataSeries>
108 108 retrieveDataSeries(const QVector<AcquisitionDataPacket> acqDataPacketVector);
109 109
110 110 void registerProvider(std::shared_ptr<IDataProvider> provider);
111 111
112 112 void storeVariableRequest(QUuid varId, QUuid varRequestId, const VariableRequest &varRequest);
113 113 QUuid acceptVariableRequest(QUuid varId, std::shared_ptr<IDataSeries> dataSeries);
114 114 void updateVariableRequest(QUuid varRequestId);
115 115 void cancelVariableRequest(QUuid varRequestId);
116 116
117 117 QMutex m_WorkingMutex;
118 118 /// Variable model. The VariableController has the ownership
119 119 VariableModel *m_VariableModel;
120 120 QItemSelectionModel *m_VariableSelectionModel;
121 121
122 122
123 123 TimeController *m_TimeController{nullptr};
124 124 std::unique_ptr<VariableCacheStrategy> m_VariableCacheStrategy;
125 125 std::unique_ptr<VariableAcquisitionWorker> m_VariableAcquisitionWorker;
126 126 QThread m_VariableAcquisitionWorkerThread;
127 127
128 128 std::unordered_map<std::shared_ptr<Variable>, std::shared_ptr<IDataProvider> >
129 129 m_VariableToProviderMap;
130 130 std::unordered_map<std::shared_ptr<Variable>, QUuid> m_VariableToIdentifierMap;
131 131 std::map<QUuid, std::shared_ptr<VariableSynchronizationGroup> >
132 132 m_GroupIdToVariableSynchronizationGroupMap;
133 133 std::map<QUuid, QUuid> m_VariableIdGroupIdMap;
134 134 std::set<std::shared_ptr<IDataProvider> > m_ProviderSet;
135 135
136 136 std::map<QUuid, std::map<QUuid, VariableRequest> > m_VarRequestIdToVarIdVarRequestMap;
137 137
138 138 std::map<QUuid, std::deque<QUuid> > m_VarIdToVarRequestIdQueueMap;
139 139
140 140
141 141 VariableController *q;
142 142 };
143 143
144 144
145 145 VariableController::VariableController(QObject *parent)
146 146 : QObject{parent}, impl{spimpl::make_unique_impl<VariableControllerPrivate>(this)}
147 147 {
148 148 qCDebug(LOG_VariableController()) << tr("VariableController construction")
149 149 << QThread::currentThread();
150 150
151 151 connect(impl->m_VariableModel, &VariableModel::abortProgessRequested, this,
152 152 &VariableController::onAbortProgressRequested);
153 153
154 154 connect(impl->m_VariableAcquisitionWorker.get(), &VariableAcquisitionWorker::dataProvided, this,
155 155 &VariableController::onDataProvided);
156 156 connect(impl->m_VariableAcquisitionWorker.get(),
157 157 &VariableAcquisitionWorker::variableRequestInProgress, this,
158 158 &VariableController::onVariableRetrieveDataInProgress);
159 159
160 160 connect(&impl->m_VariableAcquisitionWorkerThread, &QThread::started,
161 161 impl->m_VariableAcquisitionWorker.get(), &VariableAcquisitionWorker::initialize);
162 162 connect(&impl->m_VariableAcquisitionWorkerThread, &QThread::finished,
163 163 impl->m_VariableAcquisitionWorker.get(), &VariableAcquisitionWorker::finalize);
164 164
165 165
166 166 impl->m_VariableAcquisitionWorkerThread.start();
167 167 }
168 168
169 169 VariableController::~VariableController()
170 170 {
171 171 qCDebug(LOG_VariableController()) << tr("VariableController destruction")
172 172 << QThread::currentThread();
173 173 this->waitForFinish();
174 174 }
175 175
176 176 VariableModel *VariableController::variableModel() noexcept
177 177 {
178 178 return impl->m_VariableModel;
179 179 }
180 180
181 181 QItemSelectionModel *VariableController::variableSelectionModel() noexcept
182 182 {
183 183 return impl->m_VariableSelectionModel;
184 184 }
185 185
186 186 void VariableController::setTimeController(TimeController *timeController) noexcept
187 187 {
188 188 impl->m_TimeController = timeController;
189 189 }
190 190
191 191 std::shared_ptr<Variable>
192 192 VariableController::cloneVariable(std::shared_ptr<Variable> variable) noexcept
193 193 {
194 194 if (impl->m_VariableModel->containsVariable(variable)) {
195 195 // Clones variable
196 196 auto duplicate = variable->clone();
197 197
198 198 // Adds clone to model
199 199 impl->m_VariableModel->addVariable(duplicate);
200 200
201 201 // Generates clone identifier
202 202 impl->m_VariableToIdentifierMap[duplicate] = QUuid::createUuid();
203 203
204 204 // Registers provider
205 205 auto variableProvider = impl->m_VariableToProviderMap.at(variable);
206 206 auto duplicateProvider = variableProvider != nullptr ? variableProvider->clone() : nullptr;
207 207
208 208 impl->m_VariableToProviderMap[duplicate] = duplicateProvider;
209 209 if (duplicateProvider) {
210 210 impl->registerProvider(duplicateProvider);
211 211 }
212 212
213 213 return duplicate;
214 214 }
215 215 else {
216 216 qCCritical(LOG_VariableController())
217 217 << tr("Can't create duplicate of variable %1: variable not registered in the model")
218 218 .arg(variable->name());
219 219 return nullptr;
220 220 }
221 221 }
222 222
223 223 void VariableController::deleteVariable(std::shared_ptr<Variable> variable) noexcept
224 224 {
225 225 if (!variable) {
226 226 qCCritical(LOG_VariableController()) << "Can't delete variable: variable is null";
227 227 return;
228 228 }
229 229
230 230 // Spreads in SciQlop that the variable will be deleted, so that potential receivers can
231 231 // make some treatments before the deletion
232 232 emit variableAboutToBeDeleted(variable);
233 233
234 234 // Deletes identifier
235 235 impl->m_VariableToIdentifierMap.erase(variable);
236 236
237 237 // Deletes provider
238 238 auto nbProvidersDeleted = impl->m_VariableToProviderMap.erase(variable);
239 239 qCDebug(LOG_VariableController())
240 240 << tr("Number of providers deleted for variable %1: %2")
241 241 .arg(variable->name(), QString::number(nbProvidersDeleted));
242 242
243 243
244 244 // Deletes from model
245 245 impl->m_VariableModel->deleteVariable(variable);
246 246 }
247 247
248 248 void VariableController::deleteVariables(
249 249 const QVector<std::shared_ptr<Variable> > &variables) noexcept
250 250 {
251 251 for (auto variable : qAsConst(variables)) {
252 252 deleteVariable(variable);
253 253 }
254 254 }
255 255
256 256 void VariableController::abortProgress(std::shared_ptr<Variable> variable)
257 257 {
258 258 }
259 259
260 260 std::shared_ptr<Variable>
261 261 VariableController::createVariable(const QString &name, const QVariantHash &metadata,
262 262 std::shared_ptr<IDataProvider> provider) noexcept
263 263 {
264 264 if (!impl->m_TimeController) {
265 265 qCCritical(LOG_VariableController())
266 266 << tr("Impossible to create variable: The time controller is null");
267 267 return nullptr;
268 268 }
269 269
270 270 auto range = impl->m_TimeController->dateTime();
271 271
272 272 if (auto newVariable = impl->m_VariableModel->createVariable(name, range, metadata)) {
273 273 auto identifier = QUuid::createUuid();
274 274
275 275 // store the provider
276 276 impl->registerProvider(provider);
277 277
278 278 // Associate the provider
279 279 impl->m_VariableToProviderMap[newVariable] = provider;
280 280 impl->m_VariableToIdentifierMap[newVariable] = identifier;
281 281
282 282
283 283 auto varRequestId = QUuid::createUuid();
284 284 qCInfo(LOG_VariableController()) << "processRequest for" << name << varRequestId;
285 285 impl->processRequest(newVariable, range, varRequestId);
286 286 impl->updateVariableRequest(varRequestId);
287 287
288 288 return newVariable;
289 289 }
290 290 }
291 291
292 292 void VariableController::onDateTimeOnSelection(const SqpRange &dateTime)
293 293 {
294 294 // TODO check synchronisation and Rescale
295 295 qCDebug(LOG_VariableController()) << "VariableController::onDateTimeOnSelection"
296 296 << QThread::currentThread()->objectName();
297 297 auto selectedRows = impl->m_VariableSelectionModel->selectedRows();
298 298 auto varRequestId = QUuid::createUuid();
299 299
300 300 for (const auto &selectedRow : qAsConst(selectedRows)) {
301 301 if (auto selectedVariable = impl->m_VariableModel->variable(selectedRow.row())) {
302 302 selectedVariable->setRange(dateTime);
303 303 impl->processRequest(selectedVariable, dateTime, varRequestId);
304 304
305 305 // notify that rescale operation has to be done
306 306 emit rangeChanged(selectedVariable, dateTime);
307 307 }
308 308 }
309 309 impl->updateVariableRequest(varRequestId);
310 310 }
311 311
312 312 void VariableController::onDataProvided(QUuid vIdentifier, const SqpRange &rangeRequested,
313 313 const SqpRange &cacheRangeRequested,
314 314 QVector<AcquisitionDataPacket> dataAcquired)
315 315 {
316 316 auto retrievedDataSeries = impl->retrieveDataSeries(dataAcquired);
317 317 auto varRequestId = impl->acceptVariableRequest(vIdentifier, retrievedDataSeries);
318 318 if (!varRequestId.isNull()) {
319 319 impl->updateVariableRequest(varRequestId);
320 320 }
321 321 }
322 322
323 323 void VariableController::onVariableRetrieveDataInProgress(QUuid identifier, double progress)
324 324 {
325 qCInfo(LOG_VariableController()) << "TORM: ariableController::onVariableRetrieveDataInProgress"
326 << QThread::currentThread()->objectName() << progress;
325 327 if (auto var = impl->findVariable(identifier)) {
326 328 impl->m_VariableModel->setDataProgress(var, progress);
327 329 }
328 330 else {
329 331 qCCritical(LOG_VariableController())
330 332 << tr("Impossible to notify progression of a null variable");
331 333 }
332 334 }
333 335
334 336 void VariableController::onAbortProgressRequested(std::shared_ptr<Variable> variable)
335 337 {
336 338 qCDebug(LOG_VariableController()) << "TORM: VariableController::onAbortProgressRequested"
337 339 << QThread::currentThread()->objectName();
338 340
339 341 auto it = impl->m_VariableToIdentifierMap.find(variable);
340 342 if (it != impl->m_VariableToIdentifierMap.cend()) {
341 343 impl->m_VariableToProviderMap.at(variable)->requestDataAborting(it->second);
342 344 }
343 345 else {
344 346 qCWarning(LOG_VariableController())
345 347 << tr("Aborting progression of inexistant variable detected !!!")
346 348 << QThread::currentThread()->objectName();
347 349 }
348 350 }
349 351
350 352 void VariableController::onAddSynchronizationGroupId(QUuid synchronizationGroupId)
351 353 {
352 354 qCDebug(LOG_VariableController()) << "TORM: VariableController::onAddSynchronizationGroupId"
353 355 << QThread::currentThread()->objectName()
354 356 << synchronizationGroupId;
355 357 auto vSynchroGroup = std::make_shared<VariableSynchronizationGroup>();
356 358 impl->m_GroupIdToVariableSynchronizationGroupMap.insert(
357 359 std::make_pair(synchronizationGroupId, vSynchroGroup));
358 360 }
359 361
360 362 void VariableController::onRemoveSynchronizationGroupId(QUuid synchronizationGroupId)
361 363 {
362 364 impl->m_GroupIdToVariableSynchronizationGroupMap.erase(synchronizationGroupId);
363 365 }
364 366
365 367 void VariableController::onAddSynchronized(std::shared_ptr<Variable> variable,
366 368 QUuid synchronizationGroupId)
367 369
368 370 {
369 371 qCDebug(LOG_VariableController()) << "TORM: VariableController::onAddSynchronized"
370 372 << synchronizationGroupId;
371 373 auto varToVarIdIt = impl->m_VariableToIdentifierMap.find(variable);
372 374 if (varToVarIdIt != impl->m_VariableToIdentifierMap.cend()) {
373 375 auto groupIdToVSGIt
374 376 = impl->m_GroupIdToVariableSynchronizationGroupMap.find(synchronizationGroupId);
375 377 if (groupIdToVSGIt != impl->m_GroupIdToVariableSynchronizationGroupMap.cend()) {
376 378 impl->m_VariableIdGroupIdMap.insert(
377 379 std::make_pair(varToVarIdIt->second, synchronizationGroupId));
378 380 groupIdToVSGIt->second->addVariableId(varToVarIdIt->second);
379 381 }
380 382 else {
381 383 qCCritical(LOG_VariableController())
382 384 << tr("Impossible to synchronize a variable with an unknown sycnhronization group")
383 385 << variable->name();
384 386 }
385 387 }
386 388 else {
387 389 qCCritical(LOG_VariableController())
388 390 << tr("Impossible to synchronize a variable with no identifier") << variable->name();
389 391 }
390 392 }
391 393
392 394 void VariableController::desynchronize(std::shared_ptr<Variable> variable,
393 395 QUuid synchronizationGroupId)
394 396 {
395 397 // Gets variable id
396 398 auto variableIt = impl->m_VariableToIdentifierMap.find(variable);
397 399 if (variableIt == impl->m_VariableToIdentifierMap.cend()) {
398 400 qCCritical(LOG_VariableController())
399 401 << tr("Can't desynchronize variable %1: variable identifier not found")
400 402 .arg(variable->name());
401 403 return;
402 404 }
403 405
404 406 // Gets synchronization group
405 407 auto groupIt = impl->m_GroupIdToVariableSynchronizationGroupMap.find(synchronizationGroupId);
406 408 if (groupIt == impl->m_GroupIdToVariableSynchronizationGroupMap.cend()) {
407 409 qCCritical(LOG_VariableController())
408 410 << tr("Can't desynchronize variable %1: unknown synchronization group")
409 411 .arg(variable->name());
410 412 return;
411 413 }
412 414
413 415 auto variableId = variableIt->second;
414 416
415 417 // Removes variable from synchronization group
416 418 auto synchronizationGroup = groupIt->second;
417 419 synchronizationGroup->removeVariableId(variableId);
418 420
419 421 // Removes link between variable and synchronization group
420 422 impl->m_VariableIdGroupIdMap.erase(variableId);
421 423 }
422 424
423 425 void VariableController::onRequestDataLoading(QVector<std::shared_ptr<Variable> > variables,
424 426 const SqpRange &range, const SqpRange &oldRange,
425 427 bool synchronise)
426 428 {
427 429 // NOTE: oldRange isn't really necessary since oldRange == variable->range().
428 430
429 431 // we want to load data of the variable for the dateTime.
430 432 // First we check if the cache contains some of them.
431 433 // For the other, we ask the provider to give them.
432 434
433 435 auto varRequestId = QUuid::createUuid();
434 436 qCInfo(LOG_VariableController()) << "VariableController::onRequestDataLoading"
435 437 << QThread::currentThread()->objectName() << varRequestId;
436 438
437 439 for (const auto &var : variables) {
438 440 qCDebug(LOG_VariableController()) << "processRequest for" << var->name() << varRequestId;
439 441 impl->processRequest(var, range, varRequestId);
440 442 }
441 443
442 444 if (synchronise) {
443 445 // Get the group ids
444 446 qCDebug(LOG_VariableController())
445 447 << "TORM VariableController::onRequestDataLoading for synchro var ENABLE";
446 448 auto groupIds = std::set<QUuid>{};
447 449 auto groupIdToOldRangeMap = std::map<QUuid, SqpRange>{};
448 450 for (const auto &var : variables) {
449 451 auto varToVarIdIt = impl->m_VariableToIdentifierMap.find(var);
450 452 if (varToVarIdIt != impl->m_VariableToIdentifierMap.cend()) {
451 453 auto vId = varToVarIdIt->second;
452 454 auto varIdToGroupIdIt = impl->m_VariableIdGroupIdMap.find(vId);
453 455 if (varIdToGroupIdIt != impl->m_VariableIdGroupIdMap.cend()) {
454 456 auto gId = varIdToGroupIdIt->second;
455 457 groupIdToOldRangeMap.insert(std::make_pair(gId, var->range()));
456 458 if (groupIds.find(gId) == groupIds.cend()) {
457 459 qCDebug(LOG_VariableController()) << "Synchro detect group " << gId;
458 460 groupIds.insert(gId);
459 461 }
460 462 }
461 463 }
462 464 }
463 465
464 466 // We assume here all group ids exist
465 467 for (const auto &gId : groupIds) {
466 468 auto vSynchronizationGroup = impl->m_GroupIdToVariableSynchronizationGroupMap.at(gId);
467 469 auto vSyncIds = vSynchronizationGroup->getIds();
468 470 qCDebug(LOG_VariableController()) << "Var in synchro group ";
469 471 for (auto vId : vSyncIds) {
470 472 auto var = impl->findVariable(vId);
471 473
472 474 // Don't process already processed var
473 475 if (!variables.contains(var)) {
474 476 if (var != nullptr) {
475 477 qCDebug(LOG_VariableController()) << "processRequest synchro for"
476 478 << var->name();
477 479 auto vSyncRangeRequested = computeSynchroRangeRequested(
478 480 var->range(), range, groupIdToOldRangeMap.at(gId));
479 481 qCDebug(LOG_VariableController()) << "synchro RR" << vSyncRangeRequested;
480 482 impl->processRequest(var, vSyncRangeRequested, varRequestId);
481 483 }
482 484 else {
483 485 qCCritical(LOG_VariableController())
484 486
485 487 << tr("Impossible to synchronize a null variable");
486 488 }
487 489 }
488 490 }
489 491 }
490 492 }
491 493
492 494 impl->updateVariableRequest(varRequestId);
493 495 }
494 496
495 497
496 498 void VariableController::initialize()
497 499 {
498 500 qCDebug(LOG_VariableController()) << tr("VariableController init") << QThread::currentThread();
499 501 impl->m_WorkingMutex.lock();
500 502 qCDebug(LOG_VariableController()) << tr("VariableController init END");
501 503 }
502 504
503 505 void VariableController::finalize()
504 506 {
505 507 impl->m_WorkingMutex.unlock();
506 508 }
507 509
508 510 void VariableController::waitForFinish()
509 511 {
510 512 QMutexLocker locker{&impl->m_WorkingMutex};
511 513 }
512 514
513 515 AcquisitionZoomType VariableController::getZoomType(const SqpRange &range, const SqpRange &oldRange)
514 516 {
515 517 // t1.m_TStart <= t2.m_TStart && t2.m_TEnd <= t1.m_TEnd
516 518 auto zoomType = AcquisitionZoomType::Unknown;
517 519 if (range.m_TStart <= oldRange.m_TStart && oldRange.m_TEnd <= range.m_TEnd) {
518 520 zoomType = AcquisitionZoomType::ZoomOut;
519 521 }
520 522 else if (range.m_TStart > oldRange.m_TStart && range.m_TEnd > oldRange.m_TEnd) {
521 523 zoomType = AcquisitionZoomType::PanRight;
522 524 }
523 525 else if (range.m_TStart < oldRange.m_TStart && range.m_TEnd < oldRange.m_TEnd) {
524 526 zoomType = AcquisitionZoomType::PanLeft;
525 527 }
526 528 else if (range.m_TStart > oldRange.m_TStart && oldRange.m_TEnd > range.m_TEnd) {
527 529 zoomType = AcquisitionZoomType::ZoomIn;
528 530 }
529 531 else {
530 532 qCCritical(LOG_VariableController()) << "getZoomType: Unknown type detected";
531 533 }
532 534 return zoomType;
533 535 }
534 536
535 537 void VariableController::VariableControllerPrivate::processRequest(std::shared_ptr<Variable> var,
536 538 const SqpRange &rangeRequested,
537 539 QUuid varRequestId)
538 540 {
539 541
540 542 // TODO: protect at
541 543 auto varRequest = VariableRequest{};
542 544 auto varId = m_VariableToIdentifierMap.at(var);
543 545
544 546 auto varStrategyRangesRequested
545 547 = m_VariableCacheStrategy->computeStrategyRanges(var->range(), rangeRequested);
546 548 auto notInCacheRangeList = var->provideNotInCacheRangeList(varStrategyRangesRequested.second);
547 549 auto inCacheRangeList = var->provideInCacheRangeList(varStrategyRangesRequested.second);
548 550
549 551 if (!notInCacheRangeList.empty()) {
550 552 varRequest.m_RangeRequested = varStrategyRangesRequested.first;
551 553 varRequest.m_CacheRangeRequested = varStrategyRangesRequested.second;
552 554 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM processRequest RR ") << rangeRequested;
553 555 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM processRequest R ")
554 556 << varStrategyRangesRequested.first;
555 557 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM processRequest CR ")
556 558 << varStrategyRangesRequested.second;
557 559 // store VarRequest
558 560 storeVariableRequest(varId, varRequestId, varRequest);
559 561
560 562 auto varProvider = m_VariableToProviderMap.at(var);
561 563 if (varProvider != nullptr) {
562 564 auto varRequestIdCanceled = m_VariableAcquisitionWorker->pushVariableRequest(
563 565 varRequestId, varId, varStrategyRangesRequested.first,
564 566 varStrategyRangesRequested.second,
565 567 DataProviderParameters{std::move(notInCacheRangeList), var->metadata()},
566 568 varProvider);
567 569
568 570 if (!varRequestIdCanceled.isNull()) {
569 571 qCInfo(LOG_VariableAcquisitionWorker()) << tr("varRequestIdCanceled: ")
570 572 << varRequestIdCanceled;
571 573 cancelVariableRequest(varRequestIdCanceled);
572 574 }
573 575 }
574 576 else {
575 577 qCCritical(LOG_VariableController())
576 578 << "Impossible to provide data with a null provider";
577 579 }
578 580
579 581 if (!inCacheRangeList.empty()) {
580 582 emit q->updateVarDisplaying(var, inCacheRangeList.first());
581 583 }
582 584 }
583 585 else {
584 586
585 587 varRequest.m_RangeRequested = varStrategyRangesRequested.first;
586 588 varRequest.m_CacheRangeRequested = varStrategyRangesRequested.second;
587 589 // store VarRequest
588 590 storeVariableRequest(varId, varRequestId, varRequest);
589 591 acceptVariableRequest(varId,
590 592 var->dataSeries()->subDataSeries(varStrategyRangesRequested.second));
591 593 }
592 594 }
593 595
594 596 std::shared_ptr<Variable>
595 597 VariableController::VariableControllerPrivate::findVariable(QUuid vIdentifier)
596 598 {
597 599 std::shared_ptr<Variable> var;
598 600 auto findReply = [vIdentifier](const auto &entry) { return vIdentifier == entry.second; };
599 601
600 602 auto end = m_VariableToIdentifierMap.cend();
601 603 auto it = std::find_if(m_VariableToIdentifierMap.cbegin(), end, findReply);
602 604 if (it != end) {
603 605 var = it->first;
604 606 }
605 607 else {
606 608 qCCritical(LOG_VariableController())
607 609 << tr("Impossible to find the variable with the identifier: ") << vIdentifier;
608 610 }
609 611
610 612 return var;
611 613 }
612 614
613 615 std::shared_ptr<IDataSeries> VariableController::VariableControllerPrivate::retrieveDataSeries(
614 616 const QVector<AcquisitionDataPacket> acqDataPacketVector)
615 617 {
616 618 qCDebug(LOG_VariableController()) << tr("TORM: retrieveDataSeries acqDataPacketVector size")
617 619 << acqDataPacketVector.size();
618 620 std::shared_ptr<IDataSeries> dataSeries;
619 621 if (!acqDataPacketVector.isEmpty()) {
620 622 dataSeries = acqDataPacketVector[0].m_DateSeries;
621 623 for (int i = 1; i < acqDataPacketVector.size(); ++i) {
622 624 dataSeries->merge(acqDataPacketVector[i].m_DateSeries.get());
623 625 }
624 626 }
625 627 qCDebug(LOG_VariableController()) << tr("TORM: retrieveDataSeries acqDataPacketVector size END")
626 628 << acqDataPacketVector.size();
627 629 return dataSeries;
628 630 }
629 631
630 632 void VariableController::VariableControllerPrivate::registerProvider(
631 633 std::shared_ptr<IDataProvider> provider)
632 634 {
633 635 if (m_ProviderSet.find(provider) == m_ProviderSet.end()) {
634 636 qCDebug(LOG_VariableController()) << tr("Registering of a new provider")
635 637 << provider->objectName();
636 638 m_ProviderSet.insert(provider);
637 639 connect(provider.get(), &IDataProvider::dataProvided, m_VariableAcquisitionWorker.get(),
638 640 &VariableAcquisitionWorker::onVariableDataAcquired);
639 641 connect(provider.get(), &IDataProvider::dataProvidedProgress,
640 642 m_VariableAcquisitionWorker.get(),
641 643 &VariableAcquisitionWorker::onVariableRetrieveDataInProgress);
642 644 }
643 645 else {
644 646 qCDebug(LOG_VariableController()) << tr("Cannot register provider, it already exists ");
645 647 }
646 648 }
647 649
648 650 void VariableController::VariableControllerPrivate::storeVariableRequest(
649 651 QUuid varId, QUuid varRequestId, const VariableRequest &varRequest)
650 652 {
651 653 // First request for the variable. we can create an entry for it
652 654 auto varIdToVarRequestIdQueueMapIt = m_VarIdToVarRequestIdQueueMap.find(varId);
653 655 if (varIdToVarRequestIdQueueMapIt == m_VarIdToVarRequestIdQueueMap.cend()) {
654 656 auto varRequestIdQueue = std::deque<QUuid>{};
655 657 qCDebug(LOG_VariableController()) << tr("Store REQUEST in QUEUE");
656 658 varRequestIdQueue.push_back(varRequestId);
657 659 m_VarIdToVarRequestIdQueueMap.insert(std::make_pair(varId, std::move(varRequestIdQueue)));
658 660 }
659 661 else {
660 662 qCDebug(LOG_VariableController()) << tr("Store REQUEST in EXISTING QUEUE");
661 663 auto &varRequestIdQueue = varIdToVarRequestIdQueueMapIt->second;
662 664 varRequestIdQueue.push_back(varRequestId);
663 665 }
664 666
665 667 auto varRequestIdToVarIdVarRequestMapIt = m_VarRequestIdToVarIdVarRequestMap.find(varRequestId);
666 668 if (varRequestIdToVarIdVarRequestMapIt == m_VarRequestIdToVarIdVarRequestMap.cend()) {
667 669 auto varIdToVarRequestMap = std::map<QUuid, VariableRequest>{};
668 670 varIdToVarRequestMap.insert(std::make_pair(varId, varRequest));
669 671 qCDebug(LOG_VariableController()) << tr("Store REQUESTID in MAP");
670 672 m_VarRequestIdToVarIdVarRequestMap.insert(
671 673 std::make_pair(varRequestId, std::move(varIdToVarRequestMap)));
672 674 }
673 675 else {
674 676 auto &varIdToVarRequestMap = varRequestIdToVarIdVarRequestMapIt->second;
675 677 qCDebug(LOG_VariableController()) << tr("Store REQUESTID in EXISTING MAP");
676 678 varIdToVarRequestMap.insert(std::make_pair(varId, varRequest));
677 679 }
678 680 }
679 681
680 682 QUuid VariableController::VariableControllerPrivate::acceptVariableRequest(
681 683 QUuid varId, std::shared_ptr<IDataSeries> dataSeries)
682 684 {
683 685 QUuid varRequestId;
684 686 auto varIdToVarRequestIdQueueMapIt = m_VarIdToVarRequestIdQueueMap.find(varId);
685 687 if (varIdToVarRequestIdQueueMapIt != m_VarIdToVarRequestIdQueueMap.cend()) {
686 688 auto &varRequestIdQueue = varIdToVarRequestIdQueueMapIt->second;
687 689 varRequestId = varRequestIdQueue.front();
688 690 auto varRequestIdToVarIdVarRequestMapIt
689 691 = m_VarRequestIdToVarIdVarRequestMap.find(varRequestId);
690 692 if (varRequestIdToVarIdVarRequestMapIt != m_VarRequestIdToVarIdVarRequestMap.cend()) {
691 693 auto &varIdToVarRequestMap = varRequestIdToVarIdVarRequestMapIt->second;
692 694 auto varIdToVarRequestMapIt = varIdToVarRequestMap.find(varId);
693 695 if (varIdToVarRequestMapIt != varIdToVarRequestMap.cend()) {
694 696 qCDebug(LOG_VariableController()) << tr("acceptVariableRequest");
695 697 auto &varRequest = varIdToVarRequestMapIt->second;
696 698 varRequest.m_DataSeries = dataSeries;
697 699 varRequest.m_CanUpdate = true;
698 700 }
699 701 else {
700 702 qCDebug(LOG_VariableController())
701 703 << tr("Impossible to acceptVariableRequest of a unknown variable id attached "
702 704 "to a variableRequestId")
703 705 << varRequestId << varId;
704 706 }
705 707 }
706 708 else {
707 709 qCCritical(LOG_VariableController())
708 710 << tr("Impossible to acceptVariableRequest of a unknown variableRequestId")
709 711 << varRequestId;
710 712 }
711 713
712 714 qCDebug(LOG_VariableController()) << tr("1: erase REQUEST in QUEUE ?")
713 715 << varRequestIdQueue.size();
714 716 varRequestIdQueue.pop_front();
715 717 qCDebug(LOG_VariableController()) << tr("2: erase REQUEST in QUEUE ?")
716 718 << varRequestIdQueue.size();
717 719 if (varRequestIdQueue.empty()) {
718 720 m_VarIdToVarRequestIdQueueMap.erase(varId);
719 721 }
720 722 }
721 723 else {
722 724 qCCritical(LOG_VariableController())
723 725 << tr("Impossible to acceptVariableRequest of a unknown variable id") << varId;
724 726 }
725 727
726 728 return varRequestId;
727 729 }
728 730
729 731 void VariableController::VariableControllerPrivate::updateVariableRequest(QUuid varRequestId)
730 732 {
731 733
732 734 auto varRequestIdToVarIdVarRequestMapIt = m_VarRequestIdToVarIdVarRequestMap.find(varRequestId);
733 735 if (varRequestIdToVarIdVarRequestMapIt != m_VarRequestIdToVarIdVarRequestMap.cend()) {
734 736 bool processVariableUpdate = true;
735 737 auto &varIdToVarRequestMap = varRequestIdToVarIdVarRequestMapIt->second;
736 738 for (auto varIdToVarRequestMapIt = varIdToVarRequestMap.cbegin();
737 739 (varIdToVarRequestMapIt != varIdToVarRequestMap.cend()) && processVariableUpdate;
738 740 ++varIdToVarRequestMapIt) {
739 741 processVariableUpdate &= varIdToVarRequestMapIt->second.m_CanUpdate;
740 742 qCDebug(LOG_VariableController()) << tr("updateVariableRequest")
741 743 << processVariableUpdate;
742 744 }
743 745
744 746 if (processVariableUpdate) {
745 747 for (auto varIdToVarRequestMapIt = varIdToVarRequestMap.cbegin();
746 748 varIdToVarRequestMapIt != varIdToVarRequestMap.cend(); ++varIdToVarRequestMapIt) {
747 749 if (auto var = findVariable(varIdToVarRequestMapIt->first)) {
748 750 auto &varRequest = varIdToVarRequestMapIt->second;
749 751 var->setRange(varRequest.m_RangeRequested);
750 752 var->setCacheRange(varRequest.m_CacheRangeRequested);
751 753 qCDebug(LOG_VariableController()) << tr("1: onDataProvided")
752 754 << varRequest.m_RangeRequested;
753 755 qCDebug(LOG_VariableController()) << tr("2: onDataProvided")
754 756 << varRequest.m_CacheRangeRequested;
755 757 var->mergeDataSeries(varRequest.m_DataSeries);
756 758 qCDebug(LOG_VariableController()) << tr("3: onDataProvided")
757 759 << varRequest.m_DataSeries->range();
758 760 qCDebug(LOG_VariableController()) << tr("4: onDataProvided");
759 761
760 762 /// @todo MPL: confirm
761 763 // Variable update is notified only if there is no pending request for it
762 764 if (m_VarIdToVarRequestIdQueueMap.count(varIdToVarRequestMapIt->first) == 0) {
763 765 emit var->updated();
764 766 }
765 767 }
766 768 else {
767 769 qCCritical(LOG_VariableController())
768 770 << tr("Impossible to update data to a null variable");
769 771 }
770 772 }
771 773
772 774 // cleaning varRequestId
773 775 qCDebug(LOG_VariableController()) << tr("0: erase REQUEST in MAP ?")
774 776 << m_VarRequestIdToVarIdVarRequestMap.size();
775 777 m_VarRequestIdToVarIdVarRequestMap.erase(varRequestId);
776 778 qCDebug(LOG_VariableController()) << tr("1: erase REQUEST in MAP ?")
777 779 << m_VarRequestIdToVarIdVarRequestMap.size();
778 780 }
779 781 }
780 782 else {
781 783 qCCritical(LOG_VariableController())
782 784 << tr("Cannot updateVariableRequest for a unknow varRequestId") << varRequestId;
783 785 }
784 786 }
785 787
786 788 void VariableController::VariableControllerPrivate::cancelVariableRequest(QUuid varRequestId)
787 789 {
788 790 // cleaning varRequestId
789 791 m_VarRequestIdToVarIdVarRequestMap.erase(varRequestId);
790 792
791 793 for (auto varIdToVarRequestIdQueueMapIt = m_VarIdToVarRequestIdQueueMap.begin();
792 794 varIdToVarRequestIdQueueMapIt != m_VarIdToVarRequestIdQueueMap.end();) {
793 795 auto &varRequestIdQueue = varIdToVarRequestIdQueueMapIt->second;
794 796 varRequestIdQueue.erase(
795 797 std::remove(varRequestIdQueue.begin(), varRequestIdQueue.end(), varRequestId),
796 798 varRequestIdQueue.end());
797 799 if (varRequestIdQueue.empty()) {
798 800 varIdToVarRequestIdQueueMapIt
799 801 = m_VarIdToVarRequestIdQueueMap.erase(varIdToVarRequestIdQueueMapIt);
800 802 }
801 803 else {
802 804 ++varIdToVarRequestIdQueueMapIt;
803 805 }
804 806 }
805 807 }
@@ -1,240 +1,240
1 1 #include "Data/ArrayData.h"
2 2 #include <QObject>
3 3 #include <QtTest>
4 4
5 5 using DataContainer = std::vector<double>;
6 6 using Container = std::vector<DataContainer>;
7 7 using InputData = QPair<DataContainer, int>;
8 8
9 9 namespace {
10 10
11 11 InputData flatten(const Container &container)
12 12 {
13 13 if (container.empty()) {
14 14 return {};
15 15 }
16 16
17 17 // We assume here that each component of the container have the same size
18 18 auto containerSize = container.size();
19 19 auto componentSize = container.front().size();
20 20
21 21 auto result = DataContainer{};
22 22 result.reserve(componentSize * containerSize);
23 23
24 24 for (auto i = 0; i < componentSize; ++i) {
25 25 for (auto j = 0; j < containerSize; ++j) {
26 26 result.push_back(container.at(j).at(i));
27 27 }
28 28 }
29 29
30 30 return {result, static_cast<int>(containerSize)};
31 31 }
32 32
33 33 void verifyArrayData(const ArrayData<2> &arrayData, const Container &expectedData)
34 34 {
35 35 auto verifyComponent = [&arrayData](const auto &componentData, const auto &equalFun) {
36 36 QVERIFY(std::equal(arrayData.cbegin(), arrayData.cend(), componentData.cbegin(),
37 37 componentData.cend(),
38 38 [&equalFun](const auto &dataSeriesIt, const auto &expectedValue) {
39 39 return equalFun(dataSeriesIt, expectedValue);
40 40 }));
41 41 };
42 42
43 43 for (auto i = 0; i < expectedData.size(); ++i) {
44 44 verifyComponent(expectedData.at(i), [i](const auto &seriesIt, const auto &value) {
45 45 return seriesIt.at(i) == value;
46 46 });
47 47 }
48 48 }
49 49
50 50 } // namespace
51 51
52 52 class TestTwoDimArrayData : public QObject {
53 53 Q_OBJECT
54 54 private slots:
55 55 /// Tests @sa ArrayData ctor
56 56 void testCtor_data();
57 57 void testCtor();
58 58
59 59 /// Tests @sa ArrayData::add()
60 60 void testAdd_data();
61 61 void testAdd();
62 62
63 63 /// Tests @sa ArrayData::clear()
64 64 void testClear_data();
65 65 void testClear();
66 66
67 67 /// Tests @sa ArrayData::size()
68 68 void testSize_data();
69 69 void testSize();
70 70
71 71 /// Tests @sa ArrayData::sort()
72 72 void testSort_data();
73 73 void testSort();
74 74 };
75 75
76 76 void TestTwoDimArrayData::testCtor_data()
77 77 {
78 78 // Test structure
79 79 QTest::addColumn<InputData>("inputData"); // array data's input
80 80 QTest::addColumn<bool>("success"); // array data has been successfully constructed
81 81 QTest::addColumn<Container>("expectedData"); // expected array data (when success)
82 82
83 83 // Test cases
84 84 QTest::newRow("validInput") << flatten(Container{{1., 2., 3., 4., 5.},
85 85 {6., 7., 8., 9., 10.},
86 86 {11., 12., 13., 14., 15.}})
87 87 << true << Container{{1., 2., 3., 4., 5.},
88 88 {6., 7., 8., 9., 10.},
89 89 {11., 12., 13., 14., 15.}};
90 QTest::newRow("invalidInput (invalid data size")
91 << InputData{{1., 2., 3., 4., 5., 6., 7.}, 3} << false << Container{{}, {}, {}};
90 QTest::newRow("invalidInput (invalid data size") << InputData{{1., 2., 3., 4., 5., 6., 7.}, 3}
91 << false << Container{{}, {}, {}};
92 92 QTest::newRow("invalidInput (less than two components")
93 93 << flatten(Container{{1., 2., 3., 4., 5.}}) << false << Container{{}, {}, {}};
94 94 }
95 95
96 96 void TestTwoDimArrayData::testCtor()
97 97 {
98 98 QFETCH(InputData, inputData);
99 99 QFETCH(bool, success);
100 100
101 101 if (success) {
102 102 QFETCH(Container, expectedData);
103 103
104 104 ArrayData<2> arrayData{inputData.first, inputData.second};
105 105 verifyArrayData(arrayData, expectedData);
106 106 }
107 107 else {
108 108 QVERIFY_EXCEPTION_THROWN(ArrayData<2>(inputData.first, inputData.second),
109 109 std::invalid_argument);
110 110 }
111 111 }
112 112
113 113 void TestTwoDimArrayData::testAdd_data()
114 114 {
115 115 // Test structure
116 116 QTest::addColumn<InputData>("inputData"); // array's data input
117 117 QTest::addColumn<InputData>("otherData"); // array data's input to merge with
118 118 QTest::addColumn<bool>("prepend"); // prepend or append merge
119 119 QTest::addColumn<Container>("expectedData"); // expected data after merge
120 120
121 121 // Test cases
122 122 auto inputData = flatten(
123 123 Container{{1., 2., 3., 4., 5.}, {11., 12., 13., 14., 15.}, {21., 22., 23., 24., 25.}});
124 124
125 125 auto vectorContainer = flatten(Container{{6., 7., 8.}, {16., 17., 18.}, {26., 27., 28}});
126 126 auto tensorContainer = flatten(Container{{6., 7., 8.},
127 127 {16., 17., 18.},
128 128 {26., 27., 28},
129 129 {36., 37., 38.},
130 130 {46., 47., 48.},
131 131 {56., 57., 58}});
132 132
133 133 QTest::newRow("appendMerge") << inputData << vectorContainer << false
134 134 << Container{{1., 2., 3., 4., 5., 6., 7., 8.},
135 135 {11., 12., 13., 14., 15., 16., 17., 18.},
136 136 {21., 22., 23., 24., 25., 26., 27., 28}};
137 137 QTest::newRow("prependMerge") << inputData << vectorContainer << true
138 138 << Container{{6., 7., 8., 1., 2., 3., 4., 5.},
139 139 {16., 17., 18., 11., 12., 13., 14., 15.},
140 140 {26., 27., 28, 21., 22., 23., 24., 25.}};
141 141 QTest::newRow("invalidMerge") << inputData << tensorContainer << false
142 142 << Container{{1., 2., 3., 4., 5.},
143 143 {11., 12., 13., 14., 15.},
144 144 {21., 22., 23., 24., 25.}};
145 145 }
146 146
147 147 void TestTwoDimArrayData::testAdd()
148 148 {
149 149 QFETCH(InputData, inputData);
150 150 QFETCH(InputData, otherData);
151 151 QFETCH(bool, prepend);
152 152 QFETCH(Container, expectedData);
153 153
154 154 ArrayData<2> arrayData{inputData.first, inputData.second};
155 155 ArrayData<2> other{otherData.first, otherData.second};
156 156
157 157 arrayData.add(other, prepend);
158 158
159 159 verifyArrayData(arrayData, expectedData);
160 160 }
161 161
162 162 void TestTwoDimArrayData::testClear_data()
163 163 {
164 164 // Test structure
165 165 QTest::addColumn<InputData>("inputData"); // array data's input
166 166
167 167 // Test cases
168 168 QTest::newRow("data1") << flatten(
169 169 Container{{1., 2., 3., 4., 5.}, {6., 7., 8., 9., 10.}, {11., 12., 13., 14., 15.}});
170 170 }
171 171
172 172 void TestTwoDimArrayData::testClear()
173 173 {
174 174 QFETCH(InputData, inputData);
175 175
176 176 ArrayData<2> arrayData{inputData.first, inputData.second};
177 177 arrayData.clear();
178 178
179 179 auto emptyData = Container(inputData.second, DataContainer{});
180 180 verifyArrayData(arrayData, emptyData);
181 181 }
182 182
183 183 void TestTwoDimArrayData::testSize_data()
184 184 {
185 185 // Test structure
186 186 QTest::addColumn<InputData>("inputData"); // array data's input
187 187 QTest::addColumn<int>("expectedSize"); // expected array data size
188 188
189 189 // Test cases
190 190 QTest::newRow("data1") << flatten(Container{{1., 2., 3., 4., 5.}, {6., 7., 8., 9., 10.}}) << 5;
191 191 QTest::newRow("data2") << flatten(Container{{1., 2., 3., 4., 5.},
192 192 {6., 7., 8., 9., 10.},
193 193 {11., 12., 13., 14., 15.}})
194 194 << 5;
195 195 }
196 196
197 197 void TestTwoDimArrayData::testSize()
198 198 {
199 199 QFETCH(InputData, inputData);
200 200 QFETCH(int, expectedSize);
201 201
202 202 ArrayData<2> arrayData{inputData.first, inputData.second};
203 203 QVERIFY(arrayData.size() == expectedSize);
204 204 }
205 205
206 206 void TestTwoDimArrayData::testSort_data()
207 207 {
208 208 // Test structure
209 209 QTest::addColumn<InputData>("inputData"); // array data's input
210 210 QTest::addColumn<std::vector<int> >("sortPermutation"); // permutation used to sort data
211 211 QTest::addColumn<Container>("expectedData"); // expected data after sorting
212 212
213 213 // Test cases
214 214 QTest::newRow("data1")
215 215 << flatten(
216 216 Container{{1., 2., 3., 4., 5.}, {6., 7., 8., 9., 10.}, {11., 12., 13., 14., 15.}})
217 217 << std::vector<int>{0, 2, 3, 1, 4}
218 218 << Container{{1., 3., 4., 2., 5.}, {6., 8., 9., 7., 10.}, {11., 13., 14., 12., 15.}};
219 219 QTest::newRow("data2")
220 220 << flatten(
221 221 Container{{1., 2., 3., 4., 5.}, {6., 7., 8., 9., 10.}, {11., 12., 13., 14., 15.}})
222 222 << std::vector<int>{2, 4, 3, 0, 1}
223 223 << Container{{3., 5., 4., 1., 2.}, {8., 10., 9., 6., 7.}, {13., 15., 14., 11., 12.}};
224 224 }
225 225
226 226 void TestTwoDimArrayData::testSort()
227 227 {
228 228 QFETCH(InputData, inputData);
229 229 QFETCH(std::vector<int>, sortPermutation);
230 230 QFETCH(Container, expectedData);
231 231
232 232 ArrayData<2> arrayData{inputData.first, inputData.second};
233 233 auto sortedArrayData = arrayData.sort(sortPermutation);
234 234 QVERIFY(sortedArrayData != nullptr);
235 235
236 236 verifyArrayData(*sortedArrayData, expectedData);
237 237 }
238 238
239 239 QTEST_MAIN(TestTwoDimArrayData)
240 240 #include "TestTwoDimArrayData.moc"
@@ -1,31 +1,42
1 1 #ifndef SCIQLOP_AMDAPROVIDER_H
2 2 #define SCIQLOP_AMDAPROVIDER_H
3 3
4 4 #include "AmdaGlobal.h"
5 5
6 6 #include <Data/IDataProvider.h>
7 7
8 8 #include <QLoggingCategory>
9 9
10 #include <map>
10 11
11 12 Q_DECLARE_LOGGING_CATEGORY(LOG_AmdaProvider)
12 13
13 14 class QNetworkReply;
15 class QNetworkRequest;
14 16
15 17 /**
16 18 * @brief The AmdaProvider class is an example of how a data provider can generate data
17 19 */
18 20 class SCIQLOP_AMDA_EXPORT AmdaProvider : public IDataProvider {
19 21 public:
20 22 explicit AmdaProvider();
21 23 std::shared_ptr<IDataProvider> clone() const override;
22 24
23 25 void requestDataLoading(QUuid acqIdentifier, const DataProviderParameters &parameters) override;
24 26
25 27 void requestDataAborting(QUuid acqIdentifier) override;
26 28
29 private slots:
30 void onReplyDownloadProgress(QUuid, const QNetworkRequest &, double progress);
31
27 32 private:
28 33 void retrieveData(QUuid token, const SqpRange &dateTime, const QVariantHash &data);
34
35 void updateRequestProgress(QUuid acqIdentifier, std::shared_ptr<QNetworkRequest> request,
36 double progress);
37
38 std::map<QUuid, std::map<std::shared_ptr<QNetworkRequest>, double> >
39 m_AcqIdToRequestProgressMap;
29 40 };
30 41
31 42 #endif // SCIQLOP_AMDAPROVIDER_H
@@ -1,174 +1,260
1 1 #include "AmdaProvider.h"
2 2 #include "AmdaDefs.h"
3 3 #include "AmdaResultParser.h"
4 4
5 5 #include <Common/DateUtils.h>
6 6 #include <Data/DataProviderParameters.h>
7 7 #include <Network/NetworkController.h>
8 8 #include <SqpApplication.h>
9 9 #include <Variable/Variable.h>
10 10
11 11 #include <QNetworkAccessManager>
12 12 #include <QNetworkReply>
13 13 #include <QTemporaryFile>
14 14 #include <QThread>
15 15
16 16 Q_LOGGING_CATEGORY(LOG_AmdaProvider, "AmdaProvider")
17 17
18 18 namespace {
19 19
20 20 /// URL format for a request on AMDA server. The parameters are as follows:
21 21 /// - %1: start date
22 22 /// - %2: end date
23 23 /// - %3: parameter id
24 24 const auto AMDA_URL_FORMAT = QStringLiteral(
25 25 "http://amda.irap.omp.eu/php/rest/"
26 26 "getParameter.php?startTime=%1&stopTime=%2&parameterID=%3&outputFormat=ASCII&"
27 27 "timeFormat=ISO8601&gzip=0");
28 28
29 29 /// Dates format passed in the URL (e.g 2013-09-23T09:00)
30 30 const auto AMDA_TIME_FORMAT = QStringLiteral("yyyy-MM-ddThh:mm:ss");
31 31
32 // struct AmdaProgression {
33 // QUuid acqIdentifier;
34 // std::map<QNetworkRequest, double> m_RequestId;
35 //};
36
32 37 /// Formats a time to a date that can be passed in URL
33 38 QString dateFormat(double sqpRange) noexcept
34 39 {
35 40 auto dateTime = DateUtils::dateTime(sqpRange);
36 41 return dateTime.toString(AMDA_TIME_FORMAT);
37 42 }
38 43
39 44 AmdaResultParser::ValueType valueType(const QString &valueType)
40 45 {
41 46 if (valueType == QStringLiteral("scalar")) {
42 47 return AmdaResultParser::ValueType::SCALAR;
43 48 }
44 49 else if (valueType == QStringLiteral("vector")) {
45 50 return AmdaResultParser::ValueType::VECTOR;
46 51 }
47 52 else {
48 53 return AmdaResultParser::ValueType::UNKNOWN;
49 54 }
50 55 }
51 56
52 57 } // namespace
53 58
54 59 AmdaProvider::AmdaProvider()
55 60 {
56 61 qCDebug(LOG_AmdaProvider()) << tr("AmdaProvider::AmdaProvider") << QThread::currentThread();
57 62 if (auto app = sqpApp) {
58 63 auto &networkController = app->networkController();
59 64 connect(this, SIGNAL(requestConstructed(QNetworkRequest, QUuid,
60 65 std::function<void(QNetworkReply *, QUuid)>)),
61 66 &networkController,
62 67 SLOT(onProcessRequested(QNetworkRequest, QUuid,
63 68 std::function<void(QNetworkReply *, QUuid)>)));
64 69
65 70
66 connect(&sqpApp->networkController(), SIGNAL(replyDownloadProgress(QUuid, double)), this,
67 SIGNAL(dataProvidedProgress(QUuid, double)));
71 connect(&sqpApp->networkController(),
72 SIGNAL(replyDownloadProgress(QUuid, const QNetworkRequest &, double)), this,
73 SLOT(onReplyDownloadProgress(QUuid, const QNetworkRequest &, double)));
68 74 }
69 75 }
70 76
71 77 std::shared_ptr<IDataProvider> AmdaProvider::clone() const
72 78 {
73 79 // No copy is made in the clone
74 80 return std::make_shared<AmdaProvider>();
75 81 }
76 82
77 83 void AmdaProvider::requestDataLoading(QUuid acqIdentifier, const DataProviderParameters &parameters)
78 84 {
79 85 // NOTE: Try to use multithread if possible
80 86 const auto times = parameters.m_Times;
81 87 const auto data = parameters.m_Data;
82 88 for (const auto &dateTime : qAsConst(times)) {
83 89 this->retrieveData(acqIdentifier, dateTime, data);
84 90
91
85 92 // TORM when AMDA will support quick asynchrone request
86 93 QThread::msleep(1000);
87 94 }
88 95 }
89 96
90 97 void AmdaProvider::requestDataAborting(QUuid acqIdentifier)
91 98 {
92 99 if (auto app = sqpApp) {
93 100 auto &networkController = app->networkController();
94 101 networkController.onReplyCanceled(acqIdentifier);
95 102 }
96 103 }
97 104
105 void AmdaProvider::onReplyDownloadProgress(QUuid acqIdentifier,
106 const QNetworkRequest &networkRequest, double progress)
107 {
108 qCCritical(LOG_AmdaProvider()) << tr("onReplyDownloadProgress") << progress;
109 auto acqIdToRequestProgressMapIt = m_AcqIdToRequestProgressMap.find(acqIdentifier);
110 if (acqIdToRequestProgressMapIt != m_AcqIdToRequestProgressMap.end()) {
111
112 qCCritical(LOG_AmdaProvider()) << tr("1 onReplyDownloadProgress") << progress;
113 auto requestPtr = &networkRequest;
114 auto findRequest
115 = [requestPtr](const auto &entry) { return requestPtr == entry.first.get(); };
116
117 auto &requestProgressMap = acqIdToRequestProgressMapIt->second;
118 auto requestProgressMapEnd = requestProgressMap.end();
119 auto requestProgressMapIt
120 = std::find_if(requestProgressMap.begin(), requestProgressMapEnd, findRequest);
121
122 if (requestProgressMapIt != requestProgressMapEnd) {
123 requestProgressMapIt->second = progress;
124 }
125 else {
126 qCCritical(LOG_AmdaProvider()) << tr("Can't retrieve Request in progress");
127 }
128 }
129
130 acqIdToRequestProgressMapIt = m_AcqIdToRequestProgressMap.find(acqIdentifier);
131 if (acqIdToRequestProgressMapIt != m_AcqIdToRequestProgressMap.end()) {
132 qCCritical(LOG_AmdaProvider()) << tr("2 onReplyDownloadProgress") << progress;
133 double finalProgress = 0.0;
134
135 auto &requestProgressMap = acqIdToRequestProgressMapIt->second;
136 auto fraq = requestProgressMap.size();
137
138 for (auto requestProgress : requestProgressMap) {
139 finalProgress += requestProgress.second;
140 }
141
142 if (fraq > 0) {
143 finalProgress = finalProgress / fraq;
144 }
145
146 qCCritical(LOG_AmdaProvider()) << tr("2 onReplyDownloadProgress") << finalProgress;
147 emit dataProvidedProgress(acqIdentifier, finalProgress);
148 }
149 else {
150 emit dataProvidedProgress(acqIdentifier, 0.0);
151 }
152 }
153
98 154 void AmdaProvider::retrieveData(QUuid token, const SqpRange &dateTime, const QVariantHash &data)
99 155 {
100 156 // Retrieves product ID from data: if the value is invalid, no request is made
101 157 auto productId = data.value(AMDA_XML_ID_KEY).toString();
102 158 if (productId.isNull()) {
103 159 qCCritical(LOG_AmdaProvider()) << tr("Can't retrieve data: unknown product id");
104 160 return;
105 161 }
106 162 qCDebug(LOG_AmdaProvider()) << tr("AmdaProvider::retrieveData") << dateTime;
107 163
108 164 // Retrieves the data type that determines whether the expected format for the result file is
109 165 // scalar, vector...
110 166 auto productValueType = valueType(data.value(AMDA_DATA_TYPE_KEY).toString());
111 167
112 168 // /////////// //
113 169 // Creates URL //
114 170 // /////////// //
115 171
116 172 auto startDate = dateFormat(dateTime.m_TStart);
117 173 auto endDate = dateFormat(dateTime.m_TEnd);
118 174
119 175 auto url = QUrl{QString{AMDA_URL_FORMAT}.arg(startDate, endDate, productId)};
120 176 qCInfo(LOG_AmdaProvider()) << tr("TORM AmdaProvider::retrieveData url:") << url;
121 177 auto tempFile = std::make_shared<QTemporaryFile>();
122 178
123 179 // LAMBDA
124 180 auto httpDownloadFinished = [this, dateTime, tempFile,
125 181 productValueType](QNetworkReply *reply, QUuid dataId) noexcept {
126 182
127 183 // Don't do anything if the reply was abort
128 184 if (reply->error() != QNetworkReply::OperationCanceledError) {
129 185
130 186 if (tempFile) {
131 187 auto replyReadAll = reply->readAll();
132 188 if (!replyReadAll.isEmpty()) {
133 189 tempFile->write(replyReadAll);
134 190 }
135 191 tempFile->close();
136 192
137 193 // Parse results file
138 194 if (auto dataSeries
139 195 = AmdaResultParser::readTxt(tempFile->fileName(), productValueType)) {
140 196 emit dataProvided(dataId, dataSeries, dateTime);
141 197 }
142 198 else {
143 199 /// @todo ALX : debug
144 200 }
145 201 }
202 m_AcqIdToRequestProgressMap.erase(dataId);
146 203 }
147 204
148 205 };
149 206 auto httpFinishedLambda
150 207 = [this, httpDownloadFinished, tempFile](QNetworkReply *reply, QUuid dataId) noexcept {
151 208
152 209 // Don't do anything if the reply was abort
153 210 if (reply->error() != QNetworkReply::OperationCanceledError) {
154 211 auto downloadFileUrl = QUrl{QString{reply->readAll()}};
155 212
156
157 213 qCInfo(LOG_AmdaProvider())
158 214 << tr("TORM AmdaProvider::retrieveData downloadFileUrl:") << downloadFileUrl;
159 215 // Executes request for downloading file //
160 216
161 217 // Creates destination file
162 218 if (tempFile->open()) {
163 219 // Executes request
164 emit requestConstructed(QNetworkRequest{downloadFileUrl}, dataId,
165 httpDownloadFinished);
220 auto request = std::make_shared<QNetworkRequest>(downloadFileUrl);
221 updateRequestProgress(dataId, request, 0.0);
222 emit requestConstructed(*request.get(), dataId, httpDownloadFinished);
166 223 }
167 224 }
225 else {
226 m_AcqIdToRequestProgressMap.erase(dataId);
227 }
168 228 };
169 229
170 230 // //////////////// //
171 231 // Executes request //
172 232 // //////////////// //
173 emit requestConstructed(QNetworkRequest{url}, token, httpFinishedLambda);
233
234 auto request = std::make_shared<QNetworkRequest>(url);
235 updateRequestProgress(token, request, 0.0);
236
237 emit requestConstructed(*request.get(), token, httpFinishedLambda);
238 }
239
240 void AmdaProvider::updateRequestProgress(QUuid acqIdentifier,
241 std::shared_ptr<QNetworkRequest> request, double progress)
242 {
243 auto acqIdToRequestProgressMapIt = m_AcqIdToRequestProgressMap.find(acqIdentifier);
244 if (acqIdToRequestProgressMapIt != m_AcqIdToRequestProgressMap.end()) {
245 auto &requestProgressMap = acqIdToRequestProgressMapIt->second;
246 auto requestProgressMapIt = requestProgressMap.find(request);
247 if (requestProgressMapIt != requestProgressMap.end()) {
248 requestProgressMapIt->second = progress;
249 }
250 else {
251 acqIdToRequestProgressMapIt->second.insert(std::make_pair(request, progress));
252 }
253 }
254 else {
255 auto requestProgressMap = std::map<std::shared_ptr<QNetworkRequest>, double>{};
256 requestProgressMap.insert(std::make_pair(request, progress));
257 m_AcqIdToRequestProgressMap.insert(
258 std::make_pair(acqIdentifier, std::move(requestProgressMap)));
259 }
174 260 }
@@ -1,110 +1,115
1 1 #include "CosinusProvider.h"
2 2
3 3 #include <Data/DataProviderParameters.h>
4 4 #include <Data/ScalarSeries.h>
5 5
6 6 #include <cmath>
7 7
8 8 #include <QFuture>
9 9 #include <QThread>
10 10 #include <QtConcurrent/QtConcurrent>
11 11
12 12 Q_LOGGING_CATEGORY(LOG_CosinusProvider, "CosinusProvider")
13 13
14 14 std::shared_ptr<IDataProvider> CosinusProvider::clone() const
15 15 {
16 16 // No copy is made in clone
17 17 return std::make_shared<CosinusProvider>();
18 18 }
19 19
20 20 std::shared_ptr<IDataSeries> CosinusProvider::retrieveData(QUuid acqIdentifier,
21 21 const SqpRange &dataRangeRequested)
22 22 {
23 23 // TODO: Add Mutex
24 24 auto dataIndex = 0;
25 25
26 26 // Gets the timerange from the parameters
27 27 double freq = 100.0;
28 28 double start = std::ceil(dataRangeRequested.m_TStart * freq); // 100 htz
29 29 double end = std::floor(dataRangeRequested.m_TEnd * freq); // 100 htz
30 30
31 31 // We assure that timerange is valid
32 32 if (end < start) {
33 33 std::swap(start, end);
34 34 }
35 35
36 36 // Generates scalar series containing cosinus values (one value per second, end value is
37 37 // included)
38 38 auto dataCount = end - start + 1;
39 39
40 40 auto xAxisData = std::vector<double>{};
41 41 xAxisData.resize(dataCount);
42 42
43 43 auto valuesData = std::vector<double>{};
44 44 valuesData.resize(dataCount);
45 45
46 46 int progress = 0;
47 47 auto progressEnd = dataCount;
48 48 for (auto time = start; time <= end; ++time, ++dataIndex) {
49 49 auto it = m_VariableToEnableProvider.find(acqIdentifier);
50 50 if (it != m_VariableToEnableProvider.end() && it.value()) {
51 51 const auto timeOnFreq = time / freq;
52 52
53 53 xAxisData[dataIndex] = timeOnFreq;
54 54 valuesData[dataIndex] = std::cos(timeOnFreq);
55 55
56 56 // progression
57 57 int currentProgress = (time - start) * 100.0 / progressEnd;
58 58 if (currentProgress != progress) {
59 59 progress = currentProgress;
60 60
61 61 emit dataProvidedProgress(acqIdentifier, progress);
62 qCInfo(LOG_CosinusProvider()) << "TORM: CosinusProvider::retrieveData"
63 << QThread::currentThread()->objectName() << progress;
64 // NOTE: Try to use multithread if possible
62 65 }
63 66 }
64 67 else {
65 68 if (!it.value()) {
66 69 qCDebug(LOG_CosinusProvider())
67 70 << "CosinusProvider::retrieveData: ARRET De l'acquisition detectΓ©"
68 71 << end - time;
69 72 }
70 73 }
71 74 }
72 emit dataProvidedProgress(acqIdentifier, 0.0);
73
75 if (progress != 100) {
76 // We can close progression beacause all data has been retrieved
77 emit dataProvidedProgress(acqIdentifier, 100);
78 }
74 79 return std::make_shared<ScalarSeries>(std::move(xAxisData), std::move(valuesData),
75 80 Unit{QStringLiteral("t"), true}, Unit{});
76 81 }
77 82
78 83 void CosinusProvider::requestDataLoading(QUuid acqIdentifier,
79 84 const DataProviderParameters &parameters)
80 85 {
81 86 // TODO: Add Mutex
82 87 m_VariableToEnableProvider[acqIdentifier] = true;
83 88 qCDebug(LOG_CosinusProvider()) << "TORM: CosinusProvider::requestDataLoading"
84 89 << QThread::currentThread()->objectName();
85 90 // NOTE: Try to use multithread if possible
86 91 const auto times = parameters.m_Times;
87 92
88 93 for (const auto &dateTime : qAsConst(times)) {
89 94 if (m_VariableToEnableProvider[acqIdentifier]) {
90 95 auto scalarSeries = this->retrieveData(acqIdentifier, dateTime);
91 96 qCDebug(LOG_CosinusProvider()) << "TORM: CosinusProvider::dataProvided";
92 97 emit dataProvided(acqIdentifier, scalarSeries, dateTime);
93 98 }
94 99 }
95 100 }
96 101
97 102 void CosinusProvider::requestDataAborting(QUuid acqIdentifier)
98 103 {
99 104 // TODO: Add Mutex
100 105 qCDebug(LOG_CosinusProvider()) << "CosinusProvider::requestDataAborting" << acqIdentifier
101 106 << QThread::currentThread()->objectName();
102 107 auto it = m_VariableToEnableProvider.find(acqIdentifier);
103 108 if (it != m_VariableToEnableProvider.end()) {
104 109 it.value() = false;
105 110 }
106 111 else {
107 112 qCWarning(LOG_CosinusProvider())
108 113 << tr("Aborting progression of inexistant identifier detected !!!");
109 114 }
110 115 }
General Comments 0
You need to be logged in to leave comments. Login now