GCC Code Coverage Report


Directory: ./
File: pdcom5/src/Process.cpp
Date: 2023-11-12 04:06:57
Exec Total Coverage
Lines: 109 155 70.3%
Branches: 57 158 36.1%

Line Branch Exec Source
1 /*****************************************************************************
2 *
3 * Copyright (C) 2007-2016 Richard Hacker (lerichi at gmx dot net)
4 * Florian Pose <fp@igh.de>
5 *
6 * This file is part of the PdCom library.
7 *
8 * The PdCom library is free software: you can redistribute it and/or modify
9 * it under the terms of the GNU Lesser General Public License as published by
10 * the Free Software Foundation, either version 3 of the License, or (at your
11 * option) any later version.
12 *
13 * The PdCom library is distributed in the hope that it will be useful, but
14 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
15 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
16 * License for more details.
17 *
18 * You should have received a copy of the GNU Lesser General Public License
19 * along with the PdCom library. If not, see <http://www.gnu.org/licenses/>.
20 *
21 *****************************************************************************/
22
23 #include "Process.h"
24
25 #include "Debug.h"
26 #include "Future.h"
27 #include "IOLayer.h"
28 #include "ProtocolHandler.h"
29 #include "Subscription.h"
30 #include "Variable.h"
31 #include "msrproto/ProtocolHandler.h"
32 #include "pdcom5.h"
33
34 #include <algorithm>
35 #include <git_revision_hash.h>
36 #include <mutex>
37
38 #define STR(x) #x
39 #define QUOTE(x) STR(x)
40
41 const char *const PdCom::pdcom_version_code =
42 QUOTE(PDCOM_MAJOR) "." QUOTE(PDCOM_MINOR) "." QUOTE(PDCOM_RELEASE);
43
44 const char *const PdCom::pdcom_full_version = GIT_REV;
45
46 using iProcess = PdCom::impl::Process;
47
48 namespace PdCom { namespace impl {
49 class PendingSubscription : public PdCom::impl::Subscription
50 {
51 public:
52 PendingSubscription(
53 std::shared_ptr<const PdCom::impl::Variable> variable,
54 PdCom::Subscription *This,
55 PdCom::Subscriber &subscriber,
56 std::shared_ptr<iProcess> process) :
57 Subscription(std::move(variable), This, subscriber, process)
58 {}
59
60 static std::shared_ptr<PendingSubscription> make_and_notify(
61 std::shared_ptr<const PdCom::impl::Variable> variable,
62 PdCom::Subscription *This,
63 PdCom::Subscriber &subscriber,
64 std::shared_ptr<iProcess> process)
65 {
66 auto ans = std::make_shared<PendingSubscription>(
67 std::move(variable), This, subscriber, process);
68
69 process->pending_callbacks_.push_back(
70 {ans, PdCom::Subscription::State::Pending});
71 return ans;
72 }
73
74
75 void poll() override { throw PdCom::InvalidSubscription(); }
76 const void *getData() const override { throw PdCom::InvalidSubscription(); }
77 };
78 }} // namespace PdCom::impl
79
80 using PdCom::impl::PendingSubscription;
81
82 ///////////////////////////////////////////////////////////////////////////
83 136 iProcess::Process(PdCom::Process *This) :
84 std::enable_shared_from_this<iProcess>(),
85 PdCom::impl::IOLayer(nullptr),
86 This(This),
87
1/2
✓ Branch 4 taken 136 times.
✗ Branch 5 not taken.
136 io(this)
88 136 {}
89
90 ///////////////////////////////////////////////////////////////////////////
91 96 void iProcess::reset()
92 {
93
1/2
✓ Branch 3 taken 96 times.
✗ Branch 4 not taken.
192 std::unique_lock<AsyncDataMutex> lck(async_data_mutex_);
94
95 96 io->rxbytes = 0;
96 96 io->txbytes = 0;
97
98 96 protocolHandler_.reset();
99 96 }
100
101 ///////////////////////////////////////////////////////////////////////////
102 729 void iProcess::asyncData()
103 {
104
3/4
✓ Branch 3 taken 729 times.
✗ Branch 4 not taken.
✓ Branch 7 taken 134 times.
✓ Branch 8 taken 594 times.
864 std::unique_lock<AsyncDataMutex> lck(async_data_mutex_);
105
1/2
✓ Branch 2 taken 729 times.
✗ Branch 3 not taken.
729 pending_callbacks_.flush();
106
2/2
✓ Branch 2 taken 594 times.
✓ Branch 3 taken 135 times.
729 if (protocolHandler_) {
107
1/2
✓ Branch 8 taken 594 times.
✗ Branch 9 not taken.
594 protocolHandler_->cancelSubscriptions();
108
2/4
✓ Branch 8 taken 594 times.
✗ Branch 9 not taken.
✗ Branch 10 not taken.
✓ Branch 11 taken 594 times.
594 if (!protocolHandler_->asyncData()) {
109 protocolHandler_.reset();
110 // first, make all subs invalid
111 auto old_subs = std::move(all_subscriptions_);
112 for (const auto &registered_s : old_subs) {
113 if (const auto sub = registered_s.subscription_.lock()) {
114 pending_callbacks_.push_back(
115 {sub, PdCom::Subscription::State::Invalid});
116 }
117 }
118 // give control flow back
119 pending_callbacks_.flush();
120 // replace all sub implementations as they are invalid now
121 for (const auto &registered_s : old_subs) {
122 if (const auto sub = registered_s.subscription_.lock()) {
123 if (!sub->This_)
124 continue;
125 auto new_sub = std::make_shared<PendingSubscription>(
126 nullptr, sub->This_, sub->subscriber_,
127 shared_from_this());
128 sub->replaceImpl(new_sub);
129
130 all_subscriptions_.insert(
131 {new_sub, registered_s.path_,
132 registered_s.selector_});
133 }
134 }
135 }
136 else {
137
1/2
✓ Branch 8 taken 594 times.
✗ Branch 9 not taken.
594 protocolHandler_->cancelSubscriptions();
138 }
139
1/2
✓ Branch 2 taken 594 times.
✗ Branch 3 not taken.
594 pending_callbacks_.flush();
140 594 return;
141 }
142
143 135 char buf[1024];
144
1/2
✓ Branch 15 taken 135 times.
✗ Branch 16 not taken.
135 const size_t n = io->read(buf, sizeof(buf));
145
2/2
✓ Branch 0 taken 132 times.
✓ Branch 1 taken 3 times.
135 if (n > 0) {
146 264 protocolHandler_ = std::unique_ptr<ProtocolHandler>(
147
2/4
✓ Branch 2 taken 132 times.
✗ Branch 3 not taken.
✓ Branch 9 taken 132 times.
✗ Branch 10 not taken.
264 new MsrProto::ProtocolHandler(this, io));
148
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 131 times.
263 if (!static_cast<MsrProto::ProtocolHandler &>(*protocolHandler_.get())
149
2/2
✓ Branch 2 taken 131 times.
✓ Branch 3 taken 1 times.
132 .parse(buf, n)) {
150 protocolHandler_.reset();
151 }
152 }
153
3/4
✓ Branch 2 taken 134 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 134 times.
✓ Branch 6 taken 594 times.
134 pending_callbacks_.flush();
154 }
155
156
157 12 std::shared_ptr<PdCom::impl::Subscription> iProcess::subscribe(
158 PdCom::Subscription *subscription,
159 const std::string &path,
160 PdCom::Subscriber &subscriber,
161 const PdCom::Selector &selector)
162 {
163 12 std::shared_ptr<PdCom::impl::Subscription> ans;
164
1/2
✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
12 if (protocolHandler_)
165
1/2
✓ Branch 3 taken 12 times.
✗ Branch 4 not taken.
36 ans = protocolHandler_->subscribe(
166 36 subscription, path, subscriber, selector);
167 else
168 ans = PendingSubscription::make_and_notify(
169 nullptr, subscription, subscriber, shared_from_this());
170
2/4
✓ Branch 5 taken 12 times.
✗ Branch 6 not taken.
✓ Branch 11 taken 12 times.
✗ Branch 12 not taken.
12 all_subscriptions_.insert({ans, path, selector.impl_});
171 12 return ans;
172 }
173 42 std::shared_ptr<PdCom::impl::Subscription> iProcess::subscribe(
174 PdCom::Subscription *subscription,
175 std::shared_ptr<const PdCom::impl::Variable> var,
176 PdCom::Subscriber &subscriber,
177 const PdCom::Selector &selector)
178 {
179 42 std::shared_ptr<PdCom::impl::Subscription> ans;
180
1/2
✓ Branch 2 taken 42 times.
✗ Branch 3 not taken.
42 if (protocolHandler_)
181
1/2
✓ Branch 5 taken 42 times.
✗ Branch 6 not taken.
126 ans = protocolHandler_->subscribe(
182 126 subscription, var, subscriber, selector);
183 else
184 ans = PendingSubscription::make_and_notify(
185 nullptr, subscription, subscriber, shared_from_this());
186
2/4
✓ Branch 7 taken 42 times.
✗ Branch 8 not taken.
✓ Branch 13 taken 42 times.
✗ Branch 14 not taken.
42 all_subscriptions_.insert({ans, var->getPath(), selector.impl_});
187 42 return ans;
188 }
189
190 126 void iProcess::connected()
191 {
192 252 auto old_subs = std::move(all_subscriptions_);
193
1/2
✗ Branch 5 not taken.
✓ Branch 6 taken 126 times.
126 for (const auto &registered_sub : old_subs)
194 if (const auto sub = registered_sub.subscription_.lock()) {
195 auto new_sub = protocolHandler_->subscribe(
196 sub->This_, registered_sub.path_, sub->subscriber_,
197 {registered_sub.selector_});
198 sub->replaceImpl(new_sub);
199 all_subscriptions_.insert(
200 {new_sub, registered_sub.path_, registered_sub.selector_});
201 }
202
1/2
✓ Branch 24 taken 126 times.
✗ Branch 25 not taken.
126 This->connected();
203 126 }
204
205 using PdCom::impl::Subscription;
206
207 10 void iProcess::replace(
208 std::shared_ptr<Subscription> const &old_s,
209 std::shared_ptr<Subscription> const &new_s)
210 {
211 10 const auto it = std::find_if(
212 30 all_subscriptions_.begin(), all_subscriptions_.end(),
213 50 [&old_s](const RegisteredSubscription &rs) {
214 30 return !rs.subscription_.owner_before(old_s)
215
2/4
✓ Branch 0 taken 10 times.
✗ Branch 1 not taken.
✓ Branch 5 taken 10 times.
✗ Branch 6 not taken.
30 and !old_s.owner_before(rs.subscription_);
216
1/2
✓ Branch 3 taken 10 times.
✗ Branch 4 not taken.
50 });
217
1/2
✓ Branch 6 taken 10 times.
✗ Branch 7 not taken.
10 if (it != all_subscriptions_.end()) {
218
2/4
✓ Branch 6 taken 10 times.
✗ Branch 7 not taken.
✓ Branch 13 taken 10 times.
✗ Branch 14 not taken.
10 all_subscriptions_.insert({new_s, it->path_, it->selector_});
219
1/2
✓ Branch 4 taken 10 times.
✗ Branch 5 not taken.
10 all_subscriptions_.erase(it);
220 }
221 10 }
222
223
224 using PdCom::Process;
225
226 Process::~Process() = default;
227
228
1/2
✓ Branch 9 taken 124 times.
✗ Branch 10 not taken.
124 Process::Process() : pimpl(std::make_shared<PdCom::impl::Process>(this))
229 124 {}
230
231 12 Process::Process(std::shared_ptr<iProcess> impl) : pimpl(impl)
232 12 {}
233
234 14 std::string Process::name() const
235 {
236 14 return pimpl->protocolHandler().name();
237 }
238
239 11 std::string Process::version() const
240 {
241 11 return pimpl->protocolHandler().version();
242 }
243
244 23 void Process::setAuthManager(PdCom::Sasl *am)
245 {
246 23 pimpl->setAuthManager(am);
247 23 }
248
249 PdCom::Sasl *Process::getAuthManager() const
250 {
251 return pimpl->auth_manager_;
252 }
253
254 104 void Process::setMessageManager(PdCom::MessageManagerBase *mm)
255 {
256 104 pimpl->setMessageManager(mm);
257 104 }
258
259 96 void Process::reset()
260 {
261 96 pimpl->reset();
262 96 }
263
264 729 void Process::asyncData()
265 {
266 729 pimpl->asyncData();
267 728 }
268
269 19 bool Process::list(const std::string &path)
270 {
271 19 return pimpl->protocolHandler().list(path);
272 }
273
274 70 bool Process::find(const std::string &path)
275 {
276 70 return pimpl->protocolHandler().find(path);
277 }
278
279 6 void PdCom::Process::getClientStatistics()
280 {
281 6 pimpl->protocolHandler().getClientStatistics();
282 6 }
283
284 void PdCom::Process::clientStatisticsReply(std::vector<PdCom::ClientStatistics>)
285 {}
286
287 4 void Process::ping()
288 {
289 4 pimpl->protocolHandler().ping();
290 4 }
291
292 29 void Process::broadcast(const std::string &message, const std::string &attr)
293 {
294 29 pimpl->protocolHandler().broadcast(message, attr);
295 20 }
296
297 void Process::broadcastReply(
298 const std::string &,
299 const std::string &,
300 std::chrono::nanoseconds,
301 const std::string &)
302 {}
303 void Process::listReply(std::vector<PdCom::Variable>, std::vector<std::string>)
304 {}
305
306 void Process::findReply(const PdCom::Variable &)
307 {}
308
309 211 void Process::callPendingCallbacks()
310 {
311 211 pimpl->pending_callbacks_.flush();
312
2/2
✓ Branch 6 taken 154 times.
✓ Branch 7 taken 57 times.
211 if (const auto p = pimpl->castProtocolHandler())
313
1/2
✓ Branch 4 taken 154 times.
✗ Branch 5 not taken.
154 if (!pimpl->async_data_mutex_.isLocked())
314 154 p->cancelSubscriptions();
315 211 }
316
317 std::vector<Process::SubscriptionInfo> Process::getActiveSubscriptions() const
318 {
319 return pimpl->protocolHandler().getActiveSubscriptions();
320 }
321
322 1744 void iProcess::PendingCallbackQueue::flush()
323 {
324
2/2
✓ Branch 2 taken 76 times.
✓ Branch 3 taken 1668 times.
1744 while (!empty()) {
325 152 const auto p = front();
326 76 pop_front();
327
1/2
✓ Branch 1 taken 76 times.
✗ Branch 2 not taken.
76 p.execute();
328 }
329 1668 }
330
331 8 PdCom::Variable::PollFuture PdCom::Variable::poll() const
332 {
333
2/4
✓ Branch 3 taken 8 times.
✗ Branch 4 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 8 times.
8 if (auto impl = pimpl_.lock()) {
334
2/4
✓ Branch 4 taken 8 times.
✗ Branch 5 not taken.
✗ Branch 7 not taken.
✓ Branch 8 taken 8 times.
8 if (const auto process = impl->process.lock()) {
335 8 auto ans = createFuture<
336 PdCom::Exception const &, VariablePollResult,
337
2/4
✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 8 times.
16 std::chrono::nanoseconds>();
338
2/4
✓ Branch 5 taken 8 times.
✗ Branch 6 not taken.
✓ Branch 13 taken 8 times.
✗ Branch 14 not taken.
24 process->protocolHandler().pollVariable(
339 16 *impl, std::move(ans.second));
340 16 return std::move(ans.first);
341 }
342 throw ProcessGoneAway();
343 }
344 throw EmptyVariable();
345 }
346