GCC Code Coverage Report


Directory: ./
File: src/Process.cpp
Date: 2024-11-05 15:23:15
Exec Total Coverage
Lines: 132 145 91.0%
Branches: 87 144 60.4%

Line Branch Exec Source
1 /*****************************************************************************
2 *
3 * Copyright (C) 2007-2016 Richard Hacker (lerichi at gmx dot net)
4 * Florian Pose <fp@igh.de>
5 *
6 * This file is part of the PdCom library.
7 *
8 * The PdCom library is free software: you can redistribute it and/or modify
9 * it under the terms of the GNU Lesser General Public License as published by
10 * the Free Software Foundation, either version 3 of the License, or (at your
11 * option) any later version.
12 *
13 * The PdCom library is distributed in the hope that it will be useful, but
14 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
15 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
16 * License for more details.
17 *
18 * You should have received a copy of the GNU Lesser General Public License
19 * along with the PdCom library. If not, see <http://www.gnu.org/licenses/>.
20 *
21 *****************************************************************************/
22
23 #include "Process.h"
24
25 #include "Debug.h"
26 #include "Future.h"
27 #include "IOLayer.h"
28 #include "ProtocolHandler.h"
29 #include "Subscription.h"
30 #include "Variable.h"
31 #include "msrproto/ProtocolHandler.h"
32 #include "pdcom5.h"
33
34 #include <algorithm>
35 #include <git_revision_hash.h>
36 #include <mutex>
37
38 #define STR(x) #x
39 #define QUOTE(x) STR(x)
40
41 const char *const PdCom::pdcom_version_code =
42 QUOTE(PDCOM_MAJOR) "." QUOTE(PDCOM_MINOR) "." QUOTE(PDCOM_RELEASE);
43
44 const char *const PdCom::pdcom_full_version = GIT_REV;
45
46 using iProcess = PdCom::impl::Process;
47
48 namespace PdCom { namespace impl {
49 22 class PendingSubscription : public PdCom::impl::Subscription
50 {
51 public:
52 22 PendingSubscription(
53 std::shared_ptr<const PdCom::impl::Variable> variable,
54 PdCom::Subscription *This,
55 PdCom::Subscriber &subscriber,
56 22 std::shared_ptr<iProcess> process) :
57 22 Subscription(std::move(variable), This, subscriber, process)
58 22 {}
59
60 11 static std::shared_ptr<PendingSubscription> make_and_notify(
61 std::shared_ptr<const PdCom::impl::Variable> variable,
62 PdCom::Subscription *This,
63 PdCom::Subscriber &subscriber,
64 std::shared_ptr<iProcess> process)
65 {
66 auto ans = std::make_shared<PendingSubscription>(
67 11 std::move(variable), This, subscriber, process);
68
69
1/2
✓ Branch 9 taken 11 times.
✗ Branch 10 not taken.
11 process->pending_callbacks_.push_back(
70 {ans, PdCom::Subscription::State::Pending});
71 11 return ans;
72 }
73
74
75
1/2
✓ Branch 2 taken 8 times.
✗ Branch 3 not taken.
8 void poll() override { throw PdCom::InvalidSubscription(); }
76
1/2
✓ Branch 2 taken 8 times.
✗ Branch 3 not taken.
8 const void *getData() const override { throw PdCom::InvalidSubscription(); }
77 };
78 }} // namespace PdCom::impl
79
80 using PdCom::impl::PendingSubscription;
81
82 ///////////////////////////////////////////////////////////////////////////
83 61 iProcess::Process(PdCom::Process *This) :
84 std::enable_shared_from_this<iProcess>(),
85 PdCom::impl::IOLayer(nullptr),
86 This(This),
87
1/2
✓ Branch 4 taken 61 times.
✗ Branch 5 not taken.
61 io(this)
88 61 {}
89
90 ///////////////////////////////////////////////////////////////////////////
91 5 void iProcess::reset()
92 {
93
2/2
✓ Branch 3 taken 4 times.
✓ Branch 4 taken 1 times.
9 std::unique_lock<AsyncDataMutex> lck(async_data_mutex_);
94
95 4 io->rxbytes = 0;
96 4 io->txbytes = 0;
97
98 4 protocolHandler_.reset();
99 4 }
100
101 ///////////////////////////////////////////////////////////////////////////
102 370 void iProcess::asyncData()
103 {
104
2/2
✓ Branch 3 taken 369 times.
✓ Branch 4 taken 1 times.
739 std::unique_lock<AsyncDataMutex> lck(async_data_mutex_);
105
1/2
✓ Branch 2 taken 369 times.
✗ Branch 3 not taken.
369 pending_callbacks_.flush();
106
2/2
✓ Branch 2 taken 67 times.
✓ Branch 3 taken 302 times.
369 if (!protocolHandler_)
107
2/4
✓ Branch 2 taken 67 times.
✗ Branch 3 not taken.
✓ Branch 9 taken 67 times.
✗ Branch 10 not taken.
67 protocolHandler_.reset(new MsrProto::ProtocolHandler(this, io));
108
109
1/2
✓ Branch 8 taken 369 times.
✗ Branch 9 not taken.
369 protocolHandler_->cancelSubscriptions();
110
4/4
✓ Branch 8 taken 366 times.
✓ Branch 9 taken 3 times.
✓ Branch 10 taken 11 times.
✓ Branch 11 taken 355 times.
369 if (!protocolHandler_->asyncData()) {
111 11 protocolHandler_.reset();
112 // first, make all subs invalid
113 22 auto old_subs = std::move(all_subscriptions_);
114
2/2
✓ Branch 5 taken 11 times.
✓ Branch 6 taken 11 times.
22 for (const auto &registered_s : old_subs) {
115
1/2
✓ Branch 3 taken 11 times.
✗ Branch 4 not taken.
22 if (const auto sub = registered_s.subscription_.lock()) {
116
1/2
✓ Branch 5 taken 11 times.
✗ Branch 6 not taken.
11 pending_callbacks_.push_back(
117 {sub, PdCom::Subscription::State::Invalid});
118 }
119 }
120 // give control flow back
121
1/2
✓ Branch 2 taken 11 times.
✗ Branch 3 not taken.
11 pending_callbacks_.flush();
122 // replace all sub implementations as they are invalid now
123
2/2
✓ Branch 5 taken 11 times.
✓ Branch 6 taken 11 times.
22 for (const auto &registered_s : old_subs) {
124
2/4
✓ Branch 3 taken 11 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 11 times.
✗ Branch 7 not taken.
22 if (const auto sub = registered_s.subscription_.lock()) {
125
1/2
✗ Branch 4 not taken.
✓ Branch 5 taken 11 times.
11 if (!sub->This_)
126 continue;
127 11 auto new_sub = std::make_shared<PendingSubscription>(
128 22 nullptr, sub->This_, sub->subscriber_,
129
4/8
✓ Branch 2 taken 11 times.
✗ Branch 3 not taken.
✓ Branch 10 taken 11 times.
✗ Branch 11 not taken.
✓ Branch 17 taken 11 times.
✗ Branch 18 not taken.
✓ Branch 20 taken 11 times.
✗ Branch 21 not taken.
44 shared_from_this());
130
1/2
✓ Branch 7 taken 11 times.
✗ Branch 8 not taken.
11 sub->replaceImpl(new_sub);
131
132
2/4
✓ Branch 5 taken 11 times.
✗ Branch 6 not taken.
✓ Branch 11 taken 11 times.
✗ Branch 12 not taken.
11 all_subscriptions_.insert(
133 {new_sub, registered_s.path_, registered_s.selector_});
134 }
135 }
136 }
137 else {
138
1/2
✓ Branch 8 taken 355 times.
✗ Branch 9 not taken.
355 protocolHandler_->cancelSubscriptions();
139 }
140
1/2
✓ Branch 2 taken 366 times.
✗ Branch 3 not taken.
366 pending_callbacks_.flush();
141 366 }
142
143
144 50 std::shared_ptr<PdCom::impl::Subscription> iProcess::subscribe(
145 PdCom::Subscription *subscription,
146 const std::string &path,
147 PdCom::Subscriber &subscriber,
148 const PdCom::Selector &selector)
149 {
150 50 std::shared_ptr<PdCom::impl::Subscription> ans;
151
2/2
✓ Branch 2 taken 39 times.
✓ Branch 3 taken 11 times.
50 if (protocolHandler_)
152
1/2
✓ Branch 3 taken 39 times.
✗ Branch 4 not taken.
117 ans = protocolHandler_->subscribe(
153 117 subscription, path, subscriber, selector);
154 else
155
1/2
✓ Branch 4 taken 11 times.
✗ Branch 5 not taken.
33 ans = PendingSubscription::make_and_notify(
156
1/2
✓ Branch 2 taken 11 times.
✗ Branch 3 not taken.
44 nullptr, subscription, subscriber, shared_from_this());
157
2/4
✓ Branch 5 taken 50 times.
✗ Branch 6 not taken.
✓ Branch 11 taken 50 times.
✗ Branch 12 not taken.
50 all_subscriptions_.insert({ans, path, selector.impl_});
158 50 return ans;
159 }
160 55 std::shared_ptr<PdCom::impl::Subscription> iProcess::subscribe(
161 PdCom::Subscription *subscription,
162 std::shared_ptr<const PdCom::impl::Variable> var,
163 PdCom::Subscriber &subscriber,
164 const PdCom::Selector &selector)
165 {
166 55 std::shared_ptr<PdCom::impl::Subscription> ans;
167
1/2
✓ Branch 2 taken 55 times.
✗ Branch 3 not taken.
55 if (protocolHandler_)
168
2/2
✓ Branch 5 taken 51 times.
✓ Branch 6 taken 4 times.
165 ans = protocolHandler_->subscribe(
169 161 subscription, var, subscriber, selector);
170 else
171 ans = PendingSubscription::make_and_notify(
172 nullptr, subscription, subscriber, shared_from_this());
173
2/4
✓ Branch 7 taken 51 times.
✗ Branch 8 not taken.
✓ Branch 13 taken 51 times.
✗ Branch 14 not taken.
51 all_subscriptions_.insert({ans, var->getPath(), selector.impl_});
174 51 return ans;
175 }
176
177 67 void iProcess::connected()
178 {
179 134 auto old_subs = std::move(all_subscriptions_);
180
2/2
✓ Branch 5 taken 13 times.
✓ Branch 6 taken 67 times.
80 for (const auto &registered_sub : old_subs)
181
2/2
✓ Branch 3 taken 5 times.
✓ Branch 4 taken 8 times.
26 if (const auto sub = registered_sub.subscription_.lock()) {
182 5 auto new_sub = protocolHandler_->subscribe(
183 10 sub->This_, registered_sub.path_, sub->subscriber_,
184
1/2
✓ Branch 12 taken 5 times.
✗ Branch 13 not taken.
20 {registered_sub.selector_});
185
1/2
✓ Branch 7 taken 5 times.
✗ Branch 8 not taken.
5 sub->replaceImpl(new_sub);
186
2/4
✓ Branch 5 taken 5 times.
✗ Branch 6 not taken.
✓ Branch 11 taken 5 times.
✗ Branch 12 not taken.
5 all_subscriptions_.insert(
187 {new_sub, registered_sub.path_, registered_sub.selector_});
188 }
189
2/2
✓ Branch 24 taken 66 times.
✓ Branch 25 taken 1 times.
67 This->connected();
190 66 }
191
192 using PdCom::impl::Subscription;
193
194 26 void iProcess::replace(
195 std::shared_ptr<Subscription> const &old_s,
196 std::shared_ptr<Subscription> const &new_s)
197 {
198 26 const auto it = std::find_if(
199 78 all_subscriptions_.begin(), all_subscriptions_.end(),
200 136 [&old_s](const RegisteredSubscription &rs) {
201 84 return !rs.subscription_.owner_before(old_s)
202
3/4
✓ Branch 0 taken 26 times.
✓ Branch 1 taken 2 times.
✓ Branch 5 taken 26 times.
✗ Branch 6 not taken.
80 and !old_s.owner_before(rs.subscription_);
203
1/2
✓ Branch 3 taken 26 times.
✗ Branch 4 not taken.
132 });
204
1/2
✓ Branch 6 taken 26 times.
✗ Branch 7 not taken.
26 if (it != all_subscriptions_.end()) {
205
2/4
✓ Branch 6 taken 26 times.
✗ Branch 7 not taken.
✓ Branch 13 taken 26 times.
✗ Branch 14 not taken.
26 all_subscriptions_.insert({new_s, it->path_, it->selector_});
206
1/2
✓ Branch 4 taken 26 times.
✗ Branch 5 not taken.
26 all_subscriptions_.erase(it);
207 }
208 26 }
209
210
211 using PdCom::Process;
212
213 Process::~Process() = default;
214
215
1/2
✓ Branch 9 taken 61 times.
✗ Branch 10 not taken.
61 Process::Process() : pimpl(std::make_shared<PdCom::impl::Process>(this))
216 61 {}
217
218 Process::Process(std::shared_ptr<iProcess> impl) : pimpl(impl)
219 {}
220
221 3 std::string Process::name() const
222 {
223 3 return pimpl->protocolHandler().name();
224 }
225
226 3 std::string Process::version() const
227 {
228 3 return pimpl->protocolHandler().version();
229 }
230
231 void Process::setAuthManager(PdCom::Sasl *am)
232 {
233 pimpl->setAuthManager(am);
234 }
235
236 PdCom::Sasl *Process::getAuthManager() const
237 {
238 return pimpl->auth_manager_;
239 }
240
241 4 void Process::setMessageManager(PdCom::MessageManagerBase *mm)
242 {
243 4 pimpl->setMessageManager(mm);
244 4 }
245
246 5 void Process::reset()
247 {
248 5 pimpl->reset();
249 4 }
250
251 370 void Process::asyncData()
252 {
253 370 pimpl->asyncData();
254 366 }
255
256 2 bool Process::list(const std::string &path)
257 {
258 2 return pimpl->protocolHandler().list(path);
259 }
260
261 50 bool Process::find(const std::string &path)
262 {
263 50 return pimpl->protocolHandler().find(path);
264 }
265
266 1 void PdCom::Process::getClientStatistics()
267 {
268 1 pimpl->protocolHandler().getClientStatistics();
269 1 }
270
271 void PdCom::Process::clientStatisticsReply(std::vector<PdCom::ClientStatistics>)
272 {}
273
274 3 void Process::ping()
275 {
276 3 pimpl->protocolHandler().ping();
277 1 }
278
279 2 void Process::broadcast(const std::string &message, const std::string &attr)
280 {
281 2 pimpl->protocolHandler().broadcast(message, attr);
282 }
283
284 void Process::broadcastReply(
285 const std::string &,
286 const std::string &,
287 std::chrono::nanoseconds,
288 const std::string &)
289 {}
290 void Process::listReply(std::vector<PdCom::Variable>, std::vector<std::string>)
291 {}
292
293 void Process::findReply(const PdCom::Variable &)
294 {}
295
296 54 void Process::callPendingCallbacks()
297 {
298 54 pimpl->pending_callbacks_.flush();
299
2/2
✓ Branch 6 taken 43 times.
✓ Branch 7 taken 11 times.
54 if (const auto p = pimpl->castProtocolHandler())
300
1/2
✓ Branch 4 taken 43 times.
✗ Branch 5 not taken.
43 if (!pimpl->async_data_mutex_.isLocked())
301 43 p->cancelSubscriptions();
302 52 }
303
304 1 std::vector<Process::SubscriptionInfo> Process::getActiveSubscriptions() const
305 {
306 1 return pimpl->protocolHandler().getActiveSubscriptions();
307 }
308
309 972 void iProcess::PendingCallbackQueue::flush()
310 {
311
2/2
✓ Branch 2 taken 172 times.
✓ Branch 3 taken 800 times.
972 while (!empty()) {
312 344 const auto p = front();
313 172 pop_front();
314
1/2
✓ Branch 1 taken 172 times.
✗ Branch 2 not taken.
172 p.execute();
315 }
316 800 }
317
318 4 PdCom::Variable::PollFuture PdCom::Variable::poll() const
319 {
320
4/4
✓ Branch 3 taken 2 times.
✓ Branch 4 taken 2 times.
✓ Branch 6 taken 2 times.
✓ Branch 7 taken 2 times.
6 if (auto impl = pimpl_.lock()) {
321
2/4
✓ Branch 4 taken 2 times.
✗ Branch 5 not taken.
✗ Branch 7 not taken.
✓ Branch 8 taken 2 times.
2 if (const auto process = impl->process.lock()) {
322 2 auto ans = createFuture<
323 PdCom::Exception const &, VariablePollResult,
324
2/4
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 2 times.
4 std::chrono::nanoseconds>();
325
2/4
✓ Branch 5 taken 2 times.
✗ Branch 6 not taken.
✓ Branch 13 taken 2 times.
✗ Branch 14 not taken.
6 process->protocolHandler().pollVariable(
326 4 *impl, std::move(ans.second));
327 4 return std::move(ans.first);
328 }
329 throw ProcessGoneAway();
330 }
331
1/2
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
2 throw EmptyVariable();
332 }
333