##// END OF EJS Templates
Implementation of abort mechanism
perrinel -
r697:2015ea331dc4
parent child
Show More
@@ -1,133 +1,129
1 1 #ifndef SCIQLOP_VARIABLECONTROLLER_H
2 2 #define SCIQLOP_VARIABLECONTROLLER_H
3 3
4 4 #include "CoreGlobal.h"
5 5
6 6 #include <Data/AcquisitionDataPacket.h>
7 7 #include <Data/SqpRange.h>
8 8
9 9 #include <QLoggingCategory>
10 10 #include <QObject>
11 11 #include <QUuid>
12 12
13 13 #include <Common/spimpl.h>
14 14
15 15 class IDataProvider;
16 16 class QItemSelectionModel;
17 17 class TimeController;
18 18 class Variable;
19 19 class VariableModel;
20 20
21 21 Q_DECLARE_LOGGING_CATEGORY(LOG_VariableController)
22 22
23 23
24 24 /**
25 25 * Possible types of zoom operation
26 26 */
27 27 enum class AcquisitionZoomType { ZoomOut, ZoomIn, PanRight, PanLeft, Unknown };
28 28
29 29
30 30 /**
31 31 * @brief The VariableController class aims to handle the variables in SciQlop.
32 32 */
33 33 class SCIQLOP_CORE_EXPORT VariableController : public QObject {
34 34 Q_OBJECT
35 35 public:
36 36 explicit VariableController(QObject *parent = 0);
37 37 virtual ~VariableController();
38 38
39 39 VariableModel *variableModel() noexcept;
40 40 QItemSelectionModel *variableSelectionModel() noexcept;
41 41
42 42 void setTimeController(TimeController *timeController) noexcept;
43 43
44 44 /**
45 45 * Clones the variable passed in parameter and adds the duplicate to the controller
46 46 * @param variable the variable to duplicate
47 47 * @return the duplicate created, nullptr if the variable couldn't be created
48 48 */
49 49 std::shared_ptr<Variable> cloneVariable(std::shared_ptr<Variable> variable) noexcept;
50 50
51 51 /**
52 52 * Deletes from the controller the variable passed in parameter.
53 53 *
54 54 * Delete a variable includes:
55 55 * - the deletion of the various references to the variable in SciQlop
56 56 * - the deletion of the model variable
57 57 * - the deletion of the provider associated with the variable
58 58 * - removing the cache associated with the variable
59 59 *
60 60 * @param variable the variable to delete from the controller.
61 61 */
62 62 void deleteVariable(std::shared_ptr<Variable> variable) noexcept;
63 63
64 64 /**
65 65 * Deletes from the controller the variables passed in parameter.
66 66 * @param variables the variables to delete from the controller.
67 67 * @sa deleteVariable()
68 68 */
69 69 void deleteVariables(const QVector<std::shared_ptr<Variable> > &variables) noexcept;
70 70
71 /**
72 * @brief abort the variable retrieve data progression
73 */
74 void abortProgress(std::shared_ptr<Variable> variable);
75 71
76 72 static AcquisitionZoomType getZoomType(const SqpRange &range, const SqpRange &oldRange);
77 73 signals:
78 74 /// Signal emitted when a variable is about to be deleted from the controller
79 75 void variableAboutToBeDeleted(std::shared_ptr<Variable> variable);
80 76
81 77 /// Signal emitted when a data acquisition is requested on a range for a variable
82 78 void rangeChanged(std::shared_ptr<Variable> variable, const SqpRange &range);
83 79
84 80 /// Signal emitted when a sub range of the cacheRange of the variable can be displayed
85 81 void updateVarDisplaying(std::shared_ptr<Variable> variable, const SqpRange &range);
86 82
87 83 public slots:
88 84 /// Request the data loading of the variable whithin range
89 85 void onRequestDataLoading(QVector<std::shared_ptr<Variable> > variables, const SqpRange &range,
90 86 const SqpRange &oldRange, bool synchronise);
91 87 /**
92 88 * Creates a new variable and adds it to the model
93 89 * @param name the name of the new variable
94 90 * @param metadata the metadata of the new variable
95 91 * @param provider the data provider for the new variable
96 92 * @return the pointer to the new variable or nullptr if the creation failed
97 93 */
98 94 std::shared_ptr<Variable> createVariable(const QString &name, const QVariantHash &metadata,
99 95 std::shared_ptr<IDataProvider> provider) noexcept;
100 96
101 97 /// Update the temporal parameters of every selected variable to dateTime
102 98 void onDateTimeOnSelection(const SqpRange &dateTime);
103 99
104 100
105 101 void onDataProvided(QUuid vIdentifier, const SqpRange &rangeRequested,
106 102 const SqpRange &cacheRangeRequested,
107 103 QVector<AcquisitionDataPacket> dataAcquired);
108 104
109 105 void onVariableRetrieveDataInProgress(QUuid identifier, double progress);
110 106
111 107 /// Cancel the current request for the variable
112 108 void onAbortProgressRequested(std::shared_ptr<Variable> variable);
113 109
114 110 // synchronization group methods
115 111 void onAddSynchronizationGroupId(QUuid synchronizationGroupId);
116 112 void onRemoveSynchronizationGroupId(QUuid synchronizationGroupId);
117 113 void onAddSynchronized(std::shared_ptr<Variable> variable, QUuid synchronizationGroupId);
118 114
119 115 /// Desynchronizes the variable of the group whose identifier is passed in parameter
120 116 /// @remarks the method does nothing if the variable is not part of the group
121 117 void desynchronize(std::shared_ptr<Variable> variable, QUuid synchronizationGroupId);
122 118
123 119 void initialize();
124 120 void finalize();
125 121
126 122 private:
127 123 void waitForFinish();
128 124
129 125 class VariableControllerPrivate;
130 126 spimpl::unique_impl_ptr<VariableControllerPrivate> impl;
131 127 };
132 128
133 129 #endif // SCIQLOP_VARIABLECONTROLLER_H
@@ -1,257 +1,321
1 1 #include "Variable/VariableAcquisitionWorker.h"
2 2
3 3 #include "Variable/Variable.h"
4 4
5 5 #include <Data/AcquisitionRequest.h>
6 6 #include <Data/SqpRange.h>
7 7
8 8 #include <unordered_map>
9 9 #include <utility>
10 10
11 11 #include <QMutex>
12 12 #include <QReadWriteLock>
13 13 #include <QThread>
14 14
15 15 #include <cmath>
16 16
17 17 Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker")
18 18
19 19 struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate {
20 20
21 explicit VariableAcquisitionWorkerPrivate() : m_Lock{QReadWriteLock::Recursive} {}
21 explicit VariableAcquisitionWorkerPrivate(VariableAcquisitionWorker *parent)
22 : m_Lock{QReadWriteLock::Recursive}, q{parent}
23 {
24 }
22 25
23 26 void lockRead() { m_Lock.lockForRead(); }
24 27 void lockWrite() { m_Lock.lockForWrite(); }
25 28 void unlock() { m_Lock.unlock(); }
26 29
27 30 void removeVariableRequest(QUuid vIdentifier);
28 31
32 /// Remove the current request and execute the next one if exist
33 void updateToNextRequest(QUuid vIdentifier);
34
29 35 QMutex m_WorkingMutex;
30 36 QReadWriteLock m_Lock;
31 37
32 38 std::map<QUuid, QVector<AcquisitionDataPacket> > m_AcqIdentifierToAcqDataPacketVectorMap;
33 39 std::map<QUuid, AcquisitionRequest> m_AcqIdentifierToAcqRequestMap;
34 40 std::map<QUuid, std::pair<QUuid, QUuid> > m_VIdentifierToCurrrentAcqIdNextIdPairMap;
41 VariableAcquisitionWorker *q;
35 42 };
36 43
37 44
38 45 VariableAcquisitionWorker::VariableAcquisitionWorker(QObject *parent)
39 : QObject{parent}, impl{spimpl::make_unique_impl<VariableAcquisitionWorkerPrivate>()}
46 : QObject{parent}, impl{spimpl::make_unique_impl<VariableAcquisitionWorkerPrivate>(this)}
40 47 {
41 48 }
42 49
43 50 VariableAcquisitionWorker::~VariableAcquisitionWorker()
44 51 {
45 52 qCInfo(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker destruction")
46 53 << QThread::currentThread();
47 54 this->waitForFinish();
48 55 }
49 56
50 57
51 58 QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid vIdentifier,
52 59 SqpRange rangeRequested,
53 60 SqpRange cacheRangeRequested,
54 61 DataProviderParameters parameters,
55 62 std::shared_ptr<IDataProvider> provider)
56 63 {
57 64 qCDebug(LOG_VariableAcquisitionWorker())
58 65 << tr("TORM VariableAcquisitionWorker::pushVariableRequest ") << cacheRangeRequested;
59 66 auto varRequestIdCanceled = QUuid();
60 67
61 68 // Request creation
62 69 auto acqRequest = AcquisitionRequest{};
63 70 acqRequest.m_VarRequestId = varRequestId;
64 71 acqRequest.m_vIdentifier = vIdentifier;
65 72 acqRequest.m_DataProviderParameters = parameters;
66 73 acqRequest.m_RangeRequested = rangeRequested;
67 74 acqRequest.m_CacheRangeRequested = cacheRangeRequested;
68 75 acqRequest.m_Size = parameters.m_Times.size();
69 76 acqRequest.m_Provider = provider;
70 77
71 78
72 79 // Register request
73 80 impl->lockWrite();
74 81 impl->m_AcqIdentifierToAcqRequestMap.insert(
75 82 std::make_pair(acqRequest.m_AcqIdentifier, acqRequest));
76 83
77 84 auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
78 85 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
79 86 // A current request already exists, we can replace the next one
80 87 auto nextAcqId = it->second.second;
81 88 auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(nextAcqId);
82 89 if (acqIdentifierToAcqRequestMapIt != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
83 90 auto request = acqIdentifierToAcqRequestMapIt->second;
84 91 varRequestIdCanceled = request.m_VarRequestId;
85 92 }
86 93
87 94 it->second.second = acqRequest.m_AcqIdentifier;
88 95 impl->unlock();
89 96 }
90 97 else {
91 98 // First request for the variable, it must be stored and executed
92 99 impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.insert(
93 100 std::make_pair(vIdentifier, std::make_pair(acqRequest.m_AcqIdentifier, QUuid())));
94 101 impl->unlock();
95 102
96 103 QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
97 104 Q_ARG(QUuid, acqRequest.m_AcqIdentifier));
98 105 }
99 106
100 107 return varRequestIdCanceled;
101 108 }
102 109
103 110 void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier)
104 111 {
105 112 // TODO
113 impl->lockRead();
114
115 auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
116 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
117 auto currentAcqId = it->second.first;
118
119 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(currentAcqId);
120 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
121 auto request = it->second;
122 impl->unlock();
123
124 // Remove the current request from the worker
125
126 impl->lockWrite();
127 impl->updateToNextRequest(vIdentifier);
note

I can't because I unlock a lockRead and not a lockWrite

128 impl->unlock();
129
130 // notify the request aborting to the provider
131 request.m_Provider->requestDataAborting(currentAcqId);
132 }
133 else {
134 impl->unlock();
135 qCWarning(LOG_VariableAcquisitionWorker())
136 << tr("Impossible to abort an unknown acquisition request") << currentAcqId;
137 }
138 }
139 else {
140 impl->unlock();
141 }
106 142 }
107 143
108 144 void VariableAcquisitionWorker::onVariableRetrieveDataInProgress(QUuid acqIdentifier,
109 145 double progress)
110 146 {
111 147 impl->lockRead();
112 148 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
113 149 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
114 150 auto currentPartSize = (aIdToARit->second.m_Size != 0) ? 100 / aIdToARit->second.m_Size : 0;
115 151
116 152 auto currentPartProgress
117 153 = std::isnan(progress) ? 0.0 : (progress * currentPartSize) / 100.0;
118 154 auto currentAlreadyProgress = aIdToARit->second.m_Progression * currentPartSize;
119 155
120 156 auto finalProgression = currentAlreadyProgress + currentPartProgress;
121 157 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, finalProgression);
122 158
123 159 if (finalProgression == 100.0) {
124 160 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, 0.0);
125 161 }
126 162 }
127 163 impl->unlock();
128 164 }
129 165
130 166 void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier,
131 167 std::shared_ptr<IDataSeries> dataSeries,
132 168 SqpRange dataRangeAcquired)
133 169 {
134 170 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableDataAcquired on range ")
135 171 << acqIdentifier << dataRangeAcquired;
136 172 impl->lockWrite();
137 173 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
138 174 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
139 175 // Store the result
140 176 auto dataPacket = AcquisitionDataPacket{};
141 177 dataPacket.m_Range = dataRangeAcquired;
142 178 dataPacket.m_DateSeries = dataSeries;
143 179
144 180 auto aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
145 181 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
146 182 // A current request result already exists, we can update it
147 183 aIdToADPVit->second.push_back(dataPacket);
148 184 }
149 185 else {
150 186 // First request result for the variable, it must be stored
151 187 impl->m_AcqIdentifierToAcqDataPacketVectorMap.insert(
152 188 std::make_pair(acqIdentifier, QVector<AcquisitionDataPacket>() << dataPacket));
153 189 }
154 190
155 191
156 192 // Decrement the counter of the request
157 193 auto &acqRequest = aIdToARit->second;
158 194 acqRequest.m_Progression = acqRequest.m_Progression + 1;
159 195
160 196 // if the counter is 0, we can return data then run the next request if it exists and
161 197 // removed the finished request
162 198 if (acqRequest.m_Size == acqRequest.m_Progression) {
163 199 // Return the data
164 200 aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
165 201 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
166 202 emit dataProvided(acqRequest.m_vIdentifier, acqRequest.m_RangeRequested,
167 203 acqRequest.m_CacheRangeRequested, aIdToADPVit->second);
168 204 }
169 205
170 206 // Execute the next one
171 207 auto it
172 208 = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(acqRequest.m_vIdentifier);
173 209
174 210 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
175 211 if (it->second.second.isNull()) {
176 212 // There is no next request, we can remove the variable request
177 213 impl->removeVariableRequest(acqRequest.m_vIdentifier);
178 214 }
179 215 else {
180 216 auto acqIdentifierToRemove = it->second.first;
181 217 // Move the next request to the current request
182 218 it->second.first = it->second.second;
183 219 it->second.second = QUuid();
184 220 // Remove AcquisitionRequest and results;
185 221 impl->m_AcqIdentifierToAcqRequestMap.erase(acqIdentifierToRemove);
186 222 impl->m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqIdentifierToRemove);
187 223 // Execute the current request
188 224 QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
189 225 Q_ARG(QUuid, it->second.first));
190 226 }
191 227 }
192 228 else {
193 229 qCCritical(LOG_VariableAcquisitionWorker())
194 230 << tr("Impossible to execute the acquisition on an unfound variable ");
195 231 }
196 232 }
197 233 }
198 234 else {
199 qCCritical(LOG_VariableAcquisitionWorker())
200 << tr("Impossible to retrieve AcquisitionRequest for the incoming data");
235 qCWarning(LOG_VariableAcquisitionWorker())
236 << tr("Impossible to retrieve AcquisitionRequest for the incoming data.");
201 237 }
202 238 impl->unlock();
203 239 }
204 240
205 241 void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier)
206 242 {
207 243 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onExecuteRequest") << QThread::currentThread();
208 244 impl->lockRead();
209 245 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
210 246 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
211 247 auto request = it->second;
212 248 impl->unlock();
213 249 emit variableRequestInProgress(request.m_vIdentifier, 0.1);
214 250 request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters);
215 251 }
216 252 else {
217 253 impl->unlock();
218 254 // TODO log no acqIdentifier recognized
219 255 }
220 256 }
221 257
222 258 void VariableAcquisitionWorker::initialize()
223 259 {
224 260 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init")
225 261 << QThread::currentThread();
226 262 impl->m_WorkingMutex.lock();
227 263 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init END");
228 264 }
229 265
230 266 void VariableAcquisitionWorker::finalize()
231 267 {
232 268 impl->m_WorkingMutex.unlock();
233 269 }
234 270
235 271 void VariableAcquisitionWorker::waitForFinish()
236 272 {
237 273 QMutexLocker locker{&impl->m_WorkingMutex};
238 274 }
239 275
240 276 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariableRequest(
241 277 QUuid vIdentifier)
242 278 {
243 279 lockWrite();
244 280 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
245 281
246 282 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
247 283 // A current request already exists, we can replace the next one
248 284
249 285 m_AcqIdentifierToAcqRequestMap.erase(it->second.first);
250 286 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.first);
251 287
252 288 m_AcqIdentifierToAcqRequestMap.erase(it->second.second);
253 289 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.second);
254 290 }
255 291 m_VIdentifierToCurrrentAcqIdNextIdPairMap.erase(vIdentifier);
256 292 unlock();
257 293 }
294
295 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::updateToNextRequest(
296 QUuid vIdentifier)
297 {
298 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
299 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
300 if (it->second.second.isNull()) {
301 // There is no next request, we can remove the variable request
302 removeVariableRequest(vIdentifier);
303 }
304 else {
305 auto acqIdentifierToRemove = it->second.first;
306 // Move the next request to the current request
307 it->second.first = it->second.second;
308 it->second.second = QUuid();
309 // Remove AcquisitionRequest and results;
310 m_AcqIdentifierToAcqRequestMap.erase(acqIdentifierToRemove);
311 m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqIdentifierToRemove);
312 // Execute the current request
313 QMetaObject::invokeMethod(q, "onExecuteRequest", Qt::QueuedConnection,
314 Q_ARG(QUuid, it->second.first));
315 }
316 }
317 else {
318 qCCritical(LOG_VariableAcquisitionWorker())
319 << tr("Impossible to execute the acquisition on an unfound variable ");
320 }
321 }
@@ -1,808 +1,814
1 1 #include <Variable/Variable.h>
2 2 #include <Variable/VariableAcquisitionWorker.h>
3 3 #include <Variable/VariableCacheStrategy.h>
4 4 #include <Variable/VariableController.h>
5 5 #include <Variable/VariableModel.h>
6 6 #include <Variable/VariableSynchronizationGroup.h>
7 7
8 8 #include <Data/DataProviderParameters.h>
9 9 #include <Data/IDataProvider.h>
10 10 #include <Data/IDataSeries.h>
11 11 #include <Data/VariableRequest.h>
12 12 #include <Time/TimeController.h>
13 13
14 14 #include <QMutex>
15 15 #include <QThread>
16 16 #include <QUuid>
17 17 #include <QtCore/QItemSelectionModel>
18 18
19 19 #include <deque>
20 20 #include <set>
21 21 #include <unordered_map>
22 22
23 23 Q_LOGGING_CATEGORY(LOG_VariableController, "VariableController")
24 24
25 25 namespace {
26 26
27 27 SqpRange computeSynchroRangeRequested(const SqpRange &varRange, const SqpRange &graphRange,
28 28 const SqpRange &oldGraphRange)
29 29 {
30 30 auto zoomType = VariableController::getZoomType(graphRange, oldGraphRange);
31 31
32 32 auto varRangeRequested = varRange;
33 33 switch (zoomType) {
34 34 case AcquisitionZoomType::ZoomIn: {
35 35 auto deltaLeft = graphRange.m_TStart - oldGraphRange.m_TStart;
36 36 auto deltaRight = oldGraphRange.m_TEnd - graphRange.m_TEnd;
37 37 varRangeRequested.m_TStart += deltaLeft;
38 38 varRangeRequested.m_TEnd -= deltaRight;
39 39 break;
40 40 }
41 41
42 42 case AcquisitionZoomType::ZoomOut: {
43 43 auto deltaLeft = oldGraphRange.m_TStart - graphRange.m_TStart;
44 44 auto deltaRight = graphRange.m_TEnd - oldGraphRange.m_TEnd;
45 45 varRangeRequested.m_TStart -= deltaLeft;
46 46 varRangeRequested.m_TEnd += deltaRight;
47 47 break;
48 48 }
49 49 case AcquisitionZoomType::PanRight: {
50 50 auto deltaRight = graphRange.m_TEnd - oldGraphRange.m_TEnd;
51 51 varRangeRequested.m_TStart += deltaRight;
52 52 varRangeRequested.m_TEnd += deltaRight;
53 53 break;
54 54 }
55 55 case AcquisitionZoomType::PanLeft: {
56 56 auto deltaLeft = oldGraphRange.m_TStart - graphRange.m_TStart;
57 57 varRangeRequested.m_TStart -= deltaLeft;
58 58 varRangeRequested.m_TEnd -= deltaLeft;
59 59 break;
60 60 }
61 61 case AcquisitionZoomType::Unknown: {
62 62 qCCritical(LOG_VariableController())
63 63 << VariableController::tr("Impossible to synchronize: zoom type unknown");
64 64 break;
65 65 }
66 66 default:
67 67 qCCritical(LOG_VariableController()) << VariableController::tr(
68 68 "Impossible to synchronize: zoom type not take into account");
69 69 // No action
70 70 break;
71 71 }
72 72
73 73 return varRangeRequested;
74 74 }
75 75 }
76 76
77 77 struct VariableController::VariableControllerPrivate {
78 78 explicit VariableControllerPrivate(VariableController *parent)
79 79 : m_WorkingMutex{},
80 80 m_VariableModel{new VariableModel{parent}},
81 81 m_VariableSelectionModel{new QItemSelectionModel{m_VariableModel, parent}},
82 82 m_VariableCacheStrategy{std::make_unique<VariableCacheStrategy>()},
83 83 m_VariableAcquisitionWorker{std::make_unique<VariableAcquisitionWorker>()},
84 84 q{parent}
85 85 {
86 86
87 87 m_VariableAcquisitionWorker->moveToThread(&m_VariableAcquisitionWorkerThread);
88 88 m_VariableAcquisitionWorkerThread.setObjectName("VariableAcquisitionWorkerThread");
89 89 }
90 90
91 91
92 92 virtual ~VariableControllerPrivate()
93 93 {
94 94 qCDebug(LOG_VariableController()) << tr("VariableControllerPrivate destruction");
95 95 m_VariableAcquisitionWorkerThread.quit();
96 96 m_VariableAcquisitionWorkerThread.wait();
97 97 }
98 98
99 99
100 100 void processRequest(std::shared_ptr<Variable> var, const SqpRange &rangeRequested,
101 101 QUuid varRequestId);
102 102
103 103 QVector<SqpRange> provideNotInCacheDateTimeList(std::shared_ptr<Variable> variable,
104 104 const SqpRange &dateTime);
105 105
106 106 std::shared_ptr<Variable> findVariable(QUuid vIdentifier);
107 107 std::shared_ptr<IDataSeries>
108 108 retrieveDataSeries(const QVector<AcquisitionDataPacket> acqDataPacketVector);
109 109
110 110 void registerProvider(std::shared_ptr<IDataProvider> provider);
111 111
112 112 void storeVariableRequest(QUuid varId, QUuid varRequestId, const VariableRequest &varRequest);
113 113 QUuid acceptVariableRequest(QUuid varId, std::shared_ptr<IDataSeries> dataSeries);
114 114 void updateVariableRequest(QUuid varRequestId);
115 115 void cancelVariableRequest(QUuid varRequestId);
116 116
117 117 QMutex m_WorkingMutex;
118 118 /// Variable model. The VariableController has the ownership
119 119 VariableModel *m_VariableModel;
120 120 QItemSelectionModel *m_VariableSelectionModel;
121 121
122 122
123 123 TimeController *m_TimeController{nullptr};
124 124 std::unique_ptr<VariableCacheStrategy> m_VariableCacheStrategy;
125 125 std::unique_ptr<VariableAcquisitionWorker> m_VariableAcquisitionWorker;
126 126 QThread m_VariableAcquisitionWorkerThread;
127 127
128 128 std::unordered_map<std::shared_ptr<Variable>, std::shared_ptr<IDataProvider> >
129 129 m_VariableToProviderMap;
130 130 std::unordered_map<std::shared_ptr<Variable>, QUuid> m_VariableToIdentifierMap;
131 131 std::map<QUuid, std::shared_ptr<VariableSynchronizationGroup> >
132 132 m_GroupIdToVariableSynchronizationGroupMap;
133 133 std::map<QUuid, QUuid> m_VariableIdGroupIdMap;
134 134 std::set<std::shared_ptr<IDataProvider> > m_ProviderSet;
135 135
136 136 std::map<QUuid, std::map<QUuid, VariableRequest> > m_VarRequestIdToVarIdVarRequestMap;
137 137
138 138 std::map<QUuid, std::deque<QUuid> > m_VarIdToVarRequestIdQueueMap;
139 139
140 140
141 141 VariableController *q;
142 142 };
143 143
144 144
145 145 VariableController::VariableController(QObject *parent)
146 146 : QObject{parent}, impl{spimpl::make_unique_impl<VariableControllerPrivate>(this)}
147 147 {
148 148 qCDebug(LOG_VariableController()) << tr("VariableController construction")
149 149 << QThread::currentThread();
150 150
151 151 connect(impl->m_VariableModel, &VariableModel::abortProgessRequested, this,
152 152 &VariableController::onAbortProgressRequested);
153 153
154 154 connect(impl->m_VariableAcquisitionWorker.get(), &VariableAcquisitionWorker::dataProvided, this,
155 155 &VariableController::onDataProvided);
156 156 connect(impl->m_VariableAcquisitionWorker.get(),
157 157 &VariableAcquisitionWorker::variableRequestInProgress, this,
158 158 &VariableController::onVariableRetrieveDataInProgress);
159 159
160 160 connect(&impl->m_VariableAcquisitionWorkerThread, &QThread::started,
161 161 impl->m_VariableAcquisitionWorker.get(), &VariableAcquisitionWorker::initialize);
162 162 connect(&impl->m_VariableAcquisitionWorkerThread, &QThread::finished,
163 163 impl->m_VariableAcquisitionWorker.get(), &VariableAcquisitionWorker::finalize);
164 164
165 165
166 166 impl->m_VariableAcquisitionWorkerThread.start();
167 167 }
168 168
169 169 VariableController::~VariableController()
170 170 {
171 171 qCDebug(LOG_VariableController()) << tr("VariableController destruction")
172 172 << QThread::currentThread();
173 173 this->waitForFinish();
174 174 }
175 175
176 176 VariableModel *VariableController::variableModel() noexcept
177 177 {
178 178 return impl->m_VariableModel;
179 179 }
180 180
181 181 QItemSelectionModel *VariableController::variableSelectionModel() noexcept
182 182 {
183 183 return impl->m_VariableSelectionModel;
184 184 }
185 185
186 186 void VariableController::setTimeController(TimeController *timeController) noexcept
187 187 {
188 188 impl->m_TimeController = timeController;
189 189 }
190 190
191 191 std::shared_ptr<Variable>
192 192 VariableController::cloneVariable(std::shared_ptr<Variable> variable) noexcept
193 193 {
194 194 if (impl->m_VariableModel->containsVariable(variable)) {
195 195 // Clones variable
196 196 auto duplicate = variable->clone();
197 197
198 198 // Adds clone to model
199 199 impl->m_VariableModel->addVariable(duplicate);
200 200
201 201 // Generates clone identifier
202 202 impl->m_VariableToIdentifierMap[duplicate] = QUuid::createUuid();
203 203
204 204 // Registers provider
205 205 auto variableProvider = impl->m_VariableToProviderMap.at(variable);
206 206 auto duplicateProvider = variableProvider != nullptr ? variableProvider->clone() : nullptr;
207 207
208 208 impl->m_VariableToProviderMap[duplicate] = duplicateProvider;
209 209 if (duplicateProvider) {
210 210 impl->registerProvider(duplicateProvider);
211 211 }
212 212
213 213 return duplicate;
214 214 }
215 215 else {
216 216 qCCritical(LOG_VariableController())
217 217 << tr("Can't create duplicate of variable %1: variable not registered in the model")
218 218 .arg(variable->name());
219 219 return nullptr;
220 220 }
221 221 }
222 222
223 223 void VariableController::deleteVariable(std::shared_ptr<Variable> variable) noexcept
224 224 {
225 225 if (!variable) {
226 226 qCCritical(LOG_VariableController()) << "Can't delete variable: variable is null";
227 227 return;
228 228 }
229 229
230 230 // Spreads in SciQlop that the variable will be deleted, so that potential receivers can
231 231 // make some treatments before the deletion
232 232 emit variableAboutToBeDeleted(variable);
233 233
234 234 // Deletes identifier
235 235 impl->m_VariableToIdentifierMap.erase(variable);
236 236
237 237 // Deletes provider
238 238 auto nbProvidersDeleted = impl->m_VariableToProviderMap.erase(variable);
239 239 qCDebug(LOG_VariableController())
240 240 << tr("Number of providers deleted for variable %1: %2")
241 241 .arg(variable->name(), QString::number(nbProvidersDeleted));
242 242
243 243
244 244 // Deletes from model
245 245 impl->m_VariableModel->deleteVariable(variable);
246 246 }
247 247
248 248 void VariableController::deleteVariables(
249 249 const QVector<std::shared_ptr<Variable> > &variables) noexcept
250 250 {
251 251 for (auto variable : qAsConst(variables)) {
252 252 deleteVariable(variable);
253 253 }
254 254 }
255 255
256 void VariableController::abortProgress(std::shared_ptr<Variable> variable)
257 {
258 }
259
260 256 std::shared_ptr<Variable>
261 257 VariableController::createVariable(const QString &name, const QVariantHash &metadata,
262 258 std::shared_ptr<IDataProvider> provider) noexcept
263 259 {
264 260 if (!impl->m_TimeController) {
265 261 qCCritical(LOG_VariableController())
266 262 << tr("Impossible to create variable: The time controller is null");
267 263 return nullptr;
268 264 }
269 265
270 266 auto range = impl->m_TimeController->dateTime();
271 267
272 268 if (auto newVariable = impl->m_VariableModel->createVariable(name, range, metadata)) {
273 269 auto identifier = QUuid::createUuid();
274 270
275 271 // store the provider
276 272 impl->registerProvider(provider);
277 273
278 274 // Associate the provider
279 275 impl->m_VariableToProviderMap[newVariable] = provider;
280 276 impl->m_VariableToIdentifierMap[newVariable] = identifier;
281 277
282 278
283 279 auto varRequestId = QUuid::createUuid();
284 qCInfo(LOG_VariableController()) << "processRequest for" << name << varRequestId;
285 280 impl->processRequest(newVariable, range, varRequestId);
286 281 impl->updateVariableRequest(varRequestId);
287 282
288 283 return newVariable;
289 284 }
290 285 }
291 286
292 287 void VariableController::onDateTimeOnSelection(const SqpRange &dateTime)
293 288 {
294 289 // TODO check synchronisation and Rescale
295 290 qCDebug(LOG_VariableController()) << "VariableController::onDateTimeOnSelection"
296 291 << QThread::currentThread()->objectName();
297 292 auto selectedRows = impl->m_VariableSelectionModel->selectedRows();
298 293 auto varRequestId = QUuid::createUuid();
299 294
300 295 for (const auto &selectedRow : qAsConst(selectedRows)) {
301 296 if (auto selectedVariable = impl->m_VariableModel->variable(selectedRow.row())) {
302 297 selectedVariable->setRange(dateTime);
303 298 impl->processRequest(selectedVariable, dateTime, varRequestId);
304 299
305 300 // notify that rescale operation has to be done
306 301 emit rangeChanged(selectedVariable, dateTime);
307 302 }
308 303 }
309 304 impl->updateVariableRequest(varRequestId);
310 305 }
311 306
312 307 void VariableController::onDataProvided(QUuid vIdentifier, const SqpRange &rangeRequested,
313 308 const SqpRange &cacheRangeRequested,
314 309 QVector<AcquisitionDataPacket> dataAcquired)
315 310 {
316 311 auto retrievedDataSeries = impl->retrieveDataSeries(dataAcquired);
317 312 auto varRequestId = impl->acceptVariableRequest(vIdentifier, retrievedDataSeries);
318 313 if (!varRequestId.isNull()) {
319 314 impl->updateVariableRequest(varRequestId);
320 315 }
321 316 }
322 317
323 318 void VariableController::onVariableRetrieveDataInProgress(QUuid identifier, double progress)
324 319 {
325 320 qCDebug(LOG_VariableController())
326 321 << "TORM: variableController::onVariableRetrieveDataInProgress"
327 322 << QThread::currentThread()->objectName() << progress;
328 323 if (auto var = impl->findVariable(identifier)) {
329 324 impl->m_VariableModel->setDataProgress(var, progress);
330 325 }
331 326 else {
332 327 qCCritical(LOG_VariableController())
333 328 << tr("Impossible to notify progression of a null variable");
334 329 }
335 330 }
336 331
337 332 void VariableController::onAbortProgressRequested(std::shared_ptr<Variable> variable)
338 333 {
339 qCDebug(LOG_VariableController()) << "TORM: VariableController::onAbortProgressRequested"
340 << QThread::currentThread()->objectName();
341
342 334 auto it = impl->m_VariableToIdentifierMap.find(variable);
343 335 if (it != impl->m_VariableToIdentifierMap.cend()) {
344 impl->m_VariableToProviderMap.at(variable)->requestDataAborting(it->second);
336 impl->m_VariableAcquisitionWorker->abortProgressRequested(it->second);
337
338 QUuid varRequestId;
339 auto varIdToVarRequestIdQueueMapIt = impl->m_VarIdToVarRequestIdQueueMap.find(it->second);
340 if (varIdToVarRequestIdQueueMapIt != impl->m_VarIdToVarRequestIdQueueMap.cend()) {
341 auto &varRequestIdQueue = varIdToVarRequestIdQueueMapIt->second;
342 varRequestId = varRequestIdQueue.front();
343 impl->cancelVariableRequest(varRequestId);
344
345 // Finish the progression for the request
346 impl->m_VariableModel->setDataProgress(variable, 0.0);
347 }
348 else {
349 qCWarning(LOG_VariableController())
350 << tr("Aborting progression of inexistant variable request detected !!!")
351 << QThread::currentThread()->objectName();
352 }
345 353 }
346 354 else {
347 355 qCWarning(LOG_VariableController())
348 356 << tr("Aborting progression of inexistant variable detected !!!")
349 357 << QThread::currentThread()->objectName();
350 358 }
351 359 }
352 360
353 361 void VariableController::onAddSynchronizationGroupId(QUuid synchronizationGroupId)
354 362 {
355 363 qCDebug(LOG_VariableController()) << "TORM: VariableController::onAddSynchronizationGroupId"
356 364 << QThread::currentThread()->objectName()
357 365 << synchronizationGroupId;
358 366 auto vSynchroGroup = std::make_shared<VariableSynchronizationGroup>();
359 367 impl->m_GroupIdToVariableSynchronizationGroupMap.insert(
360 368 std::make_pair(synchronizationGroupId, vSynchroGroup));
361 369 }
362 370
363 371 void VariableController::onRemoveSynchronizationGroupId(QUuid synchronizationGroupId)
364 372 {
365 373 impl->m_GroupIdToVariableSynchronizationGroupMap.erase(synchronizationGroupId);
366 374 }
367 375
368 376 void VariableController::onAddSynchronized(std::shared_ptr<Variable> variable,
369 377 QUuid synchronizationGroupId)
370 378
371 379 {
372 380 qCDebug(LOG_VariableController()) << "TORM: VariableController::onAddSynchronized"
373 381 << synchronizationGroupId;
374 382 auto varToVarIdIt = impl->m_VariableToIdentifierMap.find(variable);
375 383 if (varToVarIdIt != impl->m_VariableToIdentifierMap.cend()) {
376 384 auto groupIdToVSGIt
377 385 = impl->m_GroupIdToVariableSynchronizationGroupMap.find(synchronizationGroupId);
378 386 if (groupIdToVSGIt != impl->m_GroupIdToVariableSynchronizationGroupMap.cend()) {
379 387 impl->m_VariableIdGroupIdMap.insert(
380 388 std::make_pair(varToVarIdIt->second, synchronizationGroupId));
381 389 groupIdToVSGIt->second->addVariableId(varToVarIdIt->second);
382 390 }
383 391 else {
384 392 qCCritical(LOG_VariableController())
385 393 << tr("Impossible to synchronize a variable with an unknown sycnhronization group")
386 394 << variable->name();
387 395 }
388 396 }
389 397 else {
390 398 qCCritical(LOG_VariableController())
391 399 << tr("Impossible to synchronize a variable with no identifier") << variable->name();
392 400 }
393 401 }
394 402
395 403 void VariableController::desynchronize(std::shared_ptr<Variable> variable,
396 404 QUuid synchronizationGroupId)
397 405 {
398 406 // Gets variable id
399 407 auto variableIt = impl->m_VariableToIdentifierMap.find(variable);
400 408 if (variableIt == impl->m_VariableToIdentifierMap.cend()) {
401 409 qCCritical(LOG_VariableController())
402 410 << tr("Can't desynchronize variable %1: variable identifier not found")
403 411 .arg(variable->name());
404 412 return;
405 413 }
406 414
407 415 // Gets synchronization group
408 416 auto groupIt = impl->m_GroupIdToVariableSynchronizationGroupMap.find(synchronizationGroupId);
409 417 if (groupIt == impl->m_GroupIdToVariableSynchronizationGroupMap.cend()) {
410 418 qCCritical(LOG_VariableController())
411 419 << tr("Can't desynchronize variable %1: unknown synchronization group")
412 420 .arg(variable->name());
413 421 return;
414 422 }
415 423
416 424 auto variableId = variableIt->second;
417 425
418 426 // Removes variable from synchronization group
419 427 auto synchronizationGroup = groupIt->second;
420 428 synchronizationGroup->removeVariableId(variableId);
421 429
422 430 // Removes link between variable and synchronization group
423 431 impl->m_VariableIdGroupIdMap.erase(variableId);
424 432 }
425 433
426 434 void VariableController::onRequestDataLoading(QVector<std::shared_ptr<Variable> > variables,
427 435 const SqpRange &range, const SqpRange &oldRange,
428 436 bool synchronise)
429 437 {
430 438 // NOTE: oldRange isn't really necessary since oldRange == variable->range().
431 439
432 440 // we want to load data of the variable for the dateTime.
433 441 // First we check if the cache contains some of them.
434 442 // For the other, we ask the provider to give them.
435 443
436 444 auto varRequestId = QUuid::createUuid();
437 qCInfo(LOG_VariableController()) << "VariableController::onRequestDataLoading"
438 << QThread::currentThread()->objectName() << varRequestId;
445 qCDebug(LOG_VariableController()) << "VariableController::onRequestDataLoading"
446 << QThread::currentThread()->objectName() << varRequestId;
439 447
440 448 for (const auto &var : variables) {
441 449 qCDebug(LOG_VariableController()) << "processRequest for" << var->name() << varRequestId;
442 450 impl->processRequest(var, range, varRequestId);
443 451 }
444 452
445 453 if (synchronise) {
446 454 // Get the group ids
447 455 qCDebug(LOG_VariableController())
448 456 << "TORM VariableController::onRequestDataLoading for synchro var ENABLE";
449 457 auto groupIds = std::set<QUuid>{};
450 458 auto groupIdToOldRangeMap = std::map<QUuid, SqpRange>{};
451 459 for (const auto &var : variables) {
452 460 auto varToVarIdIt = impl->m_VariableToIdentifierMap.find(var);
453 461 if (varToVarIdIt != impl->m_VariableToIdentifierMap.cend()) {
454 462 auto vId = varToVarIdIt->second;
455 463 auto varIdToGroupIdIt = impl->m_VariableIdGroupIdMap.find(vId);
456 464 if (varIdToGroupIdIt != impl->m_VariableIdGroupIdMap.cend()) {
457 465 auto gId = varIdToGroupIdIt->second;
458 466 groupIdToOldRangeMap.insert(std::make_pair(gId, var->range()));
459 467 if (groupIds.find(gId) == groupIds.cend()) {
460 468 qCDebug(LOG_VariableController()) << "Synchro detect group " << gId;
461 469 groupIds.insert(gId);
462 470 }
463 471 }
464 472 }
465 473 }
466 474
467 475 // We assume here all group ids exist
468 476 for (const auto &gId : groupIds) {
469 477 auto vSynchronizationGroup = impl->m_GroupIdToVariableSynchronizationGroupMap.at(gId);
470 478 auto vSyncIds = vSynchronizationGroup->getIds();
471 479 qCDebug(LOG_VariableController()) << "Var in synchro group ";
472 480 for (auto vId : vSyncIds) {
473 481 auto var = impl->findVariable(vId);
474 482
475 483 // Don't process already processed var
476 484 if (!variables.contains(var)) {
477 485 if (var != nullptr) {
478 486 qCDebug(LOG_VariableController()) << "processRequest synchro for"
479 487 << var->name();
480 488 auto vSyncRangeRequested = computeSynchroRangeRequested(
481 489 var->range(), range, groupIdToOldRangeMap.at(gId));
482 490 qCDebug(LOG_VariableController()) << "synchro RR" << vSyncRangeRequested;
483 491 impl->processRequest(var, vSyncRangeRequested, varRequestId);
484 492 }
485 493 else {
486 494 qCCritical(LOG_VariableController())
487 495
488 496 << tr("Impossible to synchronize a null variable");
489 497 }
490 498 }
491 499 }
492 500 }
493 501 }
494 502
495 503 impl->updateVariableRequest(varRequestId);
496 504 }
497 505
498 506
499 507 void VariableController::initialize()
500 508 {
501 509 qCDebug(LOG_VariableController()) << tr("VariableController init") << QThread::currentThread();
502 510 impl->m_WorkingMutex.lock();
503 511 qCDebug(LOG_VariableController()) << tr("VariableController init END");
504 512 }
505 513
506 514 void VariableController::finalize()
507 515 {
508 516 impl->m_WorkingMutex.unlock();
509 517 }
510 518
511 519 void VariableController::waitForFinish()
512 520 {
513 521 QMutexLocker locker{&impl->m_WorkingMutex};
514 522 }
515 523
516 524 AcquisitionZoomType VariableController::getZoomType(const SqpRange &range, const SqpRange &oldRange)
517 525 {
518 526 // t1.m_TStart <= t2.m_TStart && t2.m_TEnd <= t1.m_TEnd
519 527 auto zoomType = AcquisitionZoomType::Unknown;
520 528 if (range.m_TStart <= oldRange.m_TStart && oldRange.m_TEnd <= range.m_TEnd) {
521 529 zoomType = AcquisitionZoomType::ZoomOut;
522 530 }
523 531 else if (range.m_TStart > oldRange.m_TStart && range.m_TEnd > oldRange.m_TEnd) {
524 532 zoomType = AcquisitionZoomType::PanRight;
525 533 }
526 534 else if (range.m_TStart < oldRange.m_TStart && range.m_TEnd < oldRange.m_TEnd) {
527 535 zoomType = AcquisitionZoomType::PanLeft;
528 536 }
529 537 else if (range.m_TStart > oldRange.m_TStart && oldRange.m_TEnd > range.m_TEnd) {
530 538 zoomType = AcquisitionZoomType::ZoomIn;
531 539 }
532 540 else {
533 541 qCCritical(LOG_VariableController()) << "getZoomType: Unknown type detected";
534 542 }
535 543 return zoomType;
536 544 }
537 545
538 546 void VariableController::VariableControllerPrivate::processRequest(std::shared_ptr<Variable> var,
539 547 const SqpRange &rangeRequested,
540 548 QUuid varRequestId)
541 549 {
542 550
543 551 // TODO: protect at
544 552 auto varRequest = VariableRequest{};
545 553 auto varId = m_VariableToIdentifierMap.at(var);
546 554
547 555 auto varStrategyRangesRequested
548 556 = m_VariableCacheStrategy->computeStrategyRanges(var->range(), rangeRequested);
549 557 auto notInCacheRangeList = var->provideNotInCacheRangeList(varStrategyRangesRequested.second);
550 558 auto inCacheRangeList = var->provideInCacheRangeList(varStrategyRangesRequested.second);
551 559
552 560 if (!notInCacheRangeList.empty()) {
553 561 varRequest.m_RangeRequested = varStrategyRangesRequested.first;
554 562 varRequest.m_CacheRangeRequested = varStrategyRangesRequested.second;
555 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM processRequest RR ") << rangeRequested;
556 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM processRequest R ")
557 << varStrategyRangesRequested.first;
558 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM processRequest CR ")
559 << varStrategyRangesRequested.second;
563
560 564 // store VarRequest
561 565 storeVariableRequest(varId, varRequestId, varRequest);
562 566
563 567 auto varProvider = m_VariableToProviderMap.at(var);
564 568 if (varProvider != nullptr) {
565 569 auto varRequestIdCanceled = m_VariableAcquisitionWorker->pushVariableRequest(
566 570 varRequestId, varId, varStrategyRangesRequested.first,
567 571 varStrategyRangesRequested.second,
568 572 DataProviderParameters{std::move(notInCacheRangeList), var->metadata()},
569 573 varProvider);
570 574
571 575 if (!varRequestIdCanceled.isNull()) {
572 qCInfo(LOG_VariableAcquisitionWorker()) << tr("varRequestIdCanceled: ")
573 << varRequestIdCanceled;
576 qCDebug(LOG_VariableAcquisitionWorker()) << tr("varRequestIdCanceled: ")
577 << varRequestIdCanceled;
574 578 cancelVariableRequest(varRequestIdCanceled);
575 579 }
576 580 }
577 581 else {
578 582 qCCritical(LOG_VariableController())
579 583 << "Impossible to provide data with a null provider";
580 584 }
581 585
582 586 if (!inCacheRangeList.empty()) {
583 587 emit q->updateVarDisplaying(var, inCacheRangeList.first());
584 588 }
585 589 }
586 590 else {
587 591
588 592 varRequest.m_RangeRequested = varStrategyRangesRequested.first;
589 593 varRequest.m_CacheRangeRequested = varStrategyRangesRequested.second;
590 594 // store VarRequest
591 595 storeVariableRequest(varId, varRequestId, varRequest);
592 596 acceptVariableRequest(varId,
593 597 var->dataSeries()->subDataSeries(varStrategyRangesRequested.second));
594 598 }
595 599 }
596 600
597 601 std::shared_ptr<Variable>
598 602 VariableController::VariableControllerPrivate::findVariable(QUuid vIdentifier)
599 603 {
600 604 std::shared_ptr<Variable> var;
601 605 auto findReply = [vIdentifier](const auto &entry) { return vIdentifier == entry.second; };
602 606
603 607 auto end = m_VariableToIdentifierMap.cend();
604 608 auto it = std::find_if(m_VariableToIdentifierMap.cbegin(), end, findReply);
605 609 if (it != end) {
606 610 var = it->first;
607 611 }
608 612 else {
609 613 qCCritical(LOG_VariableController())
610 614 << tr("Impossible to find the variable with the identifier: ") << vIdentifier;
611 615 }
612 616
613 617 return var;
614 618 }
615 619
616 620 std::shared_ptr<IDataSeries> VariableController::VariableControllerPrivate::retrieveDataSeries(
617 621 const QVector<AcquisitionDataPacket> acqDataPacketVector)
618 622 {
619 623 qCDebug(LOG_VariableController()) << tr("TORM: retrieveDataSeries acqDataPacketVector size")
620 624 << acqDataPacketVector.size();
621 625 std::shared_ptr<IDataSeries> dataSeries;
622 626 if (!acqDataPacketVector.isEmpty()) {
623 627 dataSeries = acqDataPacketVector[0].m_DateSeries;
624 628 for (int i = 1; i < acqDataPacketVector.size(); ++i) {
625 629 dataSeries->merge(acqDataPacketVector[i].m_DateSeries.get());
626 630 }
627 631 }
628 632 qCDebug(LOG_VariableController()) << tr("TORM: retrieveDataSeries acqDataPacketVector size END")
629 633 << acqDataPacketVector.size();
630 634 return dataSeries;
631 635 }
632 636
633 637 void VariableController::VariableControllerPrivate::registerProvider(
634 638 std::shared_ptr<IDataProvider> provider)
635 639 {
636 640 if (m_ProviderSet.find(provider) == m_ProviderSet.end()) {
637 641 qCDebug(LOG_VariableController()) << tr("Registering of a new provider")
638 642 << provider->objectName();
639 643 m_ProviderSet.insert(provider);
640 644 connect(provider.get(), &IDataProvider::dataProvided, m_VariableAcquisitionWorker.get(),
641 645 &VariableAcquisitionWorker::onVariableDataAcquired);
642 646 connect(provider.get(), &IDataProvider::dataProvidedProgress,
643 647 m_VariableAcquisitionWorker.get(),
644 648 &VariableAcquisitionWorker::onVariableRetrieveDataInProgress);
645 649 }
646 650 else {
647 651 qCDebug(LOG_VariableController()) << tr("Cannot register provider, it already exists ");
648 652 }
649 653 }
650 654
651 655 void VariableController::VariableControllerPrivate::storeVariableRequest(
652 656 QUuid varId, QUuid varRequestId, const VariableRequest &varRequest)
653 657 {
654 658 // First request for the variable. we can create an entry for it
655 659 auto varIdToVarRequestIdQueueMapIt = m_VarIdToVarRequestIdQueueMap.find(varId);
656 660 if (varIdToVarRequestIdQueueMapIt == m_VarIdToVarRequestIdQueueMap.cend()) {
657 661 auto varRequestIdQueue = std::deque<QUuid>{};
658 662 qCDebug(LOG_VariableController()) << tr("Store REQUEST in QUEUE");
659 663 varRequestIdQueue.push_back(varRequestId);
660 664 m_VarIdToVarRequestIdQueueMap.insert(std::make_pair(varId, std::move(varRequestIdQueue)));
661 665 }
662 666 else {
663 667 qCDebug(LOG_VariableController()) << tr("Store REQUEST in EXISTING QUEUE");
664 668 auto &varRequestIdQueue = varIdToVarRequestIdQueueMapIt->second;
665 669 varRequestIdQueue.push_back(varRequestId);
666 670 }
667 671
668 672 auto varRequestIdToVarIdVarRequestMapIt = m_VarRequestIdToVarIdVarRequestMap.find(varRequestId);
669 673 if (varRequestIdToVarIdVarRequestMapIt == m_VarRequestIdToVarIdVarRequestMap.cend()) {
670 674 auto varIdToVarRequestMap = std::map<QUuid, VariableRequest>{};
671 675 varIdToVarRequestMap.insert(std::make_pair(varId, varRequest));
672 676 qCDebug(LOG_VariableController()) << tr("Store REQUESTID in MAP");
673 677 m_VarRequestIdToVarIdVarRequestMap.insert(
674 678 std::make_pair(varRequestId, std::move(varIdToVarRequestMap)));
675 679 }
676 680 else {
677 681 auto &varIdToVarRequestMap = varRequestIdToVarIdVarRequestMapIt->second;
678 682 qCDebug(LOG_VariableController()) << tr("Store REQUESTID in EXISTING MAP");
679 683 varIdToVarRequestMap.insert(std::make_pair(varId, varRequest));
680 684 }
681 685 }
682 686
683 687 QUuid VariableController::VariableControllerPrivate::acceptVariableRequest(
684 688 QUuid varId, std::shared_ptr<IDataSeries> dataSeries)
685 689 {
686 690 QUuid varRequestId;
687 691 auto varIdToVarRequestIdQueueMapIt = m_VarIdToVarRequestIdQueueMap.find(varId);
688 692 if (varIdToVarRequestIdQueueMapIt != m_VarIdToVarRequestIdQueueMap.cend()) {
689 693 auto &varRequestIdQueue = varIdToVarRequestIdQueueMapIt->second;
690 694 varRequestId = varRequestIdQueue.front();
691 695 auto varRequestIdToVarIdVarRequestMapIt
692 696 = m_VarRequestIdToVarIdVarRequestMap.find(varRequestId);
693 697 if (varRequestIdToVarIdVarRequestMapIt != m_VarRequestIdToVarIdVarRequestMap.cend()) {
694 698 auto &varIdToVarRequestMap = varRequestIdToVarIdVarRequestMapIt->second;
695 699 auto varIdToVarRequestMapIt = varIdToVarRequestMap.find(varId);
696 700 if (varIdToVarRequestMapIt != varIdToVarRequestMap.cend()) {
697 701 qCDebug(LOG_VariableController()) << tr("acceptVariableRequest");
698 702 auto &varRequest = varIdToVarRequestMapIt->second;
699 703 varRequest.m_DataSeries = dataSeries;
700 704 varRequest.m_CanUpdate = true;
701 705 }
702 706 else {
703 707 qCDebug(LOG_VariableController())
704 708 << tr("Impossible to acceptVariableRequest of a unknown variable id attached "
705 709 "to a variableRequestId")
706 710 << varRequestId << varId;
707 711 }
708 712 }
709 713 else {
710 714 qCCritical(LOG_VariableController())
711 715 << tr("Impossible to acceptVariableRequest of a unknown variableRequestId")
712 716 << varRequestId;
713 717 }
714 718
715 719 qCDebug(LOG_VariableController()) << tr("1: erase REQUEST in QUEUE ?")
716 720 << varRequestIdQueue.size();
717 721 varRequestIdQueue.pop_front();
718 722 qCDebug(LOG_VariableController()) << tr("2: erase REQUEST in QUEUE ?")
719 723 << varRequestIdQueue.size();
720 724 if (varRequestIdQueue.empty()) {
721 725 m_VarIdToVarRequestIdQueueMap.erase(varId);
722 726 }
723 727 }
724 728 else {
725 729 qCCritical(LOG_VariableController())
726 730 << tr("Impossible to acceptVariableRequest of a unknown variable id") << varId;
727 731 }
728 732
729 733 return varRequestId;
730 734 }
731 735
732 736 void VariableController::VariableControllerPrivate::updateVariableRequest(QUuid varRequestId)
733 737 {
734 738
735 739 auto varRequestIdToVarIdVarRequestMapIt = m_VarRequestIdToVarIdVarRequestMap.find(varRequestId);
736 740 if (varRequestIdToVarIdVarRequestMapIt != m_VarRequestIdToVarIdVarRequestMap.cend()) {
737 741 bool processVariableUpdate = true;
738 742 auto &varIdToVarRequestMap = varRequestIdToVarIdVarRequestMapIt->second;
739 743 for (auto varIdToVarRequestMapIt = varIdToVarRequestMap.cbegin();
740 744 (varIdToVarRequestMapIt != varIdToVarRequestMap.cend()) && processVariableUpdate;
741 745 ++varIdToVarRequestMapIt) {
742 746 processVariableUpdate &= varIdToVarRequestMapIt->second.m_CanUpdate;
743 747 qCDebug(LOG_VariableController()) << tr("updateVariableRequest")
744 748 << processVariableUpdate;
745 749 }
746 750
747 751 if (processVariableUpdate) {
748 752 for (auto varIdToVarRequestMapIt = varIdToVarRequestMap.cbegin();
749 753 varIdToVarRequestMapIt != varIdToVarRequestMap.cend(); ++varIdToVarRequestMapIt) {
750 754 if (auto var = findVariable(varIdToVarRequestMapIt->first)) {
751 755 auto &varRequest = varIdToVarRequestMapIt->second;
752 756 var->setRange(varRequest.m_RangeRequested);
753 757 var->setCacheRange(varRequest.m_CacheRangeRequested);
754 758 qCDebug(LOG_VariableController()) << tr("1: onDataProvided")
755 759 << varRequest.m_RangeRequested;
756 760 qCDebug(LOG_VariableController()) << tr("2: onDataProvided")
757 761 << varRequest.m_CacheRangeRequested;
758 762 var->mergeDataSeries(varRequest.m_DataSeries);
759 763 qCDebug(LOG_VariableController()) << tr("3: onDataProvided")
760 764 << varRequest.m_DataSeries->range();
761 765 qCDebug(LOG_VariableController()) << tr("4: onDataProvided");
762 766
763 767 /// @todo MPL: confirm
764 768 // Variable update is notified only if there is no pending request for it
765 if (m_VarIdToVarRequestIdQueueMap.count(varIdToVarRequestMapIt->first) == 0) {
766 emit var->updated();
767 }
769 // if
770 // (m_VarIdToVarRequestIdQueueMap.count(varIdToVarRequestMapIt->first)
771 // == 0) {
772 emit var->updated();
773 // }
768 774 }
769 775 else {
770 776 qCCritical(LOG_VariableController())
771 777 << tr("Impossible to update data to a null variable");
772 778 }
773 779 }
774 780
775 781 // cleaning varRequestId
776 782 qCDebug(LOG_VariableController()) << tr("0: erase REQUEST in MAP ?")
777 783 << m_VarRequestIdToVarIdVarRequestMap.size();
778 784 m_VarRequestIdToVarIdVarRequestMap.erase(varRequestId);
779 785 qCDebug(LOG_VariableController()) << tr("1: erase REQUEST in MAP ?")
780 786 << m_VarRequestIdToVarIdVarRequestMap.size();
781 787 }
782 788 }
783 789 else {
784 790 qCCritical(LOG_VariableController())
785 791 << tr("Cannot updateVariableRequest for a unknow varRequestId") << varRequestId;
786 792 }
787 793 }
788 794
789 795 void VariableController::VariableControllerPrivate::cancelVariableRequest(QUuid varRequestId)
790 796 {
791 797 // cleaning varRequestId
792 798 m_VarRequestIdToVarIdVarRequestMap.erase(varRequestId);
793 799
794 800 for (auto varIdToVarRequestIdQueueMapIt = m_VarIdToVarRequestIdQueueMap.begin();
795 801 varIdToVarRequestIdQueueMapIt != m_VarIdToVarRequestIdQueueMap.end();) {
796 802 auto &varRequestIdQueue = varIdToVarRequestIdQueueMapIt->second;
797 803 varRequestIdQueue.erase(
798 804 std::remove(varRequestIdQueue.begin(), varRequestIdQueue.end(), varRequestId),
799 805 varRequestIdQueue.end());
800 806 if (varRequestIdQueue.empty()) {
801 807 varIdToVarRequestIdQueueMapIt
802 808 = m_VarIdToVarRequestIdQueueMap.erase(varIdToVarRequestIdQueueMapIt);
803 809 }
804 810 else {
805 811 ++varIdToVarRequestIdQueueMapIt;
806 812 }
807 813 }
808 814 }
General Comments 2
You need to be logged in to leave comments. Login now