GCC Code Coverage Report


Directory: ./
File: pdcom5/src/msrproto/PeriodicSubscriptions.h
Date: 2025-02-23 04:08:29
Exec Total Coverage
Lines: 48 53 90.6%
Branches: 26 48 54.2%

Line Branch Exec Source
1 /*****************************************************************************
2 *
3 * Copyright (C) 2021 Bjarne von Horn (vh at igh dot de)
4 *
5 * This file is part of the PdCom library.
6 *
7 * The PdCom library is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU Lesser General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or (at your
10 * option) any later version.
11 *
12 * The PdCom library is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
14 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 * License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public License
18 * along with the PdCom library. If not, see <http://www.gnu.org/licenses/>.
19 *
20 *****************************************************************************/
21
22 #ifndef MSRPROTO_PERIODICSUBSCRIPTIONS_H
23 #define MSRPROTO_PERIODICSUBSCRIPTIONS_H
24
25 #include "../Process.h"
26 #include "../Subscription.h"
27 #include "DataConverter.h"
28
29 #include <chrono>
30 #include <memory>
31 #include <pdcom5/Subscriber.h>
32 #include <set>
33 #include <unordered_map>
34 #include <vector>
35
36 namespace PdCom { namespace impl { namespace MsrProto {
37 class Channel;
38 class PollSubscription;
39 class PeriodicSubscription;
40 class Variable;
41
42 template <class S = PollSubscription>
43 418 struct SubscriptionSet :
44 std::set<std::weak_ptr<S>, std::owner_less<std::weak_ptr<S>>>
45 {
46 84 using std::set<std::weak_ptr<S>, std::owner_less<std::weak_ptr<S>>>::set;
47 48 void readData(const char *data, std::chrono::nanoseconds ts) const
48 {
49
2/2
✓ Branch 6 taken 72 times.
✓ Branch 7 taken 48 times.
120 for (const auto &weak_sub : *this) {
50
1/2
✓ Branch 3 taken 72 times.
✗ Branch 4 not taken.
144 if (const auto sub = weak_sub.lock()) {
51
1/2
✓ Branch 5 taken 72 times.
✗ Branch 6 not taken.
72 sub->readData(data);
52
1/2
✓ Branch 5 taken 72 times.
✗ Branch 6 not taken.
72 sub->newValues(ts);
53 }
54 }
55 48 }
56 3 bool remove_expired() noexcept
57 {
58
2/2
✓ Branch 8 taken 4 times.
✓ Branch 9 taken 3 times.
7 for (auto it = this->begin(); it != this->end();) {
59
2/2
✓ Branch 3 taken 3 times.
✓ Branch 4 taken 1 times.
4 if (it->expired())
60 3 it = this->erase(it);
61 else
62 1 ++it;
63 }
64 3 return this->empty();
65 }
66
67 PdCom::Subscription::State currentState =
68 PdCom::Subscription::State::Pending;
69
70 48 void broadcastState(
71 PdCom::Subscription::State state,
72 Process::PendingCallbackQueue &q)
73 {
74 48 currentState = state;
75
2/2
✓ Branch 6 taken 48 times.
✓ Branch 7 taken 48 times.
96 for (const auto &weak_sub : *this)
76
1/2
✓ Branch 3 taken 48 times.
✗ Branch 4 not taken.
96 if (const auto sub = weak_sub.lock())
77
1/2
✓ Branch 8 taken 48 times.
✗ Branch 9 not taken.
48 q.push_back({sub, state});
78 48 }
79 };
80
81 333 class PeriodicSubscriptionsBase
82 {
83 public:
84 using GroupId = unsigned int;
85 using ChannelId = unsigned int;
86
87 230 struct ChannelSubscripion
88 {
89 unsigned int const blocksize_, decimation_;
90 SubscriptionSet<PeriodicSubscription> subscriptions_;
91 };
92
93 88 struct ChannelSubscriptionMap :
94 std::unordered_map<ChannelId, ChannelSubscripion>
95 {
96 using std::unordered_map<ChannelId, ChannelSubscripion>::unordered_map;
97
98 1 bool guaranteed_empty() noexcept
99 {
100
2/2
✓ Branch 8 taken 1 times.
✓ Branch 9 taken 1 times.
2 for (auto it = begin(); it != end();) {
101
1/2
✓ Branch 3 taken 1 times.
✗ Branch 4 not taken.
1 if (it->second.subscriptions_.empty())
102 1 it = erase(it);
103 else
104 ++it;
105 }
106 1 return empty();
107 }
108 };
109
110
111
2/4
✗ Branch 16 not taken.
✓ Branch 17 taken 770 times.
✗ Branch 20 not taken.
✓ Branch 21 taken 770 times.
1696 class DataReceiveHandle
112 {
113 struct Unlocker
114 {
115 385 void operator()(PeriodicSubscriptionsBase *ps) const
116 {
117
1/2
✓ Branch 0 taken 385 times.
✗ Branch 1 not taken.
385 if (ps)
118 385 ps->locked_ = false;
119 385 }
120 };
121
122 std::unique_ptr<PeriodicSubscriptionsBase, Unlocker> ps_ = nullptr;
123 VariableCache::Lock variable_cache_lock_;
124 ChannelSubscriptionMap *subscription_map_ = nullptr;
125
126 int current_blocksize_ = -1;
127 int max_read_blocks_ = -1;
128 std::chrono::nanoseconds ts_ = {};
129 bool timestamps_read_ = false;
130 // true if all arriving F tags have the same blocksize
131 bool consistent_blocksize_ = false;
132
133 public:
134 542 DataReceiveHandle() {}
135 DataReceiveHandle(
136 PeriodicSubscriptionsBase &ps,
137 ChannelSubscriptionMap &subscription_map,
138 VariableCache::Lock variable_cache_lock,
139 std::chrono::nanoseconds ts,
140 bool consistent_blocksize);
141
142 1269 operator bool() const { return static_cast<bool>(ps_); }
143
144
145 void readData(const Channel &c, const char *data);
146 void readTimestamps(const char *data);
147 void endNewDataRecieving();
148
149 private:
150 void sendDataToSubscribers(
151 const DataDecoder &var,
152 const ChannelSubscripion &subs,
153 int block);
154 };
155
156 protected:
157 std::vector<std::chrono::nanoseconds> time_vector_;
158 impl::Subscription::SubscriberNotifier subscriber_notifier_;
159 bool locked_ = false;
160 };
161
162 313 class PeriodicSubscriptionsWithGroup : public PeriodicSubscriptionsBase
163 {
164 public:
165 struct transaction
166 {
167 bool needs_server_action;
168 // these are valid iff needs_server_action is true.
169 // std::optional would be a way better alternative tbh
170 unsigned int group_id, decimation_, blocksize_;
171 };
172
173
174 transaction
175 add(Process::PendingCallbackQueue &q,
176 bool notify_pending,
177 const std::shared_ptr<PeriodicSubscription> &s);
178 transaction remove(const Variable &_var, PdCom::Transmission tm);
179
180 339 DataReceiveHandle startNewDataRecieving(
181 GroupId index,
182 VariableCache::Lock lock,
183 std::chrono::nanoseconds ts)
184 {
185 return DataReceiveHandle {
186
1/2
✓ Branch 9 taken 339 times.
✗ Branch 10 not taken.
339 *this, map_.at(index), std::move(lock), ts, true};
187 }
188
189
190 const ChannelSubscriptionMap &at(GroupId group_id) const
191 {
192 return map_.at(group_id);
193 }
194
195 void unsubscribeDone(GroupId group_id)
196 {
197 auto &sub_map = map_.at(group_id);
198 if (sub_map.state == ChannelSubscriptionMapImpl::AboutToBeDeleted)
199 sub_map.state = ChannelSubscriptionMapImpl::Empty;
200 }
201
202 void subscribeWasConfirmed(
203 Process::PendingCallbackQueue &q,
204 GroupId group_id,
205 ChannelId channel_id);
206
207 void
208 dump(std::vector<PdCom::Process::SubscriptionInfo> &ans,
209 const std::unordered_map<unsigned, Channel *> &channels) const;
210
211 private:
212
0/2
✗ Branch 5 not taken.
✗ Branch 6 not taken.
68 struct ChannelSubscriptionMapImpl : ChannelSubscriptionMap
213 {
214 using ChannelSubscriptionMap::ChannelSubscriptionMap;
215
216 enum State {
217 Empty,
218 Taken,
219 AboutToBeDeleted, /* last <xsod> sent but no <ack> received yet */
220 } state = Empty;
221 };
222
223 313 class GroupMap
224 {
225 std::vector<ChannelSubscriptionMapImpl> forward_;
226 std::unordered_map<Transmission, GroupId> backward_;
227
228 public:
229 bool valid(GroupId idx) const noexcept
230 {
231 return idx > 0 and idx <= forward_.size();
232 }
233 508 ChannelSubscriptionMapImpl &at(GroupId group_id)
234 {
235 508 return forward_.at(group_id - 1);
236 }
237 const ChannelSubscriptionMapImpl &at(GroupId group_id) const
238 {
239 return forward_.at(group_id - 1);
240 }
241 GroupId find(Transmission t);
242 // returns group id > 0 in case the last subscription has gone, else 0
243 GroupId erase(Transmission t, ChannelId channel_idx);
244 void
245 dump(std::vector<PdCom::Process::SubscriptionInfo> &ans,
246 const std::unordered_map<unsigned, Channel *> &channels) const;
247 } map_;
248 };
249
250
251 20 class PeriodicSubscriptionWithoutGroup : public PeriodicSubscriptionsBase
252 {
253 public:
254 struct transaction
255 {
256 bool needs_server_action;
257 // these are valid iff needs_server_action is true.
258 // std::optional would be a way better alternative tbh
259 unsigned int decimation_, blocksize_;
260 };
261
262
263 transaction
264 add(Process::PendingCallbackQueue &q,
265 bool notify_pending,
266 const std::shared_ptr<PeriodicSubscription> &s);
267 transaction remove(const Variable &_var);
268
269 DataReceiveHandle
270 46 startNewDataRecieving(VariableCache::Lock lock, std::chrono::nanoseconds ts)
271 {
272
1/2
✓ Branch 7 taken 46 times.
✗ Branch 8 not taken.
46 return DataReceiveHandle {*this, map_, std::move(lock), ts, false};
273 }
274
275 1 bool hasChannel(ChannelId id) const
276 {
277
1/2
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
2 const auto it = map_.find(id);
278
4/8
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
✓ Branch 9 taken 1 times.
✗ Branch 10 not taken.
✓ Branch 11 taken 1 times.
✗ Branch 12 not taken.
✓ Branch 14 taken 1 times.
✗ Branch 15 not taken.
2 return it != map_.end() && !it->second.subscriptions_.empty();
279 }
280
281
282 void subscribeWasConfirmed(
283 Process::PendingCallbackQueue &q,
284 ChannelId channel_id);
285
286 void
287 dump(std::vector<PdCom::Process::SubscriptionInfo> &ans,
288 const std::unordered_map<unsigned, Channel *> &channels) const;
289
290 private:
291 ChannelSubscriptionMap map_;
292 };
293
294 }}} // namespace PdCom::impl::MsrProto
295
296 #endif // MSRPROTO_PERIODICSUBSCRIPTIONS_H
297