@@ -41,6 +41,14 const auto PLUGIN_DIRECTORY_NAME = QStringLiteral("plugins"); | |||||
41 |
|
41 | |||
42 | int main(int argc, char *argv[]) |
|
42 | int main(int argc, char *argv[]) | |
43 | { |
|
43 | { | |
|
44 | QLoggingCategory::setFilterRules( | |||
|
45 | "*.warning=false\n" | |||
|
46 | "*.info=false\n" | |||
|
47 | "*.debug=false\n" | |||
|
48 | "AmdaProvider.info=true\n" | |||
|
49 | "NetworkController.info=true\n" | |||
|
50 | "VariableAcquisitionWorker.info=true\n"); | |||
|
51 | ||||
44 | SqpApplication a{argc, argv}; |
|
52 | SqpApplication a{argc, argv}; | |
45 | SqpApplication::setOrganizationName("LPP"); |
|
53 | SqpApplication::setOrganizationName("LPP"); | |
46 | SqpApplication::setOrganizationDomain("lpp.fr"); |
|
54 | SqpApplication::setOrganizationDomain("lpp.fr"); |
@@ -7,6 +7,7 | |||||
7 |
|
7 | |||
8 | #include <Common/DateUtils.h> |
|
8 | #include <Common/DateUtils.h> | |
9 | #include <Common/MetaTypes.h> |
|
9 | #include <Common/MetaTypes.h> | |
|
10 | #include <Data/AcquisitionDataPacket.h> | |||
10 | #include <Data/DataProviderParameters.h> |
|
11 | #include <Data/DataProviderParameters.h> | |
11 | #include <Data/IDataProvider.h> |
|
12 | #include <Data/IDataProvider.h> | |
12 | #include <Data/SqpRange.h> |
|
13 | #include <Data/SqpRange.h> | |
@@ -31,6 +32,7 struct AcquisitionRequest { | |||||
31 | SqpRange m_CacheRangeRequested; |
|
32 | SqpRange m_CacheRangeRequested; | |
32 | int m_Size; |
|
33 | int m_Size; | |
33 | std::shared_ptr<IDataProvider> m_Provider; |
|
34 | std::shared_ptr<IDataProvider> m_Provider; | |
|
35 | QVector<AcquisitionDataPacket> m_DataPackets; | |||
34 | }; |
|
36 | }; | |
35 |
|
37 | |||
36 | SCIQLOP_REGISTER_META_TYPE(ACQUISITIONREQUEST_REGISTRY, AcquisitionRequest) |
|
38 | SCIQLOP_REGISTER_META_TYPE(ACQUISITIONREQUEST_REGISTRY, AcquisitionRequest) |
@@ -60,6 +60,7 signals: | |||||
60 | */ |
|
60 | */ | |
61 | void dataProvidedProgress(QUuid acqIdentifier, double progress); |
|
61 | void dataProvidedProgress(QUuid acqIdentifier, double progress); | |
62 |
|
62 | |||
|
63 | void requestCanceled(QUuid acqIdentifier); | |||
63 |
|
64 | |||
64 | /** |
|
65 | /** | |
65 | * @brief requestConstructed send a request for the data identified by acqIdentifier |
|
66 | * @brief requestConstructed send a request for the data identified by acqIdentifier |
@@ -46,6 +46,7 signals: | |||||
46 | public slots: |
|
46 | public slots: | |
47 | void onVariableDataAcquired(QUuid acqIdentifier, std::shared_ptr<IDataSeries> dataSeries, |
|
47 | void onVariableDataAcquired(QUuid acqIdentifier, std::shared_ptr<IDataSeries> dataSeries, | |
48 | SqpRange dataRangeAcquired); |
|
48 | SqpRange dataRangeAcquired); | |
|
49 | void onVariableAcquisitionCanceled(QUuid acqIdentifier); | |||
49 | void onVariableRetrieveDataInProgress(QUuid acqIdentifier, double progress); |
|
50 | void onVariableRetrieveDataInProgress(QUuid acqIdentifier, double progress); | |
50 |
|
51 | |||
51 | private: |
|
52 | private: |
@@ -33,19 +33,18 NetworkController::NetworkController(QObject *parent) | |||||
33 | void NetworkController::onProcessRequested(const QNetworkRequest &request, QUuid identifier, |
|
33 | void NetworkController::onProcessRequested(const QNetworkRequest &request, QUuid identifier, | |
34 | std::function<void(QNetworkReply *, QUuid)> callback) |
|
34 | std::function<void(QNetworkReply *, QUuid)> callback) | |
35 | { |
|
35 | { | |
36 | qCDebug(LOG_NetworkController()) |
|
|||
37 | << tr("NetworkController registered") << QThread::currentThread()->objectName(); |
|
|||
38 | auto reply = impl->m_AccessManager->get(request); |
|
|||
39 |
|
||||
40 | // Store the couple reply id |
|
36 | // Store the couple reply id | |
41 | impl->lockWrite(); |
|
37 | impl->lockWrite(); | |
|
38 | qCInfo(LOG_NetworkController()) << tr("NetworkController registered") << identifier; | |||
|
39 | auto reply = impl->m_AccessManager->get(request); | |||
|
40 | ||||
42 | impl->m_NetworkReplyToVariableId[reply] = identifier; |
|
41 | impl->m_NetworkReplyToVariableId[reply] = identifier; | |
|
42 | qCInfo(LOG_NetworkController()) << tr("Reply stored") << identifier; | |||
43 | impl->unlock(); |
|
43 | impl->unlock(); | |
44 |
|
44 | |||
45 | auto onReplyFinished = [reply, this, identifier, callback]() { |
|
45 | auto onReplyFinished = [reply, this, identifier, callback]() { | |
46 |
|
46 | |||
47 | qCDebug(LOG_NetworkController()) |
|
47 | qCInfo(LOG_NetworkController()) << tr("NetworkController onReplyFinished") << identifier; | |
48 | << tr("NetworkController onReplyFinished") << QThread::currentThread() << reply; |
|
|||
49 | impl->lockRead(); |
|
48 | impl->lockRead(); | |
50 | auto it = impl->m_NetworkReplyToVariableId.find(reply); |
|
49 | auto it = impl->m_NetworkReplyToVariableId.find(reply); | |
51 | impl->unlock(); |
|
50 | impl->unlock(); | |
@@ -112,18 +111,28 void NetworkController::finalize() | |||||
112 |
|
111 | |||
113 | void NetworkController::onReplyCanceled(QUuid identifier) |
|
112 | void NetworkController::onReplyCanceled(QUuid identifier) | |
114 | { |
|
113 | { | |
115 | auto findReply = [identifier](const auto &entry) { return identifier == entry.second; }; |
|
114 | // auto findReply = [identifier](const auto &entry) { return identifier == entry.second; }; | |
116 | qCDebug(LOG_NetworkController()) |
|
115 | qCDebug(LOG_NetworkController()) | |
117 | << tr("NetworkController onReplyCanceled") << QThread::currentThread(); |
|
116 | << tr("NetworkController onReplyCanceled") << QThread::currentThread(); | |
118 |
|
117 | |||
|
118 | qCInfo(LOG_NetworkController()) << tr("Calls cancel on ") << identifier; | |||
119 |
|
119 | |||
120 |
impl->lock |
|
120 | impl->lockWrite(); | |
121 |
auto |
|
121 | for (auto &reply : impl->m_NetworkReplyToVariableId) { | |
122 | auto it = std::find_if(impl->m_NetworkReplyToVariableId.cbegin(), end, findReply); |
|
122 | if (reply.second == identifier) { | |
123 | impl->unlock(); |
|
123 | qCInfo(LOG_NetworkController()) << tr("Cancel on ") << identifier << "applied"; | |
124 | if (it != end) { |
|
124 | reply.first->abort(); | |
125 | it->first->abort(); |
|
125 | } | |
126 | } |
|
126 | } | |
|
127 | impl->unlock(); | |||
|
128 | ||||
|
129 | // auto end = impl->m_NetworkReplyToVariableId.cend(); | |||
|
130 | // auto it = std::find_if(impl->m_NetworkReplyToVariableId.cbegin(), end, findReply); | |||
|
131 | // impl->unlock(); | |||
|
132 | // if (it != end) { | |||
|
133 | // qCInfo(LOG_NetworkController()) << tr("Cancel on ") << identifier << "applied"; | |||
|
134 | // it->first->abort(); | |||
|
135 | // } | |||
127 | qCDebug(LOG_NetworkController()) |
|
136 | qCDebug(LOG_NetworkController()) | |
128 | << tr("NetworkController onReplyCanceled END") << QThread::currentThread(); |
|
137 | << tr("NetworkController onReplyCanceled END") << QThread::currentThread(); | |
129 | } |
|
138 | } |
@@ -11,9 +11,17 | |||||
11 | #include <QMutex> |
|
11 | #include <QMutex> | |
12 | #include <QReadWriteLock> |
|
12 | #include <QReadWriteLock> | |
13 | #include <QThread> |
|
13 | #include <QThread> | |
|
14 | #include <QtConcurrent/QtConcurrent> | |||
14 |
|
15 | |||
15 | Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker") |
|
16 | Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker") | |
16 |
|
17 | |||
|
18 | namespace { | |||
|
19 | ||||
|
20 | using AcquisitionId = QUuid; | |||
|
21 | using VariableId = QUuid; | |||
|
22 | ||||
|
23 | } // namespace | |||
|
24 | ||||
17 | struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate { |
|
25 | struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate { | |
18 |
|
26 | |||
19 | explicit VariableAcquisitionWorkerPrivate() : m_Lock{QReadWriteLock::Recursive} {} |
|
27 | explicit VariableAcquisitionWorkerPrivate() : m_Lock{QReadWriteLock::Recursive} {} | |
@@ -22,14 +30,18 struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate { | |||||
22 | void lockWrite() { m_Lock.lockForWrite(); } |
|
30 | void lockWrite() { m_Lock.lockForWrite(); } | |
23 | void unlock() { m_Lock.unlock(); } |
|
31 | void unlock() { m_Lock.unlock(); } | |
24 |
|
32 | |||
|
33 | void eraseRequest(AcquisitionId id); | |||
|
34 | std::map<AcquisitionId, AcquisitionRequest>::iterator insertRequest(AcquisitionId id, | |||
|
35 | AcquisitionRequest request); | |||
|
36 | ||||
25 | void removeVariableRequest(QUuid vIdentifier); |
|
37 | void removeVariableRequest(QUuid vIdentifier); | |
26 |
|
38 | |||
27 | QMutex m_WorkingMutex; |
|
39 | QMutex m_WorkingMutex; | |
28 | QReadWriteLock m_Lock; |
|
40 | QReadWriteLock m_Lock; | |
29 |
|
41 | |||
30 | std::map<QUuid, QVector<AcquisitionDataPacket> > m_AcqIdentifierToAcqDataPacketVectorMap; |
|
42 | /// Current acquisitions (by variable) | |
31 |
std::map< |
|
43 | std::map<AcquisitionId, AcquisitionRequest> m_Requests; | |
32 | std::map<QUuid, std::pair<QUuid, QUuid> > m_VIdentifierToCurrrentAcqIdNextIdPairMap; |
|
44 | std::map<VariableId, AcquisitionRequest *> m_RequestsIndex; | |
33 | }; |
|
45 | }; | |
34 |
|
46 | |||
35 |
|
47 | |||
@@ -66,34 +78,33 QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid v | |||||
66 | acqRequest.m_Size = parameters.m_Times.size(); |
|
78 | acqRequest.m_Size = parameters.m_Times.size(); | |
67 | acqRequest.m_Provider = provider; |
|
79 | acqRequest.m_Provider = provider; | |
68 |
|
80 | |||
69 |
|
||||
70 | // Register request |
|
|||
71 | impl->lockWrite(); |
|
81 | impl->lockWrite(); | |
72 | impl->m_AcqIdentifierToAcqRequestMap.insert( |
|
|||
73 | std::make_pair(acqRequest.m_AcqIdentifier, acqRequest)); |
|
|||
74 |
|
||||
75 | auto it = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier); |
|
|||
76 | if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) { |
|
|||
77 | // A current request already exists, we can replace the next one |
|
|||
78 | auto nextAcqId = it->second.second; |
|
|||
79 | auto acqIdentifierToAcqRequestMapIt = impl->m_AcqIdentifierToAcqRequestMap.find(nextAcqId); |
|
|||
80 | if (acqIdentifierToAcqRequestMapIt != impl->m_AcqIdentifierToAcqRequestMap.cend()) { |
|
|||
81 | auto request = acqIdentifierToAcqRequestMapIt->second; |
|
|||
82 | varRequestIdCanceled = request.m_VarRequestId; |
|
|||
83 | } |
|
|||
84 |
|
82 | |||
85 | it->second.second = acqRequest.m_AcqIdentifier; |
|
83 | // Checks if there is a current acquisition on variable | |
86 | impl->unlock(); |
|
84 | auto currentRequestIt = impl->m_RequestsIndex.find(vIdentifier); | |
|
85 | if (currentRequestIt != impl->m_RequestsIndex.cend()) { | |||
|
86 | auto request = currentRequestIt->second; | |||
|
87 | QtConcurrent::run( | |||
|
88 | [ provider = request->m_Provider, acqIdentifier = request->m_AcqIdentifier ]() { | |||
|
89 | provider->requestDataAborting(acqIdentifier); | |||
|
90 | }); | |||
|
91 | varRequestIdCanceled = request->m_VarRequestId; | |||
|
92 | ||||
|
93 | impl->eraseRequest(request->m_AcqIdentifier); | |||
87 | } |
|
94 | } | |
88 | else { |
|
|||
89 | // First request for the variable, it must be stored and executed |
|
|||
90 | impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.insert( |
|
|||
91 | std::make_pair(vIdentifier, std::make_pair(acqRequest.m_AcqIdentifier, QUuid()))); |
|
|||
92 | impl->unlock(); |
|
|||
93 |
|
95 | |||
|
96 | // Sets the new acquisition request as the current request for the variable | |||
|
97 | auto newRequestIt = impl->insertRequest(acqRequest.m_AcqIdentifier, std::move(acqRequest)); | |||
|
98 | if (newRequestIt != impl->m_Requests.end()) { | |||
|
99 | qCInfo(LOG_VariableAcquisitionWorker()) << "EXECUTE REQUEST" << acqRequest.m_AcqIdentifier; | |||
94 | QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection, |
|
100 | QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection, | |
95 |
Q_ARG(QUuid, |
|
101 | Q_ARG(QUuid, newRequestIt->first)); | |
96 | } |
|
102 | } | |
|
103 | else { | |||
|
104 | /// @todo ALX : log | |||
|
105 | } | |||
|
106 | ||||
|
107 | impl->unlock(); | |||
97 |
|
108 | |||
98 | return varRequestIdCanceled; |
|
109 | return varRequestIdCanceled; | |
99 | } |
|
110 | } | |
@@ -116,71 +127,28 void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier, | |||||
116 | qCDebug(LOG_VariableAcquisitionWorker()) |
|
127 | qCDebug(LOG_VariableAcquisitionWorker()) | |
117 | << tr("onVariableDataAcquired on range ") << acqIdentifier << dataRangeAcquired; |
|
128 | << tr("onVariableDataAcquired on range ") << acqIdentifier << dataRangeAcquired; | |
118 | impl->lockWrite(); |
|
129 | impl->lockWrite(); | |
119 | auto aIdToARit = impl->m_AcqIdentifierToAcqRequestMap.find(acqIdentifier); |
|
|||
120 | if (aIdToARit != impl->m_AcqIdentifierToAcqRequestMap.cend()) { |
|
|||
121 | // Store the result |
|
|||
122 | auto dataPacket = AcquisitionDataPacket{}; |
|
|||
123 | dataPacket.m_Range = dataRangeAcquired; |
|
|||
124 | dataPacket.m_DateSeries = dataSeries; |
|
|||
125 |
|
||||
126 | auto aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier); |
|
|||
127 | if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) { |
|
|||
128 | // A current request result already exists, we can update it |
|
|||
129 | aIdToADPVit->second.push_back(dataPacket); |
|
|||
130 | } |
|
|||
131 | else { |
|
|||
132 | // First request result for the variable, it must be stored |
|
|||
133 | impl->m_AcqIdentifierToAcqDataPacketVectorMap.insert( |
|
|||
134 | std::make_pair(acqIdentifier, QVector<AcquisitionDataPacket>() << dataPacket)); |
|
|||
135 | } |
|
|||
136 |
|
130 | |||
|
131 | auto it = impl->m_Requests.find(acqIdentifier); | |||
|
132 | if (it != impl->m_Requests.cend()) { | |||
|
133 | auto &request = it->second; | |||
137 |
|
134 | |||
138 | // Decrement the counter of the request |
|
135 | // Store the result | |
139 | auto &acqRequest = aIdToARit->second; |
|
136 | auto dataPacket = AcquisitionDataPacket{dataSeries, dataRangeAcquired}; | |
140 | acqRequest.m_Size = acqRequest.m_Size - 1; |
|
137 | request.m_DataPackets.push_back(dataPacket); | |
141 |
|
138 | request.m_Size = request.m_Size - 1; | ||
142 | // if the counter is 0, we can return data then run the next request if it exists and |
|
139 | ||
143 | // removed the finished request |
|
140 | if (request.m_Size == 0) { | |
144 | if (acqRequest.m_Size == 0) { |
|
141 | emit dataProvided(request.m_vIdentifier, request.m_RangeRequested, | |
145 | // Return the data |
|
142 | request.m_CacheRangeRequested, request.m_DataPackets); | |
146 | aIdToADPVit = impl->m_AcqIdentifierToAcqDataPacketVectorMap.find(acqIdentifier); |
|
143 | impl->eraseRequest(acqIdentifier); | |
147 | if (aIdToADPVit != impl->m_AcqIdentifierToAcqDataPacketVectorMap.cend()) { |
|
|||
148 | emit dataProvided(acqRequest.m_vIdentifier, acqRequest.m_RangeRequested, |
|
|||
149 | acqRequest.m_CacheRangeRequested, aIdToADPVit->second); |
|
|||
150 | } |
|
|||
151 |
|
||||
152 | // Execute the next one |
|
|||
153 | auto it |
|
|||
154 | = impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(acqRequest.m_vIdentifier); |
|
|||
155 |
|
||||
156 | if (it != impl->m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) { |
|
|||
157 | if (it->second.second.isNull()) { |
|
|||
158 | // There is no next request, we can remove the variable request |
|
|||
159 | impl->removeVariableRequest(acqRequest.m_vIdentifier); |
|
|||
160 | } |
|
|||
161 | else { |
|
|||
162 | auto acqIdentifierToRemove = it->second.first; |
|
|||
163 | // Move the next request to the current request |
|
|||
164 | it->second.first = it->second.second; |
|
|||
165 | it->second.second = QUuid(); |
|
|||
166 | // Remove AcquisitionRequest and results; |
|
|||
167 | impl->m_AcqIdentifierToAcqRequestMap.erase(acqIdentifierToRemove); |
|
|||
168 | impl->m_AcqIdentifierToAcqDataPacketVectorMap.erase(acqIdentifierToRemove); |
|
|||
169 | // Execute the current request |
|
|||
170 | QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection, |
|
|||
171 | Q_ARG(QUuid, it->second.first)); |
|
|||
172 | } |
|
|||
173 | } |
|
|||
174 | else { |
|
|||
175 | qCCritical(LOG_VariableAcquisitionWorker()) |
|
|||
176 | << tr("Impossible to execute the acquisition on an unfound variable "); |
|
|||
177 | } |
|
|||
178 | } |
|
144 | } | |
179 | } |
|
145 | } | |
180 | else { |
|
146 | impl->unlock(); | |
181 | qCCritical(LOG_VariableAcquisitionWorker()) |
|
147 | } | |
182 | << tr("Impossible to retrieve AcquisitionRequest for the incoming data"); |
|
148 | ||
183 | } |
|
149 | void VariableAcquisitionWorker::onVariableAcquisitionCanceled(QUuid acqIdentifier) | |
|
150 | { | |||
|
151 | impl->lockWrite(); | |||
184 | impl->unlock(); |
|
152 | impl->unlock(); | |
185 | } |
|
153 | } | |
186 |
|
154 | |||
@@ -202,32 +170,51 void VariableAcquisitionWorker::waitForFinish() | |||||
202 | QMutexLocker locker{&impl->m_WorkingMutex}; |
|
170 | QMutexLocker locker{&impl->m_WorkingMutex}; | |
203 | } |
|
171 | } | |
204 |
|
172 | |||
205 | void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariableRequest( |
|
173 | ||
206 | QUuid vIdentifier) |
|
174 | void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::eraseRequest(AcquisitionId id) | |
207 | { |
|
175 | { | |
208 | lockWrite(); |
|
176 | auto it = m_Requests.find(id); | |
209 | auto it = m_VIdentifierToCurrrentAcqIdNextIdPairMap.find(vIdentifier); |
|
177 | if (it != m_Requests.end()) { | |
|
178 | // Removes from index | |||
|
179 | m_RequestsIndex.erase(it->second.m_vIdentifier); | |||
210 |
|
180 | |||
211 | if (it != m_VIdentifierToCurrrentAcqIdNextIdPairMap.cend()) { |
|
181 | // Removes request | |
212 | // A current request already exists, we can replace the next one |
|
182 | m_Requests.erase(it); | |
|
183 | } | |||
|
184 | } | |||
213 |
|
185 | |||
214 | m_AcqIdentifierToAcqRequestMap.erase(it->second.first); |
|
186 | std::map<AcquisitionId, AcquisitionRequest>::iterator | |
215 | m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.first); |
|
187 | VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::insertRequest( | |
|
188 | AcquisitionId id, AcquisitionRequest request) | |||
|
189 | { | |||
|
190 | // Inserts request | |||
|
191 | auto variableId = request.m_vIdentifier; | |||
|
192 | auto result = m_Requests.insert(std::make_pair(id, std::move(request))); | |||
|
193 | ||||
|
194 | if (result.second) { | |||
|
195 | // Inserts index | |||
|
196 | m_RequestsIndex[variableId] = &result.first->second; | |||
216 |
|
197 | |||
217 | m_AcqIdentifierToAcqRequestMap.erase(it->second.second); |
|
198 | return result.first; | |
218 | m_AcqIdentifierToAcqDataPacketVectorMap.erase(it->second.second); |
|
|||
219 | } |
|
199 | } | |
220 | m_VIdentifierToCurrrentAcqIdNextIdPairMap.erase(vIdentifier); |
|
200 | else { | |
221 | unlock(); |
|
201 | return m_Requests.end(); | |
|
202 | } | |||
|
203 | } | |||
|
204 | ||||
|
205 | void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariableRequest( | |||
|
206 | QUuid vIdentifier) | |||
|
207 | { | |||
|
208 | /// @todo ALX | |||
|
209 | // m_Acquisitions.erase(vIdentifier); | |||
222 | } |
|
210 | } | |
223 |
|
211 | |||
224 | void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier) |
|
212 | void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier) | |
225 | { |
|
213 | { | |
226 | qCDebug(LOG_VariableAcquisitionWorker()) << tr("onExecuteRequest") << QThread::currentThread(); |
|
|||
227 | impl->lockRead(); |
|
214 | impl->lockRead(); | |
228 |
auto it = impl->m_ |
|
215 | auto it = impl->m_Requests.find(acqIdentifier); | |
229 |
if (it != impl->m_ |
|
216 | if (it != impl->m_Requests.cend()) { | |
230 | auto request = it->second; |
|
217 | auto &request = it->second; | |
231 | impl->unlock(); |
|
218 | impl->unlock(); | |
232 | request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters); |
|
219 | request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters); | |
233 | } |
|
220 | } |
@@ -313,6 +313,7 void VariableController::onDataProvided(QUuid vIdentifier, const SqpRange &range | |||||
313 | const SqpRange &cacheRangeRequested, |
|
313 | const SqpRange &cacheRangeRequested, | |
314 | QVector<AcquisitionDataPacket> dataAcquired) |
|
314 | QVector<AcquisitionDataPacket> dataAcquired) | |
315 | { |
|
315 | { | |
|
316 | qCInfo(LOG_VariableController()) << "VariableController::onDataProvided"; | |||
316 | auto retrievedDataSeries = impl->retrieveDataSeries(dataAcquired); |
|
317 | auto retrievedDataSeries = impl->retrieveDataSeries(dataAcquired); | |
317 | auto varRequestId = impl->acceptVariableRequest(vIdentifier, retrievedDataSeries); |
|
318 | auto varRequestId = impl->acceptVariableRequest(vIdentifier, retrievedDataSeries); | |
318 | if (!varRequestId.isNull()) { |
|
319 | if (!varRequestId.isNull()) { | |
@@ -431,8 +432,8 void VariableController::onRequestDataLoading(QVector<std::shared_ptr<Variable> | |||||
431 | // For the other, we ask the provider to give them. |
|
432 | // For the other, we ask the provider to give them. | |
432 |
|
433 | |||
433 | auto varRequestId = QUuid::createUuid(); |
|
434 | auto varRequestId = QUuid::createUuid(); | |
434 |
qC |
|
435 | qCDebug(LOG_VariableController()) << "VariableController::onRequestDataLoading" | |
435 | << QThread::currentThread()->objectName() << varRequestId; |
|
436 | << QThread::currentThread()->objectName() << varRequestId; | |
436 |
|
437 | |||
437 | for (const auto &var : variables) { |
|
438 | for (const auto &var : variables) { | |
438 | qCDebug(LOG_VariableController()) << "processRequest for" << var->name() << varRequestId; |
|
439 | qCDebug(LOG_VariableController()) << "processRequest for" << var->name() << varRequestId; | |
@@ -549,10 +550,10 void VariableController::VariableControllerPrivate::processRequest(std::shared_p | |||||
549 | if (!notInCacheRangeList.empty()) { |
|
550 | if (!notInCacheRangeList.empty()) { | |
550 | varRequest.m_RangeRequested = varStrategyRangesRequested.first; |
|
551 | varRequest.m_RangeRequested = varStrategyRangesRequested.first; | |
551 | varRequest.m_CacheRangeRequested = varStrategyRangesRequested.second; |
|
552 | varRequest.m_CacheRangeRequested = varStrategyRangesRequested.second; | |
552 |
qC |
|
553 | qCInfo(LOG_VariableController()) << tr("TORM processRequest RR ") << rangeRequested; | |
553 |
qC |
|
554 | qCInfo(LOG_VariableController()) | |
554 | << tr("TORM processRequest R ") << varStrategyRangesRequested.first; |
|
555 | << tr("TORM processRequest R ") << varStrategyRangesRequested.first; | |
555 |
qC |
|
556 | qCInfo(LOG_VariableController()) | |
556 | << tr("TORM processRequest CR ") << varStrategyRangesRequested.second; |
|
557 | << tr("TORM processRequest CR ") << varStrategyRangesRequested.second; | |
557 | // store VarRequest |
|
558 | // store VarRequest | |
558 | storeVariableRequest(varId, varRequestId, varRequest); |
|
559 | storeVariableRequest(varId, varRequestId, varRequest); | |
@@ -566,7 +567,7 void VariableController::VariableControllerPrivate::processRequest(std::shared_p | |||||
566 | varProvider); |
|
567 | varProvider); | |
567 |
|
568 | |||
568 | if (!varRequestIdCanceled.isNull()) { |
|
569 | if (!varRequestIdCanceled.isNull()) { | |
569 |
qCInfo(LOG_Variable |
|
570 | qCInfo(LOG_VariableController()) | |
570 | << tr("varRequestIdCanceled: ") << varRequestIdCanceled; |
|
571 | << tr("varRequestIdCanceled: ") << varRequestIdCanceled; | |
571 | cancelVariableRequest(varRequestIdCanceled); |
|
572 | cancelVariableRequest(varRequestIdCanceled); | |
572 | } |
|
573 | } | |
@@ -652,12 +653,12 void VariableController::VariableControllerPrivate::storeVariableRequest( | |||||
652 | auto varIdToVarRequestIdQueueMapIt = m_VarIdToVarRequestIdQueueMap.find(varId); |
|
653 | auto varIdToVarRequestIdQueueMapIt = m_VarIdToVarRequestIdQueueMap.find(varId); | |
653 | if (varIdToVarRequestIdQueueMapIt == m_VarIdToVarRequestIdQueueMap.cend()) { |
|
654 | if (varIdToVarRequestIdQueueMapIt == m_VarIdToVarRequestIdQueueMap.cend()) { | |
654 | auto varRequestIdQueue = std::deque<QUuid>{}; |
|
655 | auto varRequestIdQueue = std::deque<QUuid>{}; | |
655 |
qC |
|
656 | qCInfo(LOG_VariableController()) << tr("Store REQUEST in QUEUE"); | |
656 | varRequestIdQueue.push_back(varRequestId); |
|
657 | varRequestIdQueue.push_back(varRequestId); | |
657 | m_VarIdToVarRequestIdQueueMap.insert(std::make_pair(varId, std::move(varRequestIdQueue))); |
|
658 | m_VarIdToVarRequestIdQueueMap.insert(std::make_pair(varId, std::move(varRequestIdQueue))); | |
658 | } |
|
659 | } | |
659 | else { |
|
660 | else { | |
660 |
qC |
|
661 | qCInfo(LOG_VariableController()) << tr("Store REQUEST in EXISTING QUEUE"); | |
661 | auto &varRequestIdQueue = varIdToVarRequestIdQueueMapIt->second; |
|
662 | auto &varRequestIdQueue = varIdToVarRequestIdQueueMapIt->second; | |
662 | varRequestIdQueue.push_back(varRequestId); |
|
663 | varRequestIdQueue.push_back(varRequestId); | |
663 | } |
|
664 | } | |
@@ -666,13 +667,13 void VariableController::VariableControllerPrivate::storeVariableRequest( | |||||
666 | if (varRequestIdToVarIdVarRequestMapIt == m_VarRequestIdToVarIdVarRequestMap.cend()) { |
|
667 | if (varRequestIdToVarIdVarRequestMapIt == m_VarRequestIdToVarIdVarRequestMap.cend()) { | |
667 | auto varIdToVarRequestMap = std::map<QUuid, VariableRequest>{}; |
|
668 | auto varIdToVarRequestMap = std::map<QUuid, VariableRequest>{}; | |
668 | varIdToVarRequestMap.insert(std::make_pair(varId, varRequest)); |
|
669 | varIdToVarRequestMap.insert(std::make_pair(varId, varRequest)); | |
669 |
qC |
|
670 | qCInfo(LOG_VariableController()) << tr("Store REQUESTID in MAP"); | |
670 | m_VarRequestIdToVarIdVarRequestMap.insert( |
|
671 | m_VarRequestIdToVarIdVarRequestMap.insert( | |
671 | std::make_pair(varRequestId, std::move(varIdToVarRequestMap))); |
|
672 | std::make_pair(varRequestId, std::move(varIdToVarRequestMap))); | |
672 | } |
|
673 | } | |
673 | else { |
|
674 | else { | |
674 | auto &varIdToVarRequestMap = varRequestIdToVarIdVarRequestMapIt->second; |
|
675 | auto &varIdToVarRequestMap = varRequestIdToVarIdVarRequestMapIt->second; | |
675 |
qC |
|
676 | qCInfo(LOG_VariableController()) << tr("Store REQUESTID in EXISTING MAP"); | |
676 | varIdToVarRequestMap.insert(std::make_pair(varId, varRequest)); |
|
677 | varIdToVarRequestMap.insert(std::make_pair(varId, varRequest)); | |
677 | } |
|
678 | } | |
678 | } |
|
679 | } |
@@ -13,6 +13,8 | |||||
13 | #include <QTemporaryFile> |
|
13 | #include <QTemporaryFile> | |
14 | #include <QThread> |
|
14 | #include <QThread> | |
15 |
|
15 | |||
|
16 | #include <QElapsedTimer> | |||
|
17 | ||||
16 | Q_LOGGING_CATEGORY(LOG_AmdaProvider, "AmdaProvider") |
|
18 | Q_LOGGING_CATEGORY(LOG_AmdaProvider, "AmdaProvider") | |
17 |
|
19 | |||
18 | namespace { |
|
20 | namespace { | |
@@ -117,16 +119,19 void AmdaProvider::retrieveData(QUuid token, const SqpRange &dateTime, const QVa | |||||
117 | auto endDate = dateFormat(dateTime.m_TEnd); |
|
119 | auto endDate = dateFormat(dateTime.m_TEnd); | |
118 |
|
120 | |||
119 | auto url = QUrl{QString{AMDA_URL_FORMAT}.arg(startDate, endDate, productId)}; |
|
121 | auto url = QUrl{QString{AMDA_URL_FORMAT}.arg(startDate, endDate, productId)}; | |
120 |
qCInfo(LOG_AmdaProvider()) << tr(" |
|
122 | qCInfo(LOG_AmdaProvider()) << token << tr("AmdaProvider::retrieveData url:") << url; | |
121 | auto tempFile = std::make_shared<QTemporaryFile>(); |
|
123 | auto tempFile = std::make_shared<QTemporaryFile>(); | |
122 |
|
124 | |||
123 | // LAMBDA |
|
125 | // LAMBDA | |
124 | auto httpDownloadFinished = [this, dateTime, tempFile, |
|
126 | auto httpDownloadFinished = [this, dateTime, tempFile, | |
125 | productValueType](QNetworkReply *reply, QUuid dataId) noexcept { |
|
127 | productValueType](QNetworkReply *reply, QUuid dataId) noexcept { | |
126 |
|
128 | |||
|
129 | qCInfo(LOG_AmdaProvider()) << "Download url file completed" << dataId; | |||
|
130 | ||||
127 | // Don't do anything if the reply was abort |
|
131 | // Don't do anything if the reply was abort | |
128 | if (reply->error() != QNetworkReply::OperationCanceledError) { |
|
132 | if (reply->error() != QNetworkReply::OperationCanceledError) { | |
129 |
|
133 | QElapsedTimer timer{}; | ||
|
134 | timer.start(); | |||
130 | if (tempFile) { |
|
135 | if (tempFile) { | |
131 | auto replyReadAll = reply->readAll(); |
|
136 | auto replyReadAll = reply->readAll(); | |
132 | if (!replyReadAll.isEmpty()) { |
|
137 | if (!replyReadAll.isEmpty()) { | |
@@ -137,6 +142,8 void AmdaProvider::retrieveData(QUuid token, const SqpRange &dateTime, const QVa | |||||
137 | // Parse results file |
|
142 | // Parse results file | |
138 | if (auto dataSeries |
|
143 | if (auto dataSeries | |
139 | = AmdaResultParser::readTxt(tempFile->fileName(), productValueType)) { |
|
144 | = AmdaResultParser::readTxt(tempFile->fileName(), productValueType)) { | |
|
145 | qCInfo(LOG_AmdaProvider()) << tr("Request was finished") << dataId << "(took" | |||
|
146 | << timer.elapsed() << "ms)"; | |||
140 | emit dataProvided(dataId, dataSeries, dateTime); |
|
147 | emit dataProvided(dataId, dataSeries, dateTime); | |
141 | } |
|
148 | } | |
142 | else { |
|
149 | else { | |
@@ -144,28 +151,31 void AmdaProvider::retrieveData(QUuid token, const SqpRange &dateTime, const QVa | |||||
144 | } |
|
151 | } | |
145 | } |
|
152 | } | |
146 | } |
|
153 | } | |
|
154 | else { | |||
|
155 | qCInfo(LOG_AmdaProvider()) | |||
|
156 | << "Request was canceled when downloading result file" << dataId; | |||
|
157 | } | |||
|
158 | }; | |||
|
159 | auto httpFinishedLambda = [this, httpDownloadFinished, tempFile](QNetworkReply *reply, | |||
|
160 | QUuid dataId) noexcept { | |||
|
161 | qCInfo(LOG_AmdaProvider()) << "Generating url file completed" << dataId; | |||
|
162 | // Don't do anything if the reply was abort | |||
|
163 | if (reply->error() != QNetworkReply::OperationCanceledError) { | |||
|
164 | auto downloadFileUrl = QUrl{QString{reply->readAll()}}; | |||
|
165 | ||||
|
166 | // Executes request for downloading file // | |||
147 |
|
167 | |||
|
168 | // Creates destination file | |||
|
169 | if (tempFile->open()) { | |||
|
170 | // Executes request | |||
|
171 | emit requestConstructed(QNetworkRequest{downloadFileUrl}, dataId, | |||
|
172 | httpDownloadFinished); | |||
|
173 | } | |||
|
174 | } | |||
|
175 | else { | |||
|
176 | qCInfo(LOG_AmdaProvider()) << "Request was canceled when generating file url" << dataId; | |||
|
177 | } | |||
148 | }; |
|
178 | }; | |
149 | auto httpFinishedLambda |
|
|||
150 | = [this, httpDownloadFinished, tempFile](QNetworkReply *reply, QUuid dataId) noexcept { |
|
|||
151 |
|
||||
152 | // Don't do anything if the reply was abort |
|
|||
153 | if (reply->error() != QNetworkReply::OperationCanceledError) { |
|
|||
154 | auto downloadFileUrl = QUrl{QString{reply->readAll()}}; |
|
|||
155 |
|
||||
156 |
|
||||
157 | qCInfo(LOG_AmdaProvider()) |
|
|||
158 | << tr("TORM AmdaProvider::retrieveData downloadFileUrl:") << downloadFileUrl; |
|
|||
159 | // Executes request for downloading file // |
|
|||
160 |
|
||||
161 | // Creates destination file |
|
|||
162 | if (tempFile->open()) { |
|
|||
163 | // Executes request |
|
|||
164 | emit requestConstructed(QNetworkRequest{downloadFileUrl}, dataId, |
|
|||
165 | httpDownloadFinished); |
|
|||
166 | } |
|
|||
167 | } |
|
|||
168 | }; |
|
|||
169 |
|
179 | |||
170 | // //////////////// // |
|
180 | // //////////////// // | |
171 | // Executes request // |
|
181 | // Executes request // |
General Comments 0
You need to be logged in to leave comments.
Login now