##// END OF EJS Templates
commit
Alexandre Leroux -
r685:9917b9ec9087 feature/LimitNbRe...
parent child
Show More
@@ -1,192 +1,213
1 1 #include "Variable/VariableAcquisitionWorker.h"
2 2
3 3 #include "Variable/Variable.h"
4 4
5 5 #include <Data/IDataProvider.h>
6 6 #include <Data/SqpRange.h>
7 7 #include <unordered_map>
8 8 #include <utility>
9 9
10 10 #include <QMutex>
11 11 #include <QReadWriteLock>
12 12 #include <QThread>
13 13 #include <QtConcurrent/QtConcurrent>
14 14
15 15 Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker")
16 16
17 17 namespace {
18 18
19 19 using AcquisitionId = QUuid;
20 20 using VariableId = QUuid;
21 21
22 22 struct Acquisition {
23 23 std::shared_ptr<Variable> m_Variable{nullptr};
24 24 VariableRequest m_Request{};
25 25 AcquisitionId m_Id{QUuid::createUuid()};
26 26 };
27 27
28 28 } // namespace
29 29
30 30 struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate {
31 31
32 32 explicit VariableAcquisitionWorkerPrivate() : m_Lock{QReadWriteLock::Recursive} {}
33 33
34 34 void lockRead() { m_Lock.lockForRead(); }
35 35 void lockWrite() { m_Lock.lockForWrite(); }
36 36 void unlock() { m_Lock.unlock(); }
37 37
38 void eraseAcquisition(AcquisitionId id)
38 void cancelAcquisition(AcquisitionId id)
39 39 {
40 40 auto it = m_Acquisitions.find(id);
41 41 if (it != m_Acquisitions.end()) {
42 const auto &request = it->second.m_Request;
43
44 QtConcurrent::run([ provider = request.m_Provider, acqIdentifier = id ]() {
45 provider->requestDataAborting(acqIdentifier);
46 });
47
48 eraseAcquisition(it);
49 }
50 }
51
52 /// Removes from acquisitions and its indexes the acquisition represented by the iterator passed
53 /// as a parameter
54 void eraseAcquisition(std::map<AcquisitionId, Acquisition>::iterator it)
55 {
56 if (it != m_Acquisitions.end()) {
42 57 // Removes from index
43 58 m_AcquisitionsIndex.erase(it->second.m_Variable);
44 59
45 60 // Removes request
46 61 m_Acquisitions.erase(it);
47 62 }
48 63 }
49 64
65 /// Removes from acquisitions and its indexes the acquisition represented by the identifier
66 /// passed as a parameter
67 void eraseAcquisition(AcquisitionId id) { eraseAcquisition(m_Acquisitions.find(id)); }
68
50 69 std::map<AcquisitionId, Acquisition>::iterator insertAcquisition(Acquisition acquisition)
51 70 {
52 71 auto variable = acquisition.m_Variable;
53 72
54 73 // Inserts acquisition
55 74 auto result
56 75 = m_Acquisitions.insert(std::make_pair(acquisition.m_Id, std::move(acquisition)));
57 76 if (result.second) {
58 77 // Inserts index
59 78 m_AcquisitionsIndex[variable] = &result.first->second;
60 79 return result.first;
61 80 }
62 81 else {
63 82 return m_Acquisitions.end();
64 83 }
65 84 }
66 85
67 86 QMutex m_WorkingMutex;
68 87 QReadWriteLock m_Lock;
69 88
70 89 std::map<AcquisitionId, Acquisition> m_Acquisitions;
71 90 std::map<std::shared_ptr<Variable>, Acquisition *> m_AcquisitionsIndex;
72 91 };
73 92
74 93 VariableAcquisitionWorker::VariableAcquisitionWorker(QObject *parent)
75 94 : QObject{parent}, impl{spimpl::make_unique_impl<VariableAcquisitionWorkerPrivate>()}
76 95 {
77 96 }
78 97
79 98 VariableAcquisitionWorker::~VariableAcquisitionWorker()
80 99 {
81 100 qCInfo(LOG_VariableAcquisitionWorker())
82 101 << tr("VariableAcquisitionWorker destruction") << QThread::currentThread();
83 102 this->waitForFinish();
84 103 }
85 104
86 105 void VariableAcquisitionWorker::cancelVariableRequest(std::shared_ptr<Variable> variable)
87 106 {
88 /// @todo ALX
107 impl->lockWrite();
108
109 // Gets the current acquisition for the variable (if it exists) and cancels the request
110 // associated
111 auto it = impl->m_AcquisitionsIndex.find(variable);
112 if (it != impl->m_AcquisitionsIndex.end()) {
113 impl->cancelAcquisition(it->second->m_Id);
114 }
115
116 impl->unlock();
89 117 }
90 118
91 119 void VariableAcquisitionWorker::pushVariableRequest(std::shared_ptr<Variable> variable,
92 120 VariableRequest request)
93 121 {
94 122 impl->lockWrite();
95 123
96 // Checks if there is a current request for variable
124 // Checks if there is a current request for variable and cancels it
97 125 auto oldAcquisitionIt = impl->m_AcquisitionsIndex.find(variable);
98 126 if (oldAcquisitionIt != impl->m_AcquisitionsIndex.cend()) {
99 127 auto &oldAcquisition = *oldAcquisitionIt->second;
100 /// @todo ALX
101 // QtConcurrent::run(
102 // [ provider = request->m_Provider, acqIdentifier = request->m_AcqIdentifier ]()
103 // {
104 // provider->requestDataAborting(acqIdentifier);
105 // });
106
107 impl->eraseAcquisition(oldAcquisition.m_Id);
128 impl->cancelAcquisition(oldAcquisition.m_Id);
108 129 }
109 130
110 131 // Sets request for variable
111 132 Acquisition newAcquisition{variable, std::move(request)};
112 133 auto newAcquisitionIt = impl->insertAcquisition(std::move(newAcquisition));
113 134 if (newAcquisitionIt != impl->m_Acquisitions.end()) {
114 135 impl->unlock();
115 136
116 137 QMetaObject::invokeMethod(this, "executeAcquisition", Qt::QueuedConnection,
117 138 Q_ARG(QUuid, newAcquisitionIt->first));
118 139 }
119 140 else {
120 141 impl->unlock();
121 142 }
122 143 }
123 144
124 145 void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier)
125 146 {
126 147 // TODO
127 148 }
128 149
129 150 void VariableAcquisitionWorker::onVariableRetrieveDataInProgress(QUuid acqIdentifier,
130 151 double progress)
131 152 {
132 153 // TODO
133 154 }
134 155
135 156 void VariableAcquisitionWorker::onDataAcquired(QUuid acquisitionId,
136 157 std::shared_ptr<IDataSeries> dataSeries,
137 158 SqpRange range)
138 159 {
139 160 impl->lockWrite();
140 161
141 162 auto it = impl->m_Acquisitions.find(acquisitionId);
142 163 if (it != impl->m_Acquisitions.cend()) {
143 164 auto &acquisition = it->second;
144 165 auto &request = acquisition.m_Request;
145 166
146 167 // Store the result
147 168 request.addResult(dataSeries);
148 169
149 170 if (request.isFinished()) {
150 171 emit dataProvided(acquisition.m_Variable, std::move(request));
151 172 impl->eraseAcquisition(acquisitionId);
152 173 }
153 174 }
154 175 impl->unlock();
155 176 }
156 177
157 178 void VariableAcquisitionWorker::onVariableAcquisitionCanceled(QUuid acqIdentifier)
158 179 {
159 180 }
160 181
161 182 void VariableAcquisitionWorker::initialize()
162 183 {
163 184 qCDebug(LOG_VariableAcquisitionWorker())
164 185 << tr("VariableAcquisitionWorker init") << QThread::currentThread();
165 186 impl->m_WorkingMutex.lock();
166 187 qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init END");
167 188 }
168 189
169 190 void VariableAcquisitionWorker::finalize()
170 191 {
171 192 impl->m_WorkingMutex.unlock();
172 193 }
173 194
174 195 void VariableAcquisitionWorker::waitForFinish()
175 196 {
176 197 QMutexLocker locker{&impl->m_WorkingMutex};
177 198 }
178 199
179 200 void VariableAcquisitionWorker::executeAcquisition(QUuid acquisitionId)
180 201 {
181 202 impl->lockRead();
182 203 auto it = impl->m_Acquisitions.find(acquisitionId);
183 204 if (it != impl->m_Acquisitions.cend()) {
184 205 auto &request = it->second.m_Request;
185 206 impl->unlock();
186 207 request.m_Provider->requestDataLoading(acquisitionId, request.m_ProviderParameters);
187 208 }
188 209 else {
189 210 impl->unlock();
190 211 // TODO log no acqIdentifier recognized
191 212 }
192 213 }
@@ -1,104 +1,104
1 1 #include "Variable/VariableSynchronizer.h"
2 2
3 3 #include "Variable/Variable.h"
4 4
5 5 Q_LOGGING_CATEGORY(LOG_VariableSynchronizer, "VariableSynchronizer")
6 6
7 7 namespace {
8 8
9 9 using GroupId = VariableSynchronizer::GroupId;
10 10 struct Group {
11 11 GroupId m_Id;
12 12 std::set<std::shared_ptr<Variable> > m_Variables;
13 13 };
14 14
15 15 } // namespace
16 16
17 17 struct VariableSynchronizer::VariableSynchronizerPrivate {
18 18 std::map<GroupId, Group> m_Groups;
19 19 std::map<std::shared_ptr<Variable>, Group *> m_GroupsIndex;
20 20 };
21 21
22 22 VariableSynchronizer::VariableSynchronizer(QObject *parent)
23 23 : QObject{parent}, impl{spimpl::make_unique_impl<VariableSynchronizerPrivate>()}
24 24 {
25 25 }
26 26
27 27 void VariableSynchronizer::addGroup(GroupId groupId) noexcept
28 28 {
29 29 if (impl->m_Groups.count(groupId) == 1) {
30 30 qCWarning(LOG_VariableSynchronizer())
31 31 << tr("Can't create new synchronization group: a "
32 32 "group already exists under the passed identifier");
33 33 return;
34 34 }
35 35
36 impl->m_Groups.insert(std::make_pair(groupId, Group{}));
36 impl->m_Groups.insert(std::make_pair(groupId, Group{groupId}));
37 37 }
38 38
39 39 void VariableSynchronizer::addVariable(std::shared_ptr<Variable> variable, GroupId groupId) noexcept
40 40 {
41 41 if (impl->m_GroupsIndex.count(variable) == 1) {
42 42 qCWarning(LOG_VariableSynchronizer())
43 43 << tr("Can't add variable to a new synchronization group: the variable is already in a "
44 44 "synchronization group");
45 45 return;
46 46 }
47 47
48 48 auto groupIt = impl->m_Groups.find(groupId);
49 49
50 50 if (groupIt == impl->m_Groups.end()) {
51 51 qCWarning(LOG_VariableSynchronizer())
52 52 << tr("Can't add variable to the synchronization group: no group exists under the "
53 53 "passed identifier");
54 54 return;
55 55 }
56 56
57 57 // Registers variable
58 58 groupIt->second.m_Variables.insert(variable);
59 59
60 60 // Creates index for variable
61 61 impl->m_GroupsIndex.insert(std::make_pair(variable, &groupIt->second));
62 62 }
63 63
64 64 void VariableSynchronizer::removeGroup(GroupId groupId) noexcept
65 65 {
66 66 auto groupIt = impl->m_Groups.find(groupId);
67 67
68 68 if (groupIt == impl->m_Groups.end()) {
69 69 qCWarning(LOG_VariableSynchronizer()) << tr(
70 70 "Can't remove synchronization group: no group exists under the passed identifier");
71 71 return;
72 72 }
73 73
74 74 // Removes indexes
75 75 for (const auto &variable : groupIt->second.m_Variables) {
76 76 impl->m_GroupsIndex.erase(variable);
77 77 }
78 78
79 79 // Removes group
80 80 impl->m_Groups.erase(groupIt);
81 81 }
82 82
83 83 void VariableSynchronizer::removeVariable(std::shared_ptr<Variable> variable) noexcept
84 84 {
85 85 // Finds group in which the variable is
86 86 auto variableGroupIt = impl->m_GroupsIndex.find(variable);
87 87 if (variableGroupIt != impl->m_GroupsIndex.end()) {
88 88 auto groupId = variableGroupIt->second->m_Id;
89 89
90 90 // Removes variable index
91 91 impl->m_GroupsIndex.erase(variableGroupIt);
92 92
93 93 // Removes variable from group
94 94 impl->m_Groups[groupId].m_Variables.erase(variable);
95 95 }
96 96 }
97 97
98 98 std::set<std::shared_ptr<Variable> >
99 99 VariableSynchronizer::synchronizedVariables(std::shared_ptr<Variable> variable) const noexcept
100 100 {
101 101 auto groupIndexIt = impl->m_GroupsIndex.find(variable);
102 102 return groupIndexIt != impl->m_GroupsIndex.end() ? groupIndexIt->second->m_Variables
103 103 : std::set<std::shared_ptr<Variable> >{};
104 104 }
General Comments 0
You need to be logged in to leave comments. Login now