##// END OF EJS Templates
Correction for MR
perrinel -
r760:ba0e6f7d7791
parent child
Show More
@@ -1,141 +1,148
1 1 #include "Network/NetworkController.h"
2 2
3 3 #include <QMutex>
4 4 #include <QNetworkAccessManager>
5 5 #include <QNetworkReply>
6 6 #include <QNetworkRequest>
7 7 #include <QReadWriteLock>
8 8 #include <QThread>
9 9
10 10 #include <unordered_map>
11 11
12 12 Q_LOGGING_CATEGORY(LOG_NetworkController, "NetworkController")
13 13
14 14 struct NetworkController::NetworkControllerPrivate {
15 15 explicit NetworkControllerPrivate(NetworkController *parent) : m_WorkingMutex{} {}
16 16
17 17 void lockRead() { m_Lock.lockForRead(); }
18 18 void lockWrite() { m_Lock.lockForWrite(); }
19 19 void unlock() { m_Lock.unlock(); }
20 20
21 21 QMutex m_WorkingMutex;
22 22
23 23 QReadWriteLock m_Lock;
24 24 std::unordered_map<QNetworkReply *, QUuid> m_NetworkReplyToId;
25 25 std::unique_ptr<QNetworkAccessManager> m_AccessManager{nullptr};
26 26 };
27 27
28 28 NetworkController::NetworkController(QObject *parent)
29 29 : QObject(parent), impl{spimpl::make_unique_impl<NetworkControllerPrivate>(this)}
30 30 {
31 31 }
32 32
33 33 void NetworkController::onProcessRequested(std::shared_ptr<QNetworkRequest> request,
34 34 QUuid identifier,
35 35 std::function<void(QNetworkReply *, QUuid)> callback)
36 36 {
37 37 qCDebug(LOG_NetworkController()) << tr("NetworkController onProcessRequested")
38 38 << QThread::currentThread()->objectName() << &request;
39 39 auto reply = impl->m_AccessManager->get(*request);
40 40
41 41 // Store the couple reply id
42 42 impl->lockWrite();
43 43 impl->m_NetworkReplyToId[reply] = identifier;
44 44 qCDebug(LOG_NetworkController()) << tr("Store for reply: ") << identifier;
45 45 impl->unlock();
46 46
47 47 auto onReplyFinished = [request, reply, this, identifier, callback]() {
48 48
49 49 qCDebug(LOG_NetworkController()) << tr("NetworkController onReplyFinished")
50 50 << QThread::currentThread() << request.get() << reply;
51 51 impl->lockRead();
52 52 auto it = impl->m_NetworkReplyToId.find(reply);
53 impl->unlock();
54 53 if (it != impl->m_NetworkReplyToId.cend()) {
54 qCDebug(LOG_NetworkController()) << tr("Remove for reply: ") << it->second;
55 impl->unlock();
55 56 impl->lockWrite();
56 qCDebug(LOG_NetworkController()) << tr("Remove for reply: ")
57 << impl->m_NetworkReplyToId[reply];
58 57 impl->m_NetworkReplyToId.erase(reply);
59 58 impl->unlock();
60 59 // Deletes reply
61 60 callback(reply, identifier);
62 61 reply->deleteLater();
63 62 }
63 else {
64 impl->unlock();
65 }
64 66
65 67 qCDebug(LOG_NetworkController()) << tr("NetworkController onReplyFinished END")
66 68 << QThread::currentThread() << reply;
67 69 };
68 70
69 71 auto onReplyProgress = [reply, request, this](qint64 bytesRead, qint64 totalBytes) {
70 72
71 73 // NOTE: a totalbytes of 0 can happened when a request has been aborted
72 74 if (totalBytes > 0) {
73 75 double progress = (bytesRead * 100.0) / totalBytes;
74 76 qCDebug(LOG_NetworkController()) << tr("NetworkController onReplyProgress") << progress
75 77 << QThread::currentThread() << request.get() << reply
76 78 << bytesRead << totalBytes;
77 79 impl->lockRead();
78 80 auto it = impl->m_NetworkReplyToId.find(reply);
79 impl->unlock();
80 81 if (it != impl->m_NetworkReplyToId.cend()) {
81 emit this->replyDownloadProgress(it->second, request, progress);
82 auto id = it->second;
83 impl->unlock();
84 emit this->replyDownloadProgress(id, request, progress);
85 }
86 else {
87 impl->unlock();
82 88 }
89
83 90 qCDebug(LOG_NetworkController()) << tr("NetworkController onReplyProgress END")
84 91 << QThread::currentThread() << reply;
85 92 }
86 93 };
87 94
88 95
89 96 connect(reply, &QNetworkReply::finished, this, onReplyFinished);
90 97 connect(reply, &QNetworkReply::downloadProgress, this, onReplyProgress);
91 98 qCDebug(LOG_NetworkController()) << tr("NetworkController registered END")
92 99 << QThread::currentThread()->objectName() << reply;
93 100 }
94 101
95 102 void NetworkController::initialize()
96 103 {
97 104 qCDebug(LOG_NetworkController()) << tr("NetworkController init") << QThread::currentThread();
98 105 impl->m_WorkingMutex.lock();
99 106 impl->m_AccessManager = std::make_unique<QNetworkAccessManager>();
100 107
101 108
102 109 auto onReplyErrors = [this](QNetworkReply *reply, const QList<QSslError> &errors) {
103 110 qCCritical(LOG_NetworkController()) << tr("NetworkAcessManager errors: ") << errors;
104 111
105 112 };
106 113
107 114
108 115 connect(impl->m_AccessManager.get(), &QNetworkAccessManager::sslErrors, this, onReplyErrors);
109 116
110 117 qCDebug(LOG_NetworkController()) << tr("NetworkController init END");
111 118 }
112 119
113 120 void NetworkController::finalize()
114 121 {
115 122 impl->m_WorkingMutex.unlock();
116 123 }
117 124
118 125 void NetworkController::onReplyCanceled(QUuid identifier)
119 126 {
120 127 auto findReply = [identifier](const auto &entry) { return identifier == entry.second; };
121 128 qCDebug(LOG_NetworkController()) << tr("NetworkController onReplyCanceled")
122 129 << QThread::currentThread() << identifier;
123 130
124 131
125 132 impl->lockRead();
126 133 auto end = impl->m_NetworkReplyToId.cend();
127 134 auto it = std::find_if(impl->m_NetworkReplyToId.cbegin(), end, findReply);
128 135 impl->unlock();
129 136 if (it != end) {
130 137 qCDebug(LOG_NetworkController()) << tr("NetworkController onReplyCanceled ABORT DONE")
131 138 << QThread::currentThread() << identifier;
132 139 it->first->abort();
133 140 }
134 141 qCDebug(LOG_NetworkController()) << tr("NetworkController onReplyCanceled END")
135 142 << QThread::currentThread();
136 143 }
137 144
138 145 void NetworkController::waitForFinish()
139 146 {
140 147 QMutexLocker locker{&impl->m_WorkingMutex};
141 148 }
@@ -1,274 +1,277
1 1 #include "AmdaProvider.h"
2 2 #include "AmdaDefs.h"
3 3 #include "AmdaResultParser.h"
4 4
5 5 #include <Common/DateUtils.h>
6 6 #include <Data/DataProviderParameters.h>
7 7 #include <Network/NetworkController.h>
8 8 #include <SqpApplication.h>
9 9 #include <Variable/Variable.h>
10 10
11 11 #include <QNetworkAccessManager>
12 12 #include <QNetworkReply>
13 13 #include <QTemporaryFile>
14 14 #include <QThread>
15 15
16 16 Q_LOGGING_CATEGORY(LOG_AmdaProvider, "AmdaProvider")
17 17
18 18 namespace {
19 19
20 20 /// URL format for a request on AMDA server. The parameters are as follows:
21 21 /// - %1: start date
22 22 /// - %2: end date
23 23 /// - %3: parameter id
24 24 const auto AMDA_URL_FORMAT = QStringLiteral(
25 25 "http://amda.irap.omp.eu/php/rest/"
26 26 "getParameter.php?startTime=%1&stopTime=%2&parameterID=%3&outputFormat=ASCII&"
27 27 "timeFormat=ISO8601&gzip=0");
28 28
29 29 /// Dates format passed in the URL (e.g 2013-09-23T09:00)
30 30 const auto AMDA_TIME_FORMAT = QStringLiteral("yyyy-MM-ddThh:mm:ss");
31 31
32 32 /// Formats a time to a date that can be passed in URL
33 33 QString dateFormat(double sqpRange) noexcept
34 34 {
35 35 auto dateTime = DateUtils::dateTime(sqpRange);
36 36 return dateTime.toString(AMDA_TIME_FORMAT);
37 37 }
38 38
39 39 AmdaResultParser::ValueType valueType(const QString &valueType)
40 40 {
41 41 if (valueType == QStringLiteral("scalar")) {
42 42 return AmdaResultParser::ValueType::SCALAR;
43 43 }
44 44 else if (valueType == QStringLiteral("vector")) {
45 45 return AmdaResultParser::ValueType::VECTOR;
46 46 }
47 47 else {
48 48 return AmdaResultParser::ValueType::UNKNOWN;
49 49 }
50 50 }
51 51
52 52 } // namespace
53 53
54 54 AmdaProvider::AmdaProvider()
55 55 {
56 56 qCDebug(LOG_AmdaProvider()) << tr("AmdaProvider::AmdaProvider") << QThread::currentThread();
57 57 if (auto app = sqpApp) {
58 58 auto &networkController = app->networkController();
59 59 connect(this, SIGNAL(requestConstructed(std::shared_ptr<QNetworkRequest>, QUuid,
60 60 std::function<void(QNetworkReply *, QUuid)>)),
61 61 &networkController,
62 62 SLOT(onProcessRequested(std::shared_ptr<QNetworkRequest>, QUuid,
63 63 std::function<void(QNetworkReply *, QUuid)>)));
64 64
65 65
66 66 connect(&sqpApp->networkController(),
67 67 SIGNAL(replyDownloadProgress(QUuid, std::shared_ptr<QNetworkRequest>, double)),
68 68 this,
69 69 SLOT(onReplyDownloadProgress(QUuid, std::shared_ptr<QNetworkRequest>, double)));
70 70 }
71 71 }
72 72
73 73 std::shared_ptr<IDataProvider> AmdaProvider::clone() const
74 74 {
75 75 // No copy is made in the clone
76 76 return std::make_shared<AmdaProvider>();
77 77 }
78 78
79 79 void AmdaProvider::requestDataLoading(QUuid acqIdentifier, const DataProviderParameters &parameters)
80 80 {
81 81 // NOTE: Try to use multithread if possible
82 82 const auto times = parameters.m_Times;
83 83 const auto data = parameters.m_Data;
84 84 for (const auto &dateTime : qAsConst(times)) {
85 qCInfo(LOG_AmdaProvider()) << tr("TORM AmdaProvider::requestDataLoading ") << acqIdentifier
86 << dateTime;
85 87 this->retrieveData(acqIdentifier, dateTime, data);
86 88
87 89
88 90 // TORM when AMDA will support quick asynchrone request
89 91 QThread::msleep(1000);
90 92 }
91 93 }
92 94
93 95 void AmdaProvider::requestDataAborting(QUuid acqIdentifier)
94 96 {
95 97 if (auto app = sqpApp) {
96 98 auto &networkController = app->networkController();
97 99 networkController.onReplyCanceled(acqIdentifier);
98 100 }
99 101 }
100 102
101 103 void AmdaProvider::onReplyDownloadProgress(QUuid acqIdentifier,
102 104 std::shared_ptr<QNetworkRequest> networkRequest,
103 105 double progress)
104 106 {
105 qCInfo(LOG_AmdaProvider()) << tr("onReplyDownloadProgress") << acqIdentifier
106 << networkRequest.get() << progress;
107 qCDebug(LOG_AmdaProvider()) << tr("onReplyDownloadProgress") << acqIdentifier
108 << networkRequest.get() << progress;
107 109 auto acqIdToRequestProgressMapIt = m_AcqIdToRequestProgressMap.find(acqIdentifier);
108 110 if (acqIdToRequestProgressMapIt != m_AcqIdToRequestProgressMap.end()) {
109 111
112 // Update the progression for the current request
110 113 auto requestPtr = networkRequest;
111 114 auto findRequest = [requestPtr](const auto &entry) { return requestPtr == entry.first; };
112 115
113 116 auto &requestProgressMap = acqIdToRequestProgressMapIt->second;
114 117 auto requestProgressMapEnd = requestProgressMap.end();
115 118 auto requestProgressMapIt
116 119 = std::find_if(requestProgressMap.begin(), requestProgressMapEnd, findRequest);
117 120
118 121 if (requestProgressMapIt != requestProgressMapEnd) {
119 122 requestProgressMapIt->second = progress;
120 123 }
121 124 else {
122 125 // This case can happened when a progression is send after the request has been
123 126 // finished.
124 127 // Generaly the case when aborting a request
125 128 qCWarning(LOG_AmdaProvider()) << tr("Can't retrieve Request in progress")
126 129 << acqIdentifier << networkRequest.get() << progress;
127 130 }
128 }
129 131
130 acqIdToRequestProgressMapIt = m_AcqIdToRequestProgressMap.find(acqIdentifier);
131 if (acqIdToRequestProgressMapIt != m_AcqIdToRequestProgressMap.end()) {
132 // Compute the current final progress and notify it
132 133 double finalProgress = 0.0;
133 134
134 auto &requestProgressMap = acqIdToRequestProgressMapIt->second;
135 135 auto fraq = requestProgressMap.size();
136 136
137 137 for (auto requestProgress : requestProgressMap) {
138 138 finalProgress += requestProgress.second;
139 qCDebug(LOG_AmdaProvider()) << tr("current final progress without freq:")
139 qCDebug(LOG_AmdaProvider()) << tr("Current final progress without fraq:")
140 140 << finalProgress << requestProgress.second;
141 141 }
142 142
143 143 if (fraq > 0) {
144 144 finalProgress = finalProgress / fraq;
145 145 }
146 146
147 qCDebug(LOG_AmdaProvider()) << tr("2 onReplyDownloadProgress final progress") << fraq
148 << finalProgress;
147 qCDebug(LOG_AmdaProvider()) << tr("Current final progress: ") << fraq << finalProgress;
149 148 emit dataProvidedProgress(acqIdentifier, finalProgress);
150 149 }
151 150 else {
152 151 // This case can happened when a progression is send after the request has been finished.
153 152 // Generaly the case when aborting a request
154 153 emit dataProvidedProgress(acqIdentifier, 100.0);
155 154 }
156 155 }
157 156
158 157 void AmdaProvider::retrieveData(QUuid token, const SqpRange &dateTime, const QVariantHash &data)
159 158 {
160 159 // Retrieves product ID from data: if the value is invalid, no request is made
161 160 auto productId = data.value(AMDA_XML_ID_KEY).toString();
162 161 if (productId.isNull()) {
163 162 qCCritical(LOG_AmdaProvider()) << tr("Can't retrieve data: unknown product id");
164 163 return;
165 164 }
166 165
167 166 // Retrieves the data type that determines whether the expected format for the result file is
168 167 // scalar, vector...
169 168 auto productValueType = valueType(data.value(AMDA_DATA_TYPE_KEY).toString());
170 169
171 170 // /////////// //
172 171 // Creates URL //
173 172 // /////////// //
174 173
175 174 auto startDate = dateFormat(dateTime.m_TStart);
176 175 auto endDate = dateFormat(dateTime.m_TEnd);
177 176
178 177 auto url = QUrl{QString{AMDA_URL_FORMAT}.arg(startDate, endDate, productId)};
179 qCDebug(LOG_AmdaProvider()) << tr("TORM AmdaProvider::retrieveData url:") << url;
178 qCInfo(LOG_AmdaProvider()) << tr("TORM AmdaProvider::retrieveData url:") << url;
180 179 auto tempFile = std::make_shared<QTemporaryFile>();
181 180
182 181 // LAMBDA
183 182 auto httpDownloadFinished = [this, dateTime, tempFile,
184 183 productValueType](QNetworkReply *reply, QUuid dataId) noexcept {
185 184
186 185 // Don't do anything if the reply was abort
187 if (reply->error() != QNetworkReply::OperationCanceledError) {
186 if (reply->error() == QNetworkReply::NoError) {
188 187
189 188 if (tempFile) {
190 189 auto replyReadAll = reply->readAll();
191 190 if (!replyReadAll.isEmpty()) {
192 191 tempFile->write(replyReadAll);
193 192 }
194 193 tempFile->close();
195 194
196 195 // Parse results file
197 196 if (auto dataSeries
198 197 = AmdaResultParser::readTxt(tempFile->fileName(), productValueType)) {
199 198 emit dataProvided(dataId, dataSeries, dateTime);
200 199 }
201 200 else {
202 201 /// @todo ALX : debug
203 202 }
204 203 }
205 204 qCDebug(LOG_AmdaProvider()) << tr("acquisition requests erase because of finishing")
206 205 << dataId;
207 206 m_AcqIdToRequestProgressMap.erase(dataId);
208 207 }
208 else {
209 qCCritical(LOG_AmdaProvider()) << tr("httpDownloadFinished ERROR");
210 }
209 211
210 212 };
211 213 auto httpFinishedLambda
212 214 = [this, httpDownloadFinished, tempFile](QNetworkReply *reply, QUuid dataId) noexcept {
213 215
214 216 // Don't do anything if the reply was abort
215 if (reply->error() != QNetworkReply::OperationCanceledError) {
217 if (reply->error() == QNetworkReply::NoError) {
216 218 auto downloadFileUrl = QUrl{QString{reply->readAll()}};
217 219
218 qCDebug(LOG_AmdaProvider())
220 qCInfo(LOG_AmdaProvider())
219 221 << tr("TORM AmdaProvider::retrieveData downloadFileUrl:") << downloadFileUrl;
220 222 // Executes request for downloading file //
221 223
222 224 // Creates destination file
223 225 if (tempFile->open()) {
224 226 // Executes request and store the request for progression
225 227 auto request = std::make_shared<QNetworkRequest>(downloadFileUrl);
226 228 updateRequestProgress(dataId, request, 0.0);
227 229 emit requestConstructed(request, dataId, httpDownloadFinished);
228 230 }
229 231 }
230 232 else {
231 233 qCDebug(LOG_AmdaProvider())
232 234 << tr("acquisition requests erase because of aborting") << dataId;
235 qCCritical(LOG_AmdaProvider()) << tr("httpFinishedLambda ERROR");
233 236 m_AcqIdToRequestProgressMap.erase(dataId);
234 237 }
235 238 };
236 239
237 240 // //////////////// //
238 241 // Executes request //
239 242 // //////////////// //
240 243
241 244 auto request = std::make_shared<QNetworkRequest>(url);
242 245 qCDebug(LOG_AmdaProvider()) << tr("First Request creation") << request.get();
243 246 updateRequestProgress(token, request, 0.0);
244 247
245 248 emit requestConstructed(request, token, httpFinishedLambda);
246 249 }
247 250
248 251 void AmdaProvider::updateRequestProgress(QUuid acqIdentifier,
249 252 std::shared_ptr<QNetworkRequest> request, double progress)
250 253 {
251 254 auto acqIdToRequestProgressMapIt = m_AcqIdToRequestProgressMap.find(acqIdentifier);
252 255 if (acqIdToRequestProgressMapIt != m_AcqIdToRequestProgressMap.end()) {
253 256 auto &requestProgressMap = acqIdToRequestProgressMapIt->second;
254 257 auto requestProgressMapIt = requestProgressMap.find(request);
255 258 if (requestProgressMapIt != requestProgressMap.end()) {
256 259 requestProgressMapIt->second = progress;
257 260 qCDebug(LOG_AmdaProvider()) << tr("updateRequestProgress new progress for request")
258 261 << acqIdentifier << request.get() << progress;
259 262 }
260 263 else {
261 264 qCDebug(LOG_AmdaProvider()) << tr("updateRequestProgress new request") << acqIdentifier
262 265 << request.get() << progress;
263 266 acqIdToRequestProgressMapIt->second.insert(std::make_pair(request, progress));
264 267 }
265 268 }
266 269 else {
267 270 qCDebug(LOG_AmdaProvider()) << tr("updateRequestProgress new acqIdentifier")
268 271 << acqIdentifier << request.get() << progress;
269 272 auto requestProgressMap = std::map<std::shared_ptr<QNetworkRequest>, double>{};
270 273 requestProgressMap.insert(std::make_pair(request, progress));
271 274 m_AcqIdToRequestProgressMap.insert(
272 275 std::make_pair(acqIdentifier, std::move(requestProgressMap)));
273 276 }
274 277 }
General Comments 0
You need to be logged in to leave comments. Login now