GCC Code Coverage Report


Directory: ./
File: pdcom5/src/Process.cpp
Date: 2025-01-19 04:08:20
Exec Total Coverage
Lines: 109 147 74.1%
Branches: 57 144 39.6%

Line Branch Exec Source
1 /*****************************************************************************
2 *
3 * Copyright (C) 2007-2016 Richard Hacker (lerichi at gmx dot net)
4 * Florian Pose <fp@igh.de>
5 *
6 * This file is part of the PdCom library.
7 *
8 * The PdCom library is free software: you can redistribute it and/or modify
9 * it under the terms of the GNU Lesser General Public License as published by
10 * the Free Software Foundation, either version 3 of the License, or (at your
11 * option) any later version.
12 *
13 * The PdCom library is distributed in the hope that it will be useful, but
14 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
15 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
16 * License for more details.
17 *
18 * You should have received a copy of the GNU Lesser General Public License
19 * along with the PdCom library. If not, see <http://www.gnu.org/licenses/>.
20 *
21 *****************************************************************************/
22
23 #include "Process.h"
24
25 #include "Debug.h"
26 #include "Future.h"
27 #include "IOLayer.h"
28 #include "ProtocolHandler.h"
29 #include "Subscription.h"
30 #include "Variable.h"
31 #include "msrproto/ProtocolHandler.h"
32 #include "pdcom5.h"
33
34 #include <algorithm>
35 #include <git_revision_hash.h>
36 #include <mutex>
37
38 #define STR(x) #x
39 #define QUOTE(x) STR(x)
40
41 const char *const PdCom::pdcom_version_code =
42 QUOTE(PDCOM_MAJOR) "." QUOTE(PDCOM_MINOR) "." QUOTE(PDCOM_RELEASE);
43
44 const char *const PdCom::pdcom_full_version = GIT_REV;
45
46 using iProcess = PdCom::impl::Process;
47
48 namespace PdCom { namespace impl {
49 class PendingSubscription : public PdCom::impl::Subscription
50 {
51 public:
52 PendingSubscription(
53 std::shared_ptr<const PdCom::impl::Variable> variable,
54 PdCom::Subscription *This,
55 PdCom::Subscriber &subscriber,
56 std::shared_ptr<iProcess> process) :
57 Subscription(std::move(variable), This, subscriber, process)
58 {}
59
60 static std::shared_ptr<PendingSubscription> make_and_notify(
61 std::shared_ptr<const PdCom::impl::Variable> variable,
62 PdCom::Subscription *This,
63 PdCom::Subscriber &subscriber,
64 std::shared_ptr<iProcess> process)
65 {
66 auto ans = std::make_shared<PendingSubscription>(
67 std::move(variable), This, subscriber, process);
68
69 process->pending_callbacks_.push_back(
70 {ans, PdCom::Subscription::State::Pending});
71 return ans;
72 }
73
74
75 void poll() override { throw PdCom::InvalidSubscription(); }
76 const void *getData() const override { throw PdCom::InvalidSubscription(); }
77 };
78 }} // namespace PdCom::impl
79
80 using PdCom::impl::PendingSubscription;
81
82 ///////////////////////////////////////////////////////////////////////////
83 159 iProcess::Process(PdCom::Process *This) :
84 std::enable_shared_from_this<iProcess>(),
85 PdCom::impl::IOLayer(nullptr),
86 This(This),
87
1/2
✓ Branch 4 taken 159 times.
✗ Branch 5 not taken.
159 io(this)
88 159 {}
89
90 ///////////////////////////////////////////////////////////////////////////
91 119 void iProcess::reset()
92 {
93
1/2
✓ Branch 3 taken 119 times.
✗ Branch 4 not taken.
238 std::unique_lock<AsyncDataMutex> lck(async_data_mutex_);
94
95 119 io->rxbytes = 0;
96 119 io->txbytes = 0;
97
98 119 protocolHandler_.reset();
99 119 }
100
101 ///////////////////////////////////////////////////////////////////////////
102 1140 void iProcess::asyncData()
103 {
104
1/2
✓ Branch 3 taken 1140 times.
✗ Branch 4 not taken.
2280 std::unique_lock<AsyncDataMutex> lck(async_data_mutex_);
105
1/2
✓ Branch 2 taken 1140 times.
✗ Branch 3 not taken.
1140 pending_callbacks_.flush();
106
2/2
✓ Branch 2 taken 157 times.
✓ Branch 3 taken 983 times.
1140 if (!protocolHandler_)
107
2/4
✓ Branch 2 taken 157 times.
✗ Branch 3 not taken.
✓ Branch 9 taken 157 times.
✗ Branch 10 not taken.
157 protocolHandler_.reset(new MsrProto::ProtocolHandler(this, io));
108
109
1/2
✓ Branch 8 taken 1140 times.
✗ Branch 9 not taken.
1140 protocolHandler_->cancelSubscriptions();
110
4/4
✓ Branch 8 taken 1139 times.
✓ Branch 9 taken 1 times.
✓ Branch 10 taken 17 times.
✓ Branch 11 taken 1122 times.
1140 if (!protocolHandler_->asyncData()) {
111 17 protocolHandler_.reset();
112 // first, make all subs invalid
113 34 auto old_subs = std::move(all_subscriptions_);
114
2/2
✓ Branch 5 taken 2 times.
✓ Branch 6 taken 17 times.
19 for (const auto &registered_s : old_subs) {
115
1/2
✗ Branch 3 not taken.
✓ Branch 4 taken 2 times.
4 if (const auto sub = registered_s.subscription_.lock()) {
116 pending_callbacks_.push_back(
117 {sub, PdCom::Subscription::State::Invalid});
118 }
119 }
120 // give control flow back
121
1/2
✓ Branch 2 taken 17 times.
✗ Branch 3 not taken.
17 pending_callbacks_.flush();
122 // replace all sub implementations as they are invalid now
123
2/2
✓ Branch 5 taken 2 times.
✓ Branch 6 taken 17 times.
19 for (const auto &registered_s : old_subs) {
124
2/4
✗ Branch 3 not taken.
✓ Branch 4 taken 2 times.
✓ Branch 6 taken 2 times.
✗ Branch 7 not taken.
4 if (const auto sub = registered_s.subscription_.lock()) {
125 if (!sub->This_)
126 continue;
127 auto new_sub = std::make_shared<PendingSubscription>(
128 nullptr, sub->This_, sub->subscriber_,
129 shared_from_this());
130 sub->replaceImpl(new_sub);
131
132 all_subscriptions_.insert(
133 {new_sub, registered_s.path_, registered_s.selector_});
134 }
135 }
136 }
137 else {
138
1/2
✓ Branch 8 taken 1122 times.
✗ Branch 9 not taken.
1122 protocolHandler_->cancelSubscriptions();
139 }
140
1/2
✓ Branch 2 taken 1139 times.
✗ Branch 3 not taken.
1139 pending_callbacks_.flush();
141 1139 }
142
143
144 28 std::shared_ptr<PdCom::impl::Subscription> iProcess::subscribe(
145 PdCom::Subscription *subscription,
146 const std::string &path,
147 PdCom::Subscriber &subscriber,
148 const PdCom::Selector &selector)
149 {
150 28 std::shared_ptr<PdCom::impl::Subscription> ans;
151
1/2
✓ Branch 2 taken 28 times.
✗ Branch 3 not taken.
28 if (protocolHandler_)
152
1/2
✓ Branch 3 taken 28 times.
✗ Branch 4 not taken.
84 ans = protocolHandler_->subscribe(
153 84 subscription, path, subscriber, selector);
154 else
155 ans = PendingSubscription::make_and_notify(
156 nullptr, subscription, subscriber, shared_from_this());
157
2/4
✓ Branch 5 taken 28 times.
✗ Branch 6 not taken.
✓ Branch 11 taken 28 times.
✗ Branch 12 not taken.
28 all_subscriptions_.insert({ans, path, selector.impl_});
158 28 return ans;
159 }
160 56 std::shared_ptr<PdCom::impl::Subscription> iProcess::subscribe(
161 PdCom::Subscription *subscription,
162 std::shared_ptr<const PdCom::impl::Variable> var,
163 PdCom::Subscriber &subscriber,
164 const PdCom::Selector &selector)
165 {
166 56 std::shared_ptr<PdCom::impl::Subscription> ans;
167
1/2
✓ Branch 2 taken 56 times.
✗ Branch 3 not taken.
56 if (protocolHandler_)
168
1/2
✓ Branch 5 taken 56 times.
✗ Branch 6 not taken.
168 ans = protocolHandler_->subscribe(
169 168 subscription, var, subscriber, selector);
170 else
171 ans = PendingSubscription::make_and_notify(
172 nullptr, subscription, subscriber, shared_from_this());
173
2/4
✓ Branch 7 taken 56 times.
✗ Branch 8 not taken.
✓ Branch 13 taken 56 times.
✗ Branch 14 not taken.
56 all_subscriptions_.insert({ans, var->getPath(), selector.impl_});
174 56 return ans;
175 }
176
177 149 void iProcess::connected()
178 {
179 298 auto old_subs = std::move(all_subscriptions_);
180
1/2
✗ Branch 5 not taken.
✓ Branch 6 taken 149 times.
149 for (const auto &registered_sub : old_subs)
181 if (const auto sub = registered_sub.subscription_.lock()) {
182 auto new_sub = protocolHandler_->subscribe(
183 sub->This_, registered_sub.path_, sub->subscriber_,
184 {registered_sub.selector_});
185 sub->replaceImpl(new_sub);
186 all_subscriptions_.insert(
187 {new_sub, registered_sub.path_, registered_sub.selector_});
188 }
189
1/2
✓ Branch 24 taken 149 times.
✗ Branch 25 not taken.
149 This->connected();
190 149 }
191
192 using PdCom::impl::Subscription;
193
194 26 void iProcess::replace(
195 std::shared_ptr<Subscription> const &old_s,
196 std::shared_ptr<Subscription> const &new_s)
197 {
198 26 const auto it = std::find_if(
199 78 all_subscriptions_.begin(), all_subscriptions_.end(),
200 130 [&old_s](const RegisteredSubscription &rs) {
201 78 return !rs.subscription_.owner_before(old_s)
202
2/4
✓ Branch 0 taken 26 times.
✗ Branch 1 not taken.
✓ Branch 5 taken 26 times.
✗ Branch 6 not taken.
78 and !old_s.owner_before(rs.subscription_);
203
1/2
✓ Branch 3 taken 26 times.
✗ Branch 4 not taken.
130 });
204
1/2
✓ Branch 6 taken 26 times.
✗ Branch 7 not taken.
26 if (it != all_subscriptions_.end()) {
205
2/4
✓ Branch 6 taken 26 times.
✗ Branch 7 not taken.
✓ Branch 13 taken 26 times.
✗ Branch 14 not taken.
26 all_subscriptions_.insert({new_s, it->path_, it->selector_});
206
1/2
✓ Branch 4 taken 26 times.
✗ Branch 5 not taken.
26 all_subscriptions_.erase(it);
207 }
208 26 }
209
210
211 using PdCom::Process;
212
213 Process::~Process() = default;
214
215
1/2
✓ Branch 9 taken 147 times.
✗ Branch 10 not taken.
147 Process::Process() : pimpl(std::make_shared<PdCom::impl::Process>(this))
216 147 {}
217
218 12 Process::Process(std::shared_ptr<iProcess> impl) : pimpl(impl)
219 12 {}
220
221 14 std::string Process::name() const
222 {
223 14 return pimpl->protocolHandler().name();
224 }
225
226 11 std::string Process::version() const
227 {
228 11 return pimpl->protocolHandler().version();
229 }
230
231 29 void Process::setAuthManager(PdCom::Sasl *am)
232 {
233 29 pimpl->setAuthManager(am);
234 29 }
235
236 PdCom::Sasl *Process::getAuthManager() const
237 {
238 return pimpl->auth_manager_;
239 }
240
241 127 void Process::setMessageManager(PdCom::MessageManagerBase *mm)
242 {
243 127 pimpl->setMessageManager(mm);
244 127 }
245
246 119 void Process::reset()
247 {
248 119 pimpl->reset();
249 119 }
250
251 1140 void Process::asyncData()
252 {
253 1140 pimpl->asyncData();
254 1139 }
255
256 19 bool Process::list(const std::string &path)
257 {
258 19 return pimpl->protocolHandler().list(path);
259 }
260
261 84 bool Process::find(const std::string &path)
262 {
263 84 return pimpl->protocolHandler().find(path);
264 }
265
266 6 void PdCom::Process::getClientStatistics()
267 {
268 6 pimpl->protocolHandler().getClientStatistics();
269 6 }
270
271 void PdCom::Process::clientStatisticsReply(std::vector<PdCom::ClientStatistics>)
272 {}
273
274 4 void Process::ping()
275 {
276 4 pimpl->protocolHandler().ping();
277 4 }
278
279 29 void Process::broadcast(const std::string &message, const std::string &attr)
280 {
281 29 pimpl->protocolHandler().broadcast(message, attr);
282 20 }
283
284 void Process::broadcastReply(
285 const std::string &,
286 const std::string &,
287 std::chrono::nanoseconds,
288 const std::string &)
289 {}
290 void Process::listReply(std::vector<PdCom::Variable>, std::vector<std::string>)
291 {}
292
293 void Process::findReply(const PdCom::Variable &)
294 {}
295
296 241 void Process::callPendingCallbacks()
297 {
298 241 pimpl->pending_callbacks_.flush();
299
2/2
✓ Branch 6 taken 184 times.
✓ Branch 7 taken 57 times.
241 if (const auto p = pimpl->castProtocolHandler())
300
1/2
✓ Branch 4 taken 184 times.
✗ Branch 5 not taken.
184 if (!pimpl->async_data_mutex_.isLocked())
301 184 p->cancelSubscriptions();
302 241 }
303
304 std::vector<Process::SubscriptionInfo> Process::getActiveSubscriptions() const
305 {
306 return pimpl->protocolHandler().getActiveSubscriptions();
307 }
308
309 2674 void iProcess::PendingCallbackQueue::flush()
310 {
311
2/2
✓ Branch 2 taken 137 times.
✓ Branch 3 taken 2537 times.
2674 while (!empty()) {
312 274 const auto p = front();
313 137 pop_front();
314
1/2
✓ Branch 1 taken 137 times.
✗ Branch 2 not taken.
137 p.execute();
315 }
316 2537 }
317
318 8 PdCom::Variable::PollFuture PdCom::Variable::poll() const
319 {
320
2/4
✓ Branch 3 taken 8 times.
✗ Branch 4 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 8 times.
8 if (auto impl = pimpl_.lock()) {
321
2/4
✓ Branch 4 taken 8 times.
✗ Branch 5 not taken.
✗ Branch 7 not taken.
✓ Branch 8 taken 8 times.
8 if (const auto process = impl->process.lock()) {
322 8 auto ans = createFuture<
323 PdCom::Exception const &, VariablePollResult,
324
2/4
✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 8 times.
16 std::chrono::nanoseconds>();
325
2/4
✓ Branch 5 taken 8 times.
✗ Branch 6 not taken.
✓ Branch 13 taken 8 times.
✗ Branch 14 not taken.
24 process->protocolHandler().pollVariable(
326 16 *impl, std::move(ans.second));
327 16 return std::move(ans.first);
328 }
329 throw ProcessGoneAway();
330 }
331 throw EmptyVariable();
332 }
333