##// END OF EJS Templates
commit VC
Alexandre Leroux -
r683:4d03130678b7
parent child
Show More
@@ -0,0 +1,68
1 #ifndef SCIQLOP_VARIABLESYNCHRONIZER_H
2 #define SCIQLOP_VARIABLESYNCHRONIZER_H
3
4 #include "CoreGlobal.h"
5
6 #include <Common/spimpl.h>
7
8 #include <QLoggingCategory>
9 #include <QUuid>
10
11 #include <memory>
12 #include <set>
13
14 Q_DECLARE_LOGGING_CATEGORY(LOG_VariableSynchronizer)
15
16 class Variable;
17
18 /**
19 * @brief The VariableAcquisitionWorker class handles synchronization between variables
20 */
21 class SCIQLOP_CORE_EXPORT VariableSynchronizer : public QObject {
22 Q_OBJECT
23 public:
24 using GroupId = QUuid;
25
26 explicit VariableSynchronizer(QObject *parent = 0);
27
28 /**
29 * Adds a synchronization group under the identifier passed as a parameter. If a group with this
30 * identifier already exists, the method does nothing.
31 * @param groupId the group identifier
32 */
33 void addGroup(GroupId groupId) noexcept;
34
35 /**
36 * Adds a variable to a synchronization group. If the variable is already registered under
37 * another group, or the synchronization doesn't exist, the method does nothing
38 * @param variable the variable to synchronize
39 * @param groupId the synchronization group identifier
40 */
41 void addVariable(std::shared_ptr<Variable> variable, GroupId groupId) noexcept;
42
43 /**
44 * Removes the synchronization group under the identifier passed as a parameter. If no group
45 * exists with this identifier, the method does nothing.
46 * @param groupId the group identifier
47 */
48 void removeGroup(GroupId groupId) noexcept;
49
50 /**
51 * Removes a variable from a synchronization group. If the synchronization group doesn't exist
52 * or if the variable isn't in it, the method does nothing
53 * @param variable the variable to desynchronize
54 * @param groupId the synchronization group identifier
55 */
56 void removeVariable(std::shared_ptr<Variable> variable, GroupId groupId) noexcept;
57
58 /// @return the variables in the same group than the variable passed as a parameter (including
59 /// the variable)
60 std::set<std::shared_ptr<Variable> >
61 synchronizedVariables(std::shared_ptr<Variable> variable) const noexcept;
62
63 private:
64 class VariableSynchronizerPrivate;
65 spimpl::unique_impl_ptr<VariableSynchronizerPrivate> impl;
66 };
67
68 #endif // SCIQLOP_VARIABLESYNCHRONIZER_H
@@ -0,0 +1,105
1 #include "Variable/VariableSynchronizer.h"
2
3 #include "Variable/Variable.h"
4
5 Q_LOGGING_CATEGORY(LOG_VariableSynchronizer, "VariableSynchronizer")
6
7 namespace {
8
9 using GroupId = VariableSynchronizer::GroupId;
10 using Group = std::set<std::shared_ptr<Variable> >;
11
12 } // namespace
13
14 struct VariableSynchronizer::VariableSynchronizerPrivate {
15 std::map<GroupId, Group> m_Groups;
16 std::map<std::shared_ptr<Variable>, Group *> m_GroupsIndex;
17 };
18
19 VariableSynchronizer::VariableSynchronizer(QObject *parent)
20 : QObject{parent}, impl{spimpl::make_unique_impl<VariableSynchronizerPrivate>()}
21 {
22 }
23
24 void VariableSynchronizer::addGroup(GroupId groupId) noexcept
25 {
26 if (impl->m_Groups.count(groupId) == 1) {
27 qCWarning(LOG_VariableSynchronizer())
28 << tr("Can't create new synchronization group: a "
29 "group already exists under the passed identifier");
30 return;
31 }
32
33 impl->m_Groups.insert(std::make_pair(groupId, Group{}));
34 }
35
36 void VariableSynchronizer::addVariable(std::shared_ptr<Variable> variable, GroupId groupId) noexcept
37 {
38 if (impl->m_GroupsIndex.count(variable) == 1) {
39 qCWarning(LOG_VariableSynchronizer())
40 << tr("Can't add variable to a new synchronization group: the variable is already in a "
41 "synchronization group");
42 return;
43 }
44
45 auto groupIt = impl->m_Groups.find(groupId);
46
47 if (groupIt == impl->m_Groups.end()) {
48 qCWarning(LOG_VariableSynchronizer())
49 << tr("Can't add variable to the synchronization group: no group exists under the "
50 "passed identifier");
51 return;
52 }
53
54 // Registers variable
55 groupIt->second.insert(variable);
56
57 // Creates index for variable
58 impl->m_GroupsIndex.insert(std::make_pair(variable, &groupIt->second));
59 }
60
61 void VariableSynchronizer::removeGroup(GroupId groupId) noexcept
62 {
63 auto groupIt = impl->m_Groups.find(groupId);
64
65 if (groupIt == impl->m_Groups.end()) {
66 qCWarning(LOG_VariableSynchronizer()) << tr(
67 "Can't remove synchronization group: no group exists under the passed identifier");
68 return;
69 }
70
71 // Removes indexes
72 for (const auto &variable : groupIt->second) {
73 impl->m_GroupsIndex.erase(variable);
74 }
75
76 // Removes group
77 impl->m_Groups.erase(groupIt);
78 }
79
80 void VariableSynchronizer::removeVariable(std::shared_ptr<Variable> variable,
81 GroupId groupId) noexcept
82 {
83 auto groupIt = impl->m_Groups.find(groupId);
84
85 if (groupIt == impl->m_Groups.end()) {
86 qCWarning(LOG_VariableSynchronizer())
87 << tr("Can't remove variable from synchronization group: no group exists under the "
88 "passed identifier");
89 return;
90 }
91
92 // Removes variable index
93 impl->m_GroupsIndex.erase(variable);
94
95 // Removes variable from group
96 groupIt->second.erase(variable);
97 }
98
99 std::set<std::shared_ptr<Variable> >
100 VariableSynchronizer::synchronizedVariables(std::shared_ptr<Variable> variable) const noexcept
101 {
102 auto groupIndexIt = impl->m_GroupsIndex.find(variable);
103 return groupIndexIt != impl->m_GroupsIndex.end() ? *groupIndexIt->second
104 : std::set<std::shared_ptr<Variable> >{};
105 }
@@ -41,13 +41,11 const auto PLUGIN_DIRECTORY_NAME = QStringLiteral("plugins");
41 41
42 42 int main(int argc, char *argv[])
43 43 {
44 // QLoggingCategory::setFilterRules(
45 // "*.warning=false\n"
46 // "*.info=false\n"
47 // "*.debug=false\n"
48 // "AmdaProvider.info=true\n"
49 // "NetworkController.info=true\n"
50 // "VariableAcquisitionWorker.info=true\n");
44 QLoggingCategory::setFilterRules(
45 "*.warning=false\n"
46 "*.info=false\n"
47 "*.debug=false\n"
48 "VariableController.info=true\n");
51 49
52 50 SqpApplication a{argc, argv};
53 51 SqpApplication::setOrganizationName("LPP");
@@ -17,8 +17,8
17 17 /**
18 18 * @brief The AcquisitionRequest struct holds the information of an variable request
19 19 */
20 struct AcquisitionRequest {
21 AcquisitionRequest()
20 struct Acquisition {
21 Acquisition()
22 22 {
23 23 m_AcqIdentifier = QUuid::createUuid();
24 24 m_Size = 0;
@@ -6,19 +6,42
6 6 #include <QUuid>
7 7
8 8 #include <Common/MetaTypes.h>
9 #include <Data/DataProviderParameters.h>
9 10 #include <Data/IDataSeries.h>
10 11 #include <Data/SqpRange.h>
11 12
12 13 #include <memory>
13 14
15 class DataProviderParameters;
16 class IDataProvider;
17
14 18 /**
15 19 * @brief The VariableRequest struct holds the information of an acquisition request
16 20 */
17 21 struct VariableRequest {
22 void addResult(std::shared_ptr<IDataSeries> dataSeries)
23 {
24 if (!m_Result) {
25 m_Result = dataSeries->clone();
26 }
27 else {
28 m_Result->merge(dataSeries.get());
29 }
30
31 ++m_ExecCount;
32 }
33
34 bool isFinished() const { return m_ProviderParameters.m_Times.size() == m_ExecCount; }
35
36 // Parameters
18 37 SqpRange m_RangeRequested{INVALID_RANGE};
19 38 SqpRange m_CacheRangeRequested{INVALID_RANGE};
20 std::shared_ptr<IDataSeries> m_DataSeries{nullptr};
21 bool m_CanUpdate{false};
39 std::shared_ptr<IDataProvider> m_Provider{nullptr};
40 DataProviderParameters m_ProviderParameters{};
41
42 // Results
43 std::shared_ptr<IDataSeries> m_Result{nullptr};
44 int m_ExecCount{0};
22 45 };
23 46
24 47 SCIQLOP_REGISTER_META_TYPE(VARIABLEREQUEST_REGISTRY, VariableRequest)
@@ -11,6 +11,7
11 11 #include <Data/AcquisitionDataPacket.h>
12 12 #include <Data/IDataSeries.h>
13 13 #include <Data/SqpRange.h>
14 #include <Data/VariableRequest.h>
14 15
15 16 #include <QLoggingCategory>
16 17
@@ -28,24 +29,19 public:
28 29 explicit VariableAcquisitionWorker(QObject *parent = 0);
29 30 virtual ~VariableAcquisitionWorker();
30 31
31 QUuid pushVariableRequest(QUuid varRequestId, QUuid vIdentifier, SqpRange rangeRequested,
32 SqpRange cacheRangeRequested, DataProviderParameters parameters,
33 std::shared_ptr<IDataProvider> provider);
32 void pushVariableRequest(std::shared_ptr<Variable> variable, VariableRequest request);
34 33
35 34 void abortProgressRequested(QUuid vIdentifier);
36 35
37 36 void initialize();
38 37 void finalize();
39 38 signals:
40 void dataProvided(QUuid vIdentifier, const SqpRange &rangeRequested,
41 const SqpRange &cacheRangeRequested,
42 QVector<AcquisitionDataPacket> dataAcquired);
43
39 void dataProvided(std::shared_ptr<Variable> variable, VariableRequest request);
44 40 void variableRequestInProgress(QUuid vIdentifier, double progress);
45 41
46 42 public slots:
47 void onVariableDataAcquired(QUuid acqIdentifier, std::shared_ptr<IDataSeries> dataSeries,
48 SqpRange dataRangeAcquired);
43 void onDataAcquired(QUuid acquisitionId, std::shared_ptr<IDataSeries> dataSeries,
44 SqpRange range);
49 45 void onVariableAcquisitionCanceled(QUuid acqIdentifier);
50 46 void onVariableRetrieveDataInProgress(QUuid acqIdentifier, double progress);
51 47
@@ -56,7 +52,7 private:
56 52 spimpl::unique_impl_ptr<VariableAcquisitionWorkerPrivate> impl;
57 53
58 54 private slots:
59 void onExecuteRequest(QUuid acqIdentifier);
55 void executeAcquisition(QUuid acquisitionId);
60 56 };
61 57
62 58 #endif // SCIQLOP_VARIABLEACQUISITIONWORKER_H
@@ -17,6 +17,7 class QItemSelectionModel;
17 17 class TimeController;
18 18 class Variable;
19 19 class VariableModel;
20 class VariableRequest;
20 21
21 22 Q_DECLARE_LOGGING_CATEGORY(LOG_VariableController)
22 23
@@ -78,8 +79,8 signals:
78 79
79 80 public slots:
80 81 /// Request the data loading of the variable whithin range
81 void onRequestDataLoading(QVector<std::shared_ptr<Variable> > variables, const SqpRange &range,
82 const SqpRange &oldRange, bool synchronise);
82 void onRequestDataLoading(const QVector<std::shared_ptr<Variable> > &variables,
83 const SqpRange &range, const SqpRange &oldRange, bool synchronise);
83 84 /**
84 85 * Creates a new variable and adds it to the model
85 86 * @param name the name of the new variable
@@ -93,10 +94,7 public slots:
93 94 /// Update the temporal parameters of every selected variable to dateTime
94 95 void onDateTimeOnSelection(const SqpRange &dateTime);
95 96
96
97 void onDataProvided(QUuid vIdentifier, const SqpRange &rangeRequested,
98 const SqpRange &cacheRangeRequested,
99 QVector<AcquisitionDataPacket> dataAcquired);
97 void onDataProvided(std::shared_ptr<Variable> variable, VariableRequest request);
100 98
101 99 void onVariableRetrieveDataInProgress(QUuid identifier, double progress);
102 100
@@ -2,9 +2,8
2 2
3 3 #include "Variable/Variable.h"
4 4
5 #include <Data/AcquisitionRequest.h>
5 #include <Data/IDataProvider.h>
6 6 #include <Data/SqpRange.h>
7
8 7 #include <unordered_map>
9 8 #include <utility>
10 9
@@ -20,6 +19,12 namespace {
20 19 using AcquisitionId = QUuid;
21 20 using VariableId = QUuid;
22 21
22 struct Acquisition {
23 std::shared_ptr<Variable> m_Variable{nullptr};
24 VariableRequest m_Request{};
25 AcquisitionId m_Id{QUuid::createUuid()};
26 };
27
23 28 } // namespace
24 29
25 30 struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate {
@@ -30,21 +35,42 struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate {
30 35 void lockWrite() { m_Lock.lockForWrite(); }
31 36 void unlock() { m_Lock.unlock(); }
32 37
33 void eraseRequest(AcquisitionId id);
34 std::map<AcquisitionId, AcquisitionRequest>::iterator insertRequest(AcquisitionId id,
35 AcquisitionRequest request);
38 void eraseAcquisition(AcquisitionId id)
39 {
40 auto it = m_Acquisitions.find(id);
41 if (it != m_Acquisitions.end()) {
42 // Removes from index
43 m_AcquisitionsIndex.erase(it->second.m_Variable);
36 44
37 void removeVariableRequest(QUuid vIdentifier);
45 // Removes request
46 m_Acquisitions.erase(it);
47 }
48 }
49
50 std::map<AcquisitionId, Acquisition>::iterator insertAcquisition(Acquisition acquisition)
51 {
52 auto variable = acquisition.m_Variable;
53
54 // Inserts acquisition
55 auto result
56 = m_Acquisitions.insert(std::make_pair(acquisition.m_Id, std::move(acquisition)));
57 if (result.second) {
58 // Inserts index
59 m_AcquisitionsIndex[variable] = &result.first->second;
60 return result.first;
61 }
62 else {
63 return m_Acquisitions.end();
64 }
65 }
38 66
39 67 QMutex m_WorkingMutex;
40 68 QReadWriteLock m_Lock;
41 69
42 /// Current acquisitions (by variable)
43 std::map<AcquisitionId, AcquisitionRequest> m_Requests;
44 std::map<VariableId, AcquisitionRequest *> m_RequestsIndex;
70 std::map<AcquisitionId, Acquisition> m_Acquisitions;
71 std::map<std::shared_ptr<Variable>, Acquisition *> m_AcquisitionsIndex;
45 72 };
46 73
47
48 74 VariableAcquisitionWorker::VariableAcquisitionWorker(QObject *parent)
49 75 : QObject{parent}, impl{spimpl::make_unique_impl<VariableAcquisitionWorkerPrivate>()}
50 76 {
@@ -57,56 +83,37 VariableAcquisitionWorker::~VariableAcquisitionWorker()
57 83 this->waitForFinish();
58 84 }
59 85
60
61 QUuid VariableAcquisitionWorker::pushVariableRequest(QUuid varRequestId, QUuid vIdentifier,
62 SqpRange rangeRequested,
63 SqpRange cacheRangeRequested,
64 DataProviderParameters parameters,
65 std::shared_ptr<IDataProvider> provider)
86 void VariableAcquisitionWorker::pushVariableRequest(std::shared_ptr<Variable> variable,
87 VariableRequest request)
66 88 {
67 qCDebug(LOG_VariableAcquisitionWorker())
68 << tr("TORM VariableAcquisitionWorker::pushVariableRequest ") << cacheRangeRequested;
69 auto varRequestIdCanceled = QUuid();
70
71 // Request creation
72 auto acqRequest = AcquisitionRequest{};
73 acqRequest.m_VarRequestId = varRequestId;
74 acqRequest.m_vIdentifier = vIdentifier;
75 acqRequest.m_DataProviderParameters = parameters;
76 acqRequest.m_RangeRequested = rangeRequested;
77 acqRequest.m_CacheRangeRequested = cacheRangeRequested;
78 acqRequest.m_Size = parameters.m_Times.size();
79 acqRequest.m_Provider = provider;
80
81 89 impl->lockWrite();
82 90
83 // Checks if there is a current acquisition on variable
84 auto currentRequestIt = impl->m_RequestsIndex.find(vIdentifier);
85 if (currentRequestIt != impl->m_RequestsIndex.cend()) {
86 auto request = currentRequestIt->second;
87 QtConcurrent::run(
88 [ provider = request->m_Provider, acqIdentifier = request->m_AcqIdentifier ]() {
89 provider->requestDataAborting(acqIdentifier);
90 });
91 varRequestIdCanceled = request->m_VarRequestId;
92
93 impl->eraseRequest(request->m_AcqIdentifier);
91 // Checks if there is a current request for variable
92 auto oldAcquisitionIt = impl->m_AcquisitionsIndex.find(variable);
93 if (oldAcquisitionIt != impl->m_AcquisitionsIndex.cend()) {
94 auto &oldAcquisition = *oldAcquisitionIt->second;
95 /// @todo ALX
96 // QtConcurrent::run(
97 // [ provider = request->m_Provider, acqIdentifier = request->m_AcqIdentifier ]()
98 // {
99 // provider->requestDataAborting(acqIdentifier);
100 // });
101
102 impl->eraseAcquisition(oldAcquisition.m_Id);
94 103 }
95 104
96 // Sets the new acquisition request as the current request for the variable
97 auto newRequestIt = impl->insertRequest(acqRequest.m_AcqIdentifier, std::move(acqRequest));
98 if (newRequestIt != impl->m_Requests.end()) {
99 qCInfo(LOG_VariableAcquisitionWorker()) << "EXECUTE REQUEST" << acqRequest.m_AcqIdentifier;
100 QMetaObject::invokeMethod(this, "onExecuteRequest", Qt::QueuedConnection,
101 Q_ARG(QUuid, newRequestIt->first));
105 // Sets request for variable
106 Acquisition newAcquisition{variable, std::move(request)};
107 auto newAcquisitionIt = impl->insertAcquisition(std::move(newAcquisition));
108 if (newAcquisitionIt != impl->m_Acquisitions.end()) {
109 impl->unlock();
110
111 QMetaObject::invokeMethod(this, "executeAcquisition", Qt::QueuedConnection,
112 Q_ARG(QUuid, newAcquisitionIt->first));
102 113 }
103 114 else {
104 /// @todo ALX : log
115 impl->unlock();
105 116 }
106
107 impl->unlock();
108
109 return varRequestIdCanceled;
110 117 }
111 118
112 119 void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier)
@@ -120,27 +127,25 void VariableAcquisitionWorker::onVariableRetrieveDataInProgress(QUuid acqIdenti
120 127 // TODO
121 128 }
122 129
123 void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier,
124 std::shared_ptr<IDataSeries> dataSeries,
125 SqpRange dataRangeAcquired)
130 void VariableAcquisitionWorker::onDataAcquired(QUuid acquisitionId,
131 std::shared_ptr<IDataSeries> dataSeries,
132 SqpRange range)
126 133 {
127 qCDebug(LOG_VariableAcquisitionWorker())
128 << tr("onVariableDataAcquired on range ") << acqIdentifier << dataRangeAcquired;
129 134 impl->lockWrite();
130 135
131 auto it = impl->m_Requests.find(acqIdentifier);
132 if (it != impl->m_Requests.cend()) {
133 auto &request = it->second;
136 auto it = impl->m_Acquisitions.find(acquisitionId);
137 if (it != impl->m_Acquisitions.cend()) {
138 auto &acquisition = it->second;
139 auto &request = acquisition.m_Request;
140
141 qInfo(LOG_VariableAcquisitionWorker()) << "Data acquired for " << printRange(range);
134 142
135 143 // Store the result
136 auto dataPacket = AcquisitionDataPacket{dataSeries, dataRangeAcquired};
137 request.m_DataPackets.push_back(dataPacket);
138 request.m_Size = request.m_Size - 1;
139
140 if (request.m_Size == 0) {
141 emit dataProvided(request.m_vIdentifier, request.m_RangeRequested,
142 request.m_CacheRangeRequested, request.m_DataPackets);
143 impl->eraseRequest(acqIdentifier);
144 request.addResult(dataSeries);
145
146 if (request.isFinished()) {
147 emit dataProvided(acquisition.m_Variable, std::move(request));
148 impl->eraseAcquisition(acquisitionId);
144 149 }
145 150 }
146 151 impl->unlock();
@@ -148,8 +153,6 void VariableAcquisitionWorker::onVariableDataAcquired(QUuid acqIdentifier,
148 153
149 154 void VariableAcquisitionWorker::onVariableAcquisitionCanceled(QUuid acqIdentifier)
150 155 {
151 impl->lockWrite();
152 impl->unlock();
153 156 }
154 157
155 158 void VariableAcquisitionWorker::initialize()
@@ -170,53 +173,14 void VariableAcquisitionWorker::waitForFinish()
170 173 QMutexLocker locker{&impl->m_WorkingMutex};
171 174 }
172 175
173
174 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::eraseRequest(AcquisitionId id)
175 {
176 auto it = m_Requests.find(id);
177 if (it != m_Requests.end()) {
178 // Removes from index
179 m_RequestsIndex.erase(it->second.m_vIdentifier);
180
181 // Removes request
182 m_Requests.erase(it);
183 }
184 }
185
186 std::map<AcquisitionId, AcquisitionRequest>::iterator
187 VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::insertRequest(
188 AcquisitionId id, AcquisitionRequest request)
189 {
190 // Inserts request
191 auto variableId = request.m_vIdentifier;
192 auto result = m_Requests.insert(std::make_pair(id, std::move(request)));
193
194 if (result.second) {
195 // Inserts index
196 m_RequestsIndex[variableId] = &result.first->second;
197
198 return result.first;
199 }
200 else {
201 return m_Requests.end();
202 }
203 }
204
205 void VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate::removeVariableRequest(
206 QUuid vIdentifier)
207 {
208 /// @todo ALX
209 // m_Acquisitions.erase(vIdentifier);
210 }
211
212 void VariableAcquisitionWorker::onExecuteRequest(QUuid acqIdentifier)
176 void VariableAcquisitionWorker::executeAcquisition(QUuid acquisitionId)
213 177 {
214 178 impl->lockRead();
215 auto it = impl->m_Requests.find(acqIdentifier);
216 if (it != impl->m_Requests.cend()) {
217 auto &request = it->second;
179 auto it = impl->m_Acquisitions.find(acquisitionId);
180 if (it != impl->m_Acquisitions.cend()) {
181 auto &request = it->second.m_Request;
218 182 impl->unlock();
219 request.m_Provider->requestDataLoading(acqIdentifier, request.m_DataProviderParameters);
183 request.m_Provider->requestDataLoading(acquisitionId, request.m_ProviderParameters);
220 184 }
221 185 else {
222 186 impl->unlock();
@@ -3,7 +3,7
3 3 #include <Variable/VariableCacheStrategy.h>
4 4 #include <Variable/VariableController.h>
5 5 #include <Variable/VariableModel.h>
6 #include <Variable/VariableSynchronizationGroup.h>
6 #include <Variable/VariableSynchronizer.h>
7 7
8 8 #include <Data/AcquisitionUtils.h>
9 9 #include <Data/DataProviderParameters.h>
@@ -17,8 +17,6
17 17 #include <QUuid>
18 18 #include <QtCore/QItemSelectionModel>
19 19
20 #include <deque>
21 #include <set>
22 20 #include <unordered_map>
23 21
24 22 Q_LOGGING_CATEGORY(LOG_VariableController, "VariableController")
@@ -82,6 +80,7 struct VariableController::VariableControllerPrivate {
82 80 m_VariableSelectionModel{new QItemSelectionModel{m_VariableModel, parent}},
83 81 m_VariableCacheStrategy{std::make_unique<VariableCacheStrategy>()},
84 82 m_VariableAcquisitionWorker{std::make_unique<VariableAcquisitionWorker>()},
83 m_VariableSynchronizer{std::make_unique<VariableSynchronizer>()},
85 84 q{parent}
86 85 {
87 86 m_VariableAcquisitionWorker->moveToThread(&m_VariableAcquisitionWorkerThread);
@@ -106,219 +105,47 struct VariableController::VariableControllerPrivate {
106 105 return;
107 106 }
108 107
108 variable->setRange(rangeRequested);
109
109 110 // Gets ranges in/out of variable range
110 111 auto requestRange
111 112 = m_VariableCacheStrategy->computeStrategyRanges(variable->range(), rangeRequested);
112 113 auto notInCacheRanges = variable->provideNotInCacheRangeList(requestRange.second);
113 auto inCacheRanges = variable->provideInCacheRangeList(requestRange.second);
114 114
115 115 // Creates request for out-of-cache ranges
116 116 if (!notInCacheRanges.isEmpty()) {
117 117 // Gets provider for request
118 118 if (auto provider = m_Providers.at(variable)) {
119 VariableRequest request{requestRange.first, requestRange.second};
120 // m_VariableAcquisitionWorker->pushVariableRequest();
119 DataProviderParameters providerParameters{std::move(notInCacheRanges),
120 variable->metadata()};
121 VariableRequest request{requestRange.first, requestRange.second, provider,
122 std::move(providerParameters)};
123 m_VariableAcquisitionWorker->pushVariableRequest(variable, std::move(request));
121 124 }
122 }
123
124 // Calls UI update for in-cache range
125 if (!inCacheRanges.isEmpty()) {
126 emit q->updateVarDisplaying(variable, inCacheRanges.first());
127 }
128 }
129 125
130 std::shared_ptr<Variable> findVariable(QUuid vIdentifier)
131 {
132 /// @todo ALX
133 std::shared_ptr<Variable> var;
134 auto findReply = [vIdentifier](const auto &entry) { return vIdentifier == entry.second; };
135
136 auto end = m_VariableToIdentifierMap.cend();
137 auto it = std::find_if(m_VariableToIdentifierMap.cbegin(), end, findReply);
138 if (it != end) {
139 var = it->first;
126 // Calls UI update for in-cache range
127 auto inCacheRanges = variable->provideInCacheRangeList(requestRange.second);
128 if (!inCacheRanges.isEmpty()) {
129 emit q->updateVarDisplaying(variable, inCacheRanges.first());
130 }
140 131 }
141 132 else {
142 qCCritical(LOG_VariableController())
143 << tr("Impossible to find the variable with the identifier: ") << vIdentifier;
144 }
145
146 return var;
147 }
148 std::shared_ptr<IDataSeries>
149 retrieveDataSeries(const QVector<AcquisitionDataPacket> acqDataPacketVector)
150 {
151 /// @todo ALX
152 qCDebug(LOG_VariableController()) << tr("TORM: retrieveDataSeries acqDataPacketVector size")
153 << acqDataPacketVector.size();
154 std::shared_ptr<IDataSeries> dataSeries;
155 if (!acqDataPacketVector.isEmpty()) {
156 dataSeries = acqDataPacketVector[0].m_DateSeries;
157 for (int i = 1; i < acqDataPacketVector.size(); ++i) {
158 dataSeries->merge(acqDataPacketVector[i].m_DateSeries.get());
159 }
133 // No request to make: we simply update variable ranges
134 variable->setCacheRange(requestRange.second);
135 emit variable->updated();
160 136 }
161 qCDebug(LOG_VariableController())
162 << tr("TORM: retrieveDataSeries acqDataPacketVector size END")
163 << acqDataPacketVector.size();
164 return dataSeries;
165 137 }
166 138
167 139 void registerProvider(std::shared_ptr<IDataProvider> provider)
168 140 {
169 141 Q_ASSERT(provider != nullptr);
170 142 connect(provider.get(), &IDataProvider::dataProvided, m_VariableAcquisitionWorker.get(),
171 &VariableAcquisitionWorker::onVariableDataAcquired);
143 &VariableAcquisitionWorker::onDataAcquired);
172 144 connect(provider.get(), &IDataProvider::dataProvidedProgress,
173 145 m_VariableAcquisitionWorker.get(),
174 146 &VariableAcquisitionWorker::onVariableRetrieveDataInProgress);
175 147 }
176 148
177 QUuid acceptVariableRequest(QUuid varId, std::shared_ptr<IDataSeries> dataSeries)
178 {
179 /// @todo ALX
180 QUuid varRequestId;
181 // auto varIdToVarRequestIdQueueMapIt = m_VarIdToVarRequestIdQueueMap.find(varId);
182 // if (varIdToVarRequestIdQueueMapIt != m_VarIdToVarRequestIdQueueMap.cend()) {
183 // auto &varRequestIdQueue = varIdToVarRequestIdQueueMapIt->second;
184 // varRequestId = varRequestIdQueue.front();
185 // auto varRequestIdToVarIdVarRequestMapIt
186 // = m_VarRequestIdToVarIdVarRequestMap.find(varRequestId);
187 // if (varRequestIdToVarIdVarRequestMapIt !=
188 // m_VarRequestIdToVarIdVarRequestMap.cend()) {
189 // auto &varIdToVarRequestMap = varRequestIdToVarIdVarRequestMapIt->second;
190 // auto varIdToVarRequestMapIt = varIdToVarRequestMap.find(varId);
191 // if (varIdToVarRequestMapIt != varIdToVarRequestMap.cend()) {
192 // qCDebug(LOG_VariableController()) << tr("acceptVariableRequest");
193 // auto &varRequest = varIdToVarRequestMapIt->second;
194 // varRequest.m_DataSeries = dataSeries;
195 // varRequest.m_CanUpdate = true;
196 // }
197 // else {
198 // qCDebug(LOG_VariableController()) << tr("Impossible to
199 // acceptVariableRequest "
200 // "of a unknown variable id
201 // attached "
202 // "to a variableRequestId")
203 // << varRequestId << varId;
204 // }
205 // }
206 // else {
207 // qCCritical(LOG_VariableController())
208 // << tr("Impossible to acceptVariableRequest of a unknown
209 // variableRequestId")
210 // << varRequestId;
211 // }
212
213 // qCDebug(LOG_VariableController())
214 // << tr("1: erase REQUEST in QUEUE ?") << varRequestIdQueue.size();
215 // varRequestIdQueue.pop_front();
216 // qCDebug(LOG_VariableController())
217 // << tr("2: erase REQUEST in QUEUE ?") << varRequestIdQueue.size();
218 // if (varRequestIdQueue.empty()) {
219 // m_VarIdToVarRequestIdQueueMap.erase(varId);
220 // }
221 // }
222 // else {
223 // qCCritical(LOG_VariableController())
224 // << tr("Impossible to acceptVariableRequest of a unknown variable id") <<
225 // varId;
226 // }
227
228 return varRequestId;
229 }
230 void updateVariableRequest(QUuid varRequestId)
231 {
232 /// @todo ALX
233 // auto varRequestIdToVarIdVarRequestMapIt
234 // = m_VarRequestIdToVarIdVarRequestMap.find(varRequestId);
235 // if (varRequestIdToVarIdVarRequestMapIt !=
236 // m_VarRequestIdToVarIdVarRequestMap.cend()) {
237 // bool processVariableUpdate = true;
238 // auto &varIdToVarRequestMap = varRequestIdToVarIdVarRequestMapIt->second;
239 // for (auto varIdToVarRequestMapIt = varIdToVarRequestMap.cbegin();
240 // (varIdToVarRequestMapIt != varIdToVarRequestMap.cend()) &&
241 // processVariableUpdate;
242 // ++varIdToVarRequestMapIt) {
243 // processVariableUpdate &= varIdToVarRequestMapIt->second.m_CanUpdate;
244 // qCDebug(LOG_VariableController())
245 // << tr("updateVariableRequest") << processVariableUpdate;
246 // }
247
248 // if (processVariableUpdate) {
249 // for (auto varIdToVarRequestMapIt = varIdToVarRequestMap.cbegin();
250 // varIdToVarRequestMapIt != varIdToVarRequestMap.cend();
251 // ++varIdToVarRequestMapIt) {
252 // if (auto var = findVariable(varIdToVarRequestMapIt->first)) {
253 // auto &varRequest = varIdToVarRequestMapIt->second;
254 // var->setRange(varRequest.m_RangeRequested);
255 // var->setCacheRange(varRequest.m_CacheRangeRequested);
256 // qCDebug(LOG_VariableController())
257 // << tr("1: onDataProvided") << varRequest.m_RangeRequested;
258 // qCDebug(LOG_VariableController())
259 // << tr("2: onDataProvided") <<
260 // varRequest.m_CacheRangeRequested;
261 // var->mergeDataSeries(varRequest.m_DataSeries);
262 // qCDebug(LOG_VariableController())
263 // << tr("3: onDataProvided") <<
264 // varRequest.m_DataSeries->range();
265 // qCDebug(LOG_VariableController()) << tr("4: onDataProvided");
266
267 // /// @todo MPL: confirm
268 // // Variable update is notified only if there is no pending request
269 // for it
270 // if
271 // (m_VarIdToVarRequestIdQueueMap.count(varIdToVarRequestMapIt->first)
272 // == 0) {
273 // emit var->updated();
274 // }
275 // }
276 // else {
277 // qCCritical(LOG_VariableController())
278 // << tr("Impossible to update data to a null variable");
279 // }
280 // }
281
282 // // cleaning varRequestId
283 // qCDebug(LOG_VariableController()) << tr("0: erase REQUEST in MAP ?")
284 // <<
285 // m_VarRequestIdToVarIdVarRequestMap.size();
286 // m_VarRequestIdToVarIdVarRequestMap.erase(varRequestId);
287 // qCDebug(LOG_VariableController()) << tr("1: erase REQUEST in MAP ?")
288 // <<
289 // m_VarRequestIdToVarIdVarRequestMap.size();
290 // }
291 // }
292 // else {
293 // qCCritical(LOG_VariableController())
294 // << tr("Cannot updateVariableRequest for a unknow varRequestId") <<
295 // varRequestId;
296 // }
297 }
298
299 void cancelVariableRequest(QUuid varRequestId)
300 {
301 /// @todo ALX
302 // // cleaning varRequestId
303 // m_VarRequestIdToVarIdVarRequestMap.erase(varRequestId);
304
305 // for (auto varIdToVarRequestIdQueueMapIt = m_VarIdToVarRequestIdQueueMap.begin();
306 // varIdToVarRequestIdQueueMapIt != m_VarIdToVarRequestIdQueueMap.end();) {
307 // auto &varRequestIdQueue = varIdToVarRequestIdQueueMapIt->second;
308 // varRequestIdQueue.erase(
309 // std::remove(varRequestIdQueue.begin(), varRequestIdQueue.end(),
310 // varRequestId),
311 // varRequestIdQueue.end());
312 // if (varRequestIdQueue.empty()) {
313 // varIdToVarRequestIdQueueMapIt
314 // = m_VarIdToVarRequestIdQueueMap.erase(varIdToVarRequestIdQueueMapIt);
315 // }
316 // else {
317 // ++varIdToVarRequestIdQueueMapIt;
318 // }
319 // }
320 }
321
322 149 QMutex m_WorkingMutex;
323 150 /// Variable model. The VariableController has the ownership
324 151 VariableModel *m_VariableModel;
@@ -329,11 +156,10 struct VariableController::VariableControllerPrivate {
329 156 std::unique_ptr<VariableAcquisitionWorker> m_VariableAcquisitionWorker;
330 157 QThread m_VariableAcquisitionWorkerThread;
331 158
159 /// Handler for variables synchronization
160 std::unique_ptr<VariableSynchronizer> m_VariableSynchronizer;
161
332 162 std::unordered_map<std::shared_ptr<Variable>, std::shared_ptr<IDataProvider> > m_Providers;
333 std::unordered_map<std::shared_ptr<Variable>, QUuid> m_VariableToIdentifierMap;
334 std::map<QUuid, std::shared_ptr<VariableSynchronizationGroup> >
335 m_GroupIdToVariableSynchronizationGroupMap;
336 std::map<QUuid, QUuid> m_VariableIdGroupIdMap;
337 163
338 164 VariableController *q;
339 165 };
@@ -394,9 +220,6 VariableController::cloneVariable(std::shared_ptr<Variable> variable) noexcept
394 220 // Adds clone to model
395 221 impl->m_VariableModel->addVariable(duplicate);
396 222
397 // Generates clone identifier
398 impl->m_VariableToIdentifierMap[duplicate] = QUuid::createUuid();
399
400 223 // Registers provider
401 224 auto variableProvider = impl->m_Providers.at(variable);
402 225 auto duplicateProvider = variableProvider != nullptr ? variableProvider->clone() : nullptr;
@@ -427,9 +250,6 void VariableController::deleteVariable(std::shared_ptr<Variable> variable) noex
427 250 // make some treatments before the deletion
428 251 emit variableAboutToBeDeleted(variable);
429 252
430 // Deletes identifier
431 impl->m_VariableToIdentifierMap.erase(variable);
432
433 253 // Deletes provider
434 254 auto nbProvidersDeleted = impl->m_Providers.erase(variable);
435 255 qCDebug(LOG_VariableController())
@@ -466,18 +286,14 VariableController::createVariable(const QString &name, const QVariantHash &meta
466 286 auto range = impl->m_TimeController->dateTime();
467 287
468 288 if (auto newVariable = impl->m_VariableModel->createVariable(name, range, metadata)) {
469 auto identifier = QUuid::createUuid();
470
471 // store the provider
472 impl->registerProvider(provider);
473
474 289 // Associate the provider
475 impl->m_Providers[newVariable] = provider;
476 impl->m_VariableToIdentifierMap[newVariable] = identifier;
290 auto newVariableProvider = provider != nullptr ? provider->clone() : nullptr;
291 impl->m_Providers[newVariable] = newVariableProvider;
292 if (newVariableProvider) {
293 impl->registerProvider(newVariableProvider);
294 }
477 295
478 296 impl->processRequest(newVariable, range);
479 /// @todo ALX
480 // impl->updateVariableRequest(varRequestId);
481 297
482 298 return newVariable;
483 299 }
@@ -498,32 +314,34 void VariableController::onDateTimeOnSelection(const SqpRange &dateTime)
498 314 emit rangeChanged(selectedVariable, dateTime);
499 315 }
500 316 }
501
502 /// @todo ALX
503 // impl->updateVariableRequest(varRequestId);
504 317 }
505 318
506 void VariableController::onDataProvided(QUuid vIdentifier, const SqpRange &rangeRequested,
507 const SqpRange &cacheRangeRequested,
508 QVector<AcquisitionDataPacket> dataAcquired)
319 void VariableController::onDataProvided(std::shared_ptr<Variable> variable, VariableRequest request)
509 320 {
510 qCInfo(LOG_VariableController()) << "VariableController::onDataProvided";
511 auto retrievedDataSeries = impl->retrieveDataSeries(dataAcquired);
512 auto varRequestId = impl->acceptVariableRequest(vIdentifier, retrievedDataSeries);
513 if (!varRequestId.isNull()) {
514 impl->updateVariableRequest(varRequestId);
321 Q_ASSERT(variable != nullptr);
322
323 if (!impl->m_VariableModel->containsVariable(variable)) {
324 qCCritical(LOG_VariableController())
325 << QObject::tr("Can't update date of variable %1: variable is not registered (anymore)")
326 .arg(variable->name());
327 return;
515 328 }
329
330 variable->setCacheRange(request.m_CacheRangeRequested);
331 variable->mergeDataSeries(request.m_Result);
332 emit variable->updated();
516 333 }
517 334
518 335 void VariableController::onVariableRetrieveDataInProgress(QUuid identifier, double progress)
519 336 {
520 if (auto var = impl->findVariable(identifier)) {
521 impl->m_VariableModel->setDataProgress(var, progress);
522 }
523 else {
524 qCCritical(LOG_VariableController())
525 << tr("Impossible to notify progression of a null variable");
526 }
337 /// @todo ALX
338 // if (auto var = impl->findVariable(identifier)) {
339 // impl->m_VariableModel->setDataProgress(var, progress);
340 // }
341 // else {
342 // qCCritical(LOG_VariableController())
343 // << tr("Impossible to notify progression of a null variable");
344 // }
527 345 }
528 346
529 347 void VariableController::onAbortProgressRequested(std::shared_ptr<Variable> variable)
@@ -545,145 +363,59 void VariableController::onAbortProgressRequested(std::shared_ptr<Variable> vari
545 363
546 364 void VariableController::onAddSynchronizationGroupId(QUuid synchronizationGroupId)
547 365 {
548 qCDebug(LOG_VariableController())
549 << "TORM: VariableController::onAddSynchronizationGroupId"
550 << QThread::currentThread()->objectName() << synchronizationGroupId;
551 auto vSynchroGroup = std::make_shared<VariableSynchronizationGroup>();
552 impl->m_GroupIdToVariableSynchronizationGroupMap.insert(
553 std::make_pair(synchronizationGroupId, vSynchroGroup));
366 impl->m_VariableSynchronizer->addGroup(synchronizationGroupId);
554 367 }
555 368
556 369 void VariableController::onRemoveSynchronizationGroupId(QUuid synchronizationGroupId)
557 370 {
558 impl->m_GroupIdToVariableSynchronizationGroupMap.erase(synchronizationGroupId);
371 impl->m_VariableSynchronizer->removeGroup(synchronizationGroupId);
559 372 }
560 373
561 374 void VariableController::onAddSynchronized(std::shared_ptr<Variable> variable,
562 375 QUuid synchronizationGroupId)
563
564 376 {
565 qCDebug(LOG_VariableController())
566 << "TORM: VariableController::onAddSynchronized" << synchronizationGroupId;
567 auto varToVarIdIt = impl->m_VariableToIdentifierMap.find(variable);
568 if (varToVarIdIt != impl->m_VariableToIdentifierMap.cend()) {
569 auto groupIdToVSGIt
570 = impl->m_GroupIdToVariableSynchronizationGroupMap.find(synchronizationGroupId);
571 if (groupIdToVSGIt != impl->m_GroupIdToVariableSynchronizationGroupMap.cend()) {
572 impl->m_VariableIdGroupIdMap.insert(
573 std::make_pair(varToVarIdIt->second, synchronizationGroupId));
574 groupIdToVSGIt->second->addVariableId(varToVarIdIt->second);
575 }
576 else {
577 qCCritical(LOG_VariableController())
578 << tr("Impossible to synchronize a variable with an unknown sycnhronization group")
579 << variable->name();
580 }
581 }
582 else {
583 qCCritical(LOG_VariableController())
584 << tr("Impossible to synchronize a variable with no identifier") << variable->name();
585 }
377 impl->m_VariableSynchronizer->addVariable(variable, synchronizationGroupId);
586 378 }
587 379
588 380 void VariableController::desynchronize(std::shared_ptr<Variable> variable,
589 381 QUuid synchronizationGroupId)
590 382 {
591 // Gets variable id
592 auto variableIt = impl->m_VariableToIdentifierMap.find(variable);
593 if (variableIt == impl->m_VariableToIdentifierMap.cend()) {
594 qCCritical(LOG_VariableController())
595 << tr("Can't desynchronize variable %1: variable identifier not found")
596 .arg(variable->name());
597 return;
598 }
599
600 // Gets synchronization group
601 auto groupIt = impl->m_GroupIdToVariableSynchronizationGroupMap.find(synchronizationGroupId);
602 if (groupIt == impl->m_GroupIdToVariableSynchronizationGroupMap.cend()) {
603 qCCritical(LOG_VariableController())
604 << tr("Can't desynchronize variable %1: unknown synchronization group")
605 .arg(variable->name());
606 return;
607 }
608
609 auto variableId = variableIt->second;
610
611 // Removes variable from synchronization group
612 auto synchronizationGroup = groupIt->second;
613 synchronizationGroup->removeVariableId(variableId);
614
615 // Removes link between variable and synchronization group
616 impl->m_VariableIdGroupIdMap.erase(variableId);
383 impl->m_VariableSynchronizer->removeVariable(variable, synchronizationGroupId);
617 384 }
618 385
619 void VariableController::onRequestDataLoading(QVector<std::shared_ptr<Variable> > variables,
386 void VariableController::onRequestDataLoading(const QVector<std::shared_ptr<Variable> > &variables,
620 387 const SqpRange &range, const SqpRange &oldRange,
621 388 bool synchronise)
622 389 {
623 // NOTE: oldRange isn't really necessary since oldRange == variable->range().
390 // Set of variables that have been processed
391 std::set<std::shared_ptr<Variable> > processedVariables;
392 std::map<std::shared_ptr<Variable>, SqpRange> oldRanges;
624 393
625 // we want to load data of the variable for the dateTime.
626 // First we check if the cache contains some of them.
627 // For the other, we ask the provider to give them.
394 // Process requests for all variables
628 395 for (const auto &var : variables) {
629 396 impl->processRequest(var, range);
397 processedVariables.insert(var);
630 398 }
631 399
400 // Handles synchronisation
632 401 if (synchronise) {
633 // Get the group ids
634 qCDebug(LOG_VariableController())
635 << "TORM VariableController::onRequestDataLoading for synchro var ENABLE";
636 auto groupIds = std::set<QUuid>{};
637 auto groupIdToOldRangeMap = std::map<QUuid, SqpRange>{};
638 for (const auto &var : variables) {
639 auto varToVarIdIt = impl->m_VariableToIdentifierMap.find(var);
640 if (varToVarIdIt != impl->m_VariableToIdentifierMap.cend()) {
641 auto vId = varToVarIdIt->second;
642 auto varIdToGroupIdIt = impl->m_VariableIdGroupIdMap.find(vId);
643 if (varIdToGroupIdIt != impl->m_VariableIdGroupIdMap.cend()) {
644 auto gId = varIdToGroupIdIt->second;
645 groupIdToOldRangeMap.insert(std::make_pair(gId, var->range()));
646 if (groupIds.find(gId) == groupIds.cend()) {
647 qCDebug(LOG_VariableController()) << "Synchro detect group " << gId;
648 groupIds.insert(gId);
649 }
650 }
651 }
652 }
653
654 // We assume here all group ids exist
655 for (const auto &gId : groupIds) {
656 auto vSynchronizationGroup = impl->m_GroupIdToVariableSynchronizationGroupMap.at(gId);
657 auto vSyncIds = vSynchronizationGroup->getIds();
658 qCDebug(LOG_VariableController()) << "Var in synchro group ";
659 for (auto vId : vSyncIds) {
660 auto var = impl->findVariable(vId);
661
662 // Don't process already processed var
663 if (!variables.contains(var)) {
664 if (var != nullptr) {
665 qCDebug(LOG_VariableController())
666 << "processRequest synchro for" << var->name();
667 auto vSyncRangeRequested = computeSynchroRangeRequested(
668 var->range(), range, groupIdToOldRangeMap.at(gId));
669 qCDebug(LOG_VariableController()) << "synchro RR" << vSyncRangeRequested;
670 impl->processRequest(var, vSyncRangeRequested);
671 }
672 else {
673 qCCritical(LOG_VariableController())
674
675 << tr("Impossible to synchronize a null variable");
676 }
402 for (const auto &variable : variables) {
403 // Finds the variables in the same synchronization group
404 auto synchronizedVariables
405 = impl->m_VariableSynchronizer->synchronizedVariables(variable);
406 for (const auto &synchronizedVariable : synchronizedVariables) {
407 // Processes variable (if it hasn't been already processed)
408 if (processedVariables.count(synchronizedVariable) == 0) {
409 auto rangeRequested = computeSynchroRangeRequested(
410 synchronizedVariable->range(), range, oldRange);
411 impl->processRequest(synchronizedVariable, rangeRequested);
412 processedVariables.insert(synchronizedVariable);
677 413 }
678 414 }
679 415 }
680 416 }
681
682 /// @todo ALX
683 // impl->updateVariableRequest(varRequestId);
684 417 }
685 418
686
687 419 void VariableController::initialize()
688 420 {
689 421 qCDebug(LOG_VariableController()) << tr("VariableController init") << QThread::currentThread();
1 NO CONTENT: file was removed
1 NO CONTENT: file was removed
This diff has been collapsed as it changes many lines, (775 lines changed) Show them Hide them
1 NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now