##// END OF EJS Templates
Implementation of automatic cancel for request that failed
perrinel -
r704:ff170594501a
parent child
Show More
@@ -1,77 +1,81
1 1 #ifndef SCIQLOP_IDATAPROVIDER_H
2 2 #define SCIQLOP_IDATAPROVIDER_H
3 3
4 4 #include "CoreGlobal.h"
5 5
6 6 #include <memory>
7 7
8 8 #include <QObject>
9 9 #include <QUuid>
10 10
11 11 #include <Common/MetaTypes.h>
12 12
13 13 #include <Data/SqpRange.h>
14 14
15 15 #include <functional>
16 16
17 17 class DataProviderParameters;
18 18 class IDataSeries;
19 19 class QNetworkReply;
20 20 class QNetworkRequest;
21 21
22 22 /**
23 23 * @brief The IDataProvider interface aims to declare a data provider.
24 24 *
25 25 * A data provider is an entity that generates data and returns it according to various parameters
26 26 * (time interval, product to retrieve the data, etc.)
27 27 *
28 28 * @sa IDataSeries
29 29 */
30 30 class SCIQLOP_CORE_EXPORT IDataProvider : public QObject {
31 31
32 32 Q_OBJECT
33 33 public:
34 34 virtual ~IDataProvider() noexcept = default;
35 35 virtual std::shared_ptr<IDataProvider> clone() const = 0;
36 36
37 37 /**
38 38 * @brief requestDataLoading provide datas for the data identified by acqIdentifier and
39 39 * parameters
40 40 */
41 41 virtual void requestDataLoading(QUuid acqIdentifier, const DataProviderParameters &parameters)
42 42 = 0;
43 43
44 44 /**
45 45 * @brief requestDataAborting stop data loading of the data identified by acqIdentifier
46 46 */
47 47 virtual void requestDataAborting(QUuid acqIdentifier) = 0;
48 48
49 49 signals:
50 50 /**
51 51 * @brief dataProvided send dataSeries under dateTime and that corresponds of the data
52 52 * identified by acqIdentifier
53 53 */
54 54 void dataProvided(QUuid acqIdentifier, std::shared_ptr<IDataSeries> dateSeriesAcquired,
55 55 const SqpRange &dataRangeAcquired);
56 56
57 57 /**
58 * @brief dataProvided send dataSeries under dateTime and that corresponds of the data
59 * identified by identifier
60 */
58 * @brief dataProvidedProgress notify the progression of the data identifier by acqIdentifier
59 */
61 60 void dataProvidedProgress(QUuid acqIdentifier, double progress);
62 61
62 /**
63 * @brief dataProvidedFailed notify that data acquisition has failed
64 */
65 void dataProvidedFailed(QUuid acqIdentifier);
66
63 67
64 68 /**
65 69 * @brief requestConstructed send a request for the data identified by acqIdentifier
66 70 * @callback is the methode call by the reply of the request when it is finished.
67 71 */
68 72 void requestConstructed(std::shared_ptr<QNetworkRequest> request, QUuid acqIdentifier,
69 73 std::function<void(QNetworkReply *, QUuid)> callback);
70 74 };
71 75
72 76 // Required for using shared_ptr in signals/slots
73 77 SCIQLOP_REGISTER_META_TYPE(IDATAPROVIDER_PTR_REGISTRY, std::shared_ptr<IDataProvider>)
74 78 SCIQLOP_REGISTER_META_TYPE(IDATAPROVIDER_FUNCTION_REGISTRY,
75 79 std::function<void(QNetworkReply *, QUuid)>)
76 80
77 81 #endif // SCIQLOP_IDATAPROVIDER_H
@@ -1,61 +1,66
1 1 #ifndef SCIQLOP_VARIABLEACQUISITIONWORKER_H
2 2 #define SCIQLOP_VARIABLEACQUISITIONWORKER_H
3 3
4 4 #include "CoreGlobal.h"
5 5
6 6 #include <Data/DataProviderParameters.h>
7 7 #include <QLoggingCategory>
8 8 #include <QObject>
9 9 #include <QUuid>
10 10
11 11 #include <Data/AcquisitionDataPacket.h>
12 12 #include <Data/IDataSeries.h>
13 13 #include <Data/SqpRange.h>
14 14
15 15 #include <QLoggingCategory>
16 16
17 17 #include <Common/spimpl.h>
18 18
19 19 Q_DECLARE_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker)
20 20
21 21 class Variable;
22 22 class IDataProvider;
23 23
24 24 /// This class aims to handle all acquisition request
25 25 class SCIQLOP_CORE_EXPORT VariableAcquisitionWorker : public QObject {
26 26 Q_OBJECT
27 27 public:
28 28 explicit VariableAcquisitionWorker(QObject *parent = 0);
29 29 virtual ~VariableAcquisitionWorker();
30 30
31 31 QUuid pushVariableRequest(QUuid varRequestId, QUuid vIdentifier, SqpRange rangeRequested,
32 32 SqpRange cacheRangeRequested, DataProviderParameters parameters,
33 33 std::shared_ptr<IDataProvider> provider);
34 34
35 35 void abortProgressRequested(QUuid vIdentifier);
36 36
37 37 void initialize();
38 38 void finalize();
39 39 signals:
40 40 void dataProvided(QUuid vIdentifier, const SqpRange &rangeRequested,
41 41 const SqpRange &cacheRangeRequested,
42 42 QVector<AcquisitionDataPacket> dataAcquired);
43 43
44 44 void variableRequestInProgress(QUuid vIdentifier, double progress);
45 45
46
47 void variableCanceledRequested(QUuid vIdentifier);
48
49
46 50 public slots:
47 51 void onVariableDataAcquired(QUuid acqIdentifier, std::shared_ptr<IDataSeries> dataSeries,
48 52 SqpRange dataRangeAcquired);
49 53 void onVariableRetrieveDataInProgress(QUuid acqIdentifier, double progress);
54 void onVariableAcquisitionFailed(QUuid acqIdentifier);
50 55
51 56 private:
52 57 void waitForFinish();
53 58
54 59 class VariableAcquisitionWorkerPrivate;
55 60 spimpl::unique_impl_ptr<VariableAcquisitionWorkerPrivate> impl;
56 61
57 62 private slots:
58 63 void onExecuteRequest(QUuid acqIdentifier);
59 64 };
60 65
61 66 #endif // SCIQLOP_VARIABLEACQUISITIONWORKER_H
@@ -1,129 +1,130
1 1 #ifndef SCIQLOP_VARIABLECONTROLLER_H
2 2 #define SCIQLOP_VARIABLECONTROLLER_H
3 3
4 4 #include "CoreGlobal.h"
5 5
6 6 #include <Data/AcquisitionDataPacket.h>
7 7 #include <Data/SqpRange.h>
8 8
9 9 #include <QLoggingCategory>
10 10 #include <QObject>
11 11 #include <QUuid>
12 12
13 13 #include <Common/spimpl.h>
14 14
15 15 class IDataProvider;
16 16 class QItemSelectionModel;
17 17 class TimeController;
18 18 class Variable;
19 19 class VariableModel;
20 20
21 21 Q_DECLARE_LOGGING_CATEGORY(LOG_VariableController)
22 22
23 23
24 24 /**
25 25 * Possible types of zoom operation
26 26 */
27 27 enum class AcquisitionZoomType { ZoomOut, ZoomIn, PanRight, PanLeft, Unknown };
28 28
29 29
30 30 /**
31 31 * @brief The VariableController class aims to handle the variables in SciQlop.
32 32 */
33 33 class SCIQLOP_CORE_EXPORT VariableController : public QObject {
34 34 Q_OBJECT
35 35 public:
36 36 explicit VariableController(QObject *parent = 0);
37 37 virtual ~VariableController();
38 38
39 39 VariableModel *variableModel() noexcept;
40 40 QItemSelectionModel *variableSelectionModel() noexcept;
41 41
42 42 void setTimeController(TimeController *timeController) noexcept;
43 43
44 44 /**
45 45 * Clones the variable passed in parameter and adds the duplicate to the controller
46 46 * @param variable the variable to duplicate
47 47 * @return the duplicate created, nullptr if the variable couldn't be created
48 48 */
49 49 std::shared_ptr<Variable> cloneVariable(std::shared_ptr<Variable> variable) noexcept;
50 50
51 51 /**
52 52 * Deletes from the controller the variable passed in parameter.
53 53 *
54 54 * Delete a variable includes:
55 55 * - the deletion of the various references to the variable in SciQlop
56 56 * - the deletion of the model variable
57 57 * - the deletion of the provider associated with the variable
58 58 * - removing the cache associated with the variable
59 59 *
60 60 * @param variable the variable to delete from the controller.
61 61 */
62 62 void deleteVariable(std::shared_ptr<Variable> variable) noexcept;
63 63
64 64 /**
65 65 * Deletes from the controller the variables passed in parameter.
66 66 * @param variables the variables to delete from the controller.
67 67 * @sa deleteVariable()
68 68 */
69 69 void deleteVariables(const QVector<std::shared_ptr<Variable> > &variables) noexcept;
70 70
71 71
72 72 static AcquisitionZoomType getZoomType(const SqpRange &range, const SqpRange &oldRange);
73 73 signals:
74 74 /// Signal emitted when a variable is about to be deleted from the controller
75 75 void variableAboutToBeDeleted(std::shared_ptr<Variable> variable);
76 76
77 77 /// Signal emitted when a data acquisition is requested on a range for a variable
78 78 void rangeChanged(std::shared_ptr<Variable> variable, const SqpRange &range);
79 79
80 80 /// Signal emitted when a sub range of the cacheRange of the variable can be displayed
81 81 void updateVarDisplaying(std::shared_ptr<Variable> variable, const SqpRange &range);
82 82
83 83 public slots:
84 84 /// Request the data loading of the variable whithin range
85 85 void onRequestDataLoading(QVector<std::shared_ptr<Variable> > variables, const SqpRange &range,
86 86 const SqpRange &oldRange, bool synchronise);
87 87 /**
88 88 * Creates a new variable and adds it to the model
89 89 * @param name the name of the new variable
90 90 * @param metadata the metadata of the new variable
91 91 * @param provider the data provider for the new variable
92 92 * @return the pointer to the new variable or nullptr if the creation failed
93 93 */
94 94 std::shared_ptr<Variable> createVariable(const QString &name, const QVariantHash &metadata,
95 95 std::shared_ptr<IDataProvider> provider) noexcept;
96 96
97 97 /// Update the temporal parameters of every selected variable to dateTime
98 98 void onDateTimeOnSelection(const SqpRange &dateTime);
99 99
100 100
101 101 void onDataProvided(QUuid vIdentifier, const SqpRange &rangeRequested,
102 102 const SqpRange &cacheRangeRequested,
103 103 QVector<AcquisitionDataPacket> dataAcquired);
104 104
105 105 void onVariableRetrieveDataInProgress(QUuid identifier, double progress);
106 106
107 107 /// Cancel the current request for the variable
108 108 void onAbortProgressRequested(std::shared_ptr<Variable> variable);
109 void onAbortAcquisitionRequested(QUuid vIdentifier);
109 110
110 111 // synchronization group methods
111 112 void onAddSynchronizationGroupId(QUuid synchronizationGroupId);
112 113 void onRemoveSynchronizationGroupId(QUuid synchronizationGroupId);
113 114 void onAddSynchronized(std::shared_ptr<Variable> variable, QUuid synchronizationGroupId);
114 115
115 116 /// Desynchronizes the variable of the group whose identifier is passed in parameter
116 117 /// @remarks the method does nothing if the variable is not part of the group
117 118 void desynchronize(std::shared_ptr<Variable> variable, QUuid synchronizationGroupId);
118 119
119 120 void initialize();
120 121 void finalize();
121 122
122 123 private:
123 124 void waitForFinish();
124 125
125 126 class VariableControllerPrivate;
126 127 spimpl::unique_impl_ptr<VariableControllerPrivate> impl;
127 128 };
128 129
129 130 #endif // SCIQLOP_VARIABLECONTROLLER_H
@@ -1,300 +1,322
1 1 #include "Variable/VariableAcquisitionWorker.h"
2 2
3 3 #include "Variable/Variable.h"
4 4
5 5 #include <Data/AcquisitionRequest.h>
6 6 #include <Data/SqpRange.h>
7 7
8 8 #include <unordered_map>
9 9 #include <utility>
10 10
11 11 #include <QMutex>
12 12 #include <QReadWriteLock>
13 13 #include <QThread>
14 14
15 15 #include <cmath>
16 16
17 17 Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker")
18 18
19 19 struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate {
20 20
21 21 explicit VariableAcquisitionWorkerPrivate(VariableAcquisitionWorker *parent)
22 22 : m_Lock{QReadWriteLock::Recursive}, q{parent}
23 23 {
24 24 }
25 25
26 26 void lockRead() { m_Lock.lockForRead(); }
27 27 void lockWrite() { m_Lock.lockForWrite(); }
28 28 void unlock() { m_Lock.unlock(); }
29 29
30 30 void removeVariableRequest(QUuid vIdentifier);
31 31
32 32 /// Remove the current request and execute the next one if exist
33 33 void updateToNextRequest(QUuid vIdentifier);
34 34
35 35 QMutex m_WorkingMutex;
36 36 QReadWriteLock m_Lock;
37 37
38 38 std::map<QUuid, QVector<AcquisitionDataPacket> > m_AcqIdentifierToAcqDataPacketVectorMap;
39 39 std::map<QUuid, AcquisitionRequest> m_AcqIdentifierToAcqRequestMap;
40 40 std::map<QUuid, std::pair<QUuid, QUuid> > m_VIdentifierToCurrrentAcqIdNextIdPairMap;
41 41 VariableAcquisitionWorker *q;
42 42 };
43 43
44 44
45 45 VariableAcquisitionWorker::VariableAcquisitionWorker(QObject *parent)
46 46 : QObject{parent}, impl{spimpl::make_unique_impl<VariableAcquisitionWorkerPrivate>(this)}
47 47 {
48 48 }
49 49
50 50 VariableAcquisitionWorker::~VariableAcquisitionWorker()
51 51 {
52 52 qCInfo(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker destruction")
53 53 << QThread::currentThread();
54 54 this->waitForFinish();
55 55 }
56 56
57 57
58 58 QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid vIdentifier,
59 59 SqpRange rangeRequested,
60 60 SqpRange cacheRangeRequested,
61 61 DataProviderParameters parameters,
62 62 std::shared_ptr<IDataProvider> provider)
63 63 {
64 64 qCDebug(LOG_VariableAcquisitionWorker())
65 65 << tr("TORM VariableAcquisitionWorker::pushVariableRequest ") << cacheRangeRequested;
66 66 auto varRequestIdCanceled = QUuid();
67 67
68 68 // Request creation
69 69 auto acqRequest = AcquisitionRequest{};
70 qCInfo(LOG_VariableAcquisitionWorker()) << tr("TpushVariableRequest ") << vIdentifier;
70 71 acqRequest.m_VarRequestId = varRequestId;
71 72 acqRequest.m_vIdentifier = vIdentifier;
72 73 acqRequest.m_DataProviderParameters = parameters;
73 74 acqRequest.m_RangeRequested = rangeRequested;
74 75 acqRequest.m_CacheRangeRequested = cacheRangeRequested;
75 76 acqRequest.m_Size = parameters.m_Times.size();
76 77 acqRequest.m_Provider = provider;
77 78
78 79
79 80 // Register request
80 81 impl->lockWrite();
81 82 impl->m_AcqIdentifierToAcqRequestMap.insert(
82 83 std::make_pair(acqRequest.m_AcqIdentifier, acqRequest));
83 84
84 85 auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
85 86 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
86 87 // A current request already exists, we can replace the next one
87 88 auto nextAcqId = it->second.second;
88 89 auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(nextAcqId);
89 90 if (acqIdentifierToAcqRequestMapIt != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
90 91 auto request = acqIdentifierToAcqRequestMapIt->second;
91 92 varRequestIdCanceled = request.m_VarRequestId;
92 93 }
93 94
94 95 it->second.second = acqRequest.m_AcqIdentifier;
95 96 impl->unlock();
96 97 }
97 98 else {
98 99 // First request for the variable, it must be stored and executed
99 100 impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.insert(
100 101 std::make_pair(vIdentifier, std::make_pair(acqRequest.m_AcqIdentifier, QUuid())));
101 102 impl->unlock();
102 103
103 104 QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
104 105 Q_ARG(QUuid, acqRequest.m_AcqIdentifier));
105 106 }
106 107
107 108 return varRequestIdCanceled;
108 109 }
109 110
110 111 void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier)
111 112 {
112 113 impl->lockRead();
113 114
114 115 auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
115 116 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
116 117 auto currentAcqId = it->second.first;
117 118
118 119 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(currentAcqId);
119 120 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
120 121 auto request = it->second;
121 122 impl->unlock();
122 123
123 124 // Remove the current request from the worker
124 125
125 126 impl->lockWrite();
126 127 impl->updateToNextRequest(vIdentifier);
127 128 impl->unlock();
128 129
129 130 // notify the request aborting to the provider
130 131 request.m_Provider->requestDataAborting(currentAcqId);
131 132 }
132 133 else {
133 134 impl->unlock();
134 135 qCWarning(LOG_VariableAcquisitionWorker())
135 136 << tr("Impossible to abort an unknown acquisition request") << currentAcqId;
136 137 }
137 138 }
138 139 else {
139 140 impl->unlock();
140 141 }
141 142 }
142 143
143 144 void VariableAcquisitionWorker::onVariableRetrieveDataInProgress(QUuid acqIdentifier,
144 145 double progress)
145 146 {
146 147 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableRetrieveDataInProgress ")
147 148 << acqIdentifier << progress;
148 149 impl->lockRead();
149 150 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
150 151 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
151 152 auto currentPartSize = (aIdToARit->second.m_Size != 0) ? 100 / aIdToARit->second.m_Size : 0;
152 153
153 154 auto currentPartProgress
154 155 = std::isnan(progress) ? 0.0 : (progress * currentPartSize) / 100.0;
155 156 auto currentAlreadyProgress = aIdToARit->second.m_Progression * currentPartSize;
156 157
157 158 auto finalProgression = currentAlreadyProgress + currentPartProgress;
158 159 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, finalProgression);
159 160 qCDebug(LOG_VariableAcquisitionWorker())
160 << tr("TORM: onVariableRetrieveDataInProgress ") << aIdToARit->second.m_vIdentifier
161 << tr("TORM: onVariableRetrieveDataInProgress ")
162 << QThread::currentThread()->objectName() << aIdToARit->second.m_vIdentifier
161 163 << currentPartSize << currentAlreadyProgress << currentPartProgress << finalProgression;
162 164 if (finalProgression == 100.0) {
163 165 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, 0.0);
164 166 }
165 167 }
166 168 impl->unlock();
167 169 }
168 170
171 void VariableAcquisitionWorker::onVariableAcquisitionFailed(QUuid acqIdentifier)
172 {
173 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
174 << QThread::currentThread();
175 impl->lockRead();
176 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
177 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
178 auto request = it->second;
179 impl->unlock();
180 qCInfo(LOG_VariableAcquisitionWorker()) << tr("onVariableAcquisitionFailed")
181 << acqIdentifier << request.m_vIdentifier
182 << QThread::currentThread();
183 emit variableCanceledRequested(request.m_vIdentifier);
184 }
185 else {
186 impl->unlock();
187 // TODO log no acqIdentifier recognized
188 }
189 }
190
169 191 void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier,
170 192 std::shared_ptr<IDataSeries> dataSeries,
171 193 SqpRange dataRangeAcquired)
172 194 {
173 195 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableDataAcquired on range ")
174 196 << acqIdentifier << dataRangeAcquired;
175 197 impl->lockWrite();
176 198 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
177 199 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
178 200 // Store the result
179 201 auto dataPacket = AcquisitionDataPacket{};
180 202 dataPacket.m_Range = dataRangeAcquired;
181 203 dataPacket.m_DateSeries = dataSeries;
182 204
183 205 auto aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
184 206 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
185 207 // A current request result already exists, we can update it
186 208 aIdToADPVit->second.push_back(dataPacket);
187 209 }
188 210 else {
189 211 // First request result for the variable, it must be stored
190 212 impl->m_AcqIdentifierToAcqDataPacketVectorMap.insert(
191 213 std::make_pair(acqIdentifier, QVector<AcquisitionDataPacket>() << dataPacket));
192 214 }
193 215
194 216
195 217 // Decrement the counter of the request
196 218 auto &acqRequest = aIdToARit->second;
197 219 acqRequest.m_Progression = acqRequest.m_Progression + 1;
198 220
199 221 // if the counter is 0, we can return data then run the next request if it exists and
200 222 // removed the finished request
201 223 if (acqRequest.m_Size == acqRequest.m_Progression) {
202 224 // Return the data
203 225 aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
204 226 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
205 227 emit dataProvided(acqRequest.m_vIdentifier, acqRequest.m_RangeRequested,
206 228 acqRequest.m_CacheRangeRequested, aIdToADPVit->second);
207 229 }
208 230
209 231 // Update to the next request
210 232 impl->updateToNextRequest(acqRequest.m_vIdentifier);
211 233 }
212 234 }
213 235 else {
214 236 qCWarning(LOG_VariableAcquisitionWorker())
215 237 << tr("Impossible to retrieve AcquisitionRequest for the incoming data.");
216 238 }
217 239 impl->unlock();
218 240 }
219 241
220 242 void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier)
221 243 {
222 244 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onExecuteRequest") << QThread::currentThread();
223 245 impl->lockRead();
224 246 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
225 247 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
226 248 auto request = it->second;
227 249 impl->unlock();
228 250 emit variableRequestInProgress(request.m_vIdentifier, 0.1);
229 251 request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters);
230 252 }
231 253 else {
232 254 impl->unlock();
233 255 // TODO log no acqIdentifier recognized
234 256 }
235 257 }
236 258
237 259 void VariableAcquisitionWorker::initialize()
238 260 {
239 261 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init")
240 262 << QThread::currentThread();
241 263 impl->m_WorkingMutex.lock();
242 264 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init END");
243 265 }
244 266
245 267 void VariableAcquisitionWorker::finalize()
246 268 {
247 269 impl->m_WorkingMutex.unlock();
248 270 }
249 271
250 272 void VariableAcquisitionWorker::waitForFinish()
251 273 {
252 274 QMutexLocker locker{&impl->m_WorkingMutex};
253 275 }
254 276
255 277 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariableRequest(
256 278 QUuid vIdentifier)
257 279 {
258 280 lockWrite();
259 281 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
260 282
261 283 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
262 284 // A current request already exists, we can replace the next one
263 285
264 286 m_AcqIdentifierToAcqRequestMap.erase(it->second.first);
265 287 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.first);
266 288
267 289 m_AcqIdentifierToAcqRequestMap.erase(it->second.second);
268 290 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.second);
269 291 }
270 292 m_VIdentifierToCurrrentAcqIdNextIdPairMap.erase(vIdentifier);
271 293 unlock();
272 294 }
273 295
274 296 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::updateToNextRequest(
275 297 QUuid vIdentifier)
276 298 {
277 299 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
278 300 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
279 301 if (it->second.second.isNull()) {
280 302 // There is no next request, we can remove the variable request
281 303 removeVariableRequest(vIdentifier);
282 304 }
283 305 else {
284 306 auto acqIdentifierToRemove = it->second.first;
285 307 // Move the next request to the current request
286 308 it->second.first = it->second.second;
287 309 it->second.second = QUuid();
288 310 // Remove AcquisitionRequest and results;
289 311 m_AcqIdentifierToAcqRequestMap.erase(acqIdentifierToRemove);
290 312 m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqIdentifierToRemove);
291 313 // Execute the current request
292 314 QMetaObject::invokeMethod(q, "onExecuteRequest", Qt::QueuedConnection,
293 315 Q_ARG(QUuid, it->second.first));
294 316 }
295 317 }
296 318 else {
297 319 qCCritical(LOG_VariableAcquisitionWorker())
298 320 << tr("Impossible to execute the acquisition on an unfound variable ");
299 321 }
300 322 }
@@ -1,813 +1,834
1 1 #include <Variable/Variable.h>
2 2 #include <Variable/VariableAcquisitionWorker.h>
3 3 #include <Variable/VariableCacheStrategy.h>
4 4 #include <Variable/VariableController.h>
5 5 #include <Variable/VariableModel.h>
6 6 #include <Variable/VariableSynchronizationGroup.h>
7 7
8 8 #include <Data/DataProviderParameters.h>
9 9 #include <Data/IDataProvider.h>
10 10 #include <Data/IDataSeries.h>
11 11 #include <Data/VariableRequest.h>
12 12 #include <Time/TimeController.h>
13 13
14 14 #include <QMutex>
15 15 #include <QThread>
16 16 #include <QUuid>
17 17 #include <QtCore/QItemSelectionModel>
18 18
19 19 #include <deque>
20 20 #include <set>
21 21 #include <unordered_map>
22 22
23 23 Q_LOGGING_CATEGORY(LOG_VariableController, "VariableController")
24 24
25 25 namespace {
26 26
27 27 SqpRange computeSynchroRangeRequested(const SqpRange &varRange, const SqpRange &graphRange,
28 28 const SqpRange &oldGraphRange)
29 29 {
30 30 auto zoomType = VariableController::getZoomType(graphRange, oldGraphRange);
31 31
32 32 auto varRangeRequested = varRange;
33 33 switch (zoomType) {
34 34 case AcquisitionZoomType::ZoomIn: {
35 35 auto deltaLeft = graphRange.m_TStart - oldGraphRange.m_TStart;
36 36 auto deltaRight = oldGraphRange.m_TEnd - graphRange.m_TEnd;
37 37 varRangeRequested.m_TStart += deltaLeft;
38 38 varRangeRequested.m_TEnd -= deltaRight;
39 39 break;
40 40 }
41 41
42 42 case AcquisitionZoomType::ZoomOut: {
43 43 auto deltaLeft = oldGraphRange.m_TStart - graphRange.m_TStart;
44 44 auto deltaRight = graphRange.m_TEnd - oldGraphRange.m_TEnd;
45 45 varRangeRequested.m_TStart -= deltaLeft;
46 46 varRangeRequested.m_TEnd += deltaRight;
47 47 break;
48 48 }
49 49 case AcquisitionZoomType::PanRight: {
50 50 auto deltaRight = graphRange.m_TEnd - oldGraphRange.m_TEnd;
51 51 varRangeRequested.m_TStart += deltaRight;
52 52 varRangeRequested.m_TEnd += deltaRight;
53 53 break;
54 54 }
55 55 case AcquisitionZoomType::PanLeft: {
56 56 auto deltaLeft = oldGraphRange.m_TStart - graphRange.m_TStart;
57 57 varRangeRequested.m_TStart -= deltaLeft;
58 58 varRangeRequested.m_TEnd -= deltaLeft;
59 59 break;
60 60 }
61 61 case AcquisitionZoomType::Unknown: {
62 62 qCCritical(LOG_VariableController())
63 63 << VariableController::tr("Impossible to synchronize: zoom type unknown");
64 64 break;
65 65 }
66 66 default:
67 67 qCCritical(LOG_VariableController()) << VariableController::tr(
68 68 "Impossible to synchronize: zoom type not take into account");
69 69 // No action
70 70 break;
71 71 }
72 72
73 73 return varRangeRequested;
74 74 }
75 75 }
76 76
77 77 struct VariableController::VariableControllerPrivate {
78 78 explicit VariableControllerPrivate(VariableController *parent)
79 79 : m_WorkingMutex{},
80 80 m_VariableModel{new VariableModel{parent}},
81 81 m_VariableSelectionModel{new QItemSelectionModel{m_VariableModel, parent}},
82 82 m_VariableCacheStrategy{std::make_unique<VariableCacheStrategy>()},
83 83 m_VariableAcquisitionWorker{std::make_unique<VariableAcquisitionWorker>()},
84 84 q{parent}
85 85 {
86 86
87 87 m_VariableAcquisitionWorker->moveToThread(&m_VariableAcquisitionWorkerThread);
88 88 m_VariableAcquisitionWorkerThread.setObjectName("VariableAcquisitionWorkerThread");
89 89 }
90 90
91 91
92 92 virtual ~VariableControllerPrivate()
93 93 {
94 94 qCDebug(LOG_VariableController()) << tr("VariableControllerPrivate destruction");
95 95 m_VariableAcquisitionWorkerThread.quit();
96 96 m_VariableAcquisitionWorkerThread.wait();
97 97 }
98 98
99 99
100 100 void processRequest(std::shared_ptr<Variable> var, const SqpRange &rangeRequested,
101 101 QUuid varRequestId);
102 102
103 103 QVector<SqpRange> provideNotInCacheDateTimeList(std::shared_ptr<Variable> variable,
104 104 const SqpRange &dateTime);
105 105
106 106 std::shared_ptr<Variable> findVariable(QUuid vIdentifier);
107 107 std::shared_ptr<IDataSeries>
108 108 retrieveDataSeries(const QVector<AcquisitionDataPacket> acqDataPacketVector);
109 109
110 110 void registerProvider(std::shared_ptr<IDataProvider> provider);
111 111
112 112 void storeVariableRequest(QUuid varId, QUuid varRequestId, const VariableRequest &varRequest);
113 113 QUuid acceptVariableRequest(QUuid varId, std::shared_ptr<IDataSeries> dataSeries);
114 114 void updateVariableRequest(QUuid varRequestId);
115 115 void cancelVariableRequest(QUuid varRequestId);
116 116
117 117 QMutex m_WorkingMutex;
118 118 /// Variable model. The VariableController has the ownership
119 119 VariableModel *m_VariableModel;
120 120 QItemSelectionModel *m_VariableSelectionModel;
121 121
122 122
123 123 TimeController *m_TimeController{nullptr};
124 124 std::unique_ptr<VariableCacheStrategy> m_VariableCacheStrategy;
125 125 std::unique_ptr<VariableAcquisitionWorker> m_VariableAcquisitionWorker;
126 126 QThread m_VariableAcquisitionWorkerThread;
127 127
128 128 std::unordered_map<std::shared_ptr<Variable>, std::shared_ptr<IDataProvider> >
129 129 m_VariableToProviderMap;
130 130 std::unordered_map<std::shared_ptr<Variable>, QUuid> m_VariableToIdentifierMap;
131 131 std::map<QUuid, std::shared_ptr<VariableSynchronizationGroup> >
132 132 m_GroupIdToVariableSynchronizationGroupMap;
133 133 std::map<QUuid, QUuid> m_VariableIdGroupIdMap;
134 134 std::set<std::shared_ptr<IDataProvider> > m_ProviderSet;
135 135
136 136 std::map<QUuid, std::map<QUuid, VariableRequest> > m_VarRequestIdToVarIdVarRequestMap;
137 137
138 138 std::map<QUuid, std::deque<QUuid> > m_VarIdToVarRequestIdQueueMap;
139 139
140 140
141 141 VariableController *q;
142 142 };
143 143
144 144
145 145 VariableController::VariableController(QObject *parent)
146 146 : QObject{parent}, impl{spimpl::make_unique_impl<VariableControllerPrivate>(this)}
147 147 {
148 148 qCDebug(LOG_VariableController()) << tr("VariableController construction")
149 149 << QThread::currentThread();
150 150
151 151 connect(impl->m_VariableModel, &VariableModel::abortProgessRequested, this,
152 152 &VariableController::onAbortProgressRequested);
153 153
154 connect(impl->m_VariableAcquisitionWorker.get(),
155 &VariableAcquisitionWorker::variableCanceledRequested, this,
156 &VariableController::onAbortAcquisitionRequested);
157
154 158 connect(impl->m_VariableAcquisitionWorker.get(), &VariableAcquisitionWorker::dataProvided, this,
155 159 &VariableController::onDataProvided);
156 160 connect(impl->m_VariableAcquisitionWorker.get(),
157 161 &VariableAcquisitionWorker::variableRequestInProgress, this,
158 162 &VariableController::onVariableRetrieveDataInProgress);
159 163
164
160 165 connect(&impl->m_VariableAcquisitionWorkerThread, &QThread::started,
161 166 impl->m_VariableAcquisitionWorker.get(), &VariableAcquisitionWorker::initialize);
162 167 connect(&impl->m_VariableAcquisitionWorkerThread, &QThread::finished,
163 168 impl->m_VariableAcquisitionWorker.get(), &VariableAcquisitionWorker::finalize);
164 169
165 170
166 171 impl->m_VariableAcquisitionWorkerThread.start();
167 172 }
168 173
169 174 VariableController::~VariableController()
170 175 {
171 176 qCDebug(LOG_VariableController()) << tr("VariableController destruction")
172 177 << QThread::currentThread();
173 178 this->waitForFinish();
174 179 }
175 180
176 181 VariableModel *VariableController::variableModel() noexcept
177 182 {
178 183 return impl->m_VariableModel;
179 184 }
180 185
181 186 QItemSelectionModel *VariableController::variableSelectionModel() noexcept
182 187 {
183 188 return impl->m_VariableSelectionModel;
184 189 }
185 190
186 191 void VariableController::setTimeController(TimeController *timeController) noexcept
187 192 {
188 193 impl->m_TimeController = timeController;
189 194 }
190 195
191 196 std::shared_ptr<Variable>
192 197 VariableController::cloneVariable(std::shared_ptr<Variable> variable) noexcept
193 198 {
194 199 if (impl->m_VariableModel->containsVariable(variable)) {
195 200 // Clones variable
196 201 auto duplicate = variable->clone();
197 202
198 203 // Adds clone to model
199 204 impl->m_VariableModel->addVariable(duplicate);
200 205
201 206 // Generates clone identifier
202 207 impl->m_VariableToIdentifierMap[duplicate] = QUuid::createUuid();
203 208
204 209 // Registers provider
205 210 auto variableProvider = impl->m_VariableToProviderMap.at(variable);
206 211 auto duplicateProvider = variableProvider != nullptr ? variableProvider->clone() : nullptr;
207 212
208 213 impl->m_VariableToProviderMap[duplicate] = duplicateProvider;
209 214 if (duplicateProvider) {
210 215 impl->registerProvider(duplicateProvider);
211 216 }
212 217
213 218 return duplicate;
214 219 }
215 220 else {
216 221 qCCritical(LOG_VariableController())
217 222 << tr("Can't create duplicate of variable %1: variable not registered in the model")
218 223 .arg(variable->name());
219 224 return nullptr;
220 225 }
221 226 }
222 227
223 228 void VariableController::deleteVariable(std::shared_ptr<Variable> variable) noexcept
224 229 {
225 230 if (!variable) {
226 231 qCCritical(LOG_VariableController()) << "Can't delete variable: variable is null";
227 232 return;
228 233 }
229 234
230 235 // Spreads in SciQlop that the variable will be deleted, so that potential receivers can
231 236 // make some treatments before the deletion
232 237 emit variableAboutToBeDeleted(variable);
233 238
234 239 // Deletes identifier
235 240 impl->m_VariableToIdentifierMap.erase(variable);
236 241
237 242 // Deletes provider
238 243 auto nbProvidersDeleted = impl->m_VariableToProviderMap.erase(variable);
239 244 qCDebug(LOG_VariableController())
240 245 << tr("Number of providers deleted for variable %1: %2")
241 246 .arg(variable->name(), QString::number(nbProvidersDeleted));
242 247
243 248
244 249 // Deletes from model
245 250 impl->m_VariableModel->deleteVariable(variable);
246 251 }
247 252
248 253 void VariableController::deleteVariables(
249 254 const QVector<std::shared_ptr<Variable> > &variables) noexcept
250 255 {
251 256 for (auto variable : qAsConst(variables)) {
252 257 deleteVariable(variable);
253 258 }
254 259 }
255 260
256 261 std::shared_ptr<Variable>
257 262 VariableController::createVariable(const QString &name, const QVariantHash &metadata,
258 263 std::shared_ptr<IDataProvider> provider) noexcept
259 264 {
260 265 if (!impl->m_TimeController) {
261 266 qCCritical(LOG_VariableController())
262 267 << tr("Impossible to create variable: The time controller is null");
263 268 return nullptr;
264 269 }
265 270
266 271 auto range = impl->m_TimeController->dateTime();
267 272
268 273 if (auto newVariable = impl->m_VariableModel->createVariable(name, metadata)) {
269 274 auto identifier = QUuid::createUuid();
270 275
271 276 // store the provider
272 277 impl->registerProvider(provider);
273 278
274 279 // Associate the provider
275 280 impl->m_VariableToProviderMap[newVariable] = provider;
281 qCInfo(LOG_VariableController()) << "createVariable: " << identifier;
276 282 impl->m_VariableToIdentifierMap[newVariable] = identifier;
277 283
278 284
279 285 auto varRequestId = QUuid::createUuid();
280 286 impl->processRequest(newVariable, range, varRequestId);
281 287 impl->updateVariableRequest(varRequestId);
282 288
283 289 return newVariable;
284 290 }
285 291 }
286 292
287 293 void VariableController::onDateTimeOnSelection(const SqpRange &dateTime)
288 294 {
289 295 // TODO check synchronisation and Rescale
290 296 qCDebug(LOG_VariableController()) << "VariableController::onDateTimeOnSelection"
291 297 << QThread::currentThread()->objectName();
292 298 auto selectedRows = impl->m_VariableSelectionModel->selectedRows();
293 299 auto varRequestId = QUuid::createUuid();
294 300
295 301 for (const auto &selectedRow : qAsConst(selectedRows)) {
296 302 if (auto selectedVariable = impl->m_VariableModel->variable(selectedRow.row())) {
297 303 selectedVariable->setRange(dateTime);
298 304 impl->processRequest(selectedVariable, dateTime, varRequestId);
299 305
300 306 // notify that rescale operation has to be done
301 307 emit rangeChanged(selectedVariable, dateTime);
302 308 }
303 309 }
304 310 impl->updateVariableRequest(varRequestId);
305 311 }
306 312
307 313 void VariableController::onDataProvided(QUuid vIdentifier, const SqpRange &rangeRequested,
308 314 const SqpRange &cacheRangeRequested,
309 315 QVector<AcquisitionDataPacket> dataAcquired)
310 316 {
311 317 auto retrievedDataSeries = impl->retrieveDataSeries(dataAcquired);
312 318 auto varRequestId = impl->acceptVariableRequest(vIdentifier, retrievedDataSeries);
313 319 if (!varRequestId.isNull()) {
314 320 impl->updateVariableRequest(varRequestId);
315 321 }
316 322 }
317 323
318 324 void VariableController::onVariableRetrieveDataInProgress(QUuid identifier, double progress)
319 325 {
320 326 qCDebug(LOG_VariableController())
321 327 << "TORM: variableController::onVariableRetrieveDataInProgress"
322 328 << QThread::currentThread()->objectName() << progress;
323 329 if (auto var = impl->findVariable(identifier)) {
324 330 impl->m_VariableModel->setDataProgress(var, progress);
325 331 }
326 332 else {
327 333 qCCritical(LOG_VariableController())
328 334 << tr("Impossible to notify progression of a null variable");
329 335 }
330 336 }
331 337
332 338 void VariableController::onAbortProgressRequested(std::shared_ptr<Variable> variable)
333 339 {
334 340 auto it = impl->m_VariableToIdentifierMap.find(variable);
335 341 if (it != impl->m_VariableToIdentifierMap.cend()) {
336 342 impl->m_VariableAcquisitionWorker->abortProgressRequested(it->second);
337 343
338 344 QUuid varRequestId;
339 345 auto varIdToVarRequestIdQueueMapIt = impl->m_VarIdToVarRequestIdQueueMap.find(it->second);
340 346 if (varIdToVarRequestIdQueueMapIt != impl->m_VarIdToVarRequestIdQueueMap.cend()) {
341 347 auto &varRequestIdQueue = varIdToVarRequestIdQueueMapIt->second;
342 348 varRequestId = varRequestIdQueue.front();
343 349 impl->cancelVariableRequest(varRequestId);
344 350
345 351 // Finish the progression for the request
346 352 impl->m_VariableModel->setDataProgress(variable, 0.0);
347 353 }
348 354 else {
349 355 qCWarning(LOG_VariableController())
350 356 << tr("Aborting progression of inexistant variable request detected !!!")
351 357 << QThread::currentThread()->objectName();
352 358 }
353 359 }
354 360 else {
355 361 qCWarning(LOG_VariableController())
356 362 << tr("Aborting progression of inexistant variable detected !!!")
357 363 << QThread::currentThread()->objectName();
358 364 }
359 365 }
360 366
367 void VariableController::onAbortAcquisitionRequested(QUuid vIdentifier)
368 {
369 qCDebug(LOG_VariableController()) << "TORM: variableController::onAbortAcquisitionRequested"
370 << QThread::currentThread()->objectName() << vIdentifier;
371
372 if (auto var = impl->findVariable(vIdentifier)) {
373 this->onAbortProgressRequested(var);
374 }
375 else {
376 qCCritical(LOG_VariableController())
377 << tr("Impossible to abort Acquisition Requestof a null variable");
378 }
379 }
380
361 381 void VariableController::onAddSynchronizationGroupId(QUuid synchronizationGroupId)
362 382 {
363 383 qCDebug(LOG_VariableController()) << "TORM: VariableController::onAddSynchronizationGroupId"
364 384 << QThread::currentThread()->objectName()
365 385 << synchronizationGroupId;
366 386 auto vSynchroGroup = std::make_shared<VariableSynchronizationGroup>();
367 387 impl->m_GroupIdToVariableSynchronizationGroupMap.insert(
368 388 std::make_pair(synchronizationGroupId, vSynchroGroup));
369 389 }
370 390
371 391 void VariableController::onRemoveSynchronizationGroupId(QUuid synchronizationGroupId)
372 392 {
373 393 impl->m_GroupIdToVariableSynchronizationGroupMap.erase(synchronizationGroupId);
374 394 }
375 395
376 396 void VariableController::onAddSynchronized(std::shared_ptr<Variable> variable,
377 397 QUuid synchronizationGroupId)
378 398
379 399 {
380 400 qCDebug(LOG_VariableController()) << "TORM: VariableController::onAddSynchronized"
381 401 << synchronizationGroupId;
382 402 auto varToVarIdIt = impl->m_VariableToIdentifierMap.find(variable);
383 403 if (varToVarIdIt != impl->m_VariableToIdentifierMap.cend()) {
384 404 auto groupIdToVSGIt
385 405 = impl->m_GroupIdToVariableSynchronizationGroupMap.find(synchronizationGroupId);
386 406 if (groupIdToVSGIt != impl->m_GroupIdToVariableSynchronizationGroupMap.cend()) {
387 407 impl->m_VariableIdGroupIdMap.insert(
388 408 std::make_pair(varToVarIdIt->second, synchronizationGroupId));
389 409 groupIdToVSGIt->second->addVariableId(varToVarIdIt->second);
390 410 }
391 411 else {
392 412 qCCritical(LOG_VariableController())
393 413 << tr("Impossible to synchronize a variable with an unknown sycnhronization group")
394 414 << variable->name();
395 415 }
396 416 }
397 417 else {
398 418 qCCritical(LOG_VariableController())
399 419 << tr("Impossible to synchronize a variable with no identifier") << variable->name();
400 420 }
401 421 }
402 422
403 423 void VariableController::desynchronize(std::shared_ptr<Variable> variable,
404 424 QUuid synchronizationGroupId)
405 425 {
406 426 // Gets variable id
407 427 auto variableIt = impl->m_VariableToIdentifierMap.find(variable);
408 428 if (variableIt == impl->m_VariableToIdentifierMap.cend()) {
409 429 qCCritical(LOG_VariableController())
410 430 << tr("Can't desynchronize variable %1: variable identifier not found")
411 431 .arg(variable->name());
412 432 return;
413 433 }
414 434
415 435 // Gets synchronization group
416 436 auto groupIt = impl->m_GroupIdToVariableSynchronizationGroupMap.find(synchronizationGroupId);
417 437 if (groupIt == impl->m_GroupIdToVariableSynchronizationGroupMap.cend()) {
418 438 qCCritical(LOG_VariableController())
419 439 << tr("Can't desynchronize variable %1: unknown synchronization group")
420 440 .arg(variable->name());
421 441 return;
422 442 }
423 443
424 444 auto variableId = variableIt->second;
425 445
426 446 // Removes variable from synchronization group
427 447 auto synchronizationGroup = groupIt->second;
428 448 synchronizationGroup->removeVariableId(variableId);
429 449
430 450 // Removes link between variable and synchronization group
431 451 impl->m_VariableIdGroupIdMap.erase(variableId);
432 452 }
433 453
434 454 void VariableController::onRequestDataLoading(QVector<std::shared_ptr<Variable> > variables,
435 455 const SqpRange &range, const SqpRange &oldRange,
436 456 bool synchronise)
437 457 {
438 458 // NOTE: oldRange isn't really necessary since oldRange == variable->range().
439 459
440 460 // we want to load data of the variable for the dateTime.
441 461 // First we check if the cache contains some of them.
442 462 // For the other, we ask the provider to give them.
443 463
444 464 auto varRequestId = QUuid::createUuid();
445 465 qCDebug(LOG_VariableController()) << "VariableController::onRequestDataLoading"
446 466 << QThread::currentThread()->objectName() << varRequestId;
447 467
448 468 for (const auto &var : variables) {
449 469 qCDebug(LOG_VariableController()) << "processRequest for" << var->name() << varRequestId;
450 470 impl->processRequest(var, range, varRequestId);
451 471 }
452 472
453 473 if (synchronise) {
454 474 // Get the group ids
455 475 qCDebug(LOG_VariableController())
456 476 << "TORM VariableController::onRequestDataLoading for synchro var ENABLE";
457 477 auto groupIds = std::set<QUuid>{};
458 478 auto groupIdToOldRangeMap = std::map<QUuid, SqpRange>{};
459 479 for (const auto &var : variables) {
460 480 auto varToVarIdIt = impl->m_VariableToIdentifierMap.find(var);
461 481 if (varToVarIdIt != impl->m_VariableToIdentifierMap.cend()) {
462 482 auto vId = varToVarIdIt->second;
463 483 auto varIdToGroupIdIt = impl->m_VariableIdGroupIdMap.find(vId);
464 484 if (varIdToGroupIdIt != impl->m_VariableIdGroupIdMap.cend()) {
465 485 auto gId = varIdToGroupIdIt->second;
466 486 groupIdToOldRangeMap.insert(std::make_pair(gId, var->range()));
467 487 if (groupIds.find(gId) == groupIds.cend()) {
468 488 qCDebug(LOG_VariableController()) << "Synchro detect group " << gId;
469 489 groupIds.insert(gId);
470 490 }
471 491 }
472 492 }
473 493 }
474 494
475 495 // We assume here all group ids exist
476 496 for (const auto &gId : groupIds) {
477 497 auto vSynchronizationGroup = impl->m_GroupIdToVariableSynchronizationGroupMap.at(gId);
478 498 auto vSyncIds = vSynchronizationGroup->getIds();
479 499 qCDebug(LOG_VariableController()) << "Var in synchro group ";
480 500 for (auto vId : vSyncIds) {
481 501 auto var = impl->findVariable(vId);
482 502
483 503 // Don't process already processed var
484 504 if (!variables.contains(var)) {
485 505 if (var != nullptr) {
486 506 qCDebug(LOG_VariableController()) << "processRequest synchro for"
487 507 << var->name();
488 508 auto vSyncRangeRequested = computeSynchroRangeRequested(
489 509 var->range(), range, groupIdToOldRangeMap.at(gId));
490 510 qCDebug(LOG_VariableController()) << "synchro RR" << vSyncRangeRequested;
491 511 impl->processRequest(var, vSyncRangeRequested, varRequestId);
492 512 }
493 513 else {
494 514 qCCritical(LOG_VariableController())
495 515
496 516 << tr("Impossible to synchronize a null variable");
497 517 }
498 518 }
499 519 }
500 520 }
501 521 }
502 522
503 523 impl->updateVariableRequest(varRequestId);
504 524 }
505 525
506 526
507 527 void VariableController::initialize()
508 528 {
509 529 qCDebug(LOG_VariableController()) << tr("VariableController init") << QThread::currentThread();
510 530 impl->m_WorkingMutex.lock();
511 531 qCDebug(LOG_VariableController()) << tr("VariableController init END");
512 532 }
513 533
514 534 void VariableController::finalize()
515 535 {
516 536 impl->m_WorkingMutex.unlock();
517 537 }
518 538
519 539 void VariableController::waitForFinish()
520 540 {
521 541 QMutexLocker locker{&impl->m_WorkingMutex};
522 542 }
523 543
524 544 AcquisitionZoomType VariableController::getZoomType(const SqpRange &range, const SqpRange &oldRange)
525 545 {
526 546 // t1.m_TStart <= t2.m_TStart && t2.m_TEnd <= t1.m_TEnd
527 547 auto zoomType = AcquisitionZoomType::Unknown;
528 548 if (range.m_TStart <= oldRange.m_TStart && oldRange.m_TEnd <= range.m_TEnd) {
529 549 zoomType = AcquisitionZoomType::ZoomOut;
530 550 }
531 551 else if (range.m_TStart > oldRange.m_TStart && range.m_TEnd > oldRange.m_TEnd) {
532 552 zoomType = AcquisitionZoomType::PanRight;
533 553 }
534 554 else if (range.m_TStart < oldRange.m_TStart && range.m_TEnd < oldRange.m_TEnd) {
535 555 zoomType = AcquisitionZoomType::PanLeft;
536 556 }
537 557 else if (range.m_TStart > oldRange.m_TStart && oldRange.m_TEnd > range.m_TEnd) {
538 558 zoomType = AcquisitionZoomType::ZoomIn;
539 559 }
540 560 else {
541 561 qCCritical(LOG_VariableController()) << "getZoomType: Unknown type detected";
542 562 }
543 563 return zoomType;
544 564 }
545 565
546 566 void VariableController::VariableControllerPrivate::processRequest(std::shared_ptr<Variable> var,
547 567 const SqpRange &rangeRequested,
548 568 QUuid varRequestId)
549 569 {
550 570
551 571 // TODO: protect at
552 572 auto varRequest = VariableRequest{};
553 573 auto varId = m_VariableToIdentifierMap.at(var);
554 574
555 575 auto varStrategyRangesRequested
556 576 = m_VariableCacheStrategy->computeStrategyRanges(var->range(), rangeRequested);
557 577 auto notInCacheRangeList = var->provideNotInCacheRangeList(varStrategyRangesRequested.second);
558 578 auto inCacheRangeList = var->provideInCacheRangeList(varStrategyRangesRequested.second);
559 579
560 580 if (!notInCacheRangeList.empty()) {
561 581 varRequest.m_RangeRequested = varStrategyRangesRequested.first;
562 582 varRequest.m_CacheRangeRequested = varStrategyRangesRequested.second;
563 583
564 584 // store VarRequest
565 585 storeVariableRequest(varId, varRequestId, varRequest);
566 586
567 587 auto varProvider = m_VariableToProviderMap.at(var);
568 588 if (varProvider != nullptr) {
569 589 auto varRequestIdCanceled = m_VariableAcquisitionWorker->pushVariableRequest(
570 590 varRequestId, varId, varStrategyRangesRequested.first,
571 591 varStrategyRangesRequested.second,
572 592 DataProviderParameters{std::move(notInCacheRangeList), var->metadata()},
573 593 varProvider);
574 594
575 595 if (!varRequestIdCanceled.isNull()) {
576 596 qCDebug(LOG_VariableAcquisitionWorker()) << tr("vsarRequestIdCanceled: ")
577 597 << varRequestIdCanceled;
578 598 cancelVariableRequest(varRequestIdCanceled);
579 599 }
580 600 }
581 601 else {
582 602 qCCritical(LOG_VariableController())
583 603 << "Impossible to provide data with a null provider";
584 604 }
585 605
586 606 if (!inCacheRangeList.empty()) {
587 607 emit q->updateVarDisplaying(var, inCacheRangeList.first());
588 608 }
589 609 }
590 610 else {
591 611 varRequest.m_RangeRequested = varStrategyRangesRequested.first;
592 612 varRequest.m_CacheRangeRequested = varStrategyRangesRequested.second;
593 613 // store VarRequest
594 614 storeVariableRequest(varId, varRequestId, varRequest);
595 615 acceptVariableRequest(varId,
596 616 var->dataSeries()->subDataSeries(varStrategyRangesRequested.second));
597 617 }
598 618 }
599 619
600 620 std::shared_ptr<Variable>
601 621 VariableController::VariableControllerPrivate::findVariable(QUuid vIdentifier)
602 622 {
603 623 std::shared_ptr<Variable> var;
604 624 auto findReply = [vIdentifier](const auto &entry) { return vIdentifier == entry.second; };
605 625
606 626 auto end = m_VariableToIdentifierMap.cend();
607 627 auto it = std::find_if(m_VariableToIdentifierMap.cbegin(), end, findReply);
608 628 if (it != end) {
609 629 var = it->first;
610 630 }
611 631 else {
612 632 qCCritical(LOG_VariableController())
613 633 << tr("Impossible to find the variable with the identifier: ") << vIdentifier;
614 634 }
615 635
616 636 return var;
617 637 }
618 638
619 639 std::shared_ptr<IDataSeries> VariableController::VariableControllerPrivate::retrieveDataSeries(
620 640 const QVector<AcquisitionDataPacket> acqDataPacketVector)
621 641 {
622 642 qCDebug(LOG_VariableController()) << tr("TORM: retrieveDataSeries acqDataPacketVector size")
623 643 << acqDataPacketVector.size();
624 644 std::shared_ptr<IDataSeries> dataSeries;
625 645 if (!acqDataPacketVector.isEmpty()) {
626 646 dataSeries = acqDataPacketVector[0].m_DateSeries;
627 647 for (int i = 1; i < acqDataPacketVector.size(); ++i) {
628 648 dataSeries->merge(acqDataPacketVector[i].m_DateSeries.get());
629 649 }
630 650 }
631 651 qCDebug(LOG_VariableController()) << tr("TORM: retrieveDataSeries acqDataPacketVector size END")
632 652 << acqDataPacketVector.size();
633 653 return dataSeries;
634 654 }
635 655
636 656 void VariableController::VariableControllerPrivate::registerProvider(
637 657 std::shared_ptr<IDataProvider> provider)
638 658 {
639 659 if (m_ProviderSet.find(provider) == m_ProviderSet.end()) {
640 660 qCDebug(LOG_VariableController()) << tr("Registering of a new provider")
641 661 << provider->objectName();
642 662 m_ProviderSet.insert(provider);
643 663 connect(provider.get(), &IDataProvider::dataProvided, m_VariableAcquisitionWorker.get(),
644 664 &VariableAcquisitionWorker::onVariableDataAcquired);
645 665 connect(provider.get(), &IDataProvider::dataProvidedProgress,
646 666 m_VariableAcquisitionWorker.get(),
647 667 &VariableAcquisitionWorker::onVariableRetrieveDataInProgress);
668 connect(provider.get(), &IDataProvider::dataProvidedFailed,
669 m_VariableAcquisitionWorker.get(),
670 &VariableAcquisitionWorker::onVariableAcquisitionFailed);
648 671 }
649 672 else {
650 673 qCDebug(LOG_VariableController()) << tr("Cannot register provider, it already exists ");
651 674 }
652 675 }
653 676
654 677 void VariableController::VariableControllerPrivate::storeVariableRequest(
655 678 QUuid varId, QUuid varRequestId, const VariableRequest &varRequest)
656 679 {
657 680 // First request for the variable. we can create an entry for it
658 681 auto varIdToVarRequestIdQueueMapIt = m_VarIdToVarRequestIdQueueMap.find(varId);
659 682 if (varIdToVarRequestIdQueueMapIt == m_VarIdToVarRequestIdQueueMap.cend()) {
660 683 auto varRequestIdQueue = std::deque<QUuid>{};
661 684 qCDebug(LOG_VariableController()) << tr("Store REQUEST in QUEUE");
662 685 varRequestIdQueue.push_back(varRequestId);
663 686 m_VarIdToVarRequestIdQueueMap.insert(std::make_pair(varId, std::move(varRequestIdQueue)));
664 687 }
665 688 else {
666 689 qCDebug(LOG_VariableController()) << tr("Store REQUEST in EXISTING QUEUE");
667 690 auto &varRequestIdQueue = varIdToVarRequestIdQueueMapIt->second;
668 691 varRequestIdQueue.push_back(varRequestId);
669 692 }
670 693
671 694 auto varRequestIdToVarIdVarRequestMapIt = m_VarRequestIdToVarIdVarRequestMap.find(varRequestId);
672 695 if (varRequestIdToVarIdVarRequestMapIt == m_VarRequestIdToVarIdVarRequestMap.cend()) {
673 696 auto varIdToVarRequestMap = std::map<QUuid, VariableRequest>{};
674 697 varIdToVarRequestMap.insert(std::make_pair(varId, varRequest));
675 698 qCDebug(LOG_VariableController()) << tr("Store REQUESTID in MAP");
676 699 m_VarRequestIdToVarIdVarRequestMap.insert(
677 700 std::make_pair(varRequestId, std::move(varIdToVarRequestMap)));
678 701 }
679 702 else {
680 703 auto &varIdToVarRequestMap = varRequestIdToVarIdVarRequestMapIt->second;
681 704 qCDebug(LOG_VariableController()) << tr("Store REQUESTID in EXISTING MAP");
682 705 varIdToVarRequestMap.insert(std::make_pair(varId, varRequest));
683 706 }
684 707 }
685 708
686 709 QUuid VariableController::VariableControllerPrivate::acceptVariableRequest(
687 710 QUuid varId, std::shared_ptr<IDataSeries> dataSeries)
688 711 {
689 712 QUuid varRequestId;
690 713 auto varIdToVarRequestIdQueueMapIt = m_VarIdToVarRequestIdQueueMap.find(varId);
691 714 if (varIdToVarRequestIdQueueMapIt != m_VarIdToVarRequestIdQueueMap.cend()) {
692 715 auto &varRequestIdQueue = varIdToVarRequestIdQueueMapIt->second;
693 716 varRequestId = varRequestIdQueue.front();
694 717 auto varRequestIdToVarIdVarRequestMapIt
695 718 = m_VarRequestIdToVarIdVarRequestMap.find(varRequestId);
696 719 if (varRequestIdToVarIdVarRequestMapIt != m_VarRequestIdToVarIdVarRequestMap.cend()) {
697 720 auto &varIdToVarRequestMap = varRequestIdToVarIdVarRequestMapIt->second;
698 721 auto varIdToVarRequestMapIt = varIdToVarRequestMap.find(varId);
699 722 if (varIdToVarRequestMapIt != varIdToVarRequestMap.cend()) {
700 723 qCDebug(LOG_VariableController()) << tr("acceptVariableRequest");
701 724 auto &varRequest = varIdToVarRequestMapIt->second;
702 725 varRequest.m_DataSeries = dataSeries;
703 726 varRequest.m_CanUpdate = true;
704 727 }
705 728 else {
706 729 qCDebug(LOG_VariableController())
707 730 << tr("Impossible to acceptVariableRequest of a unknown variable id attached "
708 731 "to a variableRequestId")
709 732 << varRequestId << varId;
710 733 }
711 734 }
712 735 else {
713 736 qCCritical(LOG_VariableController())
714 737 << tr("Impossible to acceptVariableRequest of a unknown variableRequestId")
715 738 << varRequestId;
716 739 }
717 740
718 qCDebug(LOG_VariableController()) << tr("1: erase REQUEST in QUEUE ?")
719 << varRequestIdQueue.size();
720 741 varRequestIdQueue.pop_front();
721 qCDebug(LOG_VariableController()) << tr("2: erase REQUEST in QUEUE ?")
722 << varRequestIdQueue.size();
723 742 if (varRequestIdQueue.empty()) {
743 qCDebug(LOG_VariableController())
744 << tr("TORM Erase REQUEST because it has been accepted") << varId;
724 745 m_VarIdToVarRequestIdQueueMap.erase(varId);
725 746 }
726 747 }
727 748 else {
728 749 qCCritical(LOG_VariableController())
729 750 << tr("Impossible to acceptVariableRequest of a unknown variable id") << varId;
730 751 }
731 752
732 753 return varRequestId;
733 754 }
734 755
735 756 void VariableController::VariableControllerPrivate::updateVariableRequest(QUuid varRequestId)
736 757 {
737 758
738 759 auto varRequestIdToVarIdVarRequestMapIt = m_VarRequestIdToVarIdVarRequestMap.find(varRequestId);
739 760 if (varRequestIdToVarIdVarRequestMapIt != m_VarRequestIdToVarIdVarRequestMap.cend()) {
740 761 bool processVariableUpdate = true;
741 762 auto &varIdToVarRequestMap = varRequestIdToVarIdVarRequestMapIt->second;
742 763 for (auto varIdToVarRequestMapIt = varIdToVarRequestMap.cbegin();
743 764 (varIdToVarRequestMapIt != varIdToVarRequestMap.cend()) && processVariableUpdate;
744 765 ++varIdToVarRequestMapIt) {
745 766 processVariableUpdate &= varIdToVarRequestMapIt->second.m_CanUpdate;
746 767 qCDebug(LOG_VariableController()) << tr("updateVariableRequest")
747 768 << processVariableUpdate;
748 769 }
749 770
750 771 if (processVariableUpdate) {
751 772 for (auto varIdToVarRequestMapIt = varIdToVarRequestMap.cbegin();
752 773 varIdToVarRequestMapIt != varIdToVarRequestMap.cend(); ++varIdToVarRequestMapIt) {
753 774 if (auto var = findVariable(varIdToVarRequestMapIt->first)) {
754 775 auto &varRequest = varIdToVarRequestMapIt->second;
755 776 var->setRange(varRequest.m_RangeRequested);
756 777 var->setCacheRange(varRequest.m_CacheRangeRequested);
757 778 qCDebug(LOG_VariableController()) << tr("1: onDataProvided")
758 779 << varRequest.m_RangeRequested;
759 780 qCDebug(LOG_VariableController()) << tr("2: onDataProvided")
760 781 << varRequest.m_CacheRangeRequested;
761 782 var->mergeDataSeries(varRequest.m_DataSeries);
762 783 qCDebug(LOG_VariableController()) << tr("3: onDataProvided")
763 784 << varRequest.m_DataSeries->range();
764 785 qCDebug(LOG_VariableController()) << tr("4: onDataProvided");
765 786
766 787 /// @todo MPL: confirm
767 788 // Variable update is notified only if there is no pending request for it
768 789 // if
769 790 // (m_VarIdToVarRequestIdQueueMap.count(varIdToVarRequestMapIt->first)
770 791 // == 0) {
771 792 emit var->updated();
772 793 // }
773 794 }
774 795 else {
775 796 qCCritical(LOG_VariableController())
776 797 << tr("Impossible to update data to a null variable");
777 798 }
778 799 }
779 800
780 801 // cleaning varRequestId
781 802 qCDebug(LOG_VariableController()) << tr("0: erase REQUEST in MAP ?")
782 803 << m_VarRequestIdToVarIdVarRequestMap.size();
783 804 m_VarRequestIdToVarIdVarRequestMap.erase(varRequestId);
784 805 qCDebug(LOG_VariableController()) << tr("1: erase REQUEST in MAP ?")
785 806 << m_VarRequestIdToVarIdVarRequestMap.size();
786 807 }
787 808 }
788 809 else {
789 810 qCCritical(LOG_VariableController())
790 811 << tr("Cannot updateVariableRequest for a unknow varRequestId") << varRequestId;
791 812 }
792 813 }
793 814
794 815 void VariableController::VariableControllerPrivate::cancelVariableRequest(QUuid varRequestId)
795 816 {
796 817 // cleaning varRequestId
797 818 m_VarRequestIdToVarIdVarRequestMap.erase(varRequestId);
798 819
799 820 for (auto varIdToVarRequestIdQueueMapIt = m_VarIdToVarRequestIdQueueMap.begin();
800 821 varIdToVarRequestIdQueueMapIt != m_VarIdToVarRequestIdQueueMap.end();) {
801 822 auto &varRequestIdQueue = varIdToVarRequestIdQueueMapIt->second;
802 823 varRequestIdQueue.erase(
803 824 std::remove(varRequestIdQueue.begin(), varRequestIdQueue.end(), varRequestId),
804 825 varRequestIdQueue.end());
805 826 if (varRequestIdQueue.empty()) {
806 827 varIdToVarRequestIdQueueMapIt
807 828 = m_VarIdToVarRequestIdQueueMap.erase(varIdToVarRequestIdQueueMapIt);
808 829 }
809 830 else {
810 831 ++varIdToVarRequestIdQueueMapIt;
811 832 }
812 833 }
813 834 }
@@ -1,277 +1,283
1 1 #include "AmdaProvider.h"
2 2 #include "AmdaDefs.h"
3 3 #include "AmdaResultParser.h"
4 4
5 5 #include <Common/DateUtils.h>
6 6 #include <Data/DataProviderParameters.h>
7 7 #include <Network/NetworkController.h>
8 8 #include <SqpApplication.h>
9 9 #include <Variable/Variable.h>
10 10
11 11 #include <QNetworkAccessManager>
12 12 #include <QNetworkReply>
13 13 #include <QTemporaryFile>
14 14 #include <QThread>
15 15
16 16 Q_LOGGING_CATEGORY(LOG_AmdaProvider, "AmdaProvider")
17 17
18 18 namespace {
19 19
20 20 /// URL format for a request on AMDA server. The parameters are as follows:
21 21 /// - %1: start date
22 22 /// - %2: end date
23 23 /// - %3: parameter id
24 24 const auto AMDA_URL_FORMAT = QStringLiteral(
25 25 "http://amda.irap.omp.eu/php/rest/"
26 26 "getParameter.php?startTime=%1&stopTime=%2&parameterID=%3&outputFormat=ASCII&"
27 27 "timeFormat=ISO8601&gzip=0");
28 28
29 29 /// Dates format passed in the URL (e.g 2013-09-23T09:00)
30 30 const auto AMDA_TIME_FORMAT = QStringLiteral("yyyy-MM-ddThh:mm:ss");
31 31
32 32 /// Formats a time to a date that can be passed in URL
33 33 QString dateFormat(double sqpRange) noexcept
34 34 {
35 35 auto dateTime = DateUtils::dateTime(sqpRange);
36 36 return dateTime.toString(AMDA_TIME_FORMAT);
37 37 }
38 38
39 39 AmdaResultParser::ValueType valueType(const QString &valueType)
40 40 {
41 41 if (valueType == QStringLiteral("scalar")) {
42 42 return AmdaResultParser::ValueType::SCALAR;
43 43 }
44 44 else if (valueType == QStringLiteral("vector")) {
45 45 return AmdaResultParser::ValueType::VECTOR;
46 46 }
47 47 else {
48 48 return AmdaResultParser::ValueType::UNKNOWN;
49 49 }
50 50 }
51 51
52 52 } // namespace
53 53
54 54 AmdaProvider::AmdaProvider()
55 55 {
56 56 qCDebug(LOG_AmdaProvider()) << tr("AmdaProvider::AmdaProvider") << QThread::currentThread();
57 57 if (auto app = sqpApp) {
58 58 auto &networkController = app->networkController();
59 59 connect(this, SIGNAL(requestConstructed(std::shared_ptr<QNetworkRequest>, QUuid,
60 60 std::function<void(QNetworkReply *, QUuid)>)),
61 61 &networkController,
62 62 SLOT(onProcessRequested(std::shared_ptr<QNetworkRequest>, QUuid,
63 63 std::function<void(QNetworkReply *, QUuid)>)));
64 64
65 65
66 66 connect(&sqpApp->networkController(),
67 67 SIGNAL(replyDownloadProgress(QUuid, std::shared_ptr<QNetworkRequest>, double)),
68 68 this,
69 69 SLOT(onReplyDownloadProgress(QUuid, std::shared_ptr<QNetworkRequest>, double)));
70 70 }
71 71 }
72 72
73 73 std::shared_ptr<IDataProvider> AmdaProvider::clone() const
74 74 {
75 75 // No copy is made in the clone
76 76 return std::make_shared<AmdaProvider>();
77 77 }
78 78
79 79 void AmdaProvider::requestDataLoading(QUuid acqIdentifier, const DataProviderParameters &parameters)
80 80 {
81 81 // NOTE: Try to use multithread if possible
82 82 const auto times = parameters.m_Times;
83 83 const auto data = parameters.m_Data;
84 84 for (const auto &dateTime : qAsConst(times)) {
85 qCInfo(LOG_AmdaProvider()) << tr("TORM AmdaProvider::requestDataLoading ") << acqIdentifier
86 << dateTime;
85 qCDebug(LOG_AmdaProvider()) << tr("TORM AmdaProvider::requestDataLoading ") << acqIdentifier
86 << dateTime;
87 87 this->retrieveData(acqIdentifier, dateTime, data);
88 88
89 89
90 90 // TORM when AMDA will support quick asynchrone request
91 91 QThread::msleep(1000);
92 92 }
93 93 }
94 94
95 95 void AmdaProvider::requestDataAborting(QUuid acqIdentifier)
96 96 {
97 97 if (auto app = sqpApp) {
98 98 auto &networkController = app->networkController();
99 99 networkController.onReplyCanceled(acqIdentifier);
100 100 }
101 101 }
102 102
103 103 void AmdaProvider::onReplyDownloadProgress(QUuid acqIdentifier,
104 104 std::shared_ptr<QNetworkRequest> networkRequest,
105 105 double progress)
106 106 {
107 107 qCDebug(LOG_AmdaProvider()) << tr("onReplyDownloadProgress") << acqIdentifier
108 108 << networkRequest.get() << progress;
109 109 auto acqIdToRequestProgressMapIt = m_AcqIdToRequestProgressMap.find(acqIdentifier);
110 110 if (acqIdToRequestProgressMapIt != m_AcqIdToRequestProgressMap.end()) {
111 111
112 112 // Update the progression for the current request
113 113 auto requestPtr = networkRequest;
114 114 auto findRequest = [requestPtr](const auto &entry) { return requestPtr == entry.first; };
115 115
116 116 auto &requestProgressMap = acqIdToRequestProgressMapIt->second;
117 117 auto requestProgressMapEnd = requestProgressMap.end();
118 118 auto requestProgressMapIt
119 119 = std::find_if(requestProgressMap.begin(), requestProgressMapEnd, findRequest);
120 120
121 121 if (requestProgressMapIt != requestProgressMapEnd) {
122 122 requestProgressMapIt->second = progress;
123 123 }
124 124 else {
125 125 // This case can happened when a progression is send after the request has been
126 126 // finished.
127 127 // Generaly the case when aborting a request
128 qCWarning(LOG_AmdaProvider()) << tr("Can't retrieve Request in progress")
129 << acqIdentifier << networkRequest.get() << progress;
128 qCDebug(LOG_AmdaProvider()) << tr("Can't retrieve Request in progress") << acqIdentifier
129 << networkRequest.get() << progress;
130 130 }
131 131
132 132 // Compute the current final progress and notify it
133 133 double finalProgress = 0.0;
134 134
135 135 auto fraq = requestProgressMap.size();
136 136
137 137 for (auto requestProgress : requestProgressMap) {
138 138 finalProgress += requestProgress.second;
139 139 qCDebug(LOG_AmdaProvider()) << tr("Current final progress without fraq:")
140 140 << finalProgress << requestProgress.second;
141 141 }
142 142
143 143 if (fraq > 0) {
144 144 finalProgress = finalProgress / fraq;
145 145 }
146 146
147 147 qCDebug(LOG_AmdaProvider()) << tr("Current final progress: ") << fraq << finalProgress;
148 148 emit dataProvidedProgress(acqIdentifier, finalProgress);
149 149 }
150 150 else {
151 151 // This case can happened when a progression is send after the request has been finished.
152 152 // Generaly the case when aborting a request
153 153 emit dataProvidedProgress(acqIdentifier, 100.0);
154 154 }
155 155 }
156 156
157 157 void AmdaProvider::retrieveData(QUuid token, const SqpRange &dateTime, const QVariantHash &data)
158 158 {
159 159 // Retrieves product ID from data: if the value is invalid, no request is made
160 160 auto productId = data.value(AMDA_XML_ID_KEY).toString();
161 161 if (productId.isNull()) {
162 162 qCCritical(LOG_AmdaProvider()) << tr("Can't retrieve data: unknown product id");
163 163 return;
164 164 }
165 165
166 166 // Retrieves the data type that determines whether the expected format for the result file is
167 167 // scalar, vector...
168 168 auto productValueType = valueType(data.value(AMDA_DATA_TYPE_KEY).toString());
169 169
170 170 // /////////// //
171 171 // Creates URL //
172 172 // /////////// //
173 173
174 174 auto startDate = dateFormat(dateTime.m_TStart);
175 175 auto endDate = dateFormat(dateTime.m_TEnd);
176 176
177 177 auto url = QUrl{QString{AMDA_URL_FORMAT}.arg(startDate, endDate, productId)};
178 178 qCInfo(LOG_AmdaProvider()) << tr("TORM AmdaProvider::retrieveData url:") << url;
179 179 auto tempFile = std::make_shared<QTemporaryFile>();
180 180
181 181 // LAMBDA
182 182 auto httpDownloadFinished = [this, dateTime, tempFile,
183 183 productValueType](QNetworkReply *reply, QUuid dataId) noexcept {
184 184
185 185 // Don't do anything if the reply was abort
186 186 if (reply->error() == QNetworkReply::NoError) {
187 187
188 188 if (tempFile) {
189 189 auto replyReadAll = reply->readAll();
190 190 if (!replyReadAll.isEmpty()) {
191 191 tempFile->write(replyReadAll);
192 192 }
193 193 tempFile->close();
194 194
195 195 // Parse results file
196 196 if (auto dataSeries
197 197 = AmdaResultParser::readTxt(tempFile->fileName(), productValueType)) {
198 198 emit dataProvided(dataId, dataSeries, dateTime);
199 199 }
200 200 else {
201 201 /// @todo ALX : debug
202 emit dataProvidedFailed(dataId);
202 203 }
203 204 }
204 205 qCDebug(LOG_AmdaProvider()) << tr("acquisition requests erase because of finishing")
205 206 << dataId;
206 207 m_AcqIdToRequestProgressMap.erase(dataId);
207 208 }
208 209 else {
209 210 qCCritical(LOG_AmdaProvider()) << tr("httpDownloadFinished ERROR");
211 emit dataProvidedFailed(dataId);
210 212 }
211 213
212 214 };
213 215 auto httpFinishedLambda
214 216 = [this, httpDownloadFinished, tempFile](QNetworkReply *reply, QUuid dataId) noexcept {
215 217
216 218 // Don't do anything if the reply was abort
217 219 if (reply->error() == QNetworkReply::NoError) {
218 220 auto downloadFileUrl = QUrl{QString{reply->readAll()}};
219 221
220 222 qCInfo(LOG_AmdaProvider())
221 223 << tr("TORM AmdaProvider::retrieveData downloadFileUrl:") << downloadFileUrl;
222 224 // Executes request for downloading file //
223 225
224 226 // Creates destination file
225 227 if (tempFile->open()) {
226 228 // Executes request and store the request for progression
227 229 auto request = std::make_shared<QNetworkRequest>(downloadFileUrl);
228 230 updateRequestProgress(dataId, request, 0.0);
229 231 emit requestConstructed(request, dataId, httpDownloadFinished);
230 232 }
233 else {
234 emit dataProvidedFailed(dataId);
235 }
231 236 }
232 237 else {
233 238 qCDebug(LOG_AmdaProvider())
234 239 << tr("acquisition requests erase because of aborting") << dataId;
235 240 qCCritical(LOG_AmdaProvider()) << tr("httpFinishedLambda ERROR");
236 241 m_AcqIdToRequestProgressMap.erase(dataId);
242 emit dataProvidedFailed(dataId);
237 243 }
238 244 };
239 245
240 246 // //////////////// //
241 247 // Executes request //
242 248 // //////////////// //
243 249
244 250 auto request = std::make_shared<QNetworkRequest>(url);
245 251 qCDebug(LOG_AmdaProvider()) << tr("First Request creation") << request.get();
246 252 updateRequestProgress(token, request, 0.0);
247 253
248 254 emit requestConstructed(request, token, httpFinishedLambda);
249 255 }
250 256
251 257 void AmdaProvider::updateRequestProgress(QUuid acqIdentifier,
252 258 std::shared_ptr<QNetworkRequest> request, double progress)
253 259 {
254 260 auto acqIdToRequestProgressMapIt = m_AcqIdToRequestProgressMap.find(acqIdentifier);
255 261 if (acqIdToRequestProgressMapIt != m_AcqIdToRequestProgressMap.end()) {
256 262 auto &requestProgressMap = acqIdToRequestProgressMapIt->second;
257 263 auto requestProgressMapIt = requestProgressMap.find(request);
258 264 if (requestProgressMapIt != requestProgressMap.end()) {
259 265 requestProgressMapIt->second = progress;
260 266 qCDebug(LOG_AmdaProvider()) << tr("updateRequestProgress new progress for request")
261 267 << acqIdentifier << request.get() << progress;
262 268 }
263 269 else {
264 270 qCDebug(LOG_AmdaProvider()) << tr("updateRequestProgress new request") << acqIdentifier
265 271 << request.get() << progress;
266 272 acqIdToRequestProgressMapIt->second.insert(std::make_pair(request, progress));
267 273 }
268 274 }
269 275 else {
270 276 qCDebug(LOG_AmdaProvider()) << tr("updateRequestProgress new acqIdentifier")
271 277 << acqIdentifier << request.get() << progress;
272 278 auto requestProgressMap = std::map<std::shared_ptr<QNetworkRequest>, double>{};
273 279 requestProgressMap.insert(std::make_pair(request, progress));
274 280 m_AcqIdToRequestProgressMap.insert(
275 281 std::make_pair(acqIdentifier, std::move(requestProgressMap)));
276 282 }
277 283 }
General Comments 2
You need to be logged in to leave comments. Login now