##// END OF EJS Templates
request is now passed by shared pointer instead of const &
perrinel -
r751:34234d13df5c
parent child
Show More
@@ -1,77 +1,77
1 1 #ifndef SCIQLOP_IDATAPROVIDER_H
2 2 #define SCIQLOP_IDATAPROVIDER_H
3 3
4 4 #include "CoreGlobal.h"
5 5
6 6 #include <memory>
7 7
8 8 #include <QObject>
9 9 #include <QUuid>
10 10
11 11 #include <Common/MetaTypes.h>
12 12
13 13 #include <Data/SqpRange.h>
14 14
15 15 #include <functional>
16 16
17 17 class DataProviderParameters;
18 18 class IDataSeries;
19 19 class QNetworkReply;
20 20 class QNetworkRequest;
21 21
22 22 /**
23 23 * @brief The IDataProvider interface aims to declare a data provider.
24 24 *
25 25 * A data provider is an entity that generates data and returns it according to various parameters
26 26 * (time interval, product to retrieve the data, etc.)
27 27 *
28 28 * @sa IDataSeries
29 29 */
30 30 class SCIQLOP_CORE_EXPORT IDataProvider : public QObject {
31 31
32 32 Q_OBJECT
33 33 public:
34 34 virtual ~IDataProvider() noexcept = default;
35 35 virtual std::shared_ptr<IDataProvider> clone() const = 0;
36 36
37 37 /**
38 38 * @brief requestDataLoading provide datas for the data identified by acqIdentifier and
39 39 * parameters
40 40 */
41 41 virtual void requestDataLoading(QUuid acqIdentifier, const DataProviderParameters &parameters)
42 42 = 0;
43 43
44 44 /**
45 45 * @brief requestDataAborting stop data loading of the data identified by acqIdentifier
46 46 */
47 47 virtual void requestDataAborting(QUuid acqIdentifier) = 0;
48 48
49 49 signals:
50 50 /**
51 51 * @brief dataProvided send dataSeries under dateTime and that corresponds of the data
52 52 * identified by acqIdentifier
53 53 */
54 54 void dataProvided(QUuid acqIdentifier, std::shared_ptr<IDataSeries> dateSeriesAcquired,
55 55 const SqpRange &dataRangeAcquired);
56 56
57 57 /**
58 58 * @brief dataProvided send dataSeries under dateTime and that corresponds of the data
59 59 * identified by identifier
60 60 */
61 61 void dataProvidedProgress(QUuid acqIdentifier, double progress);
62 62
63 63
64 64 /**
65 65 * @brief requestConstructed send a request for the data identified by acqIdentifier
66 66 * @callback is the methode call by the reply of the request when it is finished.
67 67 */
68 void requestConstructed(const QNetworkRequest &request, QUuid acqIdentifier,
68 void requestConstructed(std::shared_ptr<QNetworkRequest> request, QUuid acqIdentifier,
69 69 std::function<void(QNetworkReply *, QUuid)> callback);
70 70 };
71 71
72 72 // Required for using shared_ptr in signals/slots
73 73 SCIQLOP_REGISTER_META_TYPE(IDATAPROVIDER_PTR_REGISTRY, std::shared_ptr<IDataProvider>)
74 74 SCIQLOP_REGISTER_META_TYPE(IDATAPROVIDER_FUNCTION_REGISTRY,
75 75 std::function<void(QNetworkReply *, QUuid)>)
76 76
77 77 #endif // SCIQLOP_IDATAPROVIDER_H
@@ -1,49 +1,53
1 1 #ifndef SCIQLOP_NETWORKCONTROLLER_H
2 2 #define SCIQLOP_NETWORKCONTROLLER_H
3 3
4 4 #include "CoreGlobal.h"
5 5
6 6 #include <QLoggingCategory>
7 7 #include <QObject>
8 8 #include <QUuid>
9 9
10 #include <Common/MetaTypes.h>
10 11 #include <Common/spimpl.h>
11 12 #include <functional>
12 13
13 14 Q_DECLARE_LOGGING_CATEGORY(LOG_NetworkController)
14 15
15 16 class QNetworkReply;
16 17 class QNetworkRequest;
17 18
18 19 /**
19 20 * @brief The NetworkController class aims to handle all network connection of SciQlop.
20 21 */
21 22 class SCIQLOP_CORE_EXPORT NetworkController : public QObject {
22 23 Q_OBJECT
23 24 public:
24 25 explicit NetworkController(QObject *parent = 0);
25 26
26 27 void initialize();
27 28 void finalize();
28 29
29 30 public slots:
30 31 /// Execute request and call callback when the reply is finished. Identifier is attached to the
31 32 /// callback
32 void onProcessRequested(const QNetworkRequest &request, QUuid identifier,
33 void onProcessRequested(std::shared_ptr<QNetworkRequest> request, QUuid identifier,
33 34 std::function<void(QNetworkReply *, QUuid)> callback);
34 35 /// Cancel the request of identifier
35 36 void onReplyCanceled(QUuid identifier);
36 37
37 38 signals:
38 39 void replyFinished(QNetworkReply *reply, QUuid identifier);
39 void replyDownloadProgress(QUuid identifier, const QNetworkRequest &networkRequest,
40 void replyDownloadProgress(QUuid identifier, std::shared_ptr<QNetworkRequest> networkRequest,
40 41 double progress);
41 42
42 43 private:
43 44 void waitForFinish();
44 45
45 46 class NetworkControllerPrivate;
46 47 spimpl::unique_impl_ptr<NetworkControllerPrivate> impl;
47 48 };
48 49
50 SCIQLOP_REGISTER_META_TYPE(NETWORKREQUEST_REGISTRY, std::shared_ptr<QNetworkRequest>)
51
52
49 53 #endif // SCIQLOP_NETWORKCONTROLLER_H
@@ -1,134 +1,133
1 1 #include "Network/NetworkController.h"
2 2
3 3 #include <QMutex>
4 4 #include <QNetworkAccessManager>
5 5 #include <QNetworkReply>
6 6 #include <QNetworkRequest>
7 7 #include <QReadWriteLock>
8 8 #include <QThread>
9 9
10 10 #include <unordered_map>
11 11
12 12 Q_LOGGING_CATEGORY(LOG_NetworkController, "NetworkController")
13 13
14 14 struct NetworkController::NetworkControllerPrivate {
15 15 explicit NetworkControllerPrivate(NetworkController *parent) : m_WorkingMutex{} {}
16 16
17 17 void lockRead() { m_Lock.lockForRead(); }
18 18 void lockWrite() { m_Lock.lockForWrite(); }
19 19 void unlock() { m_Lock.unlock(); }
20 20
21 21 QMutex m_WorkingMutex;
22 22
23 23 QReadWriteLock m_Lock;
24 24 std::unordered_map<QNetworkReply *, QUuid> m_NetworkReplyToVariableId;
25 25 std::unique_ptr<QNetworkAccessManager> m_AccessManager{nullptr};
26 26 };
27 27
28 28 NetworkController::NetworkController(QObject *parent)
29 29 : QObject(parent), impl{spimpl::make_unique_impl<NetworkControllerPrivate>(this)}
30 30 {
31 31 }
32 32
33 void NetworkController::onProcessRequested(const QNetworkRequest &request, QUuid identifier,
33 void NetworkController::onProcessRequested(std::shared_ptr<QNetworkRequest> request,
34 QUuid identifier,
34 35 std::function<void(QNetworkReply *, QUuid)> callback)
35 36 {
36 qCDebug(LOG_NetworkController()) << tr("NetworkController registered")
37 << QThread::currentThread()->objectName();
38 auto reply = impl->m_AccessManager->get(request);
37 qCDebug(LOG_NetworkController()) << tr("NetworkController onProcessRequested")
38 << QThread::currentThread()->objectName() << &request;
39 auto reply = impl->m_AccessManager->get(*request);
39 40
40 41 // Store the couple reply id
41 42 impl->lockWrite();
42 43 impl->m_NetworkReplyToVariableId[reply] = identifier;
43 44 impl->unlock();
44 45
45 46 auto onReplyFinished = [request, reply, this, identifier, callback]() {
46 47
47 48 qCDebug(LOG_NetworkController()) << tr("NetworkController onReplyFinished")
48 << QThread::currentThread() << reply;
49 << QThread::currentThread() << request.get() << reply;
49 50 impl->lockRead();
50 51 auto it = impl->m_NetworkReplyToVariableId.find(reply);
51 52 impl->unlock();
52 53 if (it != impl->m_NetworkReplyToVariableId.cend()) {
53 54 impl->lockWrite();
54 55 impl->m_NetworkReplyToVariableId.erase(reply);
55 56 impl->unlock();
56 57 // Deletes reply
57 58 callback(reply, identifier);
58 59 reply->deleteLater();
59
60 emit this->replyDownloadProgress(identifier, request, 0);
61 60 }
62 61
63 62 qCDebug(LOG_NetworkController()) << tr("NetworkController onReplyFinished END")
64 63 << QThread::currentThread() << reply;
65 64 };
66 65
67 66 auto onReplyProgress = [reply, request, this](qint64 bytesRead, qint64 totalBytes) {
68 67
69 68 double progress = (bytesRead * 100.0) / totalBytes;
70 69 qCDebug(LOG_NetworkController()) << tr("NetworkController onReplyProgress") << progress
71 << QThread::currentThread() << reply;
70 << QThread::currentThread() << request.get() << reply;
72 71 impl->lockRead();
73 72 auto it = impl->m_NetworkReplyToVariableId.find(reply);
74 73 impl->unlock();
75 74 if (it != impl->m_NetworkReplyToVariableId.cend()) {
76 75 emit this->replyDownloadProgress(it->second, request, progress);
77 76 }
78 77 qCDebug(LOG_NetworkController()) << tr("NetworkController onReplyProgress END")
79 78 << QThread::currentThread() << reply;
80 79 };
81 80
82 81
83 82 connect(reply, &QNetworkReply::finished, this, onReplyFinished);
84 83 connect(reply, &QNetworkReply::downloadProgress, this, onReplyProgress);
85 84 qCDebug(LOG_NetworkController()) << tr("NetworkController registered END")
86 85 << QThread::currentThread()->objectName() << reply;
87 86 }
88 87
89 88 void NetworkController::initialize()
90 89 {
91 90 qCDebug(LOG_NetworkController()) << tr("NetworkController init") << QThread::currentThread();
92 91 impl->m_WorkingMutex.lock();
93 92 impl->m_AccessManager = std::make_unique<QNetworkAccessManager>();
94 93
95 94
96 95 auto onReplyErrors = [this](QNetworkReply *reply, const QList<QSslError> &errors) {
97 96
98 97 qCCritical(LOG_NetworkController()) << tr("NetworkAcessManager errors: ") << errors;
99 98
100 99 };
101 100
102 101
103 102 connect(impl->m_AccessManager.get(), &QNetworkAccessManager::sslErrors, this, onReplyErrors);
104 103
105 104 qCDebug(LOG_NetworkController()) << tr("NetworkController init END");
106 105 }
107 106
108 107 void NetworkController::finalize()
109 108 {
110 109 impl->m_WorkingMutex.unlock();
111 110 }
112 111
113 112 void NetworkController::onReplyCanceled(QUuid identifier)
114 113 {
115 114 auto findReply = [identifier](const auto &entry) { return identifier == entry.second; };
116 115 qCDebug(LOG_NetworkController()) << tr("NetworkController onReplyCanceled")
117 116 << QThread::currentThread();
118 117
119 118
120 119 impl->lockRead();
121 120 auto end = impl->m_NetworkReplyToVariableId.cend();
122 121 auto it = std::find_if(impl->m_NetworkReplyToVariableId.cbegin(), end, findReply);
123 122 impl->unlock();
124 123 if (it != end) {
125 124 it->first->abort();
126 125 }
127 126 qCDebug(LOG_NetworkController()) << tr("NetworkController onReplyCanceled END")
128 127 << QThread::currentThread();
129 128 }
130 129
131 130 void NetworkController::waitForFinish()
132 131 {
133 132 QMutexLocker locker{&impl->m_WorkingMutex};
134 133 }
@@ -1,287 +1,257
1 1 #include "Variable/VariableAcquisitionWorker.h"
2 2
3 3 #include "Variable/Variable.h"
4 4
5 5 #include <Data/AcquisitionRequest.h>
6 6 #include <Data/SqpRange.h>
7 7
8 8 #include <unordered_map>
9 9 #include <utility>
10 10
11 11 #include <QMutex>
12 12 #include <QReadWriteLock>
13 13 #include <QThread>
14 14
15 15 #include <cmath>
16 16
17 17 Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker")
18 18
19 19 struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate {
20 20
21 21 explicit VariableAcquisitionWorkerPrivate() : m_Lock{QReadWriteLock::Recursive} {}
22 22
23 23 void lockRead() { m_Lock.lockForRead(); }
24 24 void lockWrite() { m_Lock.lockForWrite(); }
25 25 void unlock() { m_Lock.unlock(); }
26 26
27 27 void removeVariableRequest(QUuid vIdentifier);
28 28
29 29 QMutex m_WorkingMutex;
30 30 QReadWriteLock m_Lock;
31 31
32 32 std::map<QUuid, QVector<AcquisitionDataPacket> > m_AcqIdentifierToAcqDataPacketVectorMap;
33 33 std::map<QUuid, AcquisitionRequest> m_AcqIdentifierToAcqRequestMap;
34 34 std::map<QUuid, std::pair<QUuid, QUuid> > m_VIdentifierToCurrrentAcqIdNextIdPairMap;
35 35 };
36 36
37 37
38 38 VariableAcquisitionWorker::VariableAcquisitionWorker(QObject *parent)
39 39 : QObject{parent}, impl{spimpl::make_unique_impl<VariableAcquisitionWorkerPrivate>()}
40 40 {
41 41 }
42 42
43 43 VariableAcquisitionWorker::~VariableAcquisitionWorker()
44 44 {
45 45 qCInfo(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker destruction")
46 46 << QThread::currentThread();
47 47 this->waitForFinish();
48 48 }
49 49
50 50
51 51 QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid vIdentifier,
52 52 SqpRange rangeRequested,
53 53 SqpRange cacheRangeRequested,
54 54 DataProviderParameters parameters,
55 55 std::shared_ptr<IDataProvider> provider)
56 56 {
57 57 qCDebug(LOG_VariableAcquisitionWorker())
58 58 << tr("TORM VariableAcquisitionWorker::pushVariableRequest ") << cacheRangeRequested;
59 59 auto varRequestIdCanceled = QUuid();
60 60
61 61 // Request creation
62 62 auto acqRequest = AcquisitionRequest{};
63 63 acqRequest.m_VarRequestId = varRequestId;
64 64 acqRequest.m_vIdentifier = vIdentifier;
65 65 acqRequest.m_DataProviderParameters = parameters;
66 66 acqRequest.m_RangeRequested = rangeRequested;
67 67 acqRequest.m_CacheRangeRequested = cacheRangeRequested;
68 68 acqRequest.m_Size = parameters.m_Times.size();
69 69 acqRequest.m_Provider = provider;
70 70
71 71
72 72 // Register request
73 73 impl->lockWrite();
74 74 impl->m_AcqIdentifierToAcqRequestMap.insert(
75 75 std::make_pair(acqRequest.m_AcqIdentifier, acqRequest));
76 76
77 77 auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
78 78 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
79 79 // A current request already exists, we can replace the next one
80 80 auto nextAcqId = it->second.second;
81 81 auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(nextAcqId);
82 82 if (acqIdentifierToAcqRequestMapIt != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
83 83 auto request = acqIdentifierToAcqRequestMapIt->second;
84 84 varRequestIdCanceled = request.m_VarRequestId;
85 85 }
86 86
87 87 it->second.second = acqRequest.m_AcqIdentifier;
88 88 impl->unlock();
89 89 }
90 90 else {
91 91 // First request for the variable, it must be stored and executed
92 92 impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.insert(
93 93 std::make_pair(vIdentifier, std::make_pair(acqRequest.m_AcqIdentifier, QUuid())));
94 94 impl->unlock();
95 95
96 96 QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
97 97 Q_ARG(QUuid, acqRequest.m_AcqIdentifier));
98 98 }
99 99
100 100 return varRequestIdCanceled;
101 101 }
102 102
103 103 void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier)
104 104 {
105 105 // TODO
106 106 }
107 107
108 108 void VariableAcquisitionWorker::onVariableRetrieveDataInProgress(QUuid acqIdentifier,
109 109 double progress)
110 110 {
111 111 impl->lockRead();
112 112 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
113 113 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
114 114 auto currentPartSize = (aIdToARit->second.m_Size != 0) ? 100 / aIdToARit->second.m_Size : 0;
115 115
116 116 auto currentPartProgress
117 117 = std::isnan(progress) ? 0.0 : (progress * currentPartSize) / 100.0;
118 118 auto currentAlreadyProgress = aIdToARit->second.m_Progression * currentPartSize;
119 119
120 qCInfo(LOG_VariableAcquisitionWorker()) << tr("TORM: progress :") << progress;
121 qCInfo(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableRetrieveDataInProgress A:")
122 << aIdToARit->second.m_Progression
123 << aIdToARit->second.m_Size;
124 qCInfo(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableRetrieveDataInProgress B:")
125 << currentPartSize;
126 qCInfo(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableRetrieveDataInProgress C:")
127 << currentPartProgress;
128 qCInfo(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableRetrieveDataInProgress D:")
129 << currentAlreadyProgress;
130 qCInfo(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableRetrieveDataInProgress E:")
131 << currentAlreadyProgress + currentPartProgress
132 << "\n";
133
134 120 auto finalProgression = currentAlreadyProgress + currentPartProgress;
135 121 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, finalProgression);
136 122
137 123 if (finalProgression == 100.0) {
138 124 emit variableRequestInProgress(aIdToARit->second.m_vIdentifier, 0.0);
139 125 }
140 126 }
141 127 impl->unlock();
142 128 }
143 129
144 130 void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier,
145 131 std::shared_ptr<IDataSeries> dataSeries,
146 132 SqpRange dataRangeAcquired)
147 133 {
148 qCInfo(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableDataAcquired on range ")
149 << acqIdentifier << dataRangeAcquired;
134 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM: onVariableDataAcquired on range ")
135 << acqIdentifier << dataRangeAcquired;
150 136 impl->lockWrite();
151 137 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
152 138 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
153 139 // Store the result
154 140 auto dataPacket = AcquisitionDataPacket{};
155 141 dataPacket.m_Range = dataRangeAcquired;
156 142 dataPacket.m_DateSeries = dataSeries;
157 143
158 144 auto aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
159 145 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
160 146 // A current request result already exists, we can update it
161 147 aIdToADPVit->second.push_back(dataPacket);
162 148 }
163 149 else {
164 150 // First request result for the variable, it must be stored
165 151 impl->m_AcqIdentifierToAcqDataPacketVectorMap.insert(
166 152 std::make_pair(acqIdentifier, QVector<AcquisitionDataPacket>() << dataPacket));
167 153 }
168 154
169 155
170 156 // Decrement the counter of the request
171 157 auto &acqRequest = aIdToARit->second;
172 158 acqRequest.m_Progression = acqRequest.m_Progression + 1;
173 159
174 160 // if the counter is 0, we can return data then run the next request if it exists and
175 161 // removed the finished request
176 162 if (acqRequest.m_Size == acqRequest.m_Progression) {
177 163 // Return the data
178 164 aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
179 165 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
180 166 emit dataProvided(acqRequest.m_vIdentifier, acqRequest.m_RangeRequested,
181 167 acqRequest.m_CacheRangeRequested, aIdToADPVit->second);
182 168 }
183 169
184 170 // Execute the next one
185 171 auto it
186 172 = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(acqRequest.m_vIdentifier);
187 173
188 174 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
189 175 if (it->second.second.isNull()) {
190 176 // There is no next request, we can remove the variable request
191 177 impl->removeVariableRequest(acqRequest.m_vIdentifier);
192 178 }
193 179 else {
194 180 auto acqIdentifierToRemove = it->second.first;
195 181 // Move the next request to the current request
196 182 it->second.first = it->second.second;
197 183 it->second.second = QUuid();
198 184 // Remove AcquisitionRequest and results;
199 185 impl->m_AcqIdentifierToAcqRequestMap.erase(acqIdentifierToRemove);
200 186 impl->m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqIdentifierToRemove);
201 187 // Execute the current request
202 188 QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
203 189 Q_ARG(QUuid, it->second.first));
204 190 }
205 191 }
206 192 else {
207 193 qCCritical(LOG_VariableAcquisitionWorker())
208 194 << tr("Impossible to execute the acquisition on an unfound variable ");
209 195 }
210 196 }
211 197 }
212 198 else {
213 199 qCCritical(LOG_VariableAcquisitionWorker())
214 200 << tr("Impossible to retrieve AcquisitionRequest for the incoming data");
215 201 }
216 202 impl->unlock();
217 203 }
218 204
219 205 void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier)
220 206 {
221 207 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onExecuteRequest") << QThread::currentThread();
222 208 impl->lockRead();
223 209 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
224 210 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
225 211 auto request = it->second;
226 212 impl->unlock();
227 213 emit variableRequestInProgress(request.m_vIdentifier, 0.1);
228 214 request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters);
229 215 }
230 216 else {
231 217 impl->unlock();
232 218 // TODO log no acqIdentifier recognized
233 219 }
234 220 }
235 221
236 222 void VariableAcquisitionWorker::initialize()
237 223 {
238 224 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init")
239 225 << QThread::currentThread();
240 226 impl->m_WorkingMutex.lock();
241 227 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init END");
242 228 }
243 229
244 230 void VariableAcquisitionWorker::finalize()
245 231 {
246 232 impl->m_WorkingMutex.unlock();
247 233 }
248 234
249 235 void VariableAcquisitionWorker::waitForFinish()
250 236 {
251 237 QMutexLocker locker{&impl->m_WorkingMutex};
252 238 }
253 239
254 240 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariableRequest(
255 241 QUuid vIdentifier)
256 242 {
257 243 lockWrite();
258 244 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
259 245
260 246 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
261 247 // A current request already exists, we can replace the next one
262 248
263 249 m_AcqIdentifierToAcqRequestMap.erase(it->second.first);
264 250 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.first);
265 251
266 252 m_AcqIdentifierToAcqRequestMap.erase(it->second.second);
267 253 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.second);
268 254 }
269 255 m_VIdentifierToCurrrentAcqIdNextIdPairMap.erase(vIdentifier);
270 256 unlock();
271 257 }
272
273 //void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier)
274 //{
275 // qCDebug(LOG_VariableAcquisitionWorker()) << tr("onExecuteRequest") << QThread::currentThread();
276 // impl->lockRead();
277 // auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
278 // if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
279 // auto request = it->second;
280 // impl->unlock();
281 // request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters);
282 // }
283 // else {
284 // impl->unlock();
285 // // TODO log no acqIdentifier recognized
286 // }
287 //}
@@ -1,807 +1,808
1 1 #include <Variable/Variable.h>
2 2 #include <Variable/VariableAcquisitionWorker.h>
3 3 #include <Variable/VariableCacheStrategy.h>
4 4 #include <Variable/VariableController.h>
5 5 #include <Variable/VariableModel.h>
6 6 #include <Variable/VariableSynchronizationGroup.h>
7 7
8 8 #include <Data/DataProviderParameters.h>
9 9 #include <Data/IDataProvider.h>
10 10 #include <Data/IDataSeries.h>
11 11 #include <Data/VariableRequest.h>
12 12 #include <Time/TimeController.h>
13 13
14 14 #include <QMutex>
15 15 #include <QThread>
16 16 #include <QUuid>
17 17 #include <QtCore/QItemSelectionModel>
18 18
19 19 #include <deque>
20 20 #include <set>
21 21 #include <unordered_map>
22 22
23 23 Q_LOGGING_CATEGORY(LOG_VariableController, "VariableController")
24 24
25 25 namespace {
26 26
27 27 SqpRange computeSynchroRangeRequested(const SqpRange &varRange, const SqpRange &graphRange,
28 28 const SqpRange &oldGraphRange)
29 29 {
30 30 auto zoomType = VariableController::getZoomType(graphRange, oldGraphRange);
31 31
32 32 auto varRangeRequested = varRange;
33 33 switch (zoomType) {
34 34 case AcquisitionZoomType::ZoomIn: {
35 35 auto deltaLeft = graphRange.m_TStart - oldGraphRange.m_TStart;
36 36 auto deltaRight = oldGraphRange.m_TEnd - graphRange.m_TEnd;
37 37 varRangeRequested.m_TStart += deltaLeft;
38 38 varRangeRequested.m_TEnd -= deltaRight;
39 39 break;
40 40 }
41 41
42 42 case AcquisitionZoomType::ZoomOut: {
43 43 auto deltaLeft = oldGraphRange.m_TStart - graphRange.m_TStart;
44 44 auto deltaRight = graphRange.m_TEnd - oldGraphRange.m_TEnd;
45 45 varRangeRequested.m_TStart -= deltaLeft;
46 46 varRangeRequested.m_TEnd += deltaRight;
47 47 break;
48 48 }
49 49 case AcquisitionZoomType::PanRight: {
50 50 auto deltaRight = graphRange.m_TEnd - oldGraphRange.m_TEnd;
51 51 varRangeRequested.m_TStart += deltaRight;
52 52 varRangeRequested.m_TEnd += deltaRight;
53 53 break;
54 54 }
55 55 case AcquisitionZoomType::PanLeft: {
56 56 auto deltaLeft = oldGraphRange.m_TStart - graphRange.m_TStart;
57 57 varRangeRequested.m_TStart -= deltaLeft;
58 58 varRangeRequested.m_TEnd -= deltaLeft;
59 59 break;
60 60 }
61 61 case AcquisitionZoomType::Unknown: {
62 62 qCCritical(LOG_VariableController())
63 63 << VariableController::tr("Impossible to synchronize: zoom type unknown");
64 64 break;
65 65 }
66 66 default:
67 67 qCCritical(LOG_VariableController()) << VariableController::tr(
68 68 "Impossible to synchronize: zoom type not take into account");
69 69 // No action
70 70 break;
71 71 }
72 72
73 73 return varRangeRequested;
74 74 }
75 75 }
76 76
77 77 struct VariableController::VariableControllerPrivate {
78 78 explicit VariableControllerPrivate(VariableController *parent)
79 79 : m_WorkingMutex{},
80 80 m_VariableModel{new VariableModel{parent}},
81 81 m_VariableSelectionModel{new QItemSelectionModel{m_VariableModel, parent}},
82 82 m_VariableCacheStrategy{std::make_unique<VariableCacheStrategy>()},
83 83 m_VariableAcquisitionWorker{std::make_unique<VariableAcquisitionWorker>()},
84 84 q{parent}
85 85 {
86 86
87 87 m_VariableAcquisitionWorker->moveToThread(&m_VariableAcquisitionWorkerThread);
88 88 m_VariableAcquisitionWorkerThread.setObjectName("VariableAcquisitionWorkerThread");
89 89 }
90 90
91 91
92 92 virtual ~VariableControllerPrivate()
93 93 {
94 94 qCDebug(LOG_VariableController()) << tr("VariableControllerPrivate destruction");
95 95 m_VariableAcquisitionWorkerThread.quit();
96 96 m_VariableAcquisitionWorkerThread.wait();
97 97 }
98 98
99 99
100 100 void processRequest(std::shared_ptr<Variable> var, const SqpRange &rangeRequested,
101 101 QUuid varRequestId);
102 102
103 103 QVector<SqpRange> provideNotInCacheDateTimeList(std::shared_ptr<Variable> variable,
104 104 const SqpRange &dateTime);
105 105
106 106 std::shared_ptr<Variable> findVariable(QUuid vIdentifier);
107 107 std::shared_ptr<IDataSeries>
108 108 retrieveDataSeries(const QVector<AcquisitionDataPacket> acqDataPacketVector);
109 109
110 110 void registerProvider(std::shared_ptr<IDataProvider> provider);
111 111
112 112 void storeVariableRequest(QUuid varId, QUuid varRequestId, const VariableRequest &varRequest);
113 113 QUuid acceptVariableRequest(QUuid varId, std::shared_ptr<IDataSeries> dataSeries);
114 114 void updateVariableRequest(QUuid varRequestId);
115 115 void cancelVariableRequest(QUuid varRequestId);
116 116
117 117 QMutex m_WorkingMutex;
118 118 /// Variable model. The VariableController has the ownership
119 119 VariableModel *m_VariableModel;
120 120 QItemSelectionModel *m_VariableSelectionModel;
121 121
122 122
123 123 TimeController *m_TimeController{nullptr};
124 124 std::unique_ptr<VariableCacheStrategy> m_VariableCacheStrategy;
125 125 std::unique_ptr<VariableAcquisitionWorker> m_VariableAcquisitionWorker;
126 126 QThread m_VariableAcquisitionWorkerThread;
127 127
128 128 std::unordered_map<std::shared_ptr<Variable>, std::shared_ptr<IDataProvider> >
129 129 m_VariableToProviderMap;
130 130 std::unordered_map<std::shared_ptr<Variable>, QUuid> m_VariableToIdentifierMap;
131 131 std::map<QUuid, std::shared_ptr<VariableSynchronizationGroup> >
132 132 m_GroupIdToVariableSynchronizationGroupMap;
133 133 std::map<QUuid, QUuid> m_VariableIdGroupIdMap;
134 134 std::set<std::shared_ptr<IDataProvider> > m_ProviderSet;
135 135
136 136 std::map<QUuid, std::map<QUuid, VariableRequest> > m_VarRequestIdToVarIdVarRequestMap;
137 137
138 138 std::map<QUuid, std::deque<QUuid> > m_VarIdToVarRequestIdQueueMap;
139 139
140 140
141 141 VariableController *q;
142 142 };
143 143
144 144
145 145 VariableController::VariableController(QObject *parent)
146 146 : QObject{parent}, impl{spimpl::make_unique_impl<VariableControllerPrivate>(this)}
147 147 {
148 148 qCDebug(LOG_VariableController()) << tr("VariableController construction")
149 149 << QThread::currentThread();
150 150
151 151 connect(impl->m_VariableModel, &VariableModel::abortProgessRequested, this,
152 152 &VariableController::onAbortProgressRequested);
153 153
154 154 connect(impl->m_VariableAcquisitionWorker.get(), &VariableAcquisitionWorker::dataProvided, this,
155 155 &VariableController::onDataProvided);
156 156 connect(impl->m_VariableAcquisitionWorker.get(),
157 157 &VariableAcquisitionWorker::variableRequestInProgress, this,
158 158 &VariableController::onVariableRetrieveDataInProgress);
159 159
160 160 connect(&impl->m_VariableAcquisitionWorkerThread, &QThread::started,
161 161 impl->m_VariableAcquisitionWorker.get(), &VariableAcquisitionWorker::initialize);
162 162 connect(&impl->m_VariableAcquisitionWorkerThread, &QThread::finished,
163 163 impl->m_VariableAcquisitionWorker.get(), &VariableAcquisitionWorker::finalize);
164 164
165 165
166 166 impl->m_VariableAcquisitionWorkerThread.start();
167 167 }
168 168
169 169 VariableController::~VariableController()
170 170 {
171 171 qCDebug(LOG_VariableController()) << tr("VariableController destruction")
172 172 << QThread::currentThread();
173 173 this->waitForFinish();
174 174 }
175 175
176 176 VariableModel *VariableController::variableModel() noexcept
177 177 {
178 178 return impl->m_VariableModel;
179 179 }
180 180
181 181 QItemSelectionModel *VariableController::variableSelectionModel() noexcept
182 182 {
183 183 return impl->m_VariableSelectionModel;
184 184 }
185 185
186 186 void VariableController::setTimeController(TimeController *timeController) noexcept
187 187 {
188 188 impl->m_TimeController = timeController;
189 189 }
190 190
191 191 std::shared_ptr<Variable>
192 192 VariableController::cloneVariable(std::shared_ptr<Variable> variable) noexcept
193 193 {
194 194 if (impl->m_VariableModel->containsVariable(variable)) {
195 195 // Clones variable
196 196 auto duplicate = variable->clone();
197 197
198 198 // Adds clone to model
199 199 impl->m_VariableModel->addVariable(duplicate);
200 200
201 201 // Generates clone identifier
202 202 impl->m_VariableToIdentifierMap[duplicate] = QUuid::createUuid();
203 203
204 204 // Registers provider
205 205 auto variableProvider = impl->m_VariableToProviderMap.at(variable);
206 206 auto duplicateProvider = variableProvider != nullptr ? variableProvider->clone() : nullptr;
207 207
208 208 impl->m_VariableToProviderMap[duplicate] = duplicateProvider;
209 209 if (duplicateProvider) {
210 210 impl->registerProvider(duplicateProvider);
211 211 }
212 212
213 213 return duplicate;
214 214 }
215 215 else {
216 216 qCCritical(LOG_VariableController())
217 217 << tr("Can't create duplicate of variable %1: variable not registered in the model")
218 218 .arg(variable->name());
219 219 return nullptr;
220 220 }
221 221 }
222 222
223 223 void VariableController::deleteVariable(std::shared_ptr<Variable> variable) noexcept
224 224 {
225 225 if (!variable) {
226 226 qCCritical(LOG_VariableController()) << "Can't delete variable: variable is null";
227 227 return;
228 228 }
229 229
230 230 // Spreads in SciQlop that the variable will be deleted, so that potential receivers can
231 231 // make some treatments before the deletion
232 232 emit variableAboutToBeDeleted(variable);
233 233
234 234 // Deletes identifier
235 235 impl->m_VariableToIdentifierMap.erase(variable);
236 236
237 237 // Deletes provider
238 238 auto nbProvidersDeleted = impl->m_VariableToProviderMap.erase(variable);
239 239 qCDebug(LOG_VariableController())
240 240 << tr("Number of providers deleted for variable %1: %2")
241 241 .arg(variable->name(), QString::number(nbProvidersDeleted));
242 242
243 243
244 244 // Deletes from model
245 245 impl->m_VariableModel->deleteVariable(variable);
246 246 }
247 247
248 248 void VariableController::deleteVariables(
249 249 const QVector<std::shared_ptr<Variable> > &variables) noexcept
250 250 {
251 251 for (auto variable : qAsConst(variables)) {
252 252 deleteVariable(variable);
253 253 }
254 254 }
255 255
256 256 void VariableController::abortProgress(std::shared_ptr<Variable> variable)
257 257 {
258 258 }
259 259
260 260 std::shared_ptr<Variable>
261 261 VariableController::createVariable(const QString &name, const QVariantHash &metadata,
262 262 std::shared_ptr<IDataProvider> provider) noexcept
263 263 {
264 264 if (!impl->m_TimeController) {
265 265 qCCritical(LOG_VariableController())
266 266 << tr("Impossible to create variable: The time controller is null");
267 267 return nullptr;
268 268 }
269 269
270 270 auto range = impl->m_TimeController->dateTime();
271 271
272 272 if (auto newVariable = impl->m_VariableModel->createVariable(name, range, metadata)) {
273 273 auto identifier = QUuid::createUuid();
274 274
275 275 // store the provider
276 276 impl->registerProvider(provider);
277 277
278 278 // Associate the provider
279 279 impl->m_VariableToProviderMap[newVariable] = provider;
280 280 impl->m_VariableToIdentifierMap[newVariable] = identifier;
281 281
282 282
283 283 auto varRequestId = QUuid::createUuid();
284 284 qCInfo(LOG_VariableController()) << "processRequest for" << name << varRequestId;
285 285 impl->processRequest(newVariable, range, varRequestId);
286 286 impl->updateVariableRequest(varRequestId);
287 287
288 288 return newVariable;
289 289 }
290 290 }
291 291
292 292 void VariableController::onDateTimeOnSelection(const SqpRange &dateTime)
293 293 {
294 294 // TODO check synchronisation and Rescale
295 295 qCDebug(LOG_VariableController()) << "VariableController::onDateTimeOnSelection"
296 296 << QThread::currentThread()->objectName();
297 297 auto selectedRows = impl->m_VariableSelectionModel->selectedRows();
298 298 auto varRequestId = QUuid::createUuid();
299 299
300 300 for (const auto &selectedRow : qAsConst(selectedRows)) {
301 301 if (auto selectedVariable = impl->m_VariableModel->variable(selectedRow.row())) {
302 302 selectedVariable->setRange(dateTime);
303 303 impl->processRequest(selectedVariable, dateTime, varRequestId);
304 304
305 305 // notify that rescale operation has to be done
306 306 emit rangeChanged(selectedVariable, dateTime);
307 307 }
308 308 }
309 309 impl->updateVariableRequest(varRequestId);
310 310 }
311 311
312 312 void VariableController::onDataProvided(QUuid vIdentifier, const SqpRange &rangeRequested,
313 313 const SqpRange &cacheRangeRequested,
314 314 QVector<AcquisitionDataPacket> dataAcquired)
315 315 {
316 316 auto retrievedDataSeries = impl->retrieveDataSeries(dataAcquired);
317 317 auto varRequestId = impl->acceptVariableRequest(vIdentifier, retrievedDataSeries);
318 318 if (!varRequestId.isNull()) {
319 319 impl->updateVariableRequest(varRequestId);
320 320 }
321 321 }
322 322
323 323 void VariableController::onVariableRetrieveDataInProgress(QUuid identifier, double progress)
324 324 {
325 qCInfo(LOG_VariableController()) << "TORM: ariableController::onVariableRetrieveDataInProgress"
326 << QThread::currentThread()->objectName() << progress;
325 qCDebug(LOG_VariableController())
326 << "TORM: variableController::onVariableRetrieveDataInProgress"
327 << QThread::currentThread()->objectName() << progress;
327 328 if (auto var = impl->findVariable(identifier)) {
328 329 impl->m_VariableModel->setDataProgress(var, progress);
329 330 }
330 331 else {
331 332 qCCritical(LOG_VariableController())
332 333 << tr("Impossible to notify progression of a null variable");
333 334 }
334 335 }
335 336
336 337 void VariableController::onAbortProgressRequested(std::shared_ptr<Variable> variable)
337 338 {
338 339 qCDebug(LOG_VariableController()) << "TORM: VariableController::onAbortProgressRequested"
339 340 << QThread::currentThread()->objectName();
340 341
341 342 auto it = impl->m_VariableToIdentifierMap.find(variable);
342 343 if (it != impl->m_VariableToIdentifierMap.cend()) {
343 344 impl->m_VariableToProviderMap.at(variable)->requestDataAborting(it->second);
344 345 }
345 346 else {
346 347 qCWarning(LOG_VariableController())
347 348 << tr("Aborting progression of inexistant variable detected !!!")
348 349 << QThread::currentThread()->objectName();
349 350 }
350 351 }
351 352
352 353 void VariableController::onAddSynchronizationGroupId(QUuid synchronizationGroupId)
353 354 {
354 355 qCDebug(LOG_VariableController()) << "TORM: VariableController::onAddSynchronizationGroupId"
355 356 << QThread::currentThread()->objectName()
356 357 << synchronizationGroupId;
357 358 auto vSynchroGroup = std::make_shared<VariableSynchronizationGroup>();
358 359 impl->m_GroupIdToVariableSynchronizationGroupMap.insert(
359 360 std::make_pair(synchronizationGroupId, vSynchroGroup));
360 361 }
361 362
362 363 void VariableController::onRemoveSynchronizationGroupId(QUuid synchronizationGroupId)
363 364 {
364 365 impl->m_GroupIdToVariableSynchronizationGroupMap.erase(synchronizationGroupId);
365 366 }
366 367
367 368 void VariableController::onAddSynchronized(std::shared_ptr<Variable> variable,
368 369 QUuid synchronizationGroupId)
369 370
370 371 {
371 372 qCDebug(LOG_VariableController()) << "TORM: VariableController::onAddSynchronized"
372 373 << synchronizationGroupId;
373 374 auto varToVarIdIt = impl->m_VariableToIdentifierMap.find(variable);
374 375 if (varToVarIdIt != impl->m_VariableToIdentifierMap.cend()) {
375 376 auto groupIdToVSGIt
376 377 = impl->m_GroupIdToVariableSynchronizationGroupMap.find(synchronizationGroupId);
377 378 if (groupIdToVSGIt != impl->m_GroupIdToVariableSynchronizationGroupMap.cend()) {
378 379 impl->m_VariableIdGroupIdMap.insert(
379 380 std::make_pair(varToVarIdIt->second, synchronizationGroupId));
380 381 groupIdToVSGIt->second->addVariableId(varToVarIdIt->second);
381 382 }
382 383 else {
383 384 qCCritical(LOG_VariableController())
384 385 << tr("Impossible to synchronize a variable with an unknown sycnhronization group")
385 386 << variable->name();
386 387 }
387 388 }
388 389 else {
389 390 qCCritical(LOG_VariableController())
390 391 << tr("Impossible to synchronize a variable with no identifier") << variable->name();
391 392 }
392 393 }
393 394
394 395 void VariableController::desynchronize(std::shared_ptr<Variable> variable,
395 396 QUuid synchronizationGroupId)
396 397 {
397 398 // Gets variable id
398 399 auto variableIt = impl->m_VariableToIdentifierMap.find(variable);
399 400 if (variableIt == impl->m_VariableToIdentifierMap.cend()) {
400 401 qCCritical(LOG_VariableController())
401 402 << tr("Can't desynchronize variable %1: variable identifier not found")
402 403 .arg(variable->name());
403 404 return;
404 405 }
405 406
406 407 // Gets synchronization group
407 408 auto groupIt = impl->m_GroupIdToVariableSynchronizationGroupMap.find(synchronizationGroupId);
408 409 if (groupIt == impl->m_GroupIdToVariableSynchronizationGroupMap.cend()) {
409 410 qCCritical(LOG_VariableController())
410 411 << tr("Can't desynchronize variable %1: unknown synchronization group")
411 412 .arg(variable->name());
412 413 return;
413 414 }
414 415
415 416 auto variableId = variableIt->second;
416 417
417 418 // Removes variable from synchronization group
418 419 auto synchronizationGroup = groupIt->second;
419 420 synchronizationGroup->removeVariableId(variableId);
420 421
421 422 // Removes link between variable and synchronization group
422 423 impl->m_VariableIdGroupIdMap.erase(variableId);
423 424 }
424 425
425 426 void VariableController::onRequestDataLoading(QVector<std::shared_ptr<Variable> > variables,
426 427 const SqpRange &range, const SqpRange &oldRange,
427 428 bool synchronise)
428 429 {
429 430 // NOTE: oldRange isn't really necessary since oldRange == variable->range().
430 431
431 432 // we want to load data of the variable for the dateTime.
432 433 // First we check if the cache contains some of them.
433 434 // For the other, we ask the provider to give them.
434 435
435 436 auto varRequestId = QUuid::createUuid();
436 437 qCInfo(LOG_VariableController()) << "VariableController::onRequestDataLoading"
437 438 << QThread::currentThread()->objectName() << varRequestId;
438 439
439 440 for (const auto &var : variables) {
440 441 qCDebug(LOG_VariableController()) << "processRequest for" << var->name() << varRequestId;
441 442 impl->processRequest(var, range, varRequestId);
442 443 }
443 444
444 445 if (synchronise) {
445 446 // Get the group ids
446 447 qCDebug(LOG_VariableController())
447 448 << "TORM VariableController::onRequestDataLoading for synchro var ENABLE";
448 449 auto groupIds = std::set<QUuid>{};
449 450 auto groupIdToOldRangeMap = std::map<QUuid, SqpRange>{};
450 451 for (const auto &var : variables) {
451 452 auto varToVarIdIt = impl->m_VariableToIdentifierMap.find(var);
452 453 if (varToVarIdIt != impl->m_VariableToIdentifierMap.cend()) {
453 454 auto vId = varToVarIdIt->second;
454 455 auto varIdToGroupIdIt = impl->m_VariableIdGroupIdMap.find(vId);
455 456 if (varIdToGroupIdIt != impl->m_VariableIdGroupIdMap.cend()) {
456 457 auto gId = varIdToGroupIdIt->second;
457 458 groupIdToOldRangeMap.insert(std::make_pair(gId, var->range()));
458 459 if (groupIds.find(gId) == groupIds.cend()) {
459 460 qCDebug(LOG_VariableController()) << "Synchro detect group " << gId;
460 461 groupIds.insert(gId);
461 462 }
462 463 }
463 464 }
464 465 }
465 466
466 467 // We assume here all group ids exist
467 468 for (const auto &gId : groupIds) {
468 469 auto vSynchronizationGroup = impl->m_GroupIdToVariableSynchronizationGroupMap.at(gId);
469 470 auto vSyncIds = vSynchronizationGroup->getIds();
470 471 qCDebug(LOG_VariableController()) << "Var in synchro group ";
471 472 for (auto vId : vSyncIds) {
472 473 auto var = impl->findVariable(vId);
473 474
474 475 // Don't process already processed var
475 476 if (!variables.contains(var)) {
476 477 if (var != nullptr) {
477 478 qCDebug(LOG_VariableController()) << "processRequest synchro for"
478 479 << var->name();
479 480 auto vSyncRangeRequested = computeSynchroRangeRequested(
480 481 var->range(), range, groupIdToOldRangeMap.at(gId));
481 482 qCDebug(LOG_VariableController()) << "synchro RR" << vSyncRangeRequested;
482 483 impl->processRequest(var, vSyncRangeRequested, varRequestId);
483 484 }
484 485 else {
485 486 qCCritical(LOG_VariableController())
486 487
487 488 << tr("Impossible to synchronize a null variable");
488 489 }
489 490 }
490 491 }
491 492 }
492 493 }
493 494
494 495 impl->updateVariableRequest(varRequestId);
495 496 }
496 497
497 498
498 499 void VariableController::initialize()
499 500 {
500 501 qCDebug(LOG_VariableController()) << tr("VariableController init") << QThread::currentThread();
501 502 impl->m_WorkingMutex.lock();
502 503 qCDebug(LOG_VariableController()) << tr("VariableController init END");
503 504 }
504 505
505 506 void VariableController::finalize()
506 507 {
507 508 impl->m_WorkingMutex.unlock();
508 509 }
509 510
510 511 void VariableController::waitForFinish()
511 512 {
512 513 QMutexLocker locker{&impl->m_WorkingMutex};
513 514 }
514 515
515 516 AcquisitionZoomType VariableController::getZoomType(const SqpRange &range, const SqpRange &oldRange)
516 517 {
517 518 // t1.m_TStart <= t2.m_TStart && t2.m_TEnd <= t1.m_TEnd
518 519 auto zoomType = AcquisitionZoomType::Unknown;
519 520 if (range.m_TStart <= oldRange.m_TStart && oldRange.m_TEnd <= range.m_TEnd) {
520 521 zoomType = AcquisitionZoomType::ZoomOut;
521 522 }
522 523 else if (range.m_TStart > oldRange.m_TStart && range.m_TEnd > oldRange.m_TEnd) {
523 524 zoomType = AcquisitionZoomType::PanRight;
524 525 }
525 526 else if (range.m_TStart < oldRange.m_TStart && range.m_TEnd < oldRange.m_TEnd) {
526 527 zoomType = AcquisitionZoomType::PanLeft;
527 528 }
528 529 else if (range.m_TStart > oldRange.m_TStart && oldRange.m_TEnd > range.m_TEnd) {
529 530 zoomType = AcquisitionZoomType::ZoomIn;
530 531 }
531 532 else {
532 533 qCCritical(LOG_VariableController()) << "getZoomType: Unknown type detected";
533 534 }
534 535 return zoomType;
535 536 }
536 537
537 538 void VariableController::VariableControllerPrivate::processRequest(std::shared_ptr<Variable> var,
538 539 const SqpRange &rangeRequested,
539 540 QUuid varRequestId)
540 541 {
541 542
542 543 // TODO: protect at
543 544 auto varRequest = VariableRequest{};
544 545 auto varId = m_VariableToIdentifierMap.at(var);
545 546
546 547 auto varStrategyRangesRequested
547 548 = m_VariableCacheStrategy->computeStrategyRanges(var->range(), rangeRequested);
548 549 auto notInCacheRangeList = var->provideNotInCacheRangeList(varStrategyRangesRequested.second);
549 550 auto inCacheRangeList = var->provideInCacheRangeList(varStrategyRangesRequested.second);
550 551
551 552 if (!notInCacheRangeList.empty()) {
552 553 varRequest.m_RangeRequested = varStrategyRangesRequested.first;
553 554 varRequest.m_CacheRangeRequested = varStrategyRangesRequested.second;
554 555 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM processRequest RR ") << rangeRequested;
555 556 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM processRequest R ")
556 557 << varStrategyRangesRequested.first;
557 558 qCDebug(LOG_VariableAcquisitionWorker()) << tr("TORM processRequest CR ")
558 559 << varStrategyRangesRequested.second;
559 560 // store VarRequest
560 561 storeVariableRequest(varId, varRequestId, varRequest);
561 562
562 563 auto varProvider = m_VariableToProviderMap.at(var);
563 564 if (varProvider != nullptr) {
564 565 auto varRequestIdCanceled = m_VariableAcquisitionWorker->pushVariableRequest(
565 566 varRequestId, varId, varStrategyRangesRequested.first,
566 567 varStrategyRangesRequested.second,
567 568 DataProviderParameters{std::move(notInCacheRangeList), var->metadata()},
568 569 varProvider);
569 570
570 571 if (!varRequestIdCanceled.isNull()) {
571 572 qCInfo(LOG_VariableAcquisitionWorker()) << tr("varRequestIdCanceled: ")
572 573 << varRequestIdCanceled;
573 574 cancelVariableRequest(varRequestIdCanceled);
574 575 }
575 576 }
576 577 else {
577 578 qCCritical(LOG_VariableController())
578 579 << "Impossible to provide data with a null provider";
579 580 }
580 581
581 582 if (!inCacheRangeList.empty()) {
582 583 emit q->updateVarDisplaying(var, inCacheRangeList.first());
583 584 }
584 585 }
585 586 else {
586 587
587 588 varRequest.m_RangeRequested = varStrategyRangesRequested.first;
588 589 varRequest.m_CacheRangeRequested = varStrategyRangesRequested.second;
589 590 // store VarRequest
590 591 storeVariableRequest(varId, varRequestId, varRequest);
591 592 acceptVariableRequest(varId,
592 593 var->dataSeries()->subDataSeries(varStrategyRangesRequested.second));
593 594 }
594 595 }
595 596
596 597 std::shared_ptr<Variable>
597 598 VariableController::VariableControllerPrivate::findVariable(QUuid vIdentifier)
598 599 {
599 600 std::shared_ptr<Variable> var;
600 601 auto findReply = [vIdentifier](const auto &entry) { return vIdentifier == entry.second; };
601 602
602 603 auto end = m_VariableToIdentifierMap.cend();
603 604 auto it = std::find_if(m_VariableToIdentifierMap.cbegin(), end, findReply);
604 605 if (it != end) {
605 606 var = it->first;
606 607 }
607 608 else {
608 609 qCCritical(LOG_VariableController())
609 610 << tr("Impossible to find the variable with the identifier: ") << vIdentifier;
610 611 }
611 612
612 613 return var;
613 614 }
614 615
615 616 std::shared_ptr<IDataSeries> VariableController::VariableControllerPrivate::retrieveDataSeries(
616 617 const QVector<AcquisitionDataPacket> acqDataPacketVector)
617 618 {
618 619 qCDebug(LOG_VariableController()) << tr("TORM: retrieveDataSeries acqDataPacketVector size")
619 620 << acqDataPacketVector.size();
620 621 std::shared_ptr<IDataSeries> dataSeries;
621 622 if (!acqDataPacketVector.isEmpty()) {
622 623 dataSeries = acqDataPacketVector[0].m_DateSeries;
623 624 for (int i = 1; i < acqDataPacketVector.size(); ++i) {
624 625 dataSeries->merge(acqDataPacketVector[i].m_DateSeries.get());
625 626 }
626 627 }
627 628 qCDebug(LOG_VariableController()) << tr("TORM: retrieveDataSeries acqDataPacketVector size END")
628 629 << acqDataPacketVector.size();
629 630 return dataSeries;
630 631 }
631 632
632 633 void VariableController::VariableControllerPrivate::registerProvider(
633 634 std::shared_ptr<IDataProvider> provider)
634 635 {
635 636 if (m_ProviderSet.find(provider) == m_ProviderSet.end()) {
636 637 qCDebug(LOG_VariableController()) << tr("Registering of a new provider")
637 638 << provider->objectName();
638 639 m_ProviderSet.insert(provider);
639 640 connect(provider.get(), &IDataProvider::dataProvided, m_VariableAcquisitionWorker.get(),
640 641 &VariableAcquisitionWorker::onVariableDataAcquired);
641 642 connect(provider.get(), &IDataProvider::dataProvidedProgress,
642 643 m_VariableAcquisitionWorker.get(),
643 644 &VariableAcquisitionWorker::onVariableRetrieveDataInProgress);
644 645 }
645 646 else {
646 647 qCDebug(LOG_VariableController()) << tr("Cannot register provider, it already exists ");
647 648 }
648 649 }
649 650
650 651 void VariableController::VariableControllerPrivate::storeVariableRequest(
651 652 QUuid varId, QUuid varRequestId, const VariableRequest &varRequest)
652 653 {
653 654 // First request for the variable. we can create an entry for it
654 655 auto varIdToVarRequestIdQueueMapIt = m_VarIdToVarRequestIdQueueMap.find(varId);
655 656 if (varIdToVarRequestIdQueueMapIt == m_VarIdToVarRequestIdQueueMap.cend()) {
656 657 auto varRequestIdQueue = std::deque<QUuid>{};
657 658 qCDebug(LOG_VariableController()) << tr("Store REQUEST in QUEUE");
658 659 varRequestIdQueue.push_back(varRequestId);
659 660 m_VarIdToVarRequestIdQueueMap.insert(std::make_pair(varId, std::move(varRequestIdQueue)));
660 661 }
661 662 else {
662 663 qCDebug(LOG_VariableController()) << tr("Store REQUEST in EXISTING QUEUE");
663 664 auto &varRequestIdQueue = varIdToVarRequestIdQueueMapIt->second;
664 665 varRequestIdQueue.push_back(varRequestId);
665 666 }
666 667
667 668 auto varRequestIdToVarIdVarRequestMapIt = m_VarRequestIdToVarIdVarRequestMap.find(varRequestId);
668 669 if (varRequestIdToVarIdVarRequestMapIt == m_VarRequestIdToVarIdVarRequestMap.cend()) {
669 670 auto varIdToVarRequestMap = std::map<QUuid, VariableRequest>{};
670 671 varIdToVarRequestMap.insert(std::make_pair(varId, varRequest));
671 672 qCDebug(LOG_VariableController()) << tr("Store REQUESTID in MAP");
672 673 m_VarRequestIdToVarIdVarRequestMap.insert(
673 674 std::make_pair(varRequestId, std::move(varIdToVarRequestMap)));
674 675 }
675 676 else {
676 677 auto &varIdToVarRequestMap = varRequestIdToVarIdVarRequestMapIt->second;
677 678 qCDebug(LOG_VariableController()) << tr("Store REQUESTID in EXISTING MAP");
678 679 varIdToVarRequestMap.insert(std::make_pair(varId, varRequest));
679 680 }
680 681 }
681 682
682 683 QUuid VariableController::VariableControllerPrivate::acceptVariableRequest(
683 684 QUuid varId, std::shared_ptr<IDataSeries> dataSeries)
684 685 {
685 686 QUuid varRequestId;
686 687 auto varIdToVarRequestIdQueueMapIt = m_VarIdToVarRequestIdQueueMap.find(varId);
687 688 if (varIdToVarRequestIdQueueMapIt != m_VarIdToVarRequestIdQueueMap.cend()) {
688 689 auto &varRequestIdQueue = varIdToVarRequestIdQueueMapIt->second;
689 690 varRequestId = varRequestIdQueue.front();
690 691 auto varRequestIdToVarIdVarRequestMapIt
691 692 = m_VarRequestIdToVarIdVarRequestMap.find(varRequestId);
692 693 if (varRequestIdToVarIdVarRequestMapIt != m_VarRequestIdToVarIdVarRequestMap.cend()) {
693 694 auto &varIdToVarRequestMap = varRequestIdToVarIdVarRequestMapIt->second;
694 695 auto varIdToVarRequestMapIt = varIdToVarRequestMap.find(varId);
695 696 if (varIdToVarRequestMapIt != varIdToVarRequestMap.cend()) {
696 697 qCDebug(LOG_VariableController()) << tr("acceptVariableRequest");
697 698 auto &varRequest = varIdToVarRequestMapIt->second;
698 699 varRequest.m_DataSeries = dataSeries;
699 700 varRequest.m_CanUpdate = true;
700 701 }
701 702 else {
702 703 qCDebug(LOG_VariableController())
703 704 << tr("Impossible to acceptVariableRequest of a unknown variable id attached "
704 705 "to a variableRequestId")
705 706 << varRequestId << varId;
706 707 }
707 708 }
708 709 else {
709 710 qCCritical(LOG_VariableController())
710 711 << tr("Impossible to acceptVariableRequest of a unknown variableRequestId")
711 712 << varRequestId;
712 713 }
713 714
714 715 qCDebug(LOG_VariableController()) << tr("1: erase REQUEST in QUEUE ?")
715 716 << varRequestIdQueue.size();
716 717 varRequestIdQueue.pop_front();
717 718 qCDebug(LOG_VariableController()) << tr("2: erase REQUEST in QUEUE ?")
718 719 << varRequestIdQueue.size();
719 720 if (varRequestIdQueue.empty()) {
720 721 m_VarIdToVarRequestIdQueueMap.erase(varId);
721 722 }
722 723 }
723 724 else {
724 725 qCCritical(LOG_VariableController())
725 726 << tr("Impossible to acceptVariableRequest of a unknown variable id") << varId;
726 727 }
727 728
728 729 return varRequestId;
729 730 }
730 731
731 732 void VariableController::VariableControllerPrivate::updateVariableRequest(QUuid varRequestId)
732 733 {
733 734
734 735 auto varRequestIdToVarIdVarRequestMapIt = m_VarRequestIdToVarIdVarRequestMap.find(varRequestId);
735 736 if (varRequestIdToVarIdVarRequestMapIt != m_VarRequestIdToVarIdVarRequestMap.cend()) {
736 737 bool processVariableUpdate = true;
737 738 auto &varIdToVarRequestMap = varRequestIdToVarIdVarRequestMapIt->second;
738 739 for (auto varIdToVarRequestMapIt = varIdToVarRequestMap.cbegin();
739 740 (varIdToVarRequestMapIt != varIdToVarRequestMap.cend()) && processVariableUpdate;
740 741 ++varIdToVarRequestMapIt) {
741 742 processVariableUpdate &= varIdToVarRequestMapIt->second.m_CanUpdate;
742 743 qCDebug(LOG_VariableController()) << tr("updateVariableRequest")
743 744 << processVariableUpdate;
744 745 }
745 746
746 747 if (processVariableUpdate) {
747 748 for (auto varIdToVarRequestMapIt = varIdToVarRequestMap.cbegin();
748 749 varIdToVarRequestMapIt != varIdToVarRequestMap.cend(); ++varIdToVarRequestMapIt) {
749 750 if (auto var = findVariable(varIdToVarRequestMapIt->first)) {
750 751 auto &varRequest = varIdToVarRequestMapIt->second;
751 752 var->setRange(varRequest.m_RangeRequested);
752 753 var->setCacheRange(varRequest.m_CacheRangeRequested);
753 754 qCDebug(LOG_VariableController()) << tr("1: onDataProvided")
754 755 << varRequest.m_RangeRequested;
755 756 qCDebug(LOG_VariableController()) << tr("2: onDataProvided")
756 757 << varRequest.m_CacheRangeRequested;
757 758 var->mergeDataSeries(varRequest.m_DataSeries);
758 759 qCDebug(LOG_VariableController()) << tr("3: onDataProvided")
759 760 << varRequest.m_DataSeries->range();
760 761 qCDebug(LOG_VariableController()) << tr("4: onDataProvided");
761 762
762 763 /// @todo MPL: confirm
763 764 // Variable update is notified only if there is no pending request for it
764 765 if (m_VarIdToVarRequestIdQueueMap.count(varIdToVarRequestMapIt->first) == 0) {
765 766 emit var->updated();
766 767 }
767 768 }
768 769 else {
769 770 qCCritical(LOG_VariableController())
770 771 << tr("Impossible to update data to a null variable");
771 772 }
772 773 }
773 774
774 775 // cleaning varRequestId
775 776 qCDebug(LOG_VariableController()) << tr("0: erase REQUEST in MAP ?")
776 777 << m_VarRequestIdToVarIdVarRequestMap.size();
777 778 m_VarRequestIdToVarIdVarRequestMap.erase(varRequestId);
778 779 qCDebug(LOG_VariableController()) << tr("1: erase REQUEST in MAP ?")
779 780 << m_VarRequestIdToVarIdVarRequestMap.size();
780 781 }
781 782 }
782 783 else {
783 784 qCCritical(LOG_VariableController())
784 785 << tr("Cannot updateVariableRequest for a unknow varRequestId") << varRequestId;
785 786 }
786 787 }
787 788
788 789 void VariableController::VariableControllerPrivate::cancelVariableRequest(QUuid varRequestId)
789 790 {
790 791 // cleaning varRequestId
791 792 m_VarRequestIdToVarIdVarRequestMap.erase(varRequestId);
792 793
793 794 for (auto varIdToVarRequestIdQueueMapIt = m_VarIdToVarRequestIdQueueMap.begin();
794 795 varIdToVarRequestIdQueueMapIt != m_VarIdToVarRequestIdQueueMap.end();) {
795 796 auto &varRequestIdQueue = varIdToVarRequestIdQueueMapIt->second;
796 797 varRequestIdQueue.erase(
797 798 std::remove(varRequestIdQueue.begin(), varRequestIdQueue.end(), varRequestId),
798 799 varRequestIdQueue.end());
799 800 if (varRequestIdQueue.empty()) {
800 801 varIdToVarRequestIdQueueMapIt
801 802 = m_VarIdToVarRequestIdQueueMap.erase(varIdToVarRequestIdQueueMapIt);
802 803 }
803 804 else {
804 805 ++varIdToVarRequestIdQueueMapIt;
805 806 }
806 807 }
807 808 }
General Comments 0
You need to be logged in to leave comments. Login now