@@ -1,192 +1,213 | |||
|
1 | 1 | #include "Variable/VariableAcquisitionWorker.h" |
|
2 | 2 | |
|
3 | 3 | #include "Variable/Variable.h" |
|
4 | 4 | |
|
5 | 5 | #include <Data/IDataProvider.h> |
|
6 | 6 | #include <Data/SqpRange.h> |
|
7 | 7 | #include <unordered_map> |
|
8 | 8 | #include <utility> |
|
9 | 9 | |
|
10 | 10 | #include <QMutex> |
|
11 | 11 | #include <QReadWriteLock> |
|
12 | 12 | #include <QThread> |
|
13 | 13 | #include <QtConcurrent/QtConcurrent> |
|
14 | 14 | |
|
15 | 15 | Q_LOGGING_CATEGORY(LOG_VariableAcquisitionWorker, "VariableAcquisitionWorker") |
|
16 | 16 | |
|
17 | 17 | namespace { |
|
18 | 18 | |
|
19 | 19 | using AcquisitionId = QUuid; |
|
20 | 20 | using VariableId = QUuid; |
|
21 | 21 | |
|
22 | 22 | struct Acquisition { |
|
23 | 23 | std::shared_ptr<Variable> m_Variable{nullptr}; |
|
24 | 24 | VariableRequest m_Request{}; |
|
25 | 25 | AcquisitionId m_Id{QUuid::createUuid()}; |
|
26 | 26 | }; |
|
27 | 27 | |
|
28 | 28 | } // namespace |
|
29 | 29 | |
|
30 | 30 | struct VariableAcquisitionWorker::VariableAcquisitionWorkerPrivate { |
|
31 | 31 | |
|
32 | 32 | explicit VariableAcquisitionWorkerPrivate() : m_Lock{QReadWriteLock::Recursive} {} |
|
33 | 33 | |
|
34 | 34 | void lockRead() { m_Lock.lockForRead(); } |
|
35 | 35 | void lockWrite() { m_Lock.lockForWrite(); } |
|
36 | 36 | void unlock() { m_Lock.unlock(); } |
|
37 | 37 | |
|
38 |
void |
|
|
38 | void cancelAcquisition(AcquisitionId id) | |
|
39 | 39 | { |
|
40 | 40 | auto it = m_Acquisitions.find(id); |
|
41 | 41 | if (it != m_Acquisitions.end()) { |
|
42 | const auto &request = it->second.m_Request; | |
|
43 | ||
|
44 | QtConcurrent::run([ provider = request.m_Provider, acqIdentifier = id ]() { | |
|
45 | provider->requestDataAborting(acqIdentifier); | |
|
46 | }); | |
|
47 | ||
|
48 | eraseAcquisition(it); | |
|
49 | } | |
|
50 | } | |
|
51 | ||
|
52 | /// Removes from acquisitions and its indexes the acquisition represented by the iterator passed | |
|
53 | /// as a parameter | |
|
54 | void eraseAcquisition(std::map<AcquisitionId, Acquisition>::iterator it) | |
|
55 | { | |
|
56 | if (it != m_Acquisitions.end()) { | |
|
42 | 57 | // Removes from index |
|
43 | 58 | m_AcquisitionsIndex.erase(it->second.m_Variable); |
|
44 | 59 | |
|
45 | 60 | // Removes request |
|
46 | 61 | m_Acquisitions.erase(it); |
|
47 | 62 | } |
|
48 | 63 | } |
|
49 | 64 | |
|
65 | /// Removes from acquisitions and its indexes the acquisition represented by the identifier | |
|
66 | /// passed as a parameter | |
|
67 | void eraseAcquisition(AcquisitionId id) { eraseAcquisition(m_Acquisitions.find(id)); } | |
|
68 | ||
|
50 | 69 | std::map<AcquisitionId, Acquisition>::iterator insertAcquisition(Acquisition acquisition) |
|
51 | 70 | { |
|
52 | 71 | auto variable = acquisition.m_Variable; |
|
53 | 72 | |
|
54 | 73 | // Inserts acquisition |
|
55 | 74 | auto result |
|
56 | 75 | = m_Acquisitions.insert(std::make_pair(acquisition.m_Id, std::move(acquisition))); |
|
57 | 76 | if (result.second) { |
|
58 | 77 | // Inserts index |
|
59 | 78 | m_AcquisitionsIndex[variable] = &result.first->second; |
|
60 | 79 | return result.first; |
|
61 | 80 | } |
|
62 | 81 | else { |
|
63 | 82 | return m_Acquisitions.end(); |
|
64 | 83 | } |
|
65 | 84 | } |
|
66 | 85 | |
|
67 | 86 | QMutex m_WorkingMutex; |
|
68 | 87 | QReadWriteLock m_Lock; |
|
69 | 88 | |
|
70 | 89 | std::map<AcquisitionId, Acquisition> m_Acquisitions; |
|
71 | 90 | std::map<std::shared_ptr<Variable>, Acquisition *> m_AcquisitionsIndex; |
|
72 | 91 | }; |
|
73 | 92 | |
|
74 | 93 | VariableAcquisitionWorker::VariableAcquisitionWorker(QObject *parent) |
|
75 | 94 | : QObject{parent}, impl{spimpl::make_unique_impl<VariableAcquisitionWorkerPrivate>()} |
|
76 | 95 | { |
|
77 | 96 | } |
|
78 | 97 | |
|
79 | 98 | VariableAcquisitionWorker::~VariableAcquisitionWorker() |
|
80 | 99 | { |
|
81 | 100 | qCInfo(LOG_VariableAcquisitionWorker()) |
|
82 | 101 | << tr("VariableAcquisitionWorker destruction") << QThread::currentThread(); |
|
83 | 102 | this->waitForFinish(); |
|
84 | 103 | } |
|
85 | 104 | |
|
86 | 105 | void VariableAcquisitionWorker::cancelVariableRequest(std::shared_ptr<Variable> variable) |
|
87 | 106 | { |
|
88 | /// @todo ALX | |
|
107 | impl->lockWrite(); | |
|
108 | ||
|
109 | // Gets the current acquisition for the variable (if it exists) and cancels the request | |
|
110 | // associated | |
|
111 | auto it = impl->m_AcquisitionsIndex.find(variable); | |
|
112 | if (it != impl->m_AcquisitionsIndex.end()) { | |
|
113 | impl->cancelAcquisition(it->second->m_Id); | |
|
114 | } | |
|
115 | ||
|
116 | impl->unlock(); | |
|
89 | 117 | } |
|
90 | 118 | |
|
91 | 119 | void VariableAcquisitionWorker::pushVariableRequest(std::shared_ptr<Variable> variable, |
|
92 | 120 | VariableRequest request) |
|
93 | 121 | { |
|
94 | 122 | impl->lockWrite(); |
|
95 | 123 | |
|
96 | // Checks if there is a current request for variable | |
|
124 | // Checks if there is a current request for variable and cancels it | |
|
97 | 125 | auto oldAcquisitionIt = impl->m_AcquisitionsIndex.find(variable); |
|
98 | 126 | if (oldAcquisitionIt != impl->m_AcquisitionsIndex.cend()) { |
|
99 | 127 | auto &oldAcquisition = *oldAcquisitionIt->second; |
|
100 | /// @todo ALX | |
|
101 | // QtConcurrent::run( | |
|
102 | // [ provider = request->m_Provider, acqIdentifier = request->m_AcqIdentifier ]() | |
|
103 | // { | |
|
104 | // provider->requestDataAborting(acqIdentifier); | |
|
105 | // }); | |
|
106 | ||
|
107 | impl->eraseAcquisition(oldAcquisition.m_Id); | |
|
128 | impl->cancelAcquisition(oldAcquisition.m_Id); | |
|
108 | 129 | } |
|
109 | 130 | |
|
110 | 131 | // Sets request for variable |
|
111 | 132 | Acquisition newAcquisition{variable, std::move(request)}; |
|
112 | 133 | auto newAcquisitionIt = impl->insertAcquisition(std::move(newAcquisition)); |
|
113 | 134 | if (newAcquisitionIt != impl->m_Acquisitions.end()) { |
|
114 | 135 | impl->unlock(); |
|
115 | 136 | |
|
116 | 137 | QMetaObject::invokeMethod(this, "executeAcquisition", Qt::QueuedConnection, |
|
117 | 138 | Q_ARG(QUuid, newAcquisitionIt->first)); |
|
118 | 139 | } |
|
119 | 140 | else { |
|
120 | 141 | impl->unlock(); |
|
121 | 142 | } |
|
122 | 143 | } |
|
123 | 144 | |
|
124 | 145 | void VariableAcquisitionWorker::abortProgressRequested(QUuid vIdentifier) |
|
125 | 146 | { |
|
126 | 147 | // TODO |
|
127 | 148 | } |
|
128 | 149 | |
|
129 | 150 | void VariableAcquisitionWorker::onVariableRetrieveDataInProgress(QUuid acqIdentifier, |
|
130 | 151 | double progress) |
|
131 | 152 | { |
|
132 | 153 | // TODO |
|
133 | 154 | } |
|
134 | 155 | |
|
135 | 156 | void VariableAcquisitionWorker::onDataAcquired(QUuid acquisitionId, |
|
136 | 157 | std::shared_ptr<IDataSeries> dataSeries, |
|
137 | 158 | SqpRange range) |
|
138 | 159 | { |
|
139 | 160 | impl->lockWrite(); |
|
140 | 161 | |
|
141 | 162 | auto it = impl->m_Acquisitions.find(acquisitionId); |
|
142 | 163 | if (it != impl->m_Acquisitions.cend()) { |
|
143 | 164 | auto &acquisition = it->second; |
|
144 | 165 | auto &request = acquisition.m_Request; |
|
145 | 166 | |
|
146 | 167 | // Store the result |
|
147 | 168 | request.addResult(dataSeries); |
|
148 | 169 | |
|
149 | 170 | if (request.isFinished()) { |
|
150 | 171 | emit dataProvided(acquisition.m_Variable, std::move(request)); |
|
151 | 172 | impl->eraseAcquisition(acquisitionId); |
|
152 | 173 | } |
|
153 | 174 | } |
|
154 | 175 | impl->unlock(); |
|
155 | 176 | } |
|
156 | 177 | |
|
157 | 178 | void VariableAcquisitionWorker::onVariableAcquisitionCanceled(QUuid acqIdentifier) |
|
158 | 179 | { |
|
159 | 180 | } |
|
160 | 181 | |
|
161 | 182 | void VariableAcquisitionWorker::initialize() |
|
162 | 183 | { |
|
163 | 184 | qCDebug(LOG_VariableAcquisitionWorker()) |
|
164 | 185 | << tr("VariableAcquisitionWorker init") << QThread::currentThread(); |
|
165 | 186 | impl->m_WorkingMutex.lock(); |
|
166 | 187 | qCDebug(LOG_VariableAcquisitionWorker()) << tr("VariableAcquisitionWorker init END"); |
|
167 | 188 | } |
|
168 | 189 | |
|
169 | 190 | void VariableAcquisitionWorker::finalize() |
|
170 | 191 | { |
|
171 | 192 | impl->m_WorkingMutex.unlock(); |
|
172 | 193 | } |
|
173 | 194 | |
|
174 | 195 | void VariableAcquisitionWorker::waitForFinish() |
|
175 | 196 | { |
|
176 | 197 | QMutexLocker locker{&impl->m_WorkingMutex}; |
|
177 | 198 | } |
|
178 | 199 | |
|
179 | 200 | void VariableAcquisitionWorker::executeAcquisition(QUuid acquisitionId) |
|
180 | 201 | { |
|
181 | 202 | impl->lockRead(); |
|
182 | 203 | auto it = impl->m_Acquisitions.find(acquisitionId); |
|
183 | 204 | if (it != impl->m_Acquisitions.cend()) { |
|
184 | 205 | auto &request = it->second.m_Request; |
|
185 | 206 | impl->unlock(); |
|
186 | 207 | request.m_Provider->requestDataLoading(acquisitionId, request.m_ProviderParameters); |
|
187 | 208 | } |
|
188 | 209 | else { |
|
189 | 210 | impl->unlock(); |
|
190 | 211 | // TODO log no acqIdentifier recognized |
|
191 | 212 | } |
|
192 | 213 | } |
@@ -1,104 +1,104 | |||
|
1 | 1 | #include "Variable/VariableSynchronizer.h" |
|
2 | 2 | |
|
3 | 3 | #include "Variable/Variable.h" |
|
4 | 4 | |
|
5 | 5 | Q_LOGGING_CATEGORY(LOG_VariableSynchronizer, "VariableSynchronizer") |
|
6 | 6 | |
|
7 | 7 | namespace { |
|
8 | 8 | |
|
9 | 9 | using GroupId = VariableSynchronizer::GroupId; |
|
10 | 10 | struct Group { |
|
11 | 11 | GroupId m_Id; |
|
12 | 12 | std::set<std::shared_ptr<Variable> > m_Variables; |
|
13 | 13 | }; |
|
14 | 14 | |
|
15 | 15 | } // namespace |
|
16 | 16 | |
|
17 | 17 | struct VariableSynchronizer::VariableSynchronizerPrivate { |
|
18 | 18 | std::map<GroupId, Group> m_Groups; |
|
19 | 19 | std::map<std::shared_ptr<Variable>, Group *> m_GroupsIndex; |
|
20 | 20 | }; |
|
21 | 21 | |
|
22 | 22 | VariableSynchronizer::VariableSynchronizer(QObject *parent) |
|
23 | 23 | : QObject{parent}, impl{spimpl::make_unique_impl<VariableSynchronizerPrivate>()} |
|
24 | 24 | { |
|
25 | 25 | } |
|
26 | 26 | |
|
27 | 27 | void VariableSynchronizer::addGroup(GroupId groupId) noexcept |
|
28 | 28 | { |
|
29 | 29 | if (impl->m_Groups.count(groupId) == 1) { |
|
30 | 30 | qCWarning(LOG_VariableSynchronizer()) |
|
31 | 31 | << tr("Can't create new synchronization group: a " |
|
32 | 32 | "group already exists under the passed identifier"); |
|
33 | 33 | return; |
|
34 | 34 | } |
|
35 | 35 | |
|
36 | impl->m_Groups.insert(std::make_pair(groupId, Group{})); | |
|
36 | impl->m_Groups.insert(std::make_pair(groupId, Group{groupId})); | |
|
37 | 37 | } |
|
38 | 38 | |
|
39 | 39 | void VariableSynchronizer::addVariable(std::shared_ptr<Variable> variable, GroupId groupId) noexcept |
|
40 | 40 | { |
|
41 | 41 | if (impl->m_GroupsIndex.count(variable) == 1) { |
|
42 | 42 | qCWarning(LOG_VariableSynchronizer()) |
|
43 | 43 | << tr("Can't add variable to a new synchronization group: the variable is already in a " |
|
44 | 44 | "synchronization group"); |
|
45 | 45 | return; |
|
46 | 46 | } |
|
47 | 47 | |
|
48 | 48 | auto groupIt = impl->m_Groups.find(groupId); |
|
49 | 49 | |
|
50 | 50 | if (groupIt == impl->m_Groups.end()) { |
|
51 | 51 | qCWarning(LOG_VariableSynchronizer()) |
|
52 | 52 | << tr("Can't add variable to the synchronization group: no group exists under the " |
|
53 | 53 | "passed identifier"); |
|
54 | 54 | return; |
|
55 | 55 | } |
|
56 | 56 | |
|
57 | 57 | // Registers variable |
|
58 | 58 | groupIt->second.m_Variables.insert(variable); |
|
59 | 59 | |
|
60 | 60 | // Creates index for variable |
|
61 | 61 | impl->m_GroupsIndex.insert(std::make_pair(variable, &groupIt->second)); |
|
62 | 62 | } |
|
63 | 63 | |
|
64 | 64 | void VariableSynchronizer::removeGroup(GroupId groupId) noexcept |
|
65 | 65 | { |
|
66 | 66 | auto groupIt = impl->m_Groups.find(groupId); |
|
67 | 67 | |
|
68 | 68 | if (groupIt == impl->m_Groups.end()) { |
|
69 | 69 | qCWarning(LOG_VariableSynchronizer()) << tr( |
|
70 | 70 | "Can't remove synchronization group: no group exists under the passed identifier"); |
|
71 | 71 | return; |
|
72 | 72 | } |
|
73 | 73 | |
|
74 | 74 | // Removes indexes |
|
75 | 75 | for (const auto &variable : groupIt->second.m_Variables) { |
|
76 | 76 | impl->m_GroupsIndex.erase(variable); |
|
77 | 77 | } |
|
78 | 78 | |
|
79 | 79 | // Removes group |
|
80 | 80 | impl->m_Groups.erase(groupIt); |
|
81 | 81 | } |
|
82 | 82 | |
|
83 | 83 | void VariableSynchronizer::removeVariable(std::shared_ptr<Variable> variable) noexcept |
|
84 | 84 | { |
|
85 | 85 | // Finds group in which the variable is |
|
86 | 86 | auto variableGroupIt = impl->m_GroupsIndex.find(variable); |
|
87 | 87 | if (variableGroupIt != impl->m_GroupsIndex.end()) { |
|
88 | 88 | auto groupId = variableGroupIt->second->m_Id; |
|
89 | 89 | |
|
90 | 90 | // Removes variable index |
|
91 | 91 | impl->m_GroupsIndex.erase(variableGroupIt); |
|
92 | 92 | |
|
93 | 93 | // Removes variable from group |
|
94 | 94 | impl->m_Groups[groupId].m_Variables.erase(variable); |
|
95 | 95 | } |
|
96 | 96 | } |
|
97 | 97 | |
|
98 | 98 | std::set<std::shared_ptr<Variable> > |
|
99 | 99 | VariableSynchronizer::synchronizedVariables(std::shared_ptr<Variable> variable) const noexcept |
|
100 | 100 | { |
|
101 | 101 | auto groupIndexIt = impl->m_GroupsIndex.find(variable); |
|
102 | 102 | return groupIndexIt != impl->m_GroupsIndex.end() ? groupIndexIt->second->m_Variables |
|
103 | 103 | : std::set<std::shared_ptr<Variable> >{}; |
|
104 | 104 | } |
General Comments 0
You need to be logged in to leave comments.
Login now