##// END OF EJS Templates
commit
Alexandre Leroux -
r685:9917b9ec9087 feature/LimitNbRe...
parent child
Show More
@@ -1,192 +1,213
1 #include "Variable/VariableAcquisitionWorker.h"
1 #include "Variable/VariableAcquisitionWorker.h"
2
2
3 #include "Variable/Variable.h"
3 #include "Variable/Variable.h"
4
4
5 #include <Data/IDataProvider.h>
5 #include <Data/IDataProvider.h>
6 #include <Data/SqpRange.h>
6 #include <Data/SqpRange.h>
7 #include <unordered_map>
7 #include <unordered_map>
8 #include <utility>
8 #include <utility>
9
9
10 #include <QMutex>
10 #include <QMutex>
11 #include <QReadWriteLock>
11 #include <QReadWriteLock>
12 #include <QThread>
12 #include <QThread>
13 #include <QtConcurrent/QtConcurrent>
13 #include <QtConcurrent/QtConcurrent>
14
14
15 Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker")
15 Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker")
16
16
17 namespace {
17 namespace {
18
18
19 using AcquisitionId = QUuid;
19 using AcquisitionId = QUuid;
20 using VariableId = QUuid;
20 using VariableId = QUuid;
21
21
22 struct Acquisition {
22 struct Acquisition {
23 std::shared_ptr<Variable> m_Variable{nullptr};
23 std::shared_ptr<Variable> m_Variable{nullptr};
24 VariableRequest m_Request{};
24 VariableRequest m_Request{};
25 AcquisitionId m_Id{QUuid::createUuid()};
25 AcquisitionId m_Id{QUuid::createUuid()};
26 };
26 };
27
27
28 } // namespace
28 } // namespace
29
29
30 struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate {
30 struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate {
31
31
32 explicit VariableAcquisitionWorkerPrivate() : m_Lock{QReadWriteLock::Recursive} {}
32 explicit VariableAcquisitionWorkerPrivate() : m_Lock{QReadWriteLock::Recursive} {}
33
33
34 void lockRead() { m_Lock.lockForRead(); }
34 void lockRead() { m_Lock.lockForRead(); }
35 void lockWrite() { m_Lock.lockForWrite(); }
35 void lockWrite() { m_Lock.lockForWrite(); }
36 void unlock() { m_Lock.unlock(); }
36 void unlock() { m_Lock.unlock(); }
37
37
38 void eraseAcquisition(AcquisitionId id)
38 void cancelAcquisition(AcquisitionId id)
39 {
39 {
40 auto it = m_Acquisitions.find(id);
40 auto it = m_Acquisitions.find(id);
41 if (it != m_Acquisitions.end()) {
41 if (it != m_Acquisitions.end()) {
42 const auto &request = it->second.m_Request;
43
44 QtConcurrent::run([ provider = request.m_Provider, acqIdentifier = id ]() {
45 provider->requestDataAborting(acqIdentifier);
46 });
47
48 eraseAcquisition(it);
49 }
50 }
51
52 /// Removes from acquisitions and its indexes the acquisition represented by the iterator passed
53 /// as a parameter
54 void eraseAcquisition(std::map<AcquisitionId, Acquisition>::iterator it)
55 {
56 if (it != m_Acquisitions.end()) {
42 // Removes from index
57 // Removes from index
43 m_AcquisitionsIndex.erase(it->second.m_Variable);
58 m_AcquisitionsIndex.erase(it->second.m_Variable);
44
59
45 // Removes request
60 // Removes request
46 m_Acquisitions.erase(it);
61 m_Acquisitions.erase(it);
47 }
62 }
48 }
63 }
49
64
65 /// Removes from acquisitions and its indexes the acquisition represented by the identifier
66 /// passed as a parameter
67 void eraseAcquisition(AcquisitionId id) { eraseAcquisition(m_Acquisitions.find(id)); }
68
50 std::map<AcquisitionId, Acquisition>::iterator insertAcquisition(Acquisition acquisition)
69 std::map<AcquisitionId, Acquisition>::iterator insertAcquisition(Acquisition acquisition)
51 {
70 {
52 auto variable = acquisition.m_Variable;
71 auto variable = acquisition.m_Variable;
53
72
54 // Inserts acquisition
73 // Inserts acquisition
55 auto result
74 auto result
56 = m_Acquisitions.insert(std::make_pair(acquisition.m_Id, std::move(acquisition)));
75 = m_Acquisitions.insert(std::make_pair(acquisition.m_Id, std::move(acquisition)));
57 if (result.second) {
76 if (result.second) {
58 // Inserts index
77 // Inserts index
59 m_AcquisitionsIndex[variable] = &result.first->second;
78 m_AcquisitionsIndex[variable] = &result.first->second;
60 return result.first;
79 return result.first;
61 }
80 }
62 else {
81 else {
63 return m_Acquisitions.end();
82 return m_Acquisitions.end();
64 }
83 }
65 }
84 }
66
85
67 QMutex m_WorkingMutex;
86 QMutex m_WorkingMutex;
68 QReadWriteLock m_Lock;
87 QReadWriteLock m_Lock;
69
88
70 std::map<AcquisitionId, Acquisition> m_Acquisitions;
89 std::map<AcquisitionId, Acquisition> m_Acquisitions;
71 std::map<std::shared_ptr<Variable>, Acquisition *> m_AcquisitionsIndex;
90 std::map<std::shared_ptr<Variable>, Acquisition *> m_AcquisitionsIndex;
72 };
91 };
73
92
74 VariableAcquisitionWorker::VariableAcquisitionWorker(QObject *parent)
93 VariableAcquisitionWorker::VariableAcquisitionWorker(QObject *parent)
75 : QObject{parent}, impl{spimpl::make_unique_impl<VariableAcquisitionWorkerPrivate>()}
94 : QObject{parent}, impl{spimpl::make_unique_impl<VariableAcquisitionWorkerPrivate>()}
76 {
95 {
77 }
96 }
78
97
79 VariableAcquisitionWorker::~VariableAcquisitionWorker()
98 VariableAcquisitionWorker::~VariableAcquisitionWorker()
80 {
99 {
81 qCInfo(LOG_VariableAcquisitionWorker())
100 qCInfo(LOG_VariableAcquisitionWorker())
82 << tr("VariableAcquisitionWorker destruction") << QThread::currentThread();
101 << tr("VariableAcquisitionWorker destruction") << QThread::currentThread();
83 this->waitForFinish();
102 this->waitForFinish();
84 }
103 }
85
104
86 void VariableAcquisitionWorker::cancelVariableRequest(std::shared_ptr<Variable> variable)
105 void VariableAcquisitionWorker::cancelVariableRequest(std::shared_ptr<Variable> variable)
87 {
106 {
88 /// @todo ALX
107 impl->lockWrite();
108
109 // Gets the current acquisition for the variable (if it exists) and cancels the request
110 // associated
111 auto it = impl->m_AcquisitionsIndex.find(variable);
112 if (it != impl->m_AcquisitionsIndex.end()) {
113 impl->cancelAcquisition(it->second->m_Id);
114 }
115
116 impl->unlock();
89 }
117 }
90
118
91 void VariableAcquisitionWorker::pushVariableRequest(std::shared_ptr<Variable> variable,
119 void VariableAcquisitionWorker::pushVariableRequest(std::shared_ptr<Variable> variable,
92 VariableRequest request)
120 VariableRequest request)
93 {
121 {
94 impl->lockWrite();
122 impl->lockWrite();
95
123
96 // Checks if there is a current request for variable
124 // Checks if there is a current request for variable and cancels it
97 auto oldAcquisitionIt = impl->m_AcquisitionsIndex.find(variable);
125 auto oldAcquisitionIt = impl->m_AcquisitionsIndex.find(variable);
98 if (oldAcquisitionIt != impl->m_AcquisitionsIndex.cend()) {
126 if (oldAcquisitionIt != impl->m_AcquisitionsIndex.cend()) {
99 auto &oldAcquisition = *oldAcquisitionIt->second;
127 auto &oldAcquisition = *oldAcquisitionIt->second;
100 /// @todo ALX
128 impl->cancelAcquisition(oldAcquisition.m_Id);
101 // QtConcurrent::run(
102 // [ provider = request->m_Provider, acqIdentifier = request->m_AcqIdentifier ]()
103 // {
104 // provider->requestDataAborting(acqIdentifier);
105 // });
106
107 impl->eraseAcquisition(oldAcquisition.m_Id);
108 }
129 }
109
130
110 // Sets request for variable
131 // Sets request for variable
111 Acquisition newAcquisition{variable, std::move(request)};
132 Acquisition newAcquisition{variable, std::move(request)};
112 auto newAcquisitionIt = impl->insertAcquisition(std::move(newAcquisition));
133 auto newAcquisitionIt = impl->insertAcquisition(std::move(newAcquisition));
113 if (newAcquisitionIt != impl->m_Acquisitions.end()) {
134 if (newAcquisitionIt != impl->m_Acquisitions.end()) {
114 impl->unlock();
135 impl->unlock();
115
136
116 QMetaObject::invokeMethod(this, "executeAcquisition", Qt::QueuedConnection,
137 QMetaObject::invokeMethod(this, "executeAcquisition", Qt::QueuedConnection,
117 Q_ARG(QUuid, newAcquisitionIt->first));
138 Q_ARG(QUuid, newAcquisitionIt->first));
118 }
139 }
119 else {
140 else {
120 impl->unlock();
141 impl->unlock();
121 }
142 }
122 }
143 }
123
144
124 void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier)
145 void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier)
125 {
146 {
126 // TODO
147 // TODO
127 }
148 }
128
149
129 void VariableAcquisitionWorker::onVariableRetrieveDataInProgress(QUuid acqIdentifier,
150 void VariableAcquisitionWorker::onVariableRetrieveDataInProgress(QUuid acqIdentifier,
130 double progress)
151 double progress)
131 {
152 {
132 // TODO
153 // TODO
133 }
154 }
134
155
135 void VariableAcquisitionWorker::onDataAcquired(QUuid acquisitionId,
156 void VariableAcquisitionWorker::onDataAcquired(QUuid acquisitionId,
136 std::shared_ptr<IDataSeries> dataSeries,
157 std::shared_ptr<IDataSeries> dataSeries,
137 SqpRange range)
158 SqpRange range)
138 {
159 {
139 impl->lockWrite();
160 impl->lockWrite();
140
161
141 auto it = impl->m_Acquisitions.find(acquisitionId);
162 auto it = impl->m_Acquisitions.find(acquisitionId);
142 if (it != impl->m_Acquisitions.cend()) {
163 if (it != impl->m_Acquisitions.cend()) {
143 auto &acquisition = it->second;
164 auto &acquisition = it->second;
144 auto &request = acquisition.m_Request;
165 auto &request = acquisition.m_Request;
145
166
146 // Store the result
167 // Store the result
147 request.addResult(dataSeries);
168 request.addResult(dataSeries);
148
169
149 if (request.isFinished()) {
170 if (request.isFinished()) {
150 emit dataProvided(acquisition.m_Variable, std::move(request));
171 emit dataProvided(acquisition.m_Variable, std::move(request));
151 impl->eraseAcquisition(acquisitionId);
172 impl->eraseAcquisition(acquisitionId);
152 }
173 }
153 }
174 }
154 impl->unlock();
175 impl->unlock();
155 }
176 }
156
177
157 void VariableAcquisitionWorker::onVariableAcquisitionCanceled(QUuid acqIdentifier)
178 void VariableAcquisitionWorker::onVariableAcquisitionCanceled(QUuid acqIdentifier)
158 {
179 {
159 }
180 }
160
181
161 void VariableAcquisitionWorker::initialize()
182 void VariableAcquisitionWorker::initialize()
162 {
183 {
163 qCDebug(LOG_VariableAcquisitionWorker())
184 qCDebug(LOG_VariableAcquisitionWorker())
164 << tr("VariableAcquisitionWorker init") << QThread::currentThread();
185 << tr("VariableAcquisitionWorker init") << QThread::currentThread();
165 impl->m_WorkingMutex.lock();
186 impl->m_WorkingMutex.lock();
166 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init END");
187 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init END");
167 }
188 }
168
189
169 void VariableAcquisitionWorker::finalize()
190 void VariableAcquisitionWorker::finalize()
170 {
191 {
171 impl->m_WorkingMutex.unlock();
192 impl->m_WorkingMutex.unlock();
172 }
193 }
173
194
174 void VariableAcquisitionWorker::waitForFinish()
195 void VariableAcquisitionWorker::waitForFinish()
175 {
196 {
176 QMutexLocker locker{&impl->m_WorkingMutex};
197 QMutexLocker locker{&impl->m_WorkingMutex};
177 }
198 }
178
199
179 void VariableAcquisitionWorker::executeAcquisition(QUuid acquisitionId)
200 void VariableAcquisitionWorker::executeAcquisition(QUuid acquisitionId)
180 {
201 {
181 impl->lockRead();
202 impl->lockRead();
182 auto it = impl->m_Acquisitions.find(acquisitionId);
203 auto it = impl->m_Acquisitions.find(acquisitionId);
183 if (it != impl->m_Acquisitions.cend()) {
204 if (it != impl->m_Acquisitions.cend()) {
184 auto &request = it->second.m_Request;
205 auto &request = it->second.m_Request;
185 impl->unlock();
206 impl->unlock();
186 request.m_Provider->requestDataLoading(acquisitionId, request.m_ProviderParameters);
207 request.m_Provider->requestDataLoading(acquisitionId, request.m_ProviderParameters);
187 }
208 }
188 else {
209 else {
189 impl->unlock();
210 impl->unlock();
190 // TODO log no acqIdentifier recognized
211 // TODO log no acqIdentifier recognized
191 }
212 }
192 }
213 }
@@ -1,104 +1,104
1 #include "Variable/VariableSynchronizer.h"
1 #include "Variable/VariableSynchronizer.h"
2
2
3 #include "Variable/Variable.h"
3 #include "Variable/Variable.h"
4
4
5 Q_LOGGING_CATEGORY(LOG_VariableSynchronizer, "VariableSynchronizer")
5 Q_LOGGING_CATEGORY(LOG_VariableSynchronizer, "VariableSynchronizer")
6
6
7 namespace {
7 namespace {
8
8
9 using GroupId = VariableSynchronizer::GroupId;
9 using GroupId = VariableSynchronizer::GroupId;
10 struct Group {
10 struct Group {
11 GroupId m_Id;
11 GroupId m_Id;
12 std::set<std::shared_ptr<Variable> > m_Variables;
12 std::set<std::shared_ptr<Variable> > m_Variables;
13 };
13 };
14
14
15 } // namespace
15 } // namespace
16
16
17 struct VariableSynchronizer::VariableSynchronizerPrivate {
17 struct VariableSynchronizer::VariableSynchronizerPrivate {
18 std::map<GroupId, Group> m_Groups;
18 std::map<GroupId, Group> m_Groups;
19 std::map<std::shared_ptr<Variable>, Group *> m_GroupsIndex;
19 std::map<std::shared_ptr<Variable>, Group *> m_GroupsIndex;
20 };
20 };
21
21
22 VariableSynchronizer::VariableSynchronizer(QObject *parent)
22 VariableSynchronizer::VariableSynchronizer(QObject *parent)
23 : QObject{parent}, impl{spimpl::make_unique_impl<VariableSynchronizerPrivate>()}
23 : QObject{parent}, impl{spimpl::make_unique_impl<VariableSynchronizerPrivate>()}
24 {
24 {
25 }
25 }
26
26
27 void VariableSynchronizer::addGroup(GroupId groupId) noexcept
27 void VariableSynchronizer::addGroup(GroupId groupId) noexcept
28 {
28 {
29 if (impl->m_Groups.count(groupId) == 1) {
29 if (impl->m_Groups.count(groupId) == 1) {
30 qCWarning(LOG_VariableSynchronizer())
30 qCWarning(LOG_VariableSynchronizer())
31 << tr("Can't create new synchronization group: a "
31 << tr("Can't create new synchronization group: a "
32 "group already exists under the passed identifier");
32 "group already exists under the passed identifier");
33 return;
33 return;
34 }
34 }
35
35
36 impl->m_Groups.insert(std::make_pair(groupId, Group{}));
36 impl->m_Groups.insert(std::make_pair(groupId, Group{groupId}));
37 }
37 }
38
38
39 void VariableSynchronizer::addVariable(std::shared_ptr<Variable> variable, GroupId groupId) noexcept
39 void VariableSynchronizer::addVariable(std::shared_ptr<Variable> variable, GroupId groupId) noexcept
40 {
40 {
41 if (impl->m_GroupsIndex.count(variable) == 1) {
41 if (impl->m_GroupsIndex.count(variable) == 1) {
42 qCWarning(LOG_VariableSynchronizer())
42 qCWarning(LOG_VariableSynchronizer())
43 << tr("Can't add variable to a new synchronization group: the variable is already in a "
43 << tr("Can't add variable to a new synchronization group: the variable is already in a "
44 "synchronization group");
44 "synchronization group");
45 return;
45 return;
46 }
46 }
47
47
48 auto groupIt = impl->m_Groups.find(groupId);
48 auto groupIt = impl->m_Groups.find(groupId);
49
49
50 if (groupIt == impl->m_Groups.end()) {
50 if (groupIt == impl->m_Groups.end()) {
51 qCWarning(LOG_VariableSynchronizer())
51 qCWarning(LOG_VariableSynchronizer())
52 << tr("Can't add variable to the synchronization group: no group exists under the "
52 << tr("Can't add variable to the synchronization group: no group exists under the "
53 "passed identifier");
53 "passed identifier");
54 return;
54 return;
55 }
55 }
56
56
57 // Registers variable
57 // Registers variable
58 groupIt->second.m_Variables.insert(variable);
58 groupIt->second.m_Variables.insert(variable);
59
59
60 // Creates index for variable
60 // Creates index for variable
61 impl->m_GroupsIndex.insert(std::make_pair(variable, &groupIt->second));
61 impl->m_GroupsIndex.insert(std::make_pair(variable, &groupIt->second));
62 }
62 }
63
63
64 void VariableSynchronizer::removeGroup(GroupId groupId) noexcept
64 void VariableSynchronizer::removeGroup(GroupId groupId) noexcept
65 {
65 {
66 auto groupIt = impl->m_Groups.find(groupId);
66 auto groupIt = impl->m_Groups.find(groupId);
67
67
68 if (groupIt == impl->m_Groups.end()) {
68 if (groupIt == impl->m_Groups.end()) {
69 qCWarning(LOG_VariableSynchronizer()) << tr(
69 qCWarning(LOG_VariableSynchronizer()) << tr(
70 "Can't remove synchronization group: no group exists under the passed identifier");
70 "Can't remove synchronization group: no group exists under the passed identifier");
71 return;
71 return;
72 }
72 }
73
73
74 // Removes indexes
74 // Removes indexes
75 for (const auto &variable : groupIt->second.m_Variables) {
75 for (const auto &variable : groupIt->second.m_Variables) {
76 impl->m_GroupsIndex.erase(variable);
76 impl->m_GroupsIndex.erase(variable);
77 }
77 }
78
78
79 // Removes group
79 // Removes group
80 impl->m_Groups.erase(groupIt);
80 impl->m_Groups.erase(groupIt);
81 }
81 }
82
82
83 void VariableSynchronizer::removeVariable(std::shared_ptr<Variable> variable) noexcept
83 void VariableSynchronizer::removeVariable(std::shared_ptr<Variable> variable) noexcept
84 {
84 {
85 // Finds group in which the variable is
85 // Finds group in which the variable is
86 auto variableGroupIt = impl->m_GroupsIndex.find(variable);
86 auto variableGroupIt = impl->m_GroupsIndex.find(variable);
87 if (variableGroupIt != impl->m_GroupsIndex.end()) {
87 if (variableGroupIt != impl->m_GroupsIndex.end()) {
88 auto groupId = variableGroupIt->second->m_Id;
88 auto groupId = variableGroupIt->second->m_Id;
89
89
90 // Removes variable index
90 // Removes variable index
91 impl->m_GroupsIndex.erase(variableGroupIt);
91 impl->m_GroupsIndex.erase(variableGroupIt);
92
92
93 // Removes variable from group
93 // Removes variable from group
94 impl->m_Groups[groupId].m_Variables.erase(variable);
94 impl->m_Groups[groupId].m_Variables.erase(variable);
95 }
95 }
96 }
96 }
97
97
98 std::set<std::shared_ptr<Variable> >
98 std::set<std::shared_ptr<Variable> >
99 VariableSynchronizer::synchronizedVariables(std::shared_ptr<Variable> variable) const noexcept
99 VariableSynchronizer::synchronizedVariables(std::shared_ptr<Variable> variable) const noexcept
100 {
100 {
101 auto groupIndexIt = impl->m_GroupsIndex.find(variable);
101 auto groupIndexIt = impl->m_GroupsIndex.find(variable);
102 return groupIndexIt != impl->m_GroupsIndex.end() ? groupIndexIt->second->m_Variables
102 return groupIndexIt != impl->m_GroupsIndex.end() ? groupIndexIt->second->m_Variables
103 : std::set<std::shared_ptr<Variable> >{};
103 : std::set<std::shared_ptr<Variable> >{};
104 }
104 }
General Comments 0
You need to be logged in to leave comments. Login now