Directory: | ./ |
---|---|
File: | pdcom5/src/Process.cpp |
Date: | 2023-11-12 04:06:57 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 109 | 155 | 70.3% |
Branches: | 57 | 158 | 36.1% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /***************************************************************************** | ||
2 | * | ||
3 | * Copyright (C) 2007-2016 Richard Hacker (lerichi at gmx dot net) | ||
4 | * Florian Pose <fp@igh.de> | ||
5 | * | ||
6 | * This file is part of the PdCom library. | ||
7 | * | ||
8 | * The PdCom library is free software: you can redistribute it and/or modify | ||
9 | * it under the terms of the GNU Lesser General Public License as published by | ||
10 | * the Free Software Foundation, either version 3 of the License, or (at your | ||
11 | * option) any later version. | ||
12 | * | ||
13 | * The PdCom library is distributed in the hope that it will be useful, but | ||
14 | * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY | ||
15 | * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public | ||
16 | * License for more details. | ||
17 | * | ||
18 | * You should have received a copy of the GNU Lesser General Public License | ||
19 | * along with the PdCom library. If not, see <http://www.gnu.org/licenses/>. | ||
20 | * | ||
21 | *****************************************************************************/ | ||
22 | |||
23 | #include "Process.h" | ||
24 | |||
25 | #include "Debug.h" | ||
26 | #include "Future.h" | ||
27 | #include "IOLayer.h" | ||
28 | #include "ProtocolHandler.h" | ||
29 | #include "Subscription.h" | ||
30 | #include "Variable.h" | ||
31 | #include "msrproto/ProtocolHandler.h" | ||
32 | #include "pdcom5.h" | ||
33 | |||
34 | #include <algorithm> | ||
35 | #include <git_revision_hash.h> | ||
36 | #include <mutex> | ||
37 | |||
38 | #define STR(x) #x | ||
39 | #define QUOTE(x) STR(x) | ||
40 | |||
41 | const char *const PdCom::pdcom_version_code = | ||
42 | QUOTE(PDCOM_MAJOR) "." QUOTE(PDCOM_MINOR) "." QUOTE(PDCOM_RELEASE); | ||
43 | |||
44 | const char *const PdCom::pdcom_full_version = GIT_REV; | ||
45 | |||
46 | using iProcess = PdCom::impl::Process; | ||
47 | |||
48 | namespace PdCom { namespace impl { | ||
49 | ✗ | class PendingSubscription : public PdCom::impl::Subscription | |
50 | { | ||
51 | public: | ||
52 | ✗ | PendingSubscription( | |
53 | std::shared_ptr<const PdCom::impl::Variable> variable, | ||
54 | PdCom::Subscription *This, | ||
55 | PdCom::Subscriber &subscriber, | ||
56 | ✗ | std::shared_ptr<iProcess> process) : | |
57 | ✗ | Subscription(std::move(variable), This, subscriber, process) | |
58 | {} | ||
59 | |||
60 | ✗ | static std::shared_ptr<PendingSubscription> make_and_notify( | |
61 | std::shared_ptr<const PdCom::impl::Variable> variable, | ||
62 | PdCom::Subscription *This, | ||
63 | PdCom::Subscriber &subscriber, | ||
64 | std::shared_ptr<iProcess> process) | ||
65 | { | ||
66 | auto ans = std::make_shared<PendingSubscription>( | ||
67 | ✗ | std::move(variable), This, subscriber, process); | |
68 | |||
69 | ✗ | process->pending_callbacks_.push_back( | |
70 | {ans, PdCom::Subscription::State::Pending}); | ||
71 | ✗ | return ans; | |
72 | } | ||
73 | |||
74 | |||
75 | ✗ | void poll() override { throw PdCom::InvalidSubscription(); } | |
76 | ✗ | const void *getData() const override { throw PdCom::InvalidSubscription(); } | |
77 | }; | ||
78 | }} // namespace PdCom::impl | ||
79 | |||
80 | using PdCom::impl::PendingSubscription; | ||
81 | |||
82 | /////////////////////////////////////////////////////////////////////////// | ||
83 | 136 | iProcess::Process(PdCom::Process *This) : | |
84 | std::enable_shared_from_this<iProcess>(), | ||
85 | PdCom::impl::IOLayer(nullptr), | ||
86 | This(This), | ||
87 |
1/2✓ Branch 4 taken 136 times.
✗ Branch 5 not taken.
|
136 | io(this) |
88 | 136 | {} | |
89 | |||
90 | /////////////////////////////////////////////////////////////////////////// | ||
91 | 96 | void iProcess::reset() | |
92 | { | ||
93 |
1/2✓ Branch 3 taken 96 times.
✗ Branch 4 not taken.
|
192 | std::unique_lock<AsyncDataMutex> lck(async_data_mutex_); |
94 | |||
95 | 96 | io->rxbytes = 0; | |
96 | 96 | io->txbytes = 0; | |
97 | |||
98 | 96 | protocolHandler_.reset(); | |
99 | 96 | } | |
100 | |||
101 | /////////////////////////////////////////////////////////////////////////// | ||
102 | 729 | void iProcess::asyncData() | |
103 | { | ||
104 |
3/4✓ Branch 3 taken 729 times.
✗ Branch 4 not taken.
✓ Branch 7 taken 134 times.
✓ Branch 8 taken 594 times.
|
864 | std::unique_lock<AsyncDataMutex> lck(async_data_mutex_); |
105 |
1/2✓ Branch 2 taken 729 times.
✗ Branch 3 not taken.
|
729 | pending_callbacks_.flush(); |
106 |
2/2✓ Branch 2 taken 594 times.
✓ Branch 3 taken 135 times.
|
729 | if (protocolHandler_) { |
107 |
1/2✓ Branch 8 taken 594 times.
✗ Branch 9 not taken.
|
594 | protocolHandler_->cancelSubscriptions(); |
108 |
2/4✓ Branch 8 taken 594 times.
✗ Branch 9 not taken.
✗ Branch 10 not taken.
✓ Branch 11 taken 594 times.
|
594 | if (!protocolHandler_->asyncData()) { |
109 | ✗ | protocolHandler_.reset(); | |
110 | // first, make all subs invalid | ||
111 | ✗ | auto old_subs = std::move(all_subscriptions_); | |
112 | ✗ | for (const auto ®istered_s : old_subs) { | |
113 | ✗ | if (const auto sub = registered_s.subscription_.lock()) { | |
114 | ✗ | pending_callbacks_.push_back( | |
115 | {sub, PdCom::Subscription::State::Invalid}); | ||
116 | } | ||
117 | } | ||
118 | // give control flow back | ||
119 | ✗ | pending_callbacks_.flush(); | |
120 | // replace all sub implementations as they are invalid now | ||
121 | ✗ | for (const auto ®istered_s : old_subs) { | |
122 | ✗ | if (const auto sub = registered_s.subscription_.lock()) { | |
123 | ✗ | if (!sub->This_) | |
124 | ✗ | continue; | |
125 | ✗ | auto new_sub = std::make_shared<PendingSubscription>( | |
126 | ✗ | nullptr, sub->This_, sub->subscriber_, | |
127 | ✗ | shared_from_this()); | |
128 | ✗ | sub->replaceImpl(new_sub); | |
129 | |||
130 | ✗ | all_subscriptions_.insert( | |
131 | {new_sub, registered_s.path_, | ||
132 | registered_s.selector_}); | ||
133 | } | ||
134 | } | ||
135 | } | ||
136 | else { | ||
137 |
1/2✓ Branch 8 taken 594 times.
✗ Branch 9 not taken.
|
594 | protocolHandler_->cancelSubscriptions(); |
138 | } | ||
139 |
1/2✓ Branch 2 taken 594 times.
✗ Branch 3 not taken.
|
594 | pending_callbacks_.flush(); |
140 | 594 | return; | |
141 | } | ||
142 | |||
143 | 135 | char buf[1024]; | |
144 |
1/2✓ Branch 15 taken 135 times.
✗ Branch 16 not taken.
|
135 | const size_t n = io->read(buf, sizeof(buf)); |
145 |
2/2✓ Branch 0 taken 132 times.
✓ Branch 1 taken 3 times.
|
135 | if (n > 0) { |
146 | 264 | protocolHandler_ = std::unique_ptr<ProtocolHandler>( | |
147 |
2/4✓ Branch 2 taken 132 times.
✗ Branch 3 not taken.
✓ Branch 9 taken 132 times.
✗ Branch 10 not taken.
|
264 | new MsrProto::ProtocolHandler(this, io)); |
148 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 131 times.
|
263 | if (!static_cast<MsrProto::ProtocolHandler &>(*protocolHandler_.get()) |
149 |
2/2✓ Branch 2 taken 131 times.
✓ Branch 3 taken 1 times.
|
132 | .parse(buf, n)) { |
150 | ✗ | protocolHandler_.reset(); | |
151 | } | ||
152 | } | ||
153 |
3/4✓ Branch 2 taken 134 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 134 times.
✓ Branch 6 taken 594 times.
|
134 | pending_callbacks_.flush(); |
154 | } | ||
155 | |||
156 | |||
157 | 12 | std::shared_ptr<PdCom::impl::Subscription> iProcess::subscribe( | |
158 | PdCom::Subscription *subscription, | ||
159 | const std::string &path, | ||
160 | PdCom::Subscriber &subscriber, | ||
161 | const PdCom::Selector &selector) | ||
162 | { | ||
163 | 12 | std::shared_ptr<PdCom::impl::Subscription> ans; | |
164 |
1/2✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
|
12 | if (protocolHandler_) |
165 |
1/2✓ Branch 3 taken 12 times.
✗ Branch 4 not taken.
|
36 | ans = protocolHandler_->subscribe( |
166 | 36 | subscription, path, subscriber, selector); | |
167 | else | ||
168 | ✗ | ans = PendingSubscription::make_and_notify( | |
169 | ✗ | nullptr, subscription, subscriber, shared_from_this()); | |
170 |
2/4✓ Branch 5 taken 12 times.
✗ Branch 6 not taken.
✓ Branch 11 taken 12 times.
✗ Branch 12 not taken.
|
12 | all_subscriptions_.insert({ans, path, selector.impl_}); |
171 | 12 | return ans; | |
172 | } | ||
173 | 42 | std::shared_ptr<PdCom::impl::Subscription> iProcess::subscribe( | |
174 | PdCom::Subscription *subscription, | ||
175 | std::shared_ptr<const PdCom::impl::Variable> var, | ||
176 | PdCom::Subscriber &subscriber, | ||
177 | const PdCom::Selector &selector) | ||
178 | { | ||
179 | 42 | std::shared_ptr<PdCom::impl::Subscription> ans; | |
180 |
1/2✓ Branch 2 taken 42 times.
✗ Branch 3 not taken.
|
42 | if (protocolHandler_) |
181 |
1/2✓ Branch 5 taken 42 times.
✗ Branch 6 not taken.
|
126 | ans = protocolHandler_->subscribe( |
182 | 126 | subscription, var, subscriber, selector); | |
183 | else | ||
184 | ✗ | ans = PendingSubscription::make_and_notify( | |
185 | ✗ | nullptr, subscription, subscriber, shared_from_this()); | |
186 |
2/4✓ Branch 7 taken 42 times.
✗ Branch 8 not taken.
✓ Branch 13 taken 42 times.
✗ Branch 14 not taken.
|
42 | all_subscriptions_.insert({ans, var->getPath(), selector.impl_}); |
187 | 42 | return ans; | |
188 | } | ||
189 | |||
190 | 126 | void iProcess::connected() | |
191 | { | ||
192 | 252 | auto old_subs = std::move(all_subscriptions_); | |
193 |
1/2✗ Branch 5 not taken.
✓ Branch 6 taken 126 times.
|
126 | for (const auto ®istered_sub : old_subs) |
194 | ✗ | if (const auto sub = registered_sub.subscription_.lock()) { | |
195 | ✗ | auto new_sub = protocolHandler_->subscribe( | |
196 | ✗ | sub->This_, registered_sub.path_, sub->subscriber_, | |
197 | ✗ | {registered_sub.selector_}); | |
198 | ✗ | sub->replaceImpl(new_sub); | |
199 | ✗ | all_subscriptions_.insert( | |
200 | {new_sub, registered_sub.path_, registered_sub.selector_}); | ||
201 | } | ||
202 |
1/2✓ Branch 24 taken 126 times.
✗ Branch 25 not taken.
|
126 | This->connected(); |
203 | 126 | } | |
204 | |||
205 | using PdCom::impl::Subscription; | ||
206 | |||
207 | 10 | void iProcess::replace( | |
208 | std::shared_ptr<Subscription> const &old_s, | ||
209 | std::shared_ptr<Subscription> const &new_s) | ||
210 | { | ||
211 | 10 | const auto it = std::find_if( | |
212 | 30 | all_subscriptions_.begin(), all_subscriptions_.end(), | |
213 | 50 | [&old_s](const RegisteredSubscription &rs) { | |
214 | 30 | return !rs.subscription_.owner_before(old_s) | |
215 |
2/4✓ Branch 0 taken 10 times.
✗ Branch 1 not taken.
✓ Branch 5 taken 10 times.
✗ Branch 6 not taken.
|
30 | and !old_s.owner_before(rs.subscription_); |
216 |
1/2✓ Branch 3 taken 10 times.
✗ Branch 4 not taken.
|
50 | }); |
217 |
1/2✓ Branch 6 taken 10 times.
✗ Branch 7 not taken.
|
10 | if (it != all_subscriptions_.end()) { |
218 |
2/4✓ Branch 6 taken 10 times.
✗ Branch 7 not taken.
✓ Branch 13 taken 10 times.
✗ Branch 14 not taken.
|
10 | all_subscriptions_.insert({new_s, it->path_, it->selector_}); |
219 |
1/2✓ Branch 4 taken 10 times.
✗ Branch 5 not taken.
|
10 | all_subscriptions_.erase(it); |
220 | } | ||
221 | 10 | } | |
222 | |||
223 | |||
224 | using PdCom::Process; | ||
225 | |||
226 | Process::~Process() = default; | ||
227 | |||
228 |
1/2✓ Branch 9 taken 124 times.
✗ Branch 10 not taken.
|
124 | Process::Process() : pimpl(std::make_shared<PdCom::impl::Process>(this)) |
229 | 124 | {} | |
230 | |||
231 | 12 | Process::Process(std::shared_ptr<iProcess> impl) : pimpl(impl) | |
232 | 12 | {} | |
233 | |||
234 | 14 | std::string Process::name() const | |
235 | { | ||
236 | 14 | return pimpl->protocolHandler().name(); | |
237 | } | ||
238 | |||
239 | 11 | std::string Process::version() const | |
240 | { | ||
241 | 11 | return pimpl->protocolHandler().version(); | |
242 | } | ||
243 | |||
244 | 23 | void Process::setAuthManager(PdCom::Sasl *am) | |
245 | { | ||
246 | 23 | pimpl->setAuthManager(am); | |
247 | 23 | } | |
248 | |||
249 | ✗ | PdCom::Sasl *Process::getAuthManager() const | |
250 | { | ||
251 | ✗ | return pimpl->auth_manager_; | |
252 | } | ||
253 | |||
254 | 104 | void Process::setMessageManager(PdCom::MessageManagerBase *mm) | |
255 | { | ||
256 | 104 | pimpl->setMessageManager(mm); | |
257 | 104 | } | |
258 | |||
259 | 96 | void Process::reset() | |
260 | { | ||
261 | 96 | pimpl->reset(); | |
262 | 96 | } | |
263 | |||
264 | 729 | void Process::asyncData() | |
265 | { | ||
266 | 729 | pimpl->asyncData(); | |
267 | 728 | } | |
268 | |||
269 | 19 | bool Process::list(const std::string &path) | |
270 | { | ||
271 | 19 | return pimpl->protocolHandler().list(path); | |
272 | } | ||
273 | |||
274 | 70 | bool Process::find(const std::string &path) | |
275 | { | ||
276 | 70 | return pimpl->protocolHandler().find(path); | |
277 | } | ||
278 | |||
279 | 6 | void PdCom::Process::getClientStatistics() | |
280 | { | ||
281 | 6 | pimpl->protocolHandler().getClientStatistics(); | |
282 | 6 | } | |
283 | |||
284 | ✗ | void PdCom::Process::clientStatisticsReply(std::vector<PdCom::ClientStatistics>) | |
285 | {} | ||
286 | |||
287 | 4 | void Process::ping() | |
288 | { | ||
289 | 4 | pimpl->protocolHandler().ping(); | |
290 | 4 | } | |
291 | |||
292 | 29 | void Process::broadcast(const std::string &message, const std::string &attr) | |
293 | { | ||
294 | 29 | pimpl->protocolHandler().broadcast(message, attr); | |
295 | 20 | } | |
296 | |||
297 | ✗ | void Process::broadcastReply( | |
298 | const std::string &, | ||
299 | const std::string &, | ||
300 | std::chrono::nanoseconds, | ||
301 | const std::string &) | ||
302 | {} | ||
303 | ✗ | void Process::listReply(std::vector<PdCom::Variable>, std::vector<std::string>) | |
304 | {} | ||
305 | |||
306 | ✗ | void Process::findReply(const PdCom::Variable &) | |
307 | {} | ||
308 | |||
309 | 211 | void Process::callPendingCallbacks() | |
310 | { | ||
311 | 211 | pimpl->pending_callbacks_.flush(); | |
312 |
2/2✓ Branch 6 taken 154 times.
✓ Branch 7 taken 57 times.
|
211 | if (const auto p = pimpl->castProtocolHandler()) |
313 |
1/2✓ Branch 4 taken 154 times.
✗ Branch 5 not taken.
|
154 | if (!pimpl->async_data_mutex_.isLocked()) |
314 | 154 | p->cancelSubscriptions(); | |
315 | 211 | } | |
316 | |||
317 | ✗ | std::vector<Process::SubscriptionInfo> Process::getActiveSubscriptions() const | |
318 | { | ||
319 | ✗ | return pimpl->protocolHandler().getActiveSubscriptions(); | |
320 | } | ||
321 | |||
322 | 1744 | void iProcess::PendingCallbackQueue::flush() | |
323 | { | ||
324 |
2/2✓ Branch 2 taken 76 times.
✓ Branch 3 taken 1668 times.
|
1744 | while (!empty()) { |
325 | 152 | const auto p = front(); | |
326 | 76 | pop_front(); | |
327 |
1/2✓ Branch 1 taken 76 times.
✗ Branch 2 not taken.
|
76 | p.execute(); |
328 | } | ||
329 | 1668 | } | |
330 | |||
331 | 8 | PdCom::Variable::PollFuture PdCom::Variable::poll() const | |
332 | { | ||
333 |
2/4✓ Branch 3 taken 8 times.
✗ Branch 4 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 8 times.
|
8 | if (auto impl = pimpl_.lock()) { |
334 |
2/4✓ Branch 4 taken 8 times.
✗ Branch 5 not taken.
✗ Branch 7 not taken.
✓ Branch 8 taken 8 times.
|
8 | if (const auto process = impl->process.lock()) { |
335 | 8 | auto ans = createFuture< | |
336 | PdCom::Exception const &, VariablePollResult, | ||
337 |
2/4✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 8 times.
|
16 | std::chrono::nanoseconds>(); |
338 |
2/4✓ Branch 5 taken 8 times.
✗ Branch 6 not taken.
✓ Branch 13 taken 8 times.
✗ Branch 14 not taken.
|
24 | process->protocolHandler().pollVariable( |
339 | 16 | *impl, std::move(ans.second)); | |
340 | 16 | return std::move(ans.first); | |
341 | } | ||
342 | ✗ | throw ProcessGoneAway(); | |
343 | } | ||
344 | ✗ | throw EmptyVariable(); | |
345 | } | ||
346 |