##// END OF EJS Templates
push method of worker return the id of the nextRange which is canceled
perrinel -
r625:d6648352006d
parent child
Show More
@@ -1,61 +1,61
1 1 #ifndef SCIQLOP_VARIABLEACQUISITIONWORKER_H
2 2 #define SCIQLOP_VARIABLEACQUISITIONWORKER_H
3 3
4 4 #include "CoreGlobal.h"
5 5
6 6 #include <Data/DataProviderParameters.h>
7 7 #include <QLoggingCategory>
8 8 #include <QObject>
9 9 #include <QUuid>
10 10
11 11 #include <Data/AcquisitionDataPacket.h>
12 12 #include <Data/IDataSeries.h>
13 13 #include <Data/SqpRange.h>
14 14
15 15 #include <QLoggingCategory>
16 16
17 17 #include <Common/spimpl.h>
18 18
19 19 Q_DECLARE_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker)
20 20
21 21 class Variable;
22 22 class IDataProvider;
23 23
24 24 /// This class aims to handle all acquisition request
25 25 class SCIQLOP_CORE_EXPORT VariableAcquisitionWorker : public QObject {
26 26 Q_OBJECT
27 27 public:
28 28 explicit VariableAcquisitionWorker(QObject *parent = 0);
29 29 virtual ~VariableAcquisitionWorker();
30 30
31 void pushVariableRequest(QUuid vIdentifier, SqpRange rangeRequested,
31 QUuid pushVariableRequest(QUuid varRequestId, QUuid vIdentifier, SqpRange rangeRequested,
32 32 SqpRange cacheRangeRequested, DataProviderParameters parameters,
33 33 std::shared_ptr<IDataProvider> provider);
34 34
35 35 void abortProgressRequested(QUuid vIdentifier);
36 36
37 37 void initialize();
38 38 void finalize();
39 39 signals:
40 40 void dataProvided(QUuid vIdentifier, const SqpRange &rangeRequested,
41 41 const SqpRange &cacheRangeRequested,
42 42 QVector<AcquisitionDataPacket> dataAcquired);
43 43
44 44 void variableRequestInProgress(QUuid vIdentifier, double progress);
45 45
46 46 public slots:
47 47 void onVariableDataAcquired(QUuid acqIdentifier, std::shared_ptr<IDataSeries> dataSeries,
48 48 SqpRange dataRangeAcquired);
49 49 void onVariableRetrieveDataInProgress(QUuid acqIdentifier, double progress);
50 50
51 51 private slots:
52 52 void onExecuteRequest(QUuid acqIdentifier);
53 53
54 54 private:
55 55 void waitForFinish();
56 56
57 57 class VariableAcquisitionWorkerPrivate;
58 58 spimpl::unique_impl_ptr<VariableAcquisitionWorkerPrivate> impl;
59 59 };
60 60
61 61 #endif // SCIQLOP_VARIABLEACQUISITIONWORKER_H
@@ -1,225 +1,238
1 1 #include "Variable/VariableAcquisitionWorker.h"
2 2
3 3 #include "Variable/Variable.h"
4 4
5 5 #include <Data/AcquisitionRequest.h>
6 6 #include <Data/SqpRange.h>
7 7
8 8 #include <unordered_map>
9 9 #include <utility>
10 10
11 11 #include <QMutex>
12 12 #include <QReadWriteLock>
13 13 #include <QThread>
14 14
15 15 Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker")
16 16
17 17 struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate {
18 18
19 19 explicit VariableAcquisitionWorkerPrivate() : m_Lock{QReadWriteLock::Recursive} {}
20 20
21 21 void lockRead() { m_Lock.lockForRead(); }
22 22 void lockWrite() { m_Lock.lockForWrite(); }
23 23 void unlock() { m_Lock.unlock(); }
24 24
25 25 void removeVariableRequest(QUuid vIdentifier);
26 26
27 27 QMutex m_WorkingMutex;
28 28 QReadWriteLock m_Lock;
29 29
30 30 std::map<QUuid, QVector<AcquisitionDataPacket> > m_AcqIdentifierToAcqDataPacketVectorMap;
31 31 std::map<QUuid, AcquisitionRequest> m_AcqIdentifierToAcqRequestMap;
32 32 std::map<QUuid, std::pair<QUuid, QUuid> > m_VIdentifierToCurrrentAcqIdNextIdPairMap;
33 33 };
34 34
35 35
36 36 VariableAcquisitionWorker::VariableAcquisitionWorker(QObject *parent)
37 37 : QObject{parent}, impl{spimpl::make_unique_impl<VariableAcquisitionWorkerPrivate>()}
38 38 {
39 39 }
40 40
41 41 VariableAcquisitionWorker::~VariableAcquisitionWorker()
42 42 {
43 43 qCInfo(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker destruction")
44 44 << QThread::currentThread();
45 45 this->waitForFinish();
46 46 }
47 47
48 48
49 void VariableAcquisitionWorker::pushVariableRequest(QUuid vIdentifier, SqpRange rangeRequested,
49 QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid vIdentifier,
50 SqpRange rangeRequested,
50 51 SqpRange cacheRangeRequested,
51 52 DataProviderParameters parameters,
52 53 std::shared_ptr<IDataProvider> provider)
53 54 {
54 55 qCInfo(LOG_VariableAcquisitionWorker())
55 56 << tr("TORM VariableAcquisitionWorker::pushVariableRequest ") << cacheRangeRequested;
57 auto varRequestIdCanceled = QUuid();
56 58
57 59 // Request creation
58 60 auto acqRequest = AcquisitionRequest{};
61 acqRequest.m_VarRequestId = varRequestId;
59 62 acqRequest.m_vIdentifier = vIdentifier;
60 63 acqRequest.m_DataProviderParameters = parameters;
61 64 acqRequest.m_RangeRequested = rangeRequested;
62 65 acqRequest.m_CacheRangeRequested = cacheRangeRequested;
63 66 acqRequest.m_Size = parameters.m_Times.size();
64 67 acqRequest.m_Provider = provider;
65 68
69
66 70 // Register request
67 71 impl->lockWrite();
68 72 impl->m_AcqIdentifierToAcqRequestMap.insert(
69 73 std::make_pair(acqRequest.m_AcqIdentifier, acqRequest));
70 74
71 75 auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
72 76 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
73 77 // A current request already exists, we can replace the next one
78 auto nextAcqId = it->second.second;
79 auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(nextAcqId);
80 if (acqIdentifierToAcqRequestMapIt != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
81 auto request = acqIdentifierToAcqRequestMapIt->second;
82 varRequestIdCanceled = request.m_VarRequestId;
83 }
84
74 85 it->second.second = acqRequest.m_AcqIdentifier;
75 86 impl->unlock();
76 87 }
77 88 else {
78 89 // First request for the variable, it must be stored and executed
79 90 impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.insert(
80 91 std::make_pair(vIdentifier, std::make_pair(acqRequest.m_AcqIdentifier, QUuid())));
81 92 impl->unlock();
82 93
83 94 QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
84 95 Q_ARG(QUuid, acqRequest.m_AcqIdentifier));
85 96 }
97
98 return varRequestIdCanceled;
86 99 }
87 100
88 101 void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier)
89 102 {
90 103 // TODO
91 104 }
92 105
93 106 void VariableAcquisitionWorker::onVariableRetrieveDataInProgress(QUuid acqIdentifier,
94 107 double progress)
95 108 {
96 109 // TODO
97 110 }
98 111
99 112 void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier,
100 113 std::shared_ptr<IDataSeries> dataSeries,
101 114 SqpRange dataRangeAcquired)
102 115 {
103 116 qCDebug(LOG_VariableAcquisitionWorker()) << tr("onVariableDataAcquired on range ")
104 117 << acqIdentifier << dataRangeAcquired;
105 118 impl->lockWrite();
106 119 auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
107 120 if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
108 121 // Store the result
109 122 auto dataPacket = AcquisitionDataPacket{};
110 123 dataPacket.m_Range = dataRangeAcquired;
111 124 dataPacket.m_DateSeries = dataSeries;
112 125
113 126 auto aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
114 127 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
115 128 // A current request result already exists, we can update it
116 129 aIdToADPVit->second.push_back(dataPacket);
117 130 }
118 131 else {
119 132 // First request result for the variable, it must be stored
120 133 impl->m_AcqIdentifierToAcqDataPacketVectorMap.insert(
121 134 std::make_pair(acqIdentifier, QVector<AcquisitionDataPacket>() << dataPacket));
122 135 }
123 136
124 137
125 138 // Decrement the counter of the request
126 139 auto &acqRequest = aIdToARit->second;
127 140 acqRequest.m_Size = acqRequest.m_Size - 1;
128 141
129 142 // if the counter is 0, we can return data then run the next request if it exists and
130 143 // removed the finished request
131 144 if (acqRequest.m_Size == 0) {
132 145 // Return the data
133 146 aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier);
134 147 if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) {
135 148 emit dataProvided(acqRequest.m_vIdentifier, acqRequest.m_RangeRequested,
136 149 acqRequest.m_CacheRangeRequested, aIdToADPVit->second);
137 150 }
138 151
139 152 // Execute the next one
140 153 auto it
141 154 = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(acqRequest.m_vIdentifier);
142 155
143 156 if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
144 157 if (it->second.second.isNull()) {
145 158 // There is no next request, we can remove the variable request
146 159 impl->removeVariableRequest(acqRequest.m_vIdentifier);
147 160 }
148 161 else {
149 162 auto acqIdentifierToRemove = it->second.first;
150 163 // Move the next request to the current request
151 164 it->second.first = it->second.second;
152 165 it->second.second = QUuid();
153 166 // Remove AcquisitionRequest and results;
154 167 impl->m_AcqIdentifierToAcqRequestMap.erase(acqIdentifierToRemove);
155 168 impl->m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqIdentifierToRemove);
156 169 // Execute the current request
157 170 QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
158 171 Q_ARG(QUuid, it->second.first));
159 172 }
160 173 }
161 174 else {
162 175 qCCritical(LOG_VariableAcquisitionWorker())
163 176 << tr("Impossible to execute the acquisition on an unfound variable ");
164 177 }
165 178 }
166 179 }
167 180 else {
168 181 qCCritical(LOG_VariableAcquisitionWorker())
169 182 << tr("Impossible to retrieve AcquisitionRequest for the incoming data");
170 183 }
171 184 impl->unlock();
172 185 }
173 186
174 187 void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier)
175 188 {
176 189 qCInfo(LOG_VariableAcquisitionWorker()) << tr("onExecuteRequest") << QThread::currentThread();
177 190 impl->lockRead();
178 191 auto it = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier);
179 192 if (it != impl->m_AcqIdentifierToAcqRequestMap.cend()) {
180 193 auto request = it->second;
181 194 impl->unlock();
182 195 request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters);
183 196 }
184 197 else {
185 198 impl->unlock();
186 199 // TODO log no acqIdentifier recognized
187 200 }
188 201 }
189 202
190 203 void VariableAcquisitionWorker::initialize()
191 204 {
192 205 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init")
193 206 << QThread::currentThread();
194 207 impl->m_WorkingMutex.lock();
195 208 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init END");
196 209 }
197 210
198 211 void VariableAcquisitionWorker::finalize()
199 212 {
200 213 impl->m_WorkingMutex.unlock();
201 214 }
202 215
203 216 void VariableAcquisitionWorker::waitForFinish()
204 217 {
205 218 QMutexLocker locker{&impl->m_WorkingMutex};
206 219 }
207 220
208 221 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariableRequest(
209 222 QUuid vIdentifier)
210 223 {
211 224 lockWrite();
212 225 auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier);
213 226
214 227 if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) {
215 228 // A current request already exists, we can replace the next one
216 229
217 230 m_AcqIdentifierToAcqRequestMap.erase(it->second.first);
218 231 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.first);
219 232
220 233 m_AcqIdentifierToAcqRequestMap.erase(it->second.second);
221 234 m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.second);
222 235 }
223 236 m_VIdentifierToCurrrentAcqIdNextIdPairMap.erase(vIdentifier);
224 237 unlock();
225 238 }
General Comments 0
You need to be logged in to leave comments. Login now