GCC Code Coverage Report


Directory: ./
File: src/msrproto/PeriodicSubscriptions.cpp
Date: 2024-03-27 13:09:52
Exec Total Coverage
Lines: 193 248 77.8%
Branches: 179 400 44.8%

Line Branch Exec Source
1 /*****************************************************************************
2 *
3 * Copyright (C) 2021 Bjarne von Horn (vh at igh dot de)
4 *
5 * This file is part of the PdCom library.
6 *
7 * The PdCom library is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU Lesser General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or (at your
10 * option) any later version.
11 *
12 * The PdCom library is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
14 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 * License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public License
18 * along with the PdCom library. If not, see <http://www.gnu.org/licenses/>.
19 *
20 *****************************************************************************/
21
22 #include "PeriodicSubscriptions.h"
23
24 #include "Channel.h"
25 #include "Subscription.h"
26
27 #include <cassert>
28 #include <cstring>
29 #include <pdcom5/Exception.h>
30 #include <sstream>
31
32 using PdCom::impl::MsrProto::DataDecoder;
33 using PdCom::impl::MsrProto::PeriodicSubscription;
34 using PdCom::impl::MsrProto::PeriodicSubscriptionsBase;
35 using PdCom::impl::MsrProto::PeriodicSubscriptionsWithGroup;
36 using PdCom::impl::MsrProto::PeriodicSubscriptionWithoutGroup;
37
38 using PdCom::impl::MsrProto::Variable;
39
40
41 48 PeriodicSubscriptionsWithGroup::transaction PeriodicSubscriptionsWithGroup::add(
42 PdCom::impl::Process::PendingCallbackQueue &queue,
43 bool notify_pending,
44 const std::shared_ptr<PeriodicSubscription> &s)
45 {
46
2/4
✗ Branch 1 not taken.
✓ Branch 2 taken 48 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 48 times.
48 if (locked_)
47 throw PdCom::InternalError("PeriodicSubscription is locked!");
48 96 const auto var_ptr = s->variable_.lock();
49
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 48 times.
48 if (!var_ptr)
50 throw EmptyVariable();
51 48 const auto &var = static_cast<Channel const &>(*var_ptr);
52
1/2
✓ Branch 11 taken 48 times.
✗ Branch 12 not taken.
48 const auto group_id = map_.find(s->subscriber_.getTransmission());
53
2/4
✓ Branch 3 taken 48 times.
✗ Branch 4 not taken.
✓ Branch 7 taken 48 times.
✗ Branch 8 not taken.
96 auto map_it = map_.at(group_id).find(var.index_);
54
3/4
✓ Branch 3 taken 48 times.
✗ Branch 4 not taken.
✓ Branch 10 taken 39 times.
✓ Branch 11 taken 9 times.
48 if (map_it == map_.at(group_id).end()) {
55 // not yet subscribed
56 39 unsigned decimation = s->subscriber_.getTransmission().getInterval()
57 39 / var.sample_time_.count()
58 39 + 0.5;
59 78 decimation += s->subscriber_.getTransmission().getInterval() > 0.0
60
3/4
✓ Branch 0 taken 39 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
✓ Branch 3 taken 37 times.
39 and !decimation;
61 39 unsigned blocksize = 0;
62
63
1/2
✓ Branch 0 taken 39 times.
✗ Branch 1 not taken.
39 if (decimation > 0) {
64 39 blocksize =
65 39 1.0 / var.sample_time_.count() / decimation / 25.0 + 0.5;
66
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 39 times.
39 blocksize = std::min(var.bufsize / decimation, blocksize);
67 39 blocksize = std::min(blocksize, 1000U);
68 }
69
70
2/2
✓ Branch 0 taken 36 times.
✓ Branch 1 taken 3 times.
39 if (!blocksize)
71 36 blocksize = 1;
72
1/2
✓ Branch 3 taken 39 times.
✗ Branch 4 not taken.
78 time_vector_.resize(
73 78 std::max<size_t>(time_vector_.size(), blocksize + 2));
74
75
6/12
✓ Branch 3 taken 39 times.
✗ Branch 4 not taken.
✓ Branch 14 taken 39 times.
✗ Branch 15 not taken.
✓ Branch 18 taken 39 times.
✗ Branch 19 not taken.
✓ Branch 22 taken 39 times.
✗ Branch 23 not taken.
✓ Branch 33 taken 39 times.
✓ Branch 34 taken 39 times.
✗ Branch 45 not taken.
✗ Branch 46 not taken.
39 map_.at(group_id).insert({var.index_, {blocksize, decimation, {{s}}}});
76
77
2/2
✓ Branch 0 taken 23 times.
✓ Branch 1 taken 16 times.
39 if (notify_pending)
78
1/2
✓ Branch 8 taken 23 times.
✗ Branch 9 not taken.
23 queue.push_back({s, PdCom::Subscription::State::Pending});
79 39 return {true, group_id, decimation, blocksize};
80 }
81 else {
82
2/2
✓ Branch 0 taken 8 times.
✓ Branch 1 taken 1 times.
9 if (notify_pending
83
4/4
✓ Branch 0 taken 7 times.
✓ Branch 1 taken 2 times.
✓ Branch 4 taken 6 times.
✓ Branch 5 taken 1 times.
9 or map_it->second.subscriptions_.currentState
84 != PdCom::Subscription::State::Pending)
85
1/2
✓ Branch 10 taken 8 times.
✗ Branch 11 not taken.
8 queue.push_back({s, map_it->second.subscriptions_.currentState});
86
1/2
✓ Branch 6 taken 9 times.
✗ Branch 7 not taken.
9 map_it->second.subscriptions_.insert(s);
87 9 return {false, group_id, map_it->second.decimation_,
88 18 map_it->second.blocksize_};
89 }
90 }
91
92 PeriodicSubscriptionsWithGroup::transaction
93 27 PeriodicSubscriptionsWithGroup::remove(
94 const Variable &var,
95 PdCom::Transmission tm)
96 {
97 // remove'ing is okay with locked ps,
98 // because the subscription map is _not_ deleted
99 // and the forward map _not_ altered. So pointers stay valid.
100 27 const auto group_id = map_.erase(tm, var.index_);
101 27 return {group_id != 0, group_id, 0, 0};
102 }
103
104 49 PdCom::impl::MsrProto::PeriodicSubscriptionsBase::DataReceiveHandle::
105 DataReceiveHandle(
106 PeriodicSubscriptionsBase &ps,
107 ChannelSubscriptionMap &subscription_map,
108 VariableCache::Lock variable_cache_lock,
109 std::chrono::nanoseconds ts,
110 49 bool consistent_blocksize) :
111 ps_(&ps),
112 49 variable_cache_lock_(std::move(variable_cache_lock)),
113 subscription_map_(&subscription_map),
114 ts_(ts),
115 98 consistent_blocksize_(consistent_blocksize)
116 {
117
2/4
✗ Branch 1 not taken.
✓ Branch 2 taken 49 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 49 times.
49 if (ps.locked_)
118 throw PdCom::InternalError(
119 "Receiving periodic data already in progress");
120 49 ps.locked_ = true;
121 49 }
122
123 51 void PeriodicSubscriptionsBase::DataReceiveHandle::readData(
124 const PdCom::impl::MsrProto::Channel &c,
125 const char *data)
126 {
127
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 51 times.
51 if (!ps_) {
128 throw PdCom::InternalError("Operating on invalid DataReceiveHandle");
129 }
130
1/2
✓ Branch 2 taken 51 times.
✗ Branch 3 not taken.
51 const Base64Info bi(data);
131
132
3/4
✓ Branch 4 taken 51 times.
✗ Branch 5 not taken.
✓ Branch 8 taken 43 times.
✓ Branch 9 taken 8 times.
94 const auto sub_info_it = subscription_map_->find(c.index_);
133
2/2
✓ Branch 7 taken 8 times.
✓ Branch 8 taken 43 times.
51 if (sub_info_it == subscription_map_->end())
134 8 return;
135
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 43 times.
86 if (sub_info_it->second.subscriptions_.currentState
136 43 != PdCom::Subscription::State::Active)
137 return;
138
3/4
✗ Branch 1 not taken.
✓ Branch 2 taken 43 times.
✓ Branch 3 taken 5 times.
✓ Branch 4 taken 38 times.
43 if (!consistent_blocksize_) {
139 5 current_blocksize_ = std::min<int>(
140 10 sub_info_it->second.blocksize_,
141
1/2
✗ Branch 5 not taken.
✓ Branch 6 taken 5 times.
15 bi.encodedDataLength_ / c.type_info->element_size);
142 5 max_read_blocks_ = std::max(max_read_blocks_, current_blocksize_);
143 }
144
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 38 times.
38 else if (current_blocksize_ == -1)
145 current_blocksize_ = sub_info_it->second.blocksize_;
146
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 38 times.
38 else if (
147 38 current_blocksize_
148 38 != static_cast<int>(sub_info_it->second.blocksize_))
149 throw ProtocolError("Blocksize mismatch");
150
4/6
✓ Branch 5 taken 43 times.
✗ Branch 6 not taken.
✓ Branch 10 taken 43 times.
✗ Branch 11 not taken.
✓ Branch 13 taken 43 times.
✓ Branch 14 taken 8 times.
129 variable_cache_lock_[c.index_].readFromBase64(
151 86 data, bi.base64Length_, c, current_blocksize_);
152 }
153
154
155 43 void PeriodicSubscriptionsBase::DataReceiveHandle::readTimestamps(
156 const char *data)
157 {
158 static_assert(
159 sizeof(decltype(time_vector_)::value_type) == sizeof(int64_t),
160 "time_vector_ is not size compatible to time tag");
161
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 43 times.
43 if (!ps_) {
162 throw PdCom::InternalError("Operating on invalid DataReceiveHandle");
163 }
164
2/4
✓ Branch 0 taken 43 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 43 times.
43 if (!data || !*data)
165 throw PdCom::ProtocolError("Tag does not contain data");
166
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 43 times.
43 const auto b64len = ::strlen(data);
167
1/2
✓ Branch 1 taken 43 times.
✗ Branch 2 not taken.
43 if (current_blocksize_ == -1) {
168 43 current_blocksize_ = b64len * 3 / 4 / sizeof(int64_t);
169 43 max_read_blocks_ = current_blocksize_;
170 }
171 43 consistent_blocksize_ = true;
172
173
1/2
✓ Branch 5 taken 43 times.
✗ Branch 6 not taken.
86 ps_->time_vector_.resize(
174 86 std::max<size_t>(current_blocksize_ + 1, ps_->time_vector_.size()));
175
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 43 times.
86 if (!readFromBase64(
176 43 reinterpret_cast<char *>(ps_->time_vector_.data()), data,
177 43 b64len, sizeof(int64_t) * current_blocksize_)) {
178 throw ProtocolError("Invalid <time> child of <data>");
179 }
180 43 timestamps_read_ = true;
181 43 }
182
183 1 void PeriodicSubscriptionsWithGroup::dump(
184 std::vector<PdCom::Process::SubscriptionInfo> &ans,
185 const std::unordered_map<unsigned, Channel *> &channels) const
186 {
187 1 map_.dump(ans, channels);
188 1 }
189
190 47 void PeriodicSubscriptionsBase::DataReceiveHandle::endNewDataRecieving()
191 {
192
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 47 times.
47 if (!ps_) {
193 throw PdCom::InternalError("Operating on invalid DataReceiveHandle");
194 }
195 // make sure to release the variable cache lock
196
1/2
✓ Branch 7 taken 47 times.
✗ Branch 8 not taken.
94 auto cache = std::move(variable_cache_lock_);
197 // as this is the final call on receiving a new dataset,
198 // make sure to erase the ps pointer to mark us invalid afterwards
199 const struct PsDeleter
200 {
201 std::unique_ptr<PeriodicSubscriptionsBase, Unlocker> &ps_;
202 47 ~PsDeleter() { ps_.reset(); }
203 47 PsDeleter(std::unique_ptr<PeriodicSubscriptionsBase, Unlocker> &ps) :
204 47 ps_(ps)
205 47 {}
206
1/2
✓ Branch 4 taken 47 times.
✗ Branch 5 not taken.
94 } psdeleter_ {ps_};
207 // subscriptions can dissappear during <data> tag parsing, so don't
208 // assume that subscriptions_ is not empty
209
3/6
✓ Branch 1 taken 47 times.
✗ Branch 2 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 47 times.
✗ Branch 6 not taken.
✓ Branch 7 taken 47 times.
47 if (!cache or current_blocksize_ == -1)
210 return;
211
212
2/2
✓ Branch 1 taken 60 times.
✓ Branch 2 taken 47 times.
107 for (int block = 0; block < max_read_blocks_; ++block) {
213
2/2
✓ Branch 7 taken 63 times.
✓ Branch 8 taken 60 times.
123 for (const auto &sub_map : *subscription_map_) {
214
1/2
✓ Branch 2 taken 63 times.
✗ Branch 3 not taken.
63 const DataDecoder *var = cache.filled_cache(sub_map.first);
215 // no data recieved for given variable
216
2/2
✓ Branch 0 taken 62 times.
✓ Branch 1 taken 1 times.
64 if (!var
217
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 62 times.
62 || sub_map.second.subscriptions_.currentState
218 != PdCom::Subscription::State::Active)
219 1 continue;
220
1/2
✓ Branch 4 taken 62 times.
✗ Branch 5 not taken.
62 sendDataToSubscribers(*var, sub_map.second, block);
221 }
222
1/2
✓ Branch 4 taken 60 times.
✗ Branch 5 not taken.
60 ps_->subscriber_notifier_.notify();
223 60 ps_->subscriber_notifier_.clear();
224 }
225
1/2
✓ Branch 2 taken 47 times.
✗ Branch 3 not taken.
47 current_blocksize_ = -1;
226 }
227
228 62 void PeriodicSubscriptionsBase::DataReceiveHandle::sendDataToSubscribers(
229 const DataDecoder &var,
230 const ChannelSubscripion &sub_map,
231 const int block)
232 {
233 62 bool size_check_done = false;
234
8/8
✓ Branch 6 taken 68 times.
✓ Branch 7 taken 60 times.
✓ Branch 12 taken 60 times.
✓ Branch 13 taken 2 times.
✓ Branch 15 taken 60 times.
✓ Branch 16 taken 2 times.
✓ Branch 19 taken 60 times.
✓ Branch 20 taken 2 times.
128 for (const auto &weak_sub : sub_map.subscriptions_) {
235
3/4
✓ Branch 3 taken 68 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 66 times.
✓ Branch 7 taken 2 times.
134 if (const auto sub = weak_sub.lock()) {
236
4/4
✓ Branch 5 taken 66 times.
✓ Branch 6 taken 2 times.
✓ Branch 8 taken 66 times.
✓ Branch 9 taken 2 times.
134 const auto var_ptr = sub->variable_.lock();
237
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 68 times.
68 if (!var_ptr)
238 throw EmptyVariable();
239
2/2
✓ Branch 0 taken 62 times.
✓ Branch 1 taken 6 times.
68 if (!size_check_done) {
240
2/4
✗ Branch 1 not taken.
✓ Branch 2 taken 62 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 62 times.
124 if (consistent_blocksize_
241
3/4
✓ Branch 0 taken 47 times.
✓ Branch 1 taken 15 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 47 times.
109 && var.size() < sub_map.blocksize_
242
1/2
✓ Branch 5 taken 47 times.
✗ Branch 6 not taken.
47 * var_ptr->totalSizeInBytes()) {
243 std::ostringstream os;
244 os << "size mismatch in variable cache, "
245 "expected "
246 << sub_map.blocksize_ * var_ptr->totalSizeInBytes()
247 << ", got " << var.size();
248 throw ProtocolError(os.str());
249 }
250
3/4
✓ Branch 7 taken 62 times.
✗ Branch 8 not taken.
✓ Branch 9 taken 2 times.
✓ Branch 10 taken 60 times.
62 else if (var.size() < (block + 1) * var_ptr->totalSizeInBytes())
251 2 return;
252 60 size_check_done = true;
253 }
254
2/4
✓ Branch 11 taken 66 times.
✗ Branch 12 not taken.
✓ Branch 14 taken 66 times.
✗ Branch 15 not taken.
66 sub->readData(var.data() + block * var_ptr->totalSizeInBytes());
255 66 std::chrono::nanoseconds ts;
256
3/4
✗ Branch 1 not taken.
✓ Branch 2 taken 66 times.
✓ Branch 3 taken 53 times.
✓ Branch 4 taken 13 times.
66 if (timestamps_read_)
257
1/2
✓ Branch 4 taken 53 times.
✗ Branch 5 not taken.
53 ts = ps_->time_vector_.at(block);
258 else
259 26 ts = ts_
260 52 + (block - max_read_blocks_ + 1)
261
1/2
✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
26 * std::chrono::duration_cast<
262
1/2
✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
26 std::chrono::nanoseconds>(
263
1/2
✓ Branch 9 taken 13 times.
✗ Branch 10 not taken.
52 var_ptr->getSampleTime())
264 39 * sub_map.decimation_;
265
3/4
✓ Branch 8 taken 66 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 66 times.
✓ Branch 12 taken 2 times.
66 ps_->subscriber_notifier_.insert(sub->subscriber_, ts);
266 }
267 }
268 }
269
270 PeriodicSubscriptionsBase::GroupId
271 48 PeriodicSubscriptionsWithGroup::GroupMap::find(PdCom::Transmission t)
272 {
273 // does a group already exists for given transmission?
274
1/2
✓ Branch 2 taken 48 times.
✗ Branch 3 not taken.
96 const auto it = backward_.find(t);
275
2/2
✓ Branch 6 taken 17 times.
✓ Branch 7 taken 31 times.
48 if (it != backward_.end()) {
276
3/6
✓ Branch 4 taken 17 times.
✗ Branch 5 not taken.
✗ Branch 7 not taken.
✓ Branch 8 taken 17 times.
✓ Branch 9 taken 17 times.
✗ Branch 10 not taken.
34 if (forward_.at(it->second - 1).state
277 17 == ChannelSubscriptionMapImpl::Taken)
278 17 return it->second;
279 }
280 // find a free one;
281
2/2
✓ Branch 2 taken 8 times.
✓ Branch 3 taken 30 times.
38 for (unsigned int i = 0; i < forward_.size(); ++i) {
282
3/4
✗ Branch 3 not taken.
✓ Branch 4 taken 8 times.
✓ Branch 5 taken 1 times.
✓ Branch 6 taken 7 times.
8 if (forward_[i].state == ChannelSubscriptionMapImpl::Empty) {
283 1 const auto group_id = i + 1;
284
1/2
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
1 backward_[t] = group_id;
285 1 forward_[i].state = ChannelSubscriptionMapImpl::Taken;
286 1 return group_id;
287 }
288 }
289 // allocate one at the back;
290
1/2
✓ Branch 2 taken 30 times.
✗ Branch 3 not taken.
30 forward_.emplace_back();
291 30 forward_.back().state = ChannelSubscriptionMapImpl::Taken;
292
1/2
✓ Branch 4 taken 30 times.
✗ Branch 5 not taken.
30 const auto group_id = backward_[t] = forward_.size();
293 30 return group_id;
294 }
295
296 PeriodicSubscriptionsBase::GroupId
297 27 PeriodicSubscriptionsWithGroup::GroupMap::erase(
298 PdCom::Transmission t,
299 PeriodicSubscriptionsBase::ChannelId channel_idx)
300 {
301
1/2
✓ Branch 2 taken 27 times.
✗ Branch 3 not taken.
54 const auto backward_it = backward_.find(t);
302
1/2
✗ Branch 6 not taken.
✓ Branch 7 taken 27 times.
27 if (backward_it == backward_.end()) {
303 assert(backward_it != backward_.end()
304 and "Inconsistent state, backward_ and forward_ out of sync");
305 return 0;
306 }
307
1/2
✓ Branch 4 taken 27 times.
✗ Branch 5 not taken.
27 auto &channel_map = at(backward_it->second);
308
1/2
✓ Branch 2 taken 27 times.
✗ Branch 3 not taken.
54 const auto map_it = channel_map.find(channel_idx);
309 // already cleaned? if so, nothing to do.
310
1/2
✗ Branch 6 not taken.
✓ Branch 7 taken 27 times.
27 if (map_it == channel_map.end())
311 return 0;
312 // clean subscription map
313 const bool no_channel_subs_left =
314 27 map_it->second.subscriptions_.remove_expired();
315 // if channels left, also nothing to do.
316
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 22 times.
27 if (!no_channel_subs_left)
317 5 return 0;
318 // no subscriptions for this channel is left, we have to remember the
319 // group id
320 22 const auto ans = backward_it->second;
321 // now check whether any subscriptions are left for this transmission
322
2/2
✓ Branch 2 taken 14 times.
✓ Branch 3 taken 8 times.
22 if (channel_map.guaranteed_empty()) {
323 // group id no longer in use, free up channel map
324 14 channel_map.state = ChannelSubscriptionMapImpl::AboutToBeDeleted;
325
1/2
✓ Branch 4 taken 14 times.
✗ Branch 5 not taken.
14 backward_.erase(backward_it);
326 }
327 22 return ans;
328 }
329
330 39 void PeriodicSubscriptionsWithGroup::subscribeWasConfirmed(
331 PdCom::impl::Process::PendingCallbackQueue &q,
332 PeriodicSubscriptionsWithGroup::GroupId group_id,
333 PeriodicSubscriptionsWithGroup::ChannelId channel_id)
334 {
335
1/2
✓ Branch 2 taken 39 times.
✗ Branch 3 not taken.
39 auto &sub_map = map_.at(group_id);
336
2/4
✓ Branch 2 taken 39 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 39 times.
✗ Branch 6 not taken.
78 const auto it = sub_map.find(channel_id);
337
1/2
✗ Branch 6 not taken.
✓ Branch 7 taken 39 times.
39 if (it == sub_map.end())
338 return;
339
340
2/4
✓ Branch 3 taken 39 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 39 times.
✗ Branch 7 not taken.
39 it->second.subscriptions_.broadcastState(
341 PdCom::Subscription::State::Active, q);
342 }
343
344 1 void PeriodicSubscriptionsWithGroup::GroupMap::dump(
345 std::vector<PdCom::Process::SubscriptionInfo> &ans,
346 const std::unordered_map<unsigned, Channel *> &channels) const
347 {
348
2/2
✓ Branch 6 taken 1 times.
✓ Branch 7 taken 1 times.
2 for (const auto &all_groups : forward_) {
349
2/2
✓ Branch 5 taken 1 times.
✓ Branch 6 taken 1 times.
2 for (const auto &sub_map : all_groups) {
350 1 const auto var = impl::Variable::toUApi(
351 3 std::static_pointer_cast<const MsrProto::Variable>(
352
2/4
✓ Branch 5 taken 1 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 1 times.
✗ Branch 10 not taken.
5 channels.at(sub_map.first)->shared_from_this()));
353
2/2
✓ Branch 6 taken 1 times.
✓ Branch 7 taken 1 times.
2 for (const auto &weak_sub : sub_map.second.subscriptions_) {
354
1/2
✓ Branch 3 taken 1 times.
✗ Branch 4 not taken.
2 if (const auto sub = weak_sub.lock()) {
355
1/2
✓ Branch 10 taken 1 times.
✗ Branch 11 not taken.
1 ans.emplace_back(sub->This_, &sub->subscriber_, var);
356 }
357 }
358 }
359 }
360 1 }
361
362 PeriodicSubscriptionWithoutGroup::transaction
363 3 PeriodicSubscriptionWithoutGroup::add(
364 Process::PendingCallbackQueue &q,
365 bool notify_pending,
366 const std::shared_ptr<PeriodicSubscription> &s)
367 {
368
2/4
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 3 times.
3 if (locked_)
369 throw PdCom::InternalError("PeriodicSubscription is locked!");
370 6 const auto var_ptr = s->variable_.lock();
371
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (!var_ptr)
372 throw EmptyVariable();
373 3 const auto &var = static_cast<Channel const &>(*var_ptr);
374
1/2
✓ Branch 3 taken 3 times.
✗ Branch 4 not taken.
6 auto map_it = map_.find(var.index_);
375
376
377 3 unsigned decimation = s->subscriber_.getTransmission().getInterval()
378 3 / var.sample_time_.count()
379 3 + 0.5;
380 6 decimation += s->subscriber_.getTransmission().getInterval() > 0.0
381
2/4
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 3 times.
3 and !decimation;
382 3 unsigned blocksize = 0;
383
384
1/2
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
3 if (decimation > 0) {
385 3 blocksize = 1.0 / var.sample_time_.count() / decimation / 25.0 + 0.5;
386
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 blocksize = std::min(var.bufsize / decimation, blocksize);
387 }
388
389
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (!blocksize)
390 blocksize = 1;
391
1/2
✓ Branch 10 taken 3 times.
✗ Branch 11 not taken.
3 time_vector_.resize(std::max<size_t>(time_vector_.size(), blocksize + 2));
392
393
1/2
✓ Branch 6 taken 3 times.
✗ Branch 7 not taken.
3 if (map_it == map_.end()) {
394 // not yet subscribed
395
5/10
✓ Branch 10 taken 3 times.
✗ Branch 11 not taken.
✓ Branch 14 taken 3 times.
✗ Branch 15 not taken.
✓ Branch 18 taken 3 times.
✗ Branch 19 not taken.
✓ Branch 29 taken 3 times.
✓ Branch 30 taken 3 times.
✗ Branch 41 not taken.
✗ Branch 42 not taken.
3 map_.insert({var.index_, {blocksize, decimation, {{s}}}});
396
397
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (notify_pending)
398 q.push_back({s, PdCom::Subscription::State::Pending});
399 3 return {true, decimation, blocksize};
400 }
401 else if (decimation < map_it->second.decimation_) {
402 // we have to resubscribe
403 map_it->second.subscriptions_.currentState =
404 PdCom::Subscription::State::Pending;
405
406 if (notify_pending)
407 q.push_back({s, PdCom::Subscription::State::Pending});
408 auto subs = std::move(map_it->second.subscriptions_);
409 subs.insert(s);
410 map_.erase(map_it);
411 map_.insert({var.index_, {blocksize, decimation, std::move(subs)}});
412 return {true, decimation, blocksize};
413 }
414 else {
415 if (notify_pending
416 or map_it->second.subscriptions_.currentState
417 != PdCom::Subscription::State::Pending)
418 q.push_back({s, map_it->second.subscriptions_.currentState});
419 map_it->second.subscriptions_.insert(s);
420 return {false, map_it->second.decimation_, map_it->second.blocksize_};
421 }
422 }
423
424 PeriodicSubscriptionWithoutGroup::transaction
425 PeriodicSubscriptionWithoutGroup::remove(const Variable &_var)
426 {
427 const auto it = map_.find(_var.index_);
428 if (it == map_.end() || !it->second.subscriptions_.remove_expired())
429 // not found or still subscribed by others
430 return {false, 0, 0};
431 else {
432 // now empty
433 map_.erase(it);
434 return {true, 0, 0};
435 }
436 }
437
438 3 void PeriodicSubscriptionWithoutGroup::subscribeWasConfirmed(
439 Process::PendingCallbackQueue &q,
440 ChannelId channel_id)
441 {
442
2/4
✓ Branch 2 taken 3 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 3 times.
✗ Branch 6 not taken.
6 const auto it = map_.find(channel_id);
443
1/2
✗ Branch 6 not taken.
✓ Branch 7 taken 3 times.
3 if (it == map_.end())
444 return;
445
446
2/4
✓ Branch 3 taken 3 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 3 times.
✗ Branch 7 not taken.
3 it->second.subscriptions_.broadcastState(
447 PdCom::Subscription::State::Active, q);
448 }
449
450 void PeriodicSubscriptionWithoutGroup::dump(
451 std::vector<PdCom::Process::SubscriptionInfo> &ans,
452 const std::unordered_map<unsigned, Channel *> &channels) const
453 {
454 for (const auto &sub_map : map_) {
455 const auto var = impl::Variable::toUApi(
456 std::static_pointer_cast<const MsrProto::Variable>(
457 channels.at(sub_map.first)->shared_from_this()));
458 for (const auto &weak_sub : sub_map.second.subscriptions_) {
459 if (const auto sub = weak_sub.lock()) {
460 ans.emplace_back(sub->This_, &sub->subscriber_, var);
461 }
462 }
463 }
464 }
465