GCC Code Coverage Report


Directory: ./
File: pdcom5/src/msrproto/PeriodicSubscriptions.h
Date: 2024-12-15 04:08:34
Exec Total Coverage
Lines: 48 53 90.6%
Branches: 26 48 54.2%

Line Branch Exec Source
1 /*****************************************************************************
2 *
3 * Copyright (C) 2021 Bjarne von Horn (vh at igh dot de)
4 *
5 * This file is part of the PdCom library.
6 *
7 * The PdCom library is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU Lesser General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or (at your
10 * option) any later version.
11 *
12 * The PdCom library is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
14 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 * License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public License
18 * along with the PdCom library. If not, see <http://www.gnu.org/licenses/>.
19 *
20 *****************************************************************************/
21
22 #ifndef MSRPROTO_PERIODICSUBSCRIPTIONS_H
23 #define MSRPROTO_PERIODICSUBSCRIPTIONS_H
24
25 #include "../Process.h"
26 #include "../Subscription.h"
27 #include "DataConverter.h"
28
29 #include <chrono>
30 #include <memory>
31 #include <pdcom5/Subscriber.h>
32 #include <set>
33 #include <unordered_map>
34 #include <vector>
35
36 namespace PdCom { namespace impl { namespace MsrProto {
37 class Channel;
38 class PollSubscription;
39 class PeriodicSubscription;
40 class Variable;
41
42 template <class S = PollSubscription>
43 418 struct SubscriptionSet :
44 std::set<std::weak_ptr<S>, std::owner_less<std::weak_ptr<S>>>
45 {
46 84 using std::set<std::weak_ptr<S>, std::owner_less<std::weak_ptr<S>>>::set;
47 48 void readData(const char *data, std::chrono::nanoseconds ts) const
48 {
49
2/2
✓ Branch 6 taken 72 times.
✓ Branch 7 taken 48 times.
120 for (const auto &weak_sub : *this) {
50
1/2
✓ Branch 3 taken 72 times.
✗ Branch 4 not taken.
144 if (const auto sub = weak_sub.lock()) {
51
1/2
✓ Branch 5 taken 72 times.
✗ Branch 6 not taken.
72 sub->readData(data);
52
1/2
✓ Branch 5 taken 72 times.
✗ Branch 6 not taken.
72 sub->newValues(ts);
53 }
54 }
55 48 }
56 3 bool remove_expired() noexcept
57 {
58
2/2
✓ Branch 8 taken 4 times.
✓ Branch 9 taken 3 times.
7 for (auto it = this->begin(); it != this->end();) {
59
2/2
✓ Branch 3 taken 3 times.
✓ Branch 4 taken 1 times.
4 if (it->expired())
60 3 it = this->erase(it);
61 else
62 1 ++it;
63 }
64 3 return this->empty();
65 }
66
67 PdCom::Subscription::State currentState =
68 PdCom::Subscription::State::Pending;
69
70 48 void broadcastState(
71 PdCom::Subscription::State state,
72 Process::PendingCallbackQueue &q)
73 {
74 48 currentState = state;
75
2/2
✓ Branch 6 taken 48 times.
✓ Branch 7 taken 48 times.
96 for (const auto &weak_sub : *this)
76
1/2
✓ Branch 3 taken 48 times.
✗ Branch 4 not taken.
96 if (const auto sub = weak_sub.lock())
77
1/2
✓ Branch 8 taken 48 times.
✗ Branch 9 not taken.
48 q.push_back({sub, state});
78 48 }
79 };
80
81 333 class PeriodicSubscriptionsBase
82 {
83 public:
84 using GroupId = unsigned int;
85 using ChannelId = unsigned int;
86
87 230 struct ChannelSubscripion
88 {
89 unsigned int const blocksize_, decimation_;
90 SubscriptionSet<PeriodicSubscription> subscriptions_;
91 };
92
93 88 struct ChannelSubscriptionMap :
94 std::unordered_map<ChannelId, ChannelSubscripion>
95 {
96 using std::unordered_map<ChannelId, ChannelSubscripion>::unordered_map;
97
98 1 bool guaranteed_empty() noexcept
99 {
100
2/2
✓ Branch 8 taken 1 times.
✓ Branch 9 taken 1 times.
2 for (auto it = begin(); it != end();) {
101
1/2
✓ Branch 3 taken 1 times.
✗ Branch 4 not taken.
1 if (it->second.subscriptions_.empty())
102 1 it = erase(it);
103 else
104 ++it;
105 }
106 1 return empty();
107 }
108 };
109
110
111
2/4
✗ Branch 16 not taken.
✓ Branch 17 taken 766 times.
✗ Branch 20 not taken.
✓ Branch 21 taken 766 times.
1688 class DataReceiveHandle
112 {
113 struct Unlocker
114 {
115 383 void operator()(PeriodicSubscriptionsBase *ps) const
116 {
117
1/2
✓ Branch 0 taken 383 times.
✗ Branch 1 not taken.
383 if (ps)
118 383 ps->locked_ = false;
119 383 }
120 };
121
122 std::unique_ptr<PeriodicSubscriptionsBase, Unlocker> ps_ = nullptr;
123 VariableCache::Lock variable_cache_lock_;
124 ChannelSubscriptionMap *subscription_map_;
125 int current_blocksize_ = -1;
126 int max_read_blocks_ = -1;
127 std::chrono::nanoseconds ts_ = {};
128 bool timestamps_read_ = false;
129 // true if all arriving F tags have the same blocksize
130 bool consistent_blocksize_ = false;
131
132 public:
133 540 DataReceiveHandle() {}
134 DataReceiveHandle(
135 PeriodicSubscriptionsBase &ps,
136 ChannelSubscriptionMap &subscription_map,
137 VariableCache::Lock variable_cache_lock,
138 std::chrono::nanoseconds ts,
139 bool consistent_blocksize);
140
141 1271 operator bool() const { return static_cast<bool>(ps_); }
142
143
144 void readData(const Channel &c, const char *data);
145 void readTimestamps(const char *data);
146 void endNewDataRecieving();
147
148 private:
149 void sendDataToSubscribers(
150 const DataDecoder &var,
151 const ChannelSubscripion &subs,
152 int block);
153 };
154
155 protected:
156 std::vector<std::chrono::nanoseconds> time_vector_;
157 impl::Subscription::SubscriberNotifier subscriber_notifier_;
158 bool locked_ = false;
159 };
160
161 313 class PeriodicSubscriptionsWithGroup : public PeriodicSubscriptionsBase
162 {
163 public:
164 struct transaction
165 {
166 bool needs_server_action;
167 // these are valid iff needs_server_action is true.
168 // std::optional would be a way better alternative tbh
169 unsigned int group_id, decimation_, blocksize_;
170 };
171
172
173 transaction
174 add(Process::PendingCallbackQueue &q,
175 bool notify_pending,
176 const std::shared_ptr<PeriodicSubscription> &s);
177 transaction remove(const Variable &_var, PdCom::Transmission tm);
178
179 341 DataReceiveHandle startNewDataRecieving(
180 GroupId index,
181 VariableCache::Lock lock,
182 std::chrono::nanoseconds ts)
183 {
184 return DataReceiveHandle {
185
1/2
✓ Branch 9 taken 341 times.
✗ Branch 10 not taken.
341 *this, map_.at(index), std::move(lock), ts, true};
186 }
187
188
189 const ChannelSubscriptionMap &at(GroupId group_id) const
190 {
191 return map_.at(group_id);
192 }
193
194 void unsubscribeDone(GroupId group_id)
195 {
196 auto &sub_map = map_.at(group_id);
197 if (sub_map.state == ChannelSubscriptionMapImpl::AboutToBeDeleted)
198 sub_map.state = ChannelSubscriptionMapImpl::Empty;
199 }
200
201 void subscribeWasConfirmed(
202 Process::PendingCallbackQueue &q,
203 GroupId group_id,
204 ChannelId channel_id);
205
206 void
207 dump(std::vector<PdCom::Process::SubscriptionInfo> &ans,
208 const std::unordered_map<unsigned, Channel *> &channels) const;
209
210 private:
211
0/2
✗ Branch 5 not taken.
✗ Branch 6 not taken.
68 struct ChannelSubscriptionMapImpl : ChannelSubscriptionMap
212 {
213 using ChannelSubscriptionMap::ChannelSubscriptionMap;
214
215 enum State {
216 Empty,
217 Taken,
218 AboutToBeDeleted, /* last <xsod> sent but no <ack> received yet */
219 } state = Empty;
220 };
221
222 313 class GroupMap
223 {
224 std::vector<ChannelSubscriptionMapImpl> forward_;
225 std::unordered_map<Transmission, GroupId> backward_;
226
227 public:
228 bool valid(GroupId idx) const noexcept
229 {
230 return idx > 0 and idx <= forward_.size();
231 }
232 510 ChannelSubscriptionMapImpl &at(GroupId group_id)
233 {
234 510 return forward_.at(group_id - 1);
235 }
236 const ChannelSubscriptionMapImpl &at(GroupId group_id) const
237 {
238 return forward_.at(group_id - 1);
239 }
240 GroupId find(Transmission t);
241 // returns group id > 0 in case the last subscription has gone, else 0
242 GroupId erase(Transmission t, ChannelId channel_idx);
243 void
244 dump(std::vector<PdCom::Process::SubscriptionInfo> &ans,
245 const std::unordered_map<unsigned, Channel *> &channels) const;
246 } map_;
247 };
248
249
250 20 class PeriodicSubscriptionWithoutGroup : public PeriodicSubscriptionsBase
251 {
252 public:
253 struct transaction
254 {
255 bool needs_server_action;
256 // these are valid iff needs_server_action is true.
257 // std::optional would be a way better alternative tbh
258 unsigned int decimation_, blocksize_;
259 };
260
261
262 transaction
263 add(Process::PendingCallbackQueue &q,
264 bool notify_pending,
265 const std::shared_ptr<PeriodicSubscription> &s);
266 transaction remove(const Variable &_var);
267
268 DataReceiveHandle
269 42 startNewDataRecieving(VariableCache::Lock lock, std::chrono::nanoseconds ts)
270 {
271
1/2
✓ Branch 7 taken 42 times.
✗ Branch 8 not taken.
42 return DataReceiveHandle {*this, map_, std::move(lock), ts, false};
272 }
273
274 1 bool hasChannel(ChannelId id) const
275 {
276
1/2
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
2 const auto it = map_.find(id);
277
4/8
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
✓ Branch 9 taken 1 times.
✗ Branch 10 not taken.
✓ Branch 11 taken 1 times.
✗ Branch 12 not taken.
✓ Branch 14 taken 1 times.
✗ Branch 15 not taken.
2 return it != map_.end() && !it->second.subscriptions_.empty();
278 }
279
280
281 void subscribeWasConfirmed(
282 Process::PendingCallbackQueue &q,
283 ChannelId channel_id);
284
285 void
286 dump(std::vector<PdCom::Process::SubscriptionInfo> &ans,
287 const std::unordered_map<unsigned, Channel *> &channels) const;
288
289 private:
290 ChannelSubscriptionMap map_;
291 };
292
293 }}} // namespace PdCom::impl::MsrProto
294
295 #endif // MSRPROTO_PERIODICSUBSCRIPTIONS_H
296