GCC Code Coverage Report


Directory: ./
File: pdcom5/src/msrproto/PeriodicSubscriptions.cpp
Date: 2024-12-29 04:08:32
Exec Total Coverage
Lines: 182 244 74.6%
Branches: 162 392 41.3%

Line Branch Exec Source
1 /*****************************************************************************
2 *
3 * Copyright (C) 2021 Bjarne von Horn (vh at igh dot de)
4 *
5 * This file is part of the PdCom library.
6 *
7 * The PdCom library is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU Lesser General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or (at your
10 * option) any later version.
11 *
12 * The PdCom library is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
14 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 * License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public License
18 * along with the PdCom library. If not, see <http://www.gnu.org/licenses/>.
19 *
20 *****************************************************************************/
21
22 #include "PeriodicSubscriptions.h"
23
24 #include "Channel.h"
25 #include "Subscription.h"
26
27 #include <cassert>
28 #include <cstring>
29 #include <pdcom5/Exception.h>
30 #include <sstream>
31
32 using PdCom::impl::MsrProto::DataDecoder;
33 using PdCom::impl::MsrProto::PeriodicSubscription;
34 using PdCom::impl::MsrProto::PeriodicSubscriptionsBase;
35 using PdCom::impl::MsrProto::PeriodicSubscriptionsWithGroup;
36 using PdCom::impl::MsrProto::PeriodicSubscriptionWithoutGroup;
37
38 using PdCom::impl::MsrProto::Variable;
39
40
41 42 PeriodicSubscriptionsWithGroup::transaction PeriodicSubscriptionsWithGroup::add(
42 PdCom::impl::Process::PendingCallbackQueue &queue,
43 bool notify_pending,
44 const std::shared_ptr<PeriodicSubscription> &s)
45 {
46 84 const auto var_ptr = s->variable_.lock();
47
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 42 times.
42 if (!var_ptr)
48 throw EmptyVariable();
49 42 const auto &var = static_cast<Channel const &>(*var_ptr);
50
1/2
✓ Branch 11 taken 42 times.
✗ Branch 12 not taken.
42 const auto group_id = map_.find(s->subscriber_.getTransmission());
51
2/4
✓ Branch 3 taken 42 times.
✗ Branch 4 not taken.
✓ Branch 7 taken 42 times.
✗ Branch 8 not taken.
84 auto map_it = map_.at(group_id).find(var.index_);
52
2/4
✓ Branch 3 taken 42 times.
✗ Branch 4 not taken.
✓ Branch 10 taken 42 times.
✗ Branch 11 not taken.
42 if (map_it == map_.at(group_id).end()) {
53 // not yet subscribed
54 42 unsigned decimation = s->subscriber_.getTransmission().getInterval()
55 42 / var.sample_time_.count()
56 42 + 0.5;
57 84 decimation += s->subscriber_.getTransmission().getInterval() > 0.0
58
2/4
✓ Branch 0 taken 42 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 42 times.
42 and !decimation;
59 42 unsigned blocksize = 0;
60
61
1/2
✓ Branch 0 taken 42 times.
✗ Branch 1 not taken.
42 if (decimation > 0) {
62 42 blocksize =
63 42 1.0 / var.sample_time_.count() / decimation / 25.0 + 0.5;
64
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 42 times.
42 blocksize = std::min(var.bufsize / decimation, blocksize);
65 42 blocksize = std::min(blocksize, 1000U);
66 }
67
68
2/2
✓ Branch 0 taken 13 times.
✓ Branch 1 taken 29 times.
42 if (!blocksize)
69 13 blocksize = 1;
70
1/2
✓ Branch 3 taken 42 times.
✗ Branch 4 not taken.
84 time_vector_.resize(
71 84 std::max<size_t>(time_vector_.size(), blocksize + 2));
72
73
6/12
✓ Branch 3 taken 42 times.
✗ Branch 4 not taken.
✓ Branch 14 taken 42 times.
✗ Branch 15 not taken.
✓ Branch 18 taken 42 times.
✗ Branch 19 not taken.
✓ Branch 22 taken 42 times.
✗ Branch 23 not taken.
✓ Branch 33 taken 42 times.
✓ Branch 34 taken 42 times.
✗ Branch 45 not taken.
✗ Branch 46 not taken.
42 map_.at(group_id).insert({var.index_, {blocksize, decimation, {{s}}}});
74
75
2/2
✓ Branch 0 taken 26 times.
✓ Branch 1 taken 16 times.
42 if (notify_pending)
76
1/2
✓ Branch 8 taken 26 times.
✗ Branch 9 not taken.
26 queue.push_back({s, PdCom::Subscription::State::Pending});
77 42 return {true, group_id, decimation, blocksize};
78 }
79 else {
80 if (notify_pending
81 or map_it->second.subscriptions_.currentState
82 != PdCom::Subscription::State::Pending)
83 queue.push_back({s, map_it->second.subscriptions_.currentState});
84 map_it->second.subscriptions_.insert(s);
85 return {false, group_id, map_it->second.decimation_,
86 map_it->second.blocksize_};
87 }
88 }
89
90 PeriodicSubscriptionsWithGroup::transaction
91 1 PeriodicSubscriptionsWithGroup::remove(
92 const Variable &var,
93 PdCom::Transmission tm)
94 {
95 // remove'ing is okay with locked ps,
96 // because the subscription map is _not_ deleted
97 // and the forward map _not_ altered. So pointers stay valid.
98 1 const auto group_id = map_.erase(tm, var.index_);
99 1 return {group_id != 0, group_id, 0, 0};
100 }
101
102 387 PdCom::impl::MsrProto::PeriodicSubscriptionsBase::DataReceiveHandle::
103 DataReceiveHandle(
104 PeriodicSubscriptionsBase &ps,
105 ChannelSubscriptionMap &subscription_map,
106 VariableCache::Lock variable_cache_lock,
107 std::chrono::nanoseconds ts,
108 387 bool consistent_blocksize) :
109 ps_(&ps),
110 387 variable_cache_lock_(std::move(variable_cache_lock)),
111 subscription_map_(&subscription_map),
112 ts_(ts),
113 774 consistent_blocksize_(consistent_blocksize)
114 {
115
2/4
✗ Branch 1 not taken.
✓ Branch 2 taken 387 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 387 times.
387 if (ps.locked_)
116 throw PdCom::InternalError(
117 "Receiving periodic data already in progress");
118 387 ps.locked_ = true;
119 387 }
120
121 547 void PeriodicSubscriptionsBase::DataReceiveHandle::readData(
122 const PdCom::impl::MsrProto::Channel &c,
123 const char *data)
124 {
125
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 547 times.
547 if (!ps_) {
126 throw PdCom::InternalError("Operating on invalid DataReceiveHandle");
127 }
128
1/2
✓ Branch 2 taken 547 times.
✗ Branch 3 not taken.
547 const Base64Info bi(data);
129
130
3/4
✓ Branch 4 taken 547 times.
✗ Branch 5 not taken.
✓ Branch 8 taken 546 times.
✓ Branch 9 taken 1 times.
1093 const auto sub_info_it = subscription_map_->find(c.index_);
131
2/2
✓ Branch 7 taken 1 times.
✓ Branch 8 taken 546 times.
547 if (sub_info_it == subscription_map_->end())
132 1 return;
133
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 546 times.
1092 if (sub_info_it->second.subscriptions_.currentState
134 546 != PdCom::Subscription::State::Active)
135 return;
136
3/4
✗ Branch 1 not taken.
✓ Branch 2 taken 546 times.
✓ Branch 3 taken 45 times.
✓ Branch 4 taken 501 times.
546 if (!consistent_blocksize_) {
137 45 current_blocksize_ = std::min<int>(
138 90 sub_info_it->second.blocksize_,
139
1/2
✗ Branch 5 not taken.
✓ Branch 6 taken 45 times.
135 bi.encodedDataLength_ / c.type_info->element_size);
140 45 max_read_blocks_ = std::max(max_read_blocks_, current_blocksize_);
141 }
142
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 501 times.
501 else if (current_blocksize_ == -1)
143 current_blocksize_ = sub_info_it->second.blocksize_;
144
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 501 times.
501 else if (
145 501 current_blocksize_
146 501 != static_cast<int>(sub_info_it->second.blocksize_))
147 throw ProtocolError("Blocksize mismatch");
148
4/6
✓ Branch 5 taken 546 times.
✗ Branch 6 not taken.
✓ Branch 10 taken 546 times.
✗ Branch 11 not taken.
✓ Branch 13 taken 546 times.
✓ Branch 14 taken 1 times.
1638 variable_cache_lock_[c.index_].readFromBase64(
149 1092 data, bi.base64Length_, c, current_blocksize_);
150 }
151
152
153 341 void PeriodicSubscriptionsBase::DataReceiveHandle::readTimestamps(
154 const char *data)
155 {
156 static_assert(
157 sizeof(decltype(time_vector_)::value_type) == sizeof(int64_t),
158 "time_vector_ is not size compatible to time tag");
159
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 341 times.
341 if (!ps_) {
160 throw PdCom::InternalError("Operating on invalid DataReceiveHandle");
161 }
162
2/4
✓ Branch 0 taken 341 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 341 times.
341 if (!data || !*data)
163 throw PdCom::ProtocolError("Tag does not contain data");
164
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 341 times.
341 const auto b64len = ::strlen(data);
165
1/2
✓ Branch 1 taken 341 times.
✗ Branch 2 not taken.
341 if (current_blocksize_ == -1) {
166 341 current_blocksize_ = b64len * 3 / 4 / sizeof(int64_t);
167 341 max_read_blocks_ = current_blocksize_;
168 }
169 341 consistent_blocksize_ = true;
170
171
1/2
✓ Branch 5 taken 341 times.
✗ Branch 6 not taken.
682 ps_->time_vector_.resize(
172 682 std::max<size_t>(current_blocksize_ + 1, ps_->time_vector_.size()));
173
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 341 times.
682 if (!readFromBase64(
174 341 reinterpret_cast<char *>(ps_->time_vector_.data()), data,
175 341 b64len, sizeof(int64_t) * current_blocksize_)) {
176 throw ProtocolError("Invalid <time> child of <data>");
177 }
178 341 timestamps_read_ = true;
179 341 }
180
181 void PeriodicSubscriptionsWithGroup::dump(
182 std::vector<PdCom::Process::SubscriptionInfo> &ans,
183 const std::unordered_map<unsigned, Channel *> &channels) const
184 {
185 map_.dump(ans, channels);
186 }
187
188 387 void PeriodicSubscriptionsBase::DataReceiveHandle::endNewDataRecieving()
189 {
190
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 387 times.
387 if (!ps_) {
191 throw PdCom::InternalError("Operating on invalid DataReceiveHandle");
192 }
193 // make sure to release the variable cache lock
194
2/2
✓ Branch 7 taken 386 times.
✓ Branch 8 taken 1 times.
773 auto cache = std::move(variable_cache_lock_);
195 // as this is the final call on receiving a new dataset,
196 // make sure to erase the ps pointer to mark us invalid afterwards
197 const struct PsDeleter
198 {
199 std::unique_ptr<PeriodicSubscriptionsBase, Unlocker> &ps_;
200 387 ~PsDeleter() { ps_.reset(); }
201 387 PsDeleter(std::unique_ptr<PeriodicSubscriptionsBase, Unlocker> &ps) :
202 387 ps_(ps)
203 387 {}
204
2/2
✓ Branch 4 taken 386 times.
✓ Branch 5 taken 1 times.
773 } psdeleter_ {ps_};
205 // subscriptions can dissappear during <data> tag parsing, so don't
206 // assume that subscriptions_ is not empty
207
5/6
✓ Branch 1 taken 387 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1 times.
✓ Branch 5 taken 386 times.
✓ Branch 6 taken 1 times.
✓ Branch 7 taken 386 times.
387 if (!cache or current_blocksize_ == -1)
208 1 return;
209
210
2/2
✓ Branch 1 taken 890 times.
✓ Branch 2 taken 386 times.
1276 for (int block = 0; block < max_read_blocks_; ++block) {
211
2/2
✓ Branch 7 taken 1253 times.
✓ Branch 8 taken 890 times.
2143 for (const auto &sub_map : *subscription_map_) {
212
1/2
✓ Branch 2 taken 1253 times.
✗ Branch 3 not taken.
1253 const DataDecoder *var = cache.filled_cache(sub_map.first);
213 // no data recieved for given variable
214
2/2
✓ Branch 0 taken 1210 times.
✓ Branch 1 taken 43 times.
1296 if (!var
215
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1210 times.
1210 || sub_map.second.subscriptions_.currentState
216 != PdCom::Subscription::State::Active)
217 43 continue;
218
1/2
✓ Branch 4 taken 1210 times.
✗ Branch 5 not taken.
1210 sendDataToSubscribers(*var, sub_map.second, block);
219 }
220
1/2
✓ Branch 4 taken 890 times.
✗ Branch 5 not taken.
890 ps_->subscriber_notifier_.notify();
221 890 ps_->subscriber_notifier_.clear();
222 }
223
2/2
✓ Branch 2 taken 386 times.
✓ Branch 3 taken 1 times.
386 current_blocksize_ = -1;
224 }
225
226 1210 void PeriodicSubscriptionsBase::DataReceiveHandle::sendDataToSubscribers(
227 const DataDecoder &var,
228 const ChannelSubscripion &sub_map,
229 const int block)
230 {
231 1210 bool size_check_done = false;
232
5/8
✓ Branch 6 taken 1210 times.
✓ Branch 7 taken 1210 times.
✓ Branch 12 taken 1210 times.
✗ Branch 13 not taken.
✓ Branch 15 taken 1210 times.
✗ Branch 16 not taken.
✓ Branch 19 taken 1210 times.
✗ Branch 20 not taken.
2420 for (const auto &weak_sub : sub_map.subscriptions_) {
233
2/4
✓ Branch 3 taken 1210 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 1210 times.
✗ Branch 7 not taken.
2420 if (const auto sub = weak_sub.lock()) {
234
2/4
✓ Branch 5 taken 1210 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 1210 times.
✗ Branch 9 not taken.
2420 const auto var_ptr = sub->variable_.lock();
235
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1210 times.
1210 if (!var_ptr)
236 throw EmptyVariable();
237
1/2
✓ Branch 0 taken 1210 times.
✗ Branch 1 not taken.
1210 if (!size_check_done) {
238
2/4
✗ Branch 1 not taken.
✓ Branch 2 taken 1210 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 1210 times.
2420 if (consistent_blocksize_
239
3/4
✓ Branch 0 taken 1165 times.
✓ Branch 1 taken 45 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 1165 times.
2375 && var.size() < sub_map.blocksize_
240
1/2
✓ Branch 5 taken 1165 times.
✗ Branch 6 not taken.
1165 * var_ptr->totalSizeInBytes()) {
241 std::ostringstream os;
242 os << "size mismatch in variable cache, "
243 "expected "
244 << sub_map.blocksize_ * var_ptr->totalSizeInBytes()
245 << ", got " << var.size();
246 throw ProtocolError(os.str());
247 }
248
2/4
✓ Branch 7 taken 1210 times.
✗ Branch 8 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 1210 times.
1210 else if (var.size() < (block + 1) * var_ptr->totalSizeInBytes())
249 return;
250 1210 size_check_done = true;
251 }
252
2/4
✓ Branch 11 taken 1210 times.
✗ Branch 12 not taken.
✓ Branch 14 taken 1210 times.
✗ Branch 15 not taken.
1210 sub->readData(var.data() + block * var_ptr->totalSizeInBytes());
253 1210 std::chrono::nanoseconds ts;
254
3/4
✗ Branch 1 not taken.
✓ Branch 2 taken 1210 times.
✓ Branch 3 taken 1165 times.
✓ Branch 4 taken 45 times.
1210 if (timestamps_read_)
255
1/2
✓ Branch 4 taken 1165 times.
✗ Branch 5 not taken.
1165 ts = ps_->time_vector_.at(block);
256 else
257 90 ts = ts_
258 180 + (block - max_read_blocks_ + 1)
259
1/2
✓ Branch 1 taken 45 times.
✗ Branch 2 not taken.
90 * std::chrono::duration_cast<
260
1/2
✓ Branch 1 taken 45 times.
✗ Branch 2 not taken.
90 std::chrono::nanoseconds>(
261
1/2
✓ Branch 9 taken 45 times.
✗ Branch 10 not taken.
180 var_ptr->getSampleTime())
262 135 * sub_map.decimation_;
263
2/4
✓ Branch 8 taken 1210 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 1210 times.
✗ Branch 12 not taken.
1210 ps_->subscriber_notifier_.insert(sub->subscriber_, ts);
264 }
265 }
266 }
267
268 PeriodicSubscriptionsBase::GroupId
269 42 PeriodicSubscriptionsWithGroup::GroupMap::find(PdCom::Transmission t)
270 {
271 // does a group already exists for given transmission?
272
1/2
✓ Branch 2 taken 42 times.
✗ Branch 3 not taken.
84 const auto it = backward_.find(t);
273
2/2
✓ Branch 6 taken 8 times.
✓ Branch 7 taken 34 times.
42 if (it != backward_.end()) {
274
3/6
✓ Branch 4 taken 8 times.
✗ Branch 5 not taken.
✗ Branch 7 not taken.
✓ Branch 8 taken 8 times.
✓ Branch 9 taken 8 times.
✗ Branch 10 not taken.
16 if (forward_.at(it->second - 1).state
275 8 == ChannelSubscriptionMapImpl::Taken)
276 8 return it->second;
277 }
278 // find a free one;
279
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 34 times.
34 for (unsigned int i = 0; i < forward_.size(); ++i) {
280 if (forward_[i].state == ChannelSubscriptionMapImpl::Empty) {
281 const auto group_id = i + 1;
282 backward_[t] = group_id;
283 forward_[i].state = ChannelSubscriptionMapImpl::Taken;
284 return group_id;
285 }
286 }
287 // allocate one at the back;
288
1/2
✓ Branch 2 taken 34 times.
✗ Branch 3 not taken.
34 forward_.emplace_back();
289 34 forward_.back().state = ChannelSubscriptionMapImpl::Taken;
290
1/2
✓ Branch 4 taken 34 times.
✗ Branch 5 not taken.
34 const auto group_id = backward_[t] = forward_.size();
291 34 return group_id;
292 }
293
294 PeriodicSubscriptionsBase::GroupId
295 1 PeriodicSubscriptionsWithGroup::GroupMap::erase(
296 PdCom::Transmission t,
297 PeriodicSubscriptionsBase::ChannelId channel_idx)
298 {
299
1/2
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
2 const auto backward_it = backward_.find(t);
300
1/2
✗ Branch 6 not taken.
✓ Branch 7 taken 1 times.
1 if (backward_it == backward_.end()) {
301 assert(backward_it != backward_.end()
302 and "Inconsistent state, backward_ and forward_ out of sync");
303 return 0;
304 }
305
1/2
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
1 auto &channel_map = at(backward_it->second);
306
1/2
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
2 const auto map_it = channel_map.find(channel_idx);
307 // already cleaned? if so, nothing to do.
308
1/2
✗ Branch 6 not taken.
✓ Branch 7 taken 1 times.
1 if (map_it == channel_map.end())
309 return 0;
310 // clean subscription map
311 const bool no_channel_subs_left =
312 1 map_it->second.subscriptions_.remove_expired();
313 // if channels left, also nothing to do.
314
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (!no_channel_subs_left)
315 return 0;
316 // no subscriptions for this channel is left, we have to remember the
317 // group id
318 1 const auto ans = backward_it->second;
319 // now check whether any subscriptions are left for this transmission
320
1/2
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
1 if (channel_map.guaranteed_empty()) {
321 // group id no longer in use, free up channel map
322 1 channel_map.state = ChannelSubscriptionMapImpl::AboutToBeDeleted;
323
1/2
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
1 backward_.erase(backward_it);
324 }
325 1 return ans;
326 }
327
328 42 void PeriodicSubscriptionsWithGroup::subscribeWasConfirmed(
329 PdCom::impl::Process::PendingCallbackQueue &q,
330 PeriodicSubscriptionsWithGroup::GroupId group_id,
331 PeriodicSubscriptionsWithGroup::ChannelId channel_id)
332 {
333
1/2
✓ Branch 2 taken 42 times.
✗ Branch 3 not taken.
42 auto &sub_map = map_.at(group_id);
334
2/4
✓ Branch 2 taken 42 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 42 times.
✗ Branch 6 not taken.
84 const auto it = sub_map.find(channel_id);
335
1/2
✗ Branch 6 not taken.
✓ Branch 7 taken 42 times.
42 if (it == sub_map.end())
336 return;
337
338
2/4
✓ Branch 3 taken 42 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 42 times.
✗ Branch 7 not taken.
42 it->second.subscriptions_.broadcastState(
339 PdCom::Subscription::State::Active, q);
340 }
341
342 void PeriodicSubscriptionsWithGroup::GroupMap::dump(
343 std::vector<PdCom::Process::SubscriptionInfo> &ans,
344 const std::unordered_map<unsigned, Channel *> &channels) const
345 {
346 for (const auto &all_groups : forward_) {
347 for (const auto &sub_map : all_groups) {
348 const auto var = impl::Variable::toUApi(
349 std::static_pointer_cast<const MsrProto::Variable>(
350 channels.at(sub_map.first)->shared_from_this()));
351 for (const auto &weak_sub : sub_map.second.subscriptions_) {
352 if (const auto sub = weak_sub.lock()) {
353 ans.emplace_back(sub->This_, &sub->subscriber_, var);
354 }
355 }
356 }
357 }
358 }
359
360 PeriodicSubscriptionWithoutGroup::transaction
361 4 PeriodicSubscriptionWithoutGroup::add(
362 Process::PendingCallbackQueue &q,
363 bool notify_pending,
364 const std::shared_ptr<PeriodicSubscription> &s)
365 {
366
2/4
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 4 times.
4 if (locked_)
367 throw PdCom::InternalError("PeriodicSubscription is locked!");
368 8 const auto var_ptr = s->variable_.lock();
369
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (!var_ptr)
370 throw EmptyVariable();
371 4 const auto &var = static_cast<Channel const &>(*var_ptr);
372
1/2
✓ Branch 3 taken 4 times.
✗ Branch 4 not taken.
8 auto map_it = map_.find(var.index_);
373
374
375 4 unsigned decimation = s->subscriber_.getTransmission().getInterval()
376 4 / var.sample_time_.count()
377 4 + 0.5;
378 8 decimation += s->subscriber_.getTransmission().getInterval() > 0.0
379
2/4
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 4 times.
4 and !decimation;
380 4 unsigned blocksize = 0;
381
382
1/2
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
4 if (decimation > 0) {
383 4 blocksize = 1.0 / var.sample_time_.count() / decimation / 25.0 + 0.5;
384
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 blocksize = std::min(var.bufsize / decimation, blocksize);
385 }
386
387
1/2
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
4 if (!blocksize)
388 4 blocksize = 1;
389
1/2
✓ Branch 10 taken 4 times.
✗ Branch 11 not taken.
4 time_vector_.resize(std::max<size_t>(time_vector_.size(), blocksize + 2));
390
391
2/2
✓ Branch 6 taken 3 times.
✓ Branch 7 taken 1 times.
4 if (map_it == map_.end()) {
392 // not yet subscribed
393
5/10
✓ Branch 10 taken 3 times.
✗ Branch 11 not taken.
✓ Branch 14 taken 3 times.
✗ Branch 15 not taken.
✓ Branch 18 taken 3 times.
✗ Branch 19 not taken.
✓ Branch 29 taken 3 times.
✓ Branch 30 taken 3 times.
✗ Branch 41 not taken.
✗ Branch 42 not taken.
3 map_.insert({var.index_, {blocksize, decimation, {{s}}}});
394
395
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (notify_pending)
396 q.push_back({s, PdCom::Subscription::State::Pending});
397 3 return {true, decimation, blocksize};
398 }
399
1/2
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
1 else if (decimation < map_it->second.decimation_) {
400 // we have to resubscribe
401 1 map_it->second.subscriptions_.currentState =
402 PdCom::Subscription::State::Pending;
403
404
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (notify_pending)
405 q.push_back({s, PdCom::Subscription::State::Pending});
406 2 auto subs = std::move(map_it->second.subscriptions_);
407
1/2
✓ Branch 5 taken 1 times.
✗ Branch 6 not taken.
1 subs.insert(s);
408
1/2
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
1 map_.erase(map_it);
409
2/4
✓ Branch 9 taken 1 times.
✗ Branch 10 not taken.
✓ Branch 13 taken 1 times.
✗ Branch 14 not taken.
1 map_.insert({var.index_, {blocksize, decimation, std::move(subs)}});
410 1 return {true, decimation, blocksize};
411 }
412 else {
413 if (notify_pending
414 or map_it->second.subscriptions_.currentState
415 != PdCom::Subscription::State::Pending)
416 q.push_back({s, map_it->second.subscriptions_.currentState});
417 map_it->second.subscriptions_.insert(s);
418 return {false, map_it->second.decimation_, map_it->second.blocksize_};
419 }
420 }
421
422 PeriodicSubscriptionWithoutGroup::transaction
423 2 PeriodicSubscriptionWithoutGroup::remove(const Variable &_var)
424 {
425
1/2
✓ Branch 3 taken 2 times.
✗ Branch 4 not taken.
4 const auto it = map_.find(_var.index_);
426
7/10
✓ Branch 4 taken 2 times.
✗ Branch 5 not taken.
✓ Branch 9 taken 1 times.
✓ Branch 10 taken 1 times.
✓ Branch 11 taken 2 times.
✗ Branch 12 not taken.
✓ Branch 14 taken 2 times.
✗ Branch 15 not taken.
✓ Branch 17 taken 1 times.
✓ Branch 18 taken 1 times.
2 if (it == map_.end() || !it->second.subscriptions_.remove_expired())
427 // not found or still subscribed by others
428 1 return {false, 0, 0};
429 else {
430 // now empty
431
1/2
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
1 map_.erase(it);
432 1 return {true, 0, 0};
433 }
434 }
435
436 4 void PeriodicSubscriptionWithoutGroup::subscribeWasConfirmed(
437 Process::PendingCallbackQueue &q,
438 ChannelId channel_id)
439 {
440
2/4
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 4 times.
✗ Branch 6 not taken.
8 const auto it = map_.find(channel_id);
441
1/2
✗ Branch 6 not taken.
✓ Branch 7 taken 4 times.
4 if (it == map_.end())
442 return;
443
444
2/4
✓ Branch 3 taken 4 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 4 times.
✗ Branch 7 not taken.
4 it->second.subscriptions_.broadcastState(
445 PdCom::Subscription::State::Active, q);
446 }
447
448 void PeriodicSubscriptionWithoutGroup::dump(
449 std::vector<PdCom::Process::SubscriptionInfo> &ans,
450 const std::unordered_map<unsigned, Channel *> &channels) const
451 {
452 for (const auto &sub_map : map_) {
453 const auto var = impl::Variable::toUApi(
454 std::static_pointer_cast<const MsrProto::Variable>(
455 channels.at(sub_map.first)->shared_from_this()));
456 for (const auto &weak_sub : sub_map.second.subscriptions_) {
457 if (const auto sub = weak_sub.lock()) {
458 ans.emplace_back(sub->This_, &sub->subscriber_, var);
459 }
460 }
461 }
462 }
463