GCC Code Coverage Report


Directory: ./
File: src/msrproto/PeriodicSubscriptions.cpp
Date: 2024-11-05 15:23:15
Exec Total Coverage
Lines: 192 246 78.0%
Branches: 177 392 45.2%

Line Branch Exec Source
1 /*****************************************************************************
2 *
3 * Copyright (C) 2021 Bjarne von Horn (vh at igh dot de)
4 *
5 * This file is part of the PdCom library.
6 *
7 * The PdCom library is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU Lesser General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or (at your
10 * option) any later version.
11 *
12 * The PdCom library is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
14 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 * License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public License
18 * along with the PdCom library. If not, see <http://www.gnu.org/licenses/>.
19 *
20 *****************************************************************************/
21
22 #include "PeriodicSubscriptions.h"
23
24 #include "Channel.h"
25 #include "Subscription.h"
26
27 #include <cassert>
28 #include <cstring>
29 #include <pdcom5/Exception.h>
30 #include <sstream>
31
32 using PdCom::impl::MsrProto::DataDecoder;
33 using PdCom::impl::MsrProto::PeriodicSubscription;
34 using PdCom::impl::MsrProto::PeriodicSubscriptionsBase;
35 using PdCom::impl::MsrProto::PeriodicSubscriptionsWithGroup;
36 using PdCom::impl::MsrProto::PeriodicSubscriptionWithoutGroup;
37
38 using PdCom::impl::MsrProto::Variable;
39
40
41 52 PeriodicSubscriptionsWithGroup::transaction PeriodicSubscriptionsWithGroup::add(
42 PdCom::impl::Process::PendingCallbackQueue &queue,
43 bool notify_pending,
44 const std::shared_ptr<PeriodicSubscription> &s)
45 {
46 104 const auto var_ptr = s->variable_.lock();
47
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 52 times.
52 if (!var_ptr)
48 throw EmptyVariable();
49 52 const auto &var = static_cast<Channel const &>(*var_ptr);
50
1/2
✓ Branch 11 taken 52 times.
✗ Branch 12 not taken.
52 const auto group_id = map_.find(s->subscriber_.getTransmission());
51
2/4
✓ Branch 3 taken 52 times.
✗ Branch 4 not taken.
✓ Branch 7 taken 52 times.
✗ Branch 8 not taken.
104 auto map_it = map_.at(group_id).find(var.index_);
52
3/4
✓ Branch 3 taken 52 times.
✗ Branch 4 not taken.
✓ Branch 10 taken 41 times.
✓ Branch 11 taken 11 times.
52 if (map_it == map_.at(group_id).end()) {
53 // not yet subscribed
54 41 unsigned decimation = s->subscriber_.getTransmission().getInterval()
55 41 / var.sample_time_.count()
56 41 + 0.5;
57 82 decimation += s->subscriber_.getTransmission().getInterval() > 0.0
58
3/4
✓ Branch 0 taken 41 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
✓ Branch 3 taken 39 times.
41 and !decimation;
59 41 unsigned blocksize = 0;
60
61
1/2
✓ Branch 0 taken 41 times.
✗ Branch 1 not taken.
41 if (decimation > 0) {
62 41 blocksize =
63 41 1.0 / var.sample_time_.count() / decimation / 25.0 + 0.5;
64
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 41 times.
41 blocksize = std::min(var.bufsize / decimation, blocksize);
65 41 blocksize = std::min(blocksize, 1000U);
66 }
67
68
2/2
✓ Branch 0 taken 38 times.
✓ Branch 1 taken 3 times.
41 if (!blocksize)
69 38 blocksize = 1;
70
1/2
✓ Branch 3 taken 41 times.
✗ Branch 4 not taken.
82 time_vector_.resize(
71 82 std::max<size_t>(time_vector_.size(), blocksize + 2));
72
73
6/12
✓ Branch 3 taken 41 times.
✗ Branch 4 not taken.
✓ Branch 14 taken 41 times.
✗ Branch 15 not taken.
✓ Branch 18 taken 41 times.
✗ Branch 19 not taken.
✓ Branch 22 taken 41 times.
✗ Branch 23 not taken.
✓ Branch 33 taken 41 times.
✓ Branch 34 taken 41 times.
✗ Branch 45 not taken.
✗ Branch 46 not taken.
41 map_.at(group_id).insert({var.index_, {blocksize, decimation, {{s}}}});
74
75
2/2
✓ Branch 0 taken 25 times.
✓ Branch 1 taken 16 times.
41 if (notify_pending)
76
1/2
✓ Branch 8 taken 25 times.
✗ Branch 9 not taken.
25 queue.push_back({s, PdCom::Subscription::State::Pending});
77 41 return {true, group_id, decimation, blocksize};
78 }
79 else {
80
2/2
✓ Branch 0 taken 10 times.
✓ Branch 1 taken 1 times.
11 if (notify_pending
81
4/4
✓ Branch 0 taken 7 times.
✓ Branch 1 taken 4 times.
✓ Branch 4 taken 6 times.
✓ Branch 5 taken 1 times.
11 or map_it->second.subscriptions_.currentState
82 != PdCom::Subscription::State::Pending)
83
1/2
✓ Branch 10 taken 10 times.
✗ Branch 11 not taken.
10 queue.push_back({s, map_it->second.subscriptions_.currentState});
84
1/2
✓ Branch 6 taken 11 times.
✗ Branch 7 not taken.
11 map_it->second.subscriptions_.insert(s);
85 11 return {false, group_id, map_it->second.decimation_,
86 22 map_it->second.blocksize_};
87 }
88 }
89
90 PeriodicSubscriptionsWithGroup::transaction
91 27 PeriodicSubscriptionsWithGroup::remove(
92 const Variable &var,
93 PdCom::Transmission tm)
94 {
95 // remove'ing is okay with locked ps,
96 // because the subscription map is _not_ deleted
97 // and the forward map _not_ altered. So pointers stay valid.
98 27 const auto group_id = map_.erase(tm, var.index_);
99 27 return {group_id != 0, group_id, 0, 0};
100 }
101
102 51 PdCom::impl::MsrProto::PeriodicSubscriptionsBase::DataReceiveHandle::
103 DataReceiveHandle(
104 PeriodicSubscriptionsBase &ps,
105 ChannelSubscriptionMap &subscription_map,
106 VariableCache::Lock variable_cache_lock,
107 std::chrono::nanoseconds ts,
108 51 bool consistent_blocksize) :
109 ps_(&ps),
110 51 variable_cache_lock_(std::move(variable_cache_lock)),
111 subscription_map_(&subscription_map),
112 ts_(ts),
113 102 consistent_blocksize_(consistent_blocksize)
114 {
115
2/4
✗ Branch 1 not taken.
✓ Branch 2 taken 51 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 51 times.
51 if (ps.locked_)
116 throw PdCom::InternalError(
117 "Receiving periodic data already in progress");
118 51 ps.locked_ = true;
119 51 }
120
121 53 void PeriodicSubscriptionsBase::DataReceiveHandle::readData(
122 const PdCom::impl::MsrProto::Channel &c,
123 const char *data)
124 {
125
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 53 times.
53 if (!ps_) {
126 throw PdCom::InternalError("Operating on invalid DataReceiveHandle");
127 }
128
1/2
✓ Branch 2 taken 53 times.
✗ Branch 3 not taken.
53 const Base64Info bi(data);
129
130
3/4
✓ Branch 4 taken 53 times.
✗ Branch 5 not taken.
✓ Branch 8 taken 45 times.
✓ Branch 9 taken 8 times.
98 const auto sub_info_it = subscription_map_->find(c.index_);
131
2/2
✓ Branch 7 taken 8 times.
✓ Branch 8 taken 45 times.
53 if (sub_info_it == subscription_map_->end())
132 8 return;
133
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 45 times.
90 if (sub_info_it->second.subscriptions_.currentState
134 45 != PdCom::Subscription::State::Active)
135 return;
136
3/4
✗ Branch 1 not taken.
✓ Branch 2 taken 45 times.
✓ Branch 3 taken 5 times.
✓ Branch 4 taken 40 times.
45 if (!consistent_blocksize_) {
137 5 current_blocksize_ = std::min<int>(
138 10 sub_info_it->second.blocksize_,
139
1/2
✗ Branch 5 not taken.
✓ Branch 6 taken 5 times.
15 bi.encodedDataLength_ / c.type_info->element_size);
140 5 max_read_blocks_ = std::max(max_read_blocks_, current_blocksize_);
141 }
142
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 40 times.
40 else if (current_blocksize_ == -1)
143 current_blocksize_ = sub_info_it->second.blocksize_;
144
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 40 times.
40 else if (
145 40 current_blocksize_
146 40 != static_cast<int>(sub_info_it->second.blocksize_))
147 throw ProtocolError("Blocksize mismatch");
148
4/6
✓ Branch 5 taken 45 times.
✗ Branch 6 not taken.
✓ Branch 10 taken 45 times.
✗ Branch 11 not taken.
✓ Branch 13 taken 45 times.
✓ Branch 14 taken 8 times.
135 variable_cache_lock_[c.index_].readFromBase64(
149 90 data, bi.base64Length_, c, current_blocksize_);
150 }
151
152
153 45 void PeriodicSubscriptionsBase::DataReceiveHandle::readTimestamps(
154 const char *data)
155 {
156 static_assert(
157 sizeof(decltype(time_vector_)::value_type) == sizeof(int64_t),
158 "time_vector_ is not size compatible to time tag");
159
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 45 times.
45 if (!ps_) {
160 throw PdCom::InternalError("Operating on invalid DataReceiveHandle");
161 }
162
2/4
✓ Branch 0 taken 45 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 45 times.
45 if (!data || !*data)
163 throw PdCom::ProtocolError("Tag does not contain data");
164
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 45 times.
45 const auto b64len = ::strlen(data);
165
1/2
✓ Branch 1 taken 45 times.
✗ Branch 2 not taken.
45 if (current_blocksize_ == -1) {
166 45 current_blocksize_ = b64len * 3 / 4 / sizeof(int64_t);
167 45 max_read_blocks_ = current_blocksize_;
168 }
169 45 consistent_blocksize_ = true;
170
171
1/2
✓ Branch 5 taken 45 times.
✗ Branch 6 not taken.
90 ps_->time_vector_.resize(
172 90 std::max<size_t>(current_blocksize_ + 1, ps_->time_vector_.size()));
173
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 45 times.
90 if (!readFromBase64(
174 45 reinterpret_cast<char *>(ps_->time_vector_.data()), data,
175 45 b64len, sizeof(int64_t) * current_blocksize_)) {
176 throw ProtocolError("Invalid <time> child of <data>");
177 }
178 45 timestamps_read_ = true;
179 45 }
180
181 1 void PeriodicSubscriptionsWithGroup::dump(
182 std::vector<PdCom::Process::SubscriptionInfo> &ans,
183 const std::unordered_map<unsigned, Channel *> &channels) const
184 {
185 1 map_.dump(ans, channels);
186 1 }
187
188 49 void PeriodicSubscriptionsBase::DataReceiveHandle::endNewDataRecieving()
189 {
190
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 49 times.
49 if (!ps_) {
191 throw PdCom::InternalError("Operating on invalid DataReceiveHandle");
192 }
193 // make sure to release the variable cache lock
194
1/2
✓ Branch 7 taken 49 times.
✗ Branch 8 not taken.
98 auto cache = std::move(variable_cache_lock_);
195 // as this is the final call on receiving a new dataset,
196 // make sure to erase the ps pointer to mark us invalid afterwards
197 const struct PsDeleter
198 {
199 std::unique_ptr<PeriodicSubscriptionsBase, Unlocker> &ps_;
200 49 ~PsDeleter() { ps_.reset(); }
201 49 PsDeleter(std::unique_ptr<PeriodicSubscriptionsBase, Unlocker> &ps) :
202 49 ps_(ps)
203 49 {}
204
1/2
✓ Branch 4 taken 49 times.
✗ Branch 5 not taken.
98 } psdeleter_ {ps_};
205 // subscriptions can dissappear during <data> tag parsing, so don't
206 // assume that subscriptions_ is not empty
207
3/6
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 49 times.
✗ Branch 6 not taken.
✓ Branch 7 taken 49 times.
49 if (!cache or current_blocksize_ == -1)
208 return;
209
210
2/2
✓ Branch 1 taken 62 times.
✓ Branch 2 taken 49 times.
111 for (int block = 0; block < max_read_blocks_; ++block) {
211
2/2
✓ Branch 7 taken 65 times.
✓ Branch 8 taken 62 times.
127 for (const auto &sub_map : *subscription_map_) {
212
1/2
✓ Branch 2 taken 65 times.
✗ Branch 3 not taken.
65 const DataDecoder *var = cache.filled_cache(sub_map.first);
213 // no data recieved for given variable
214
2/2
✓ Branch 0 taken 64 times.
✓ Branch 1 taken 1 times.
66 if (!var
215
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 64 times.
64 || sub_map.second.subscriptions_.currentState
216 != PdCom::Subscription::State::Active)
217 1 continue;
218
1/2
✓ Branch 4 taken 64 times.
✗ Branch 5 not taken.
64 sendDataToSubscribers(*var, sub_map.second, block);
219 }
220
1/2
✓ Branch 4 taken 62 times.
✗ Branch 5 not taken.
62 ps_->subscriber_notifier_.notify();
221 62 ps_->subscriber_notifier_.clear();
222 }
223
1/2
✓ Branch 2 taken 49 times.
✗ Branch 3 not taken.
49 current_blocksize_ = -1;
224 }
225
226 64 void PeriodicSubscriptionsBase::DataReceiveHandle::sendDataToSubscribers(
227 const DataDecoder &var,
228 const ChannelSubscripion &sub_map,
229 const int block)
230 {
231 64 bool size_check_done = false;
232
8/8
✓ Branch 6 taken 71 times.
✓ Branch 7 taken 62 times.
✓ Branch 12 taken 62 times.
✓ Branch 13 taken 2 times.
✓ Branch 15 taken 62 times.
✓ Branch 16 taken 2 times.
✓ Branch 19 taken 62 times.
✓ Branch 20 taken 2 times.
133 for (const auto &weak_sub : sub_map.subscriptions_) {
233
3/4
✓ Branch 3 taken 71 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 69 times.
✓ Branch 7 taken 2 times.
140 if (const auto sub = weak_sub.lock()) {
234
4/4
✓ Branch 5 taken 69 times.
✓ Branch 6 taken 2 times.
✓ Branch 8 taken 69 times.
✓ Branch 9 taken 2 times.
140 const auto var_ptr = sub->variable_.lock();
235
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 71 times.
71 if (!var_ptr)
236 throw EmptyVariable();
237
2/2
✓ Branch 0 taken 64 times.
✓ Branch 1 taken 7 times.
71 if (!size_check_done) {
238
2/4
✗ Branch 1 not taken.
✓ Branch 2 taken 64 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 64 times.
128 if (consistent_blocksize_
239
3/4
✓ Branch 0 taken 49 times.
✓ Branch 1 taken 15 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 49 times.
113 && var.size() < sub_map.blocksize_
240
1/2
✓ Branch 5 taken 49 times.
✗ Branch 6 not taken.
49 * var_ptr->totalSizeInBytes()) {
241 std::ostringstream os;
242 os << "size mismatch in variable cache, "
243 "expected "
244 << sub_map.blocksize_ * var_ptr->totalSizeInBytes()
245 << ", got " << var.size();
246 throw ProtocolError(os.str());
247 }
248
3/4
✓ Branch 7 taken 64 times.
✗ Branch 8 not taken.
✓ Branch 9 taken 2 times.
✓ Branch 10 taken 62 times.
64 else if (var.size() < (block + 1) * var_ptr->totalSizeInBytes())
249 2 return;
250 62 size_check_done = true;
251 }
252
2/4
✓ Branch 11 taken 69 times.
✗ Branch 12 not taken.
✓ Branch 14 taken 69 times.
✗ Branch 15 not taken.
69 sub->readData(var.data() + block * var_ptr->totalSizeInBytes());
253 69 std::chrono::nanoseconds ts;
254
3/4
✗ Branch 1 not taken.
✓ Branch 2 taken 69 times.
✓ Branch 3 taken 56 times.
✓ Branch 4 taken 13 times.
69 if (timestamps_read_)
255
1/2
✓ Branch 4 taken 56 times.
✗ Branch 5 not taken.
56 ts = ps_->time_vector_.at(block);
256 else
257 26 ts = ts_
258 52 + (block - max_read_blocks_ + 1)
259
1/2
✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
26 * std::chrono::duration_cast<
260
1/2
✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
26 std::chrono::nanoseconds>(
261
1/2
✓ Branch 9 taken 13 times.
✗ Branch 10 not taken.
52 var_ptr->getSampleTime())
262 39 * sub_map.decimation_;
263
3/4
✓ Branch 8 taken 69 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 69 times.
✓ Branch 12 taken 2 times.
69 ps_->subscriber_notifier_.insert(sub->subscriber_, ts);
264 }
265 }
266 }
267
268 PeriodicSubscriptionsBase::GroupId
269 52 PeriodicSubscriptionsWithGroup::GroupMap::find(PdCom::Transmission t)
270 {
271 // does a group already exists for given transmission?
272
1/2
✓ Branch 2 taken 52 times.
✗ Branch 3 not taken.
104 const auto it = backward_.find(t);
273
2/2
✓ Branch 6 taken 19 times.
✓ Branch 7 taken 33 times.
52 if (it != backward_.end()) {
274
3/6
✓ Branch 4 taken 19 times.
✗ Branch 5 not taken.
✗ Branch 7 not taken.
✓ Branch 8 taken 19 times.
✓ Branch 9 taken 19 times.
✗ Branch 10 not taken.
38 if (forward_.at(it->second - 1).state
275 19 == ChannelSubscriptionMapImpl::Taken)
276 19 return it->second;
277 }
278 // find a free one;
279
2/2
✓ Branch 2 taken 8 times.
✓ Branch 3 taken 32 times.
40 for (unsigned int i = 0; i < forward_.size(); ++i) {
280
3/4
✗ Branch 3 not taken.
✓ Branch 4 taken 8 times.
✓ Branch 5 taken 1 times.
✓ Branch 6 taken 7 times.
8 if (forward_[i].state == ChannelSubscriptionMapImpl::Empty) {
281 1 const auto group_id = i + 1;
282
1/2
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
1 backward_[t] = group_id;
283 1 forward_[i].state = ChannelSubscriptionMapImpl::Taken;
284 1 return group_id;
285 }
286 }
287 // allocate one at the back;
288
1/2
✓ Branch 2 taken 32 times.
✗ Branch 3 not taken.
32 forward_.emplace_back();
289 32 forward_.back().state = ChannelSubscriptionMapImpl::Taken;
290
1/2
✓ Branch 4 taken 32 times.
✗ Branch 5 not taken.
32 const auto group_id = backward_[t] = forward_.size();
291 32 return group_id;
292 }
293
294 PeriodicSubscriptionsBase::GroupId
295 27 PeriodicSubscriptionsWithGroup::GroupMap::erase(
296 PdCom::Transmission t,
297 PeriodicSubscriptionsBase::ChannelId channel_idx)
298 {
299
1/2
✓ Branch 2 taken 27 times.
✗ Branch 3 not taken.
54 const auto backward_it = backward_.find(t);
300
1/2
✗ Branch 6 not taken.
✓ Branch 7 taken 27 times.
27 if (backward_it == backward_.end()) {
301 assert(backward_it != backward_.end()
302 and "Inconsistent state, backward_ and forward_ out of sync");
303 return 0;
304 }
305
1/2
✓ Branch 4 taken 27 times.
✗ Branch 5 not taken.
27 auto &channel_map = at(backward_it->second);
306
1/2
✓ Branch 2 taken 27 times.
✗ Branch 3 not taken.
54 const auto map_it = channel_map.find(channel_idx);
307 // already cleaned? if so, nothing to do.
308
1/2
✗ Branch 6 not taken.
✓ Branch 7 taken 27 times.
27 if (map_it == channel_map.end())
309 return 0;
310 // clean subscription map
311 const bool no_channel_subs_left =
312 27 map_it->second.subscriptions_.remove_expired();
313 // if channels left, also nothing to do.
314
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 22 times.
27 if (!no_channel_subs_left)
315 5 return 0;
316 // no subscriptions for this channel is left, we have to remember the
317 // group id
318 22 const auto ans = backward_it->second;
319 // now check whether any subscriptions are left for this transmission
320
2/2
✓ Branch 2 taken 14 times.
✓ Branch 3 taken 8 times.
22 if (channel_map.guaranteed_empty()) {
321 // group id no longer in use, free up channel map
322 14 channel_map.state = ChannelSubscriptionMapImpl::AboutToBeDeleted;
323
1/2
✓ Branch 4 taken 14 times.
✗ Branch 5 not taken.
14 backward_.erase(backward_it);
324 }
325 22 return ans;
326 }
327
328 41 void PeriodicSubscriptionsWithGroup::subscribeWasConfirmed(
329 PdCom::impl::Process::PendingCallbackQueue &q,
330 PeriodicSubscriptionsWithGroup::GroupId group_id,
331 PeriodicSubscriptionsWithGroup::ChannelId channel_id)
332 {
333
1/2
✓ Branch 2 taken 41 times.
✗ Branch 3 not taken.
41 auto &sub_map = map_.at(group_id);
334
2/4
✓ Branch 2 taken 41 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 41 times.
✗ Branch 6 not taken.
82 const auto it = sub_map.find(channel_id);
335
1/2
✗ Branch 6 not taken.
✓ Branch 7 taken 41 times.
41 if (it == sub_map.end())
336 return;
337
338
2/4
✓ Branch 3 taken 41 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 41 times.
✗ Branch 7 not taken.
41 it->second.subscriptions_.broadcastState(
339 PdCom::Subscription::State::Active, q);
340 }
341
342 1 void PeriodicSubscriptionsWithGroup::GroupMap::dump(
343 std::vector<PdCom::Process::SubscriptionInfo> &ans,
344 const std::unordered_map<unsigned, Channel *> &channels) const
345 {
346
2/2
✓ Branch 6 taken 1 times.
✓ Branch 7 taken 1 times.
2 for (const auto &all_groups : forward_) {
347
2/2
✓ Branch 5 taken 1 times.
✓ Branch 6 taken 1 times.
2 for (const auto &sub_map : all_groups) {
348 1 const auto var = impl::Variable::toUApi(
349 3 std::static_pointer_cast<const MsrProto::Variable>(
350
2/4
✓ Branch 5 taken 1 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 1 times.
✗ Branch 10 not taken.
5 channels.at(sub_map.first)->shared_from_this()));
351
2/2
✓ Branch 6 taken 1 times.
✓ Branch 7 taken 1 times.
2 for (const auto &weak_sub : sub_map.second.subscriptions_) {
352
1/2
✓ Branch 3 taken 1 times.
✗ Branch 4 not taken.
2 if (const auto sub = weak_sub.lock()) {
353
1/2
✓ Branch 10 taken 1 times.
✗ Branch 11 not taken.
1 ans.emplace_back(sub->This_, &sub->subscriber_, var);
354 }
355 }
356 }
357 }
358 1 }
359
360 PeriodicSubscriptionWithoutGroup::transaction
361 3 PeriodicSubscriptionWithoutGroup::add(
362 Process::PendingCallbackQueue &q,
363 bool notify_pending,
364 const std::shared_ptr<PeriodicSubscription> &s)
365 {
366
2/4
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 3 times.
3 if (locked_)
367 throw PdCom::InternalError("PeriodicSubscription is locked!");
368 6 const auto var_ptr = s->variable_.lock();
369
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (!var_ptr)
370 throw EmptyVariable();
371 3 const auto &var = static_cast<Channel const &>(*var_ptr);
372
1/2
✓ Branch 3 taken 3 times.
✗ Branch 4 not taken.
6 auto map_it = map_.find(var.index_);
373
374
375 3 unsigned decimation = s->subscriber_.getTransmission().getInterval()
376 3 / var.sample_time_.count()
377 3 + 0.5;
378 6 decimation += s->subscriber_.getTransmission().getInterval() > 0.0
379
2/4
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 3 times.
3 and !decimation;
380 3 unsigned blocksize = 0;
381
382
1/2
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
3 if (decimation > 0) {
383 3 blocksize = 1.0 / var.sample_time_.count() / decimation / 25.0 + 0.5;
384
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 blocksize = std::min(var.bufsize / decimation, blocksize);
385 }
386
387
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (!blocksize)
388 blocksize = 1;
389
1/2
✓ Branch 10 taken 3 times.
✗ Branch 11 not taken.
3 time_vector_.resize(std::max<size_t>(time_vector_.size(), blocksize + 2));
390
391
1/2
✓ Branch 6 taken 3 times.
✗ Branch 7 not taken.
3 if (map_it == map_.end()) {
392 // not yet subscribed
393
5/10
✓ Branch 10 taken 3 times.
✗ Branch 11 not taken.
✓ Branch 14 taken 3 times.
✗ Branch 15 not taken.
✓ Branch 18 taken 3 times.
✗ Branch 19 not taken.
✓ Branch 29 taken 3 times.
✓ Branch 30 taken 3 times.
✗ Branch 41 not taken.
✗ Branch 42 not taken.
3 map_.insert({var.index_, {blocksize, decimation, {{s}}}});
394
395
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (notify_pending)
396 q.push_back({s, PdCom::Subscription::State::Pending});
397 3 return {true, decimation, blocksize};
398 }
399 else if (decimation < map_it->second.decimation_) {
400 // we have to resubscribe
401 map_it->second.subscriptions_.currentState =
402 PdCom::Subscription::State::Pending;
403
404 if (notify_pending)
405 q.push_back({s, PdCom::Subscription::State::Pending});
406 auto subs = std::move(map_it->second.subscriptions_);
407 subs.insert(s);
408 map_.erase(map_it);
409 map_.insert({var.index_, {blocksize, decimation, std::move(subs)}});
410 return {true, decimation, blocksize};
411 }
412 else {
413 if (notify_pending
414 or map_it->second.subscriptions_.currentState
415 != PdCom::Subscription::State::Pending)
416 q.push_back({s, map_it->second.subscriptions_.currentState});
417 map_it->second.subscriptions_.insert(s);
418 return {false, map_it->second.decimation_, map_it->second.blocksize_};
419 }
420 }
421
422 PeriodicSubscriptionWithoutGroup::transaction
423 PeriodicSubscriptionWithoutGroup::remove(const Variable &_var)
424 {
425 const auto it = map_.find(_var.index_);
426 if (it == map_.end() || !it->second.subscriptions_.remove_expired())
427 // not found or still subscribed by others
428 return {false, 0, 0};
429 else {
430 // now empty
431 map_.erase(it);
432 return {true, 0, 0};
433 }
434 }
435
436 3 void PeriodicSubscriptionWithoutGroup::subscribeWasConfirmed(
437 Process::PendingCallbackQueue &q,
438 ChannelId channel_id)
439 {
440
2/4
✓ Branch 2 taken 3 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 3 times.
✗ Branch 6 not taken.
6 const auto it = map_.find(channel_id);
441
1/2
✗ Branch 6 not taken.
✓ Branch 7 taken 3 times.
3 if (it == map_.end())
442 return;
443
444
2/4
✓ Branch 3 taken 3 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 3 times.
✗ Branch 7 not taken.
3 it->second.subscriptions_.broadcastState(
445 PdCom::Subscription::State::Active, q);
446 }
447
448 void PeriodicSubscriptionWithoutGroup::dump(
449 std::vector<PdCom::Process::SubscriptionInfo> &ans,
450 const std::unordered_map<unsigned, Channel *> &channels) const
451 {
452 for (const auto &sub_map : map_) {
453 const auto var = impl::Variable::toUApi(
454 std::static_pointer_cast<const MsrProto::Variable>(
455 channels.at(sub_map.first)->shared_from_this()));
456 for (const auto &weak_sub : sub_map.second.subscriptions_) {
457 if (const auto sub = weak_sub.lock()) {
458 ans.emplace_back(sub->This_, &sub->subscriber_, var);
459 }
460 }
461 }
462 }
463