GCC Code Coverage Report


Directory: ./
File: pdcom5/src/msrproto/ProtocolHandler.h
Date: 2025-01-19 04:08:20
Exec Total Coverage
Lines: 29 33 87.9%
Branches: 21 103 20.4%

Line Branch Exec Source
1 /*****************************************************************************
2 *
3 * Copyright (C) 2015-2016 Richard Hacker (lerichi at gmx dot net)
4 *
5 * This file is part of the PdCom library.
6 *
7 * The PdCom library is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU Lesser General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or (at your
10 * option) any later version.
11 *
12 * The PdCom library is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
14 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 * License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public License
18 * along with the PdCom library. If not, see <http://www.gnu.org/licenses/>.
19 *
20 *****************************************************************************/
21
22 #ifndef PD_MSRPROTOCOLHANDLER_H
23 #define PD_MSRPROTOCOLHANDLER_H
24
25 #include "../Future.h"
26 #include "../Process.h"
27 #include "../ProtocolHandler.h"
28 #include "../StreambufLayer.h"
29 #include "../Subscription.h"
30 #include "DataConverter.h"
31 #include "PeriodicSubscriptions.h"
32 #include "Variable.h"
33 #include "expat_wrapper.h"
34
35 #include <list>
36 #include <memory>
37 #include <ostream>
38 #include <pdcom5/MessageManagerBase.h>
39 #include <queue>
40 #include <stddef.h>
41 #include <string>
42 #include <unordered_map>
43
44 namespace PdCom { namespace impl { namespace MsrProto {
45 class Channel;
46 class Parameter;
47 class PollSubscription;
48 class PeriodicSubscription;
49 class PendingSubscription;
50 class EventSubscription;
51 class Variable;
52
53 156 class XmlStream
54 {
55 std::ostream os;
56
57 public:
58 XmlStream(PdCom::impl::Process *process, std::streambuf *buf);
59 void lock();
60 void unlock();
61 PdCom::impl::Process *process;
62
63 template <typename T>
64 15678 XmlStream &operator<<(T &&t)
65 {
66 try {
67
13/31
✓ Branch 4 taken 52 times.
✓ Branch 5 taken 5915 times.
✗ Branch 6 not taken.
✓ Branch 13 taken 66 times.
✗ Branch 14 not taken.
✓ Branch 22 taken 243 times.
✗ Branch 23 not taken.
✗ Branch 30 not taken.
✗ Branch 31 not taken.
✗ Branch 33 not taken.
✗ Branch 34 not taken.
✓ Branch 42 taken 142 times.
✗ Branch 43 not taken.
✓ Branch 49 taken 59 times.
✗ Branch 50 not taken.
✗ Branch 56 not taken.
✗ Branch 57 not taken.
✓ Branch 62 taken 613 times.
✗ Branch 63 not taken.
✓ Branch 68 taken 18 times.
✗ Branch 69 not taken.
✓ Branch 74 taken 66 times.
✗ Branch 75 not taken.
✓ Branch 80 taken 662 times.
✗ Branch 81 not taken.
✓ Branch 86 taken 803 times.
✗ Branch 87 not taken.
✓ Branch 93 taken 2395 times.
✗ Branch 94 not taken.
✓ Branch 100 taken 4644 times.
✗ Branch 101 not taken.
15678 os << std::forward<T>(t);
68 }
69 catch (std::ios_base::failure const &ex) {
70 throw PdCom::WriteFailure(ex.what());
71 }
72
0/28
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✗ Branch 10 not taken.
✗ Branch 13 not taken.
✗ Branch 14 not taken.
✗ Branch 17 not taken.
✗ Branch 18 not taken.
✗ Branch 21 not taken.
✗ Branch 22 not taken.
✗ Branch 25 not taken.
✗ Branch 26 not taken.
✗ Branch 29 not taken.
✗ Branch 30 not taken.
✗ Branch 33 not taken.
✗ Branch 34 not taken.
✗ Branch 37 not taken.
✗ Branch 38 not taken.
✗ Branch 41 not taken.
✗ Branch 42 not taken.
✗ Branch 45 not taken.
✗ Branch 46 not taken.
✗ Branch 49 not taken.
✗ Branch 50 not taken.
✗ Branch 53 not taken.
✗ Branch 54 not taken.
15678 return *this;
73 }
74
75 554 XmlStream &flush()
76 {
77 554 os.flush();
78 554 return *this;
79 }
80 };
81
82 167 class ChannelSubscriptions
83 {
84 public:
85 using GroupId = unsigned int;
86 using ChannelId = unsigned int;
87 using SubscriptionMap = std::unordered_map<ChannelId, SubscriptionSet<>>;
88
89 static constexpr const char *xsadId = "xsadQ";
90 static constexpr const char *xsodId = "xsodQ";
91
92 166 virtual ~ChannelSubscriptions() = default;
93
94
95 virtual std::shared_ptr<PdCom::impl::Subscription> subscribeEvent(
96 PdCom::Subscription *subscription,
97 std::shared_ptr<const Variable> _var,
98 PdCom::Subscriber &subscriber,
99 const PdCom::Selector &selector,
100 bool notify_pending,
101 XmlStream &cout) = 0;
102 virtual std::shared_ptr<PdCom::impl::Subscription> subscribePeriodic(
103 PdCom::Subscription *subscription,
104 std::shared_ptr<const Variable> _var,
105 PdCom::Subscriber &subscriber,
106 const PdCom::Selector &selector,
107 bool notify_pending,
108 XmlStream &cout) = 0;
109
110 virtual void unsubscribeEvent(const Variable &var, XmlStream &cout) = 0;
111 virtual void unsubscribePeriodic(
112 PdCom::Transmission tm,
113 const Variable &var,
114 XmlStream &cout) = 0;
115
116 virtual void xsadAck(Process *) {}
117 virtual void xsodAck() {}
118
119
120 virtual PeriodicSubscriptionsBase::DataReceiveHandle startNewDataRecieving(
121 GroupId index,
122 VariableCache::Lock lock,
123 std::chrono::nanoseconds ts) = 0;
124
125 virtual void
126 dump(std::vector<PdCom::Process::SubscriptionInfo> &ans,
127 const std::unordered_map<unsigned, Channel *> &channels) const;
128
129
130 4 const SubscriptionMap &getEventMap() const { return event_; }
131
132 protected:
133 SubscriptionMap event_;
134 };
135
136
2/4
✓ Branch 19 taken 157 times.
✗ Branch 20 not taken.
✓ Branch 23 taken 157 times.
✗ Branch 24 not taken.
469 class ChannelSubscriptionsWithGroup final : public ChannelSubscriptions
137 {
138 public:
139 std::shared_ptr<PdCom::impl::Subscription> subscribeEvent(
140 PdCom::Subscription *subscription,
141 std::shared_ptr<const Variable> _var,
142 PdCom::Subscriber &subscriber,
143 const PdCom::Selector &selector,
144 bool notify_pending,
145 XmlStream &cout) override;
146 std::shared_ptr<PdCom::impl::Subscription> subscribePeriodic(
147 PdCom::Subscription *subscription,
148 std::shared_ptr<const Variable> _var,
149 PdCom::Subscriber &subscriber,
150 const PdCom::Selector &selector,
151 bool notify_pending,
152 XmlStream &cout) override;
153
154 void unsubscribeEvent(const Variable &var, XmlStream &cout) override;
155 void unsubscribePeriodic(
156 PdCom::Transmission tm,
157 const Variable &var,
158 XmlStream &cout) override;
159
160 void xsadAck(Process *process) override;
161 void xsodAck() override;
162
163
164 339 PeriodicSubscriptionsBase::DataReceiveHandle startNewDataRecieving(
165 GroupId index,
166 VariableCache::Lock lock,
167 std::chrono::nanoseconds ts) override
168 {
169 return periodic_subscriptions_with_group_.startNewDataRecieving(
170
1/2
✓ Branch 6 taken 339 times.
✗ Branch 7 not taken.
339 index, std::move(lock), ts);
171 }
172
173 void
174 dump(std::vector<PdCom::Process::SubscriptionInfo> &ans,
175 const std::unordered_map<unsigned, Channel *> &channels)
176 const override;
177
178 private:
179 struct xsad_details
180 {
181 unsigned int group_id, channel_id;
182 };
183 std::queue<xsad_details> xsadQ;
184 std::queue<GroupId> xsodQ;
185
186 PeriodicSubscriptionsWithGroup periodic_subscriptions_with_group_;
187 };
188
189
190
1/2
✓ Branch 17 taken 10 times.
✗ Branch 18 not taken.
30 class ChannelSubscriptionsWithoutGroup final : public ChannelSubscriptions
191 {
192 public:
193 std::shared_ptr<PdCom::impl::Subscription> subscribeEvent(
194 PdCom::Subscription *subscription,
195 std::shared_ptr<const Variable> _var,
196 PdCom::Subscriber &subscriber,
197 const PdCom::Selector &selector,
198 bool notify_pending,
199 XmlStream &cout) override;
200 std::shared_ptr<PdCom::impl::Subscription> subscribePeriodic(
201 PdCom::Subscription *subscription,
202 std::shared_ptr<const Variable> _var,
203 PdCom::Subscriber &subscriber,
204 const PdCom::Selector &selector,
205 bool notify_pending,
206 XmlStream &cout) override;
207
208 void unsubscribeEvent(const Variable &var, XmlStream &cout) override;
209 void unsubscribePeriodic(
210 PdCom::Transmission tm,
211 const Variable &var,
212 XmlStream &cout) override;
213
214 void xsadAck(Process *process) override;
215
216 42 PeriodicSubscriptionsBase::DataReceiveHandle startNewDataRecieving(
217 GroupId /* index */,
218 VariableCache::Lock lock,
219 std::chrono::nanoseconds ts) override
220 {
221 return periodic_subscriptions_without_group_.startNewDataRecieving(
222
1/2
✓ Branch 6 taken 42 times.
✗ Branch 7 not taken.
42 std::move(lock), ts);
223 }
224
225 void
226 dump(std::vector<PdCom::Process::SubscriptionInfo> &ans,
227 const std::unordered_map<unsigned, Channel *> &channels)
228 const override;
229
230 private:
231 std::queue<std::pair<bool /*event*/, GroupId>> xsadQ;
232 PeriodicSubscriptionWithoutGroup periodic_subscriptions_without_group_;
233 };
234
235
236 class ProtocolHandler :
237 public PdCom::impl::ProtocolHandler,
238 public PdCom::impl::StreambufLayer,
239 protected ExpatWrapper<ProtocolHandler, true>
240 {
241 public:
242 ProtocolHandler(PdCom::impl::Process *process, IOLayer *io);
243 ~ProtocolHandler();
244
245 using ExpatWrapper<ProtocolHandler, true>::parse;
246
247 static constexpr int ReadBufferSize = 8192;
248
249
250 void getMessage(uint32_t seqNo) override;
251 void getActiveMessages() override;
252
253 std::shared_ptr<impl::Subscription> subscribe(
254 PdCom::Subscription *subscription,
255 const std::string &path,
256 PdCom::Subscriber &subscriber,
257 const PdCom::Selector &selector) override;
258 std::shared_ptr<impl::Subscription> subscribe(
259 PdCom::Subscription *subscription,
260 std::shared_ptr<const impl::Variable> var,
261 PdCom::Subscriber &subscriber,
262 const PdCom::Selector &selector,
263 bool notify_pending);
264 56 std::shared_ptr<impl::Subscription> subscribe(
265 PdCom::Subscription *subscription,
266 std::shared_ptr<const impl::Variable> var,
267 PdCom::Subscriber &subscriber,
268 const PdCom::Selector &selector) override
269 {
270
1/2
✓ Branch 4 taken 56 times.
✗ Branch 5 not taken.
56 return subscribe(subscription, var, subscriber, selector, true);
271 }
272 void unsubscribe(EventSubscription &) noexcept;
273 void unsubscribe(PeriodicSubscription &) noexcept;
274 void cancelSubscriptions() override;
275
276 void pollChannel(PollSubscription &sub);
277 void pollParameter(PollSubscription &sub);
278 void pollVariable(const impl::Variable &var, VariablePollPromise &&promise)
279 override;
280 PdCom::Variable::SetValueFuture writeParameter(
281 const Parameter &p,
282 const void *src,
283 TypeInfo::DataType src_type,
284 size_t idx,
285 size_t n);
286
287 std::vector<PdCom::Process::SubscriptionInfo>
288 getActiveSubscriptions() const override;
289
290 private:
291 bool polite = false;
292 bool protocolError = false;
293 std::chrono::nanoseconds dataTime = {};
294
295 std::string m_name;
296 std::string m_version;
297
298 XmlStream cout;
299
300 const std::shared_ptr<DirNode> root;
301
302 // Structures required for list command
303 std::queue<std::string> listQ;
304 bool processListRequest();
305 enum { Uncached, InProgress, Cached } fullListing = Uncached;
306
307 std::queue<std::string> findQ;
308 bool processFindRequest();
309
310 // Structures required for login (return true on successful login)
311 bool processLogin(const char **atts);
312
313 struct Feature
314 {
315 bool pushparameters, binparameters, eventchannels, statistics, pmtime,
316 group, history, aic, messages, quiet, list, exclusive, polite,
317 xsap, login;
318 unsigned eventid;
319 void operator=(const char *list);
320 };
321 Feature feature = {};
322
323
324 using SubscriptionMap =
325 std::unordered_map<unsigned int /* id */, SubscriptionSet<>>;
326
327 65 struct SubscriptionPendingPoll
328 {
329 const unsigned int id_;
330 SubscriptionSet<> subscriptions_;
331 };
332 626 struct SubscriptionPendingPollList : std::list<SubscriptionPendingPoll>
333 {
334 bool
335 append(unsigned int id_, const std::shared_ptr<PollSubscription> &sub);
336 } subscription_pending_channel_polls_,
337 subscription_pending_parameter_polls_;
338
339
1/2
✗ Branch 10 not taken.
✓ Branch 11 taken 3 times.
14 struct VariablePendingPoll
340 {
341 std::vector<VariablePollPromise> promises_;
342 const unsigned int id_;
343 const bool is_parameter_;
344
345 8 VariablePendingPoll(
346 VariablePollPromise p,
347 unsigned int id,
348 8 bool is_parameter) :
349 8 promises_(), id_(id), is_parameter_(is_parameter)
350 {
351
1/2
✓ Branch 4 taken 8 times.
✗ Branch 5 not taken.
8 promises_.push_back(std::move(p));
352 8 }
353 };
354 class VariablePendingPollList
355 {
356 std::list<VariablePendingPoll> polls_;
357
358 public:
359 157 VariablePendingPollList() = default;
360 bool append(const Variable &var, VariablePollPromise promise);
361
362 void processResult(
363 Variable const &var,
364 const char *data,
365 std::chrono::nanoseconds ts);
366
367 ~VariablePendingPollList();
368 } variable_pending_polls_;
369
370 std::unique_ptr<ChannelSubscriptions> channel_subscriptions_;
371 SubscriptionMap parameter_event_;
372
373
374 VariableCache variable_cache_;
375 impl::Subscription::SubscriberNotifier subscriber_notifier_;
376
377 DataEncoder encoder_;
378
379 PeriodicSubscriptionsBase::DataReceiveHandle periodic_receive_handle_;
380
381 std::unordered_map<
382 PdCom::Transmission,
383 std::set<
384 std::shared_ptr<const Variable>,
385 std::owner_less<std::shared_ptr<const Variable>>>>
386 cancelations_;
387
388 78 struct PendingRequest
389 {
390 const std::string path_;
391 SubscriptionSet<PendingSubscription> subscriptions_;
392 };
393 std::list<PendingRequest> pending_queue_;
394 std::queue<unsigned int /* parameter id */> xsapQ;
395
396 using ChannelMap = std::unordered_map<unsigned, Channel *>;
397 ChannelMap channel;
398 using ParameterMap = std::unordered_map<unsigned, Parameter *>;
399 ParameterMap parameter;
400
401 class PendingSetValues
402 {
403 std::queue<PdCom::Promise<PdCom::Exception const &>> queue_;
404
405 public:
406 157 PendingSetValues() = default;
407
408 PdCom::Variable::SetValueFuture push();
409 void pop();
410
411 ~PendingSetValues();
412
413 } pending_set_values_;
414
415
416 enum {
417 StartUp,
418 Idle,
419 GetListing,
420 WaitForConnected,
421 WaitForLogin,
422 GetVariableFields,
423 ReadData,
424 ReadEvent,
425 ReadDataOrEvent,
426 GetActiveMessages,
427 ReadClientStatistics,
428 } state = StartUp;
429
430
431 std::queue<uint32_t> seqNoQ;
432 uint32_t messageSeqNo;
433 std::vector<PdCom::Message> messageList;
434 std::vector<PdCom::ClientStatistics> client_statistics;
435
436 void setPolite(bool state);
437
438 /** Read a variable tag */
439 static const TypeInfo *getDataType(const char **atts, SizeInfo &size_info);
440 Parameter *getParameter(const char **atts);
441 Channel *getChannel(const char **atts);
442
443 // callbacks for xml parser
444 friend class ExpatWrapper<ProtocolHandler, true>;
445 void startElement(const char *name, const char **atts);
446 void endElement(const char *name);
447 void xmlError(const char *);
448
449 void xsapAck();
450 void pendingAck(Variable *);
451 void pollParameter(size_t index, const char *id);
452 void pollChannel(size_t index, const char *id);
453
454 // Reimplemented from PdCom::ProtocolHandler
455 bool asyncData() override;
456 bool find(const std::string &path) override;
457 bool list(const std::string &directory) override;
458 void getClientStatistics() override;
459 void logout() override;
460 bool login(const char *mech, const char *clientData) override;
461
462 void ping() override;
463 std::string name() const override;
464 std::string version() const override;
465
466 void
467 broadcast(const std::string &message, const std::string &attr) override;
468 void processBroadcast(const char **atts);
469
470 void readEventData(const char *name, const char **atts);
471 void readPeriodicData(const char *name, const char **atts);
472 };
473
474
475 }}} // namespace PdCom::impl::MsrProto
476
477 #endif // PD_MSRPROTOCOLHANDLER_H
478