GCC Code Coverage Report


Directory: ./
File: pdcom5/src/msrproto/PeriodicSubscriptions.cpp
Date: 2023-11-12 04:06:57
Exec Total Coverage
Lines: 163 245 66.5%
Branches: 151 400 37.8%

Line Branch Exec Source
1 /*****************************************************************************
2 *
3 * Copyright (C) 2021 Bjarne von Horn (vh at igh dot de)
4 *
5 * This file is part of the PdCom library.
6 *
7 * The PdCom library is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU Lesser General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or (at your
10 * option) any later version.
11 *
12 * The PdCom library is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
14 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 * License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public License
18 * along with the PdCom library. If not, see <http://www.gnu.org/licenses/>.
19 *
20 *****************************************************************************/
21
22 #include "PeriodicSubscriptions.h"
23
24 #include "Channel.h"
25 #include "Subscription.h"
26
27 #include <cassert>
28 #include <cstring>
29 #include <pdcom5/Exception.h>
30 #include <sstream>
31
32 using PdCom::impl::MsrProto::DataDecoder;
33 using PdCom::impl::MsrProto::PeriodicSubscription;
34 using PdCom::impl::MsrProto::PeriodicSubscriptionsBase;
35 using PdCom::impl::MsrProto::PeriodicSubscriptionsWithGroup;
36 using PdCom::impl::MsrProto::PeriodicSubscriptionWithoutGroup;
37
38 using PdCom::impl::MsrProto::Variable;
39
40
41 10 PeriodicSubscriptionsWithGroup::transaction PeriodicSubscriptionsWithGroup::add(
42 PdCom::impl::Process::PendingCallbackQueue &queue,
43 bool notify_pending,
44 const std::shared_ptr<PeriodicSubscription> &s)
45 {
46
2/4
✗ Branch 1 not taken.
✓ Branch 2 taken 10 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 10 times.
10 if (locked_)
47 throw PdCom::InternalError("PeriodicSubscription is locked!");
48 20 const auto var_ptr = s->variable_.lock();
49
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 10 times.
10 if (!var_ptr)
50 throw EmptyVariable();
51 10 const auto &var = static_cast<Channel const &>(*var_ptr);
52
1/2
✓ Branch 11 taken 10 times.
✗ Branch 12 not taken.
10 const auto group_id = map_.find(s->subscriber_.getTransmission());
53
2/4
✓ Branch 3 taken 10 times.
✗ Branch 4 not taken.
✓ Branch 7 taken 10 times.
✗ Branch 8 not taken.
20 auto map_it = map_.at(group_id).find(var.index_);
54
2/4
✓ Branch 3 taken 10 times.
✗ Branch 4 not taken.
✓ Branch 10 taken 10 times.
✗ Branch 11 not taken.
10 if (map_it == map_.at(group_id).end()) {
55 // not yet subscribed
56 10 unsigned decimation = s->subscriber_.getTransmission().getInterval()
57 10 / var.sample_time_.count()
58 10 + 0.5;
59 20 decimation += s->subscriber_.getTransmission().getInterval() > 0.0
60
2/4
✓ Branch 0 taken 10 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 10 times.
10 and !decimation;
61 10 unsigned blocksize = 0;
62
63
1/2
✓ Branch 0 taken 10 times.
✗ Branch 1 not taken.
10 if (decimation > 0) {
64 10 blocksize =
65 10 1.0 / var.sample_time_.count() / decimation / 25.0 + 0.5;
66
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 10 times.
10 blocksize = std::min(var.bufsize / decimation, blocksize);
67 }
68
69
2/2
✓ Branch 0 taken 9 times.
✓ Branch 1 taken 1 times.
10 if (!blocksize)
70 9 blocksize = 1;
71
1/2
✓ Branch 3 taken 10 times.
✗ Branch 4 not taken.
20 time_vector_.resize(
72 20 std::max<size_t>(time_vector_.size(), blocksize + 2));
73
74
6/12
✓ Branch 3 taken 10 times.
✗ Branch 4 not taken.
✓ Branch 14 taken 10 times.
✗ Branch 15 not taken.
✓ Branch 18 taken 10 times.
✗ Branch 19 not taken.
✓ Branch 22 taken 10 times.
✗ Branch 23 not taken.
✓ Branch 33 taken 10 times.
✓ Branch 34 taken 10 times.
✗ Branch 45 not taken.
✗ Branch 46 not taken.
10 map_.at(group_id).insert({var.index_, {blocksize, decimation, {{s}}}});
75
76
1/2
✓ Branch 0 taken 10 times.
✗ Branch 1 not taken.
10 if (notify_pending)
77
1/2
✓ Branch 8 taken 10 times.
✗ Branch 9 not taken.
10 queue.push_back({s, PdCom::Subscription::State::Pending});
78 10 return {true, group_id, decimation, blocksize};
79 }
80 else {
81 if (notify_pending
82 or map_it->second.subscriptions_.currentState
83 != PdCom::Subscription::State::Pending)
84 queue.push_back({s, map_it->second.subscriptions_.currentState});
85 map_it->second.subscriptions_.insert(s);
86 return {false, group_id, map_it->second.decimation_,
87 map_it->second.blocksize_};
88 }
89 }
90
91 PeriodicSubscriptionsWithGroup::transaction
92 PeriodicSubscriptionsWithGroup::remove(
93 const Variable &var,
94 PdCom::Transmission tm)
95 {
96 // remove'ing is okay with locked ps,
97 // because the subscription map is _not_ deleted
98 // and the forward map _not_ altered. So pointers stay valid.
99 const auto group_id = map_.erase(tm, var.index_);
100 return {group_id != 0, group_id, 0, 0};
101 }
102
103 89 PdCom::impl::MsrProto::PeriodicSubscriptionsBase::DataReceiveHandle::
104 DataReceiveHandle(
105 PeriodicSubscriptionsBase &ps,
106 ChannelSubscriptionMap &subscription_map,
107 VariableCache::Lock variable_cache_lock,
108 std::chrono::nanoseconds ts,
109 89 bool consistent_blocksize) :
110 ps_(&ps),
111 89 variable_cache_lock_(std::move(variable_cache_lock)),
112 subscription_map_(&subscription_map),
113 ts_(ts),
114 178 consistent_blocksize_(consistent_blocksize)
115 {
116
2/4
✗ Branch 1 not taken.
✓ Branch 2 taken 89 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 89 times.
89 if (ps.locked_)
117 throw PdCom::InternalError(
118 "Receiving periodic data already in progress");
119 89 ps.locked_ = true;
120 89 }
121
122 93 void PeriodicSubscriptionsBase::DataReceiveHandle::readData(
123 const PdCom::impl::MsrProto::Channel &c,
124 const char *data)
125 {
126
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 93 times.
93 if (!ps_) {
127 throw PdCom::InternalError("Operating on invalid DataReceiveHandle");
128 }
129
1/2
✓ Branch 2 taken 93 times.
✗ Branch 3 not taken.
93 const Base64Info bi(data);
130
131
3/4
✓ Branch 4 taken 93 times.
✗ Branch 5 not taken.
✓ Branch 8 taken 92 times.
✓ Branch 9 taken 1 times.
185 const auto sub_info_it = subscription_map_->find(c.index_);
132
2/2
✓ Branch 7 taken 1 times.
✓ Branch 8 taken 92 times.
93 if (sub_info_it == subscription_map_->end())
133 1 return;
134
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 92 times.
184 if (sub_info_it->second.subscriptions_.currentState
135 92 != PdCom::Subscription::State::Active)
136 return;
137
3/4
✗ Branch 1 not taken.
✓ Branch 2 taken 92 times.
✓ Branch 3 taken 45 times.
✓ Branch 4 taken 47 times.
92 if (!consistent_blocksize_) {
138 45 current_blocksize_ = std::min<int>(
139 90 sub_info_it->second.blocksize_,
140
1/2
✗ Branch 5 not taken.
✓ Branch 6 taken 45 times.
135 bi.encodedDataLength_ / c.type_info->element_size);
141 45 max_read_blocks_ = std::max(max_read_blocks_, current_blocksize_);
142 }
143
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 47 times.
47 else if (current_blocksize_ == -1)
144 current_blocksize_ = sub_info_it->second.blocksize_;
145
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 47 times.
47 else if (
146 47 current_blocksize_
147 47 != static_cast<int>(sub_info_it->second.blocksize_))
148 throw ProtocolError("Blocksize mismatch");
149
4/6
✓ Branch 5 taken 92 times.
✗ Branch 6 not taken.
✓ Branch 10 taken 92 times.
✗ Branch 11 not taken.
✓ Branch 13 taken 92 times.
✓ Branch 14 taken 1 times.
276 variable_cache_lock_[c.index_].readFromBase64(
150 184 data, bi.base64Length_, c, current_blocksize_);
151 }
152
153
154 47 void PeriodicSubscriptionsBase::DataReceiveHandle::readTimestamps(
155 const char *data)
156 {
157 static_assert(
158 sizeof(decltype(time_vector_)::value_type) == sizeof(int64_t),
159 "time_vector_ is not size compatible to time tag");
160
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 47 times.
47 if (!ps_) {
161 throw PdCom::InternalError("Operating on invalid DataReceiveHandle");
162 }
163
2/4
✓ Branch 0 taken 47 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 47 times.
47 if (!data || !*data)
164 throw PdCom::ProtocolError("Tag does not contain data");
165
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 47 times.
47 const auto b64len = ::strlen(data);
166
1/2
✓ Branch 1 taken 47 times.
✗ Branch 2 not taken.
47 if (current_blocksize_ == -1) {
167 47 current_blocksize_ = b64len * 3 / 4 / sizeof(int64_t);
168 47 max_read_blocks_ = current_blocksize_;
169 }
170 47 consistent_blocksize_ = true;
171
172
1/2
✓ Branch 5 taken 47 times.
✗ Branch 6 not taken.
94 ps_->time_vector_.resize(
173 94 std::max<size_t>(current_blocksize_ + 1, ps_->time_vector_.size()));
174
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 47 times.
94 if (!readFromBase64(
175 47 reinterpret_cast<char *>(ps_->time_vector_.data()), data,
176 47 b64len, sizeof(int64_t) * current_blocksize_)) {
177 throw ProtocolError("Invalid <time> child of <data>");
178 }
179 47 timestamps_read_ = true;
180 47 }
181
182 void PeriodicSubscriptionsWithGroup::dump(
183 std::vector<PdCom::Process::SubscriptionInfo> &ans,
184 const std::unordered_map<unsigned, Channel *> &channels) const
185 {
186 map_.dump(ans, channels);
187 }
188
189 89 void PeriodicSubscriptionsBase::DataReceiveHandle::endNewDataRecieving()
190 {
191
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 89 times.
89 if (!ps_) {
192 throw PdCom::InternalError("Operating on invalid DataReceiveHandle");
193 }
194 // make sure to release the variable cache lock
195
2/2
✓ Branch 7 taken 88 times.
✓ Branch 8 taken 1 times.
177 auto cache = std::move(variable_cache_lock_);
196 // as this is the final call on receiving a new dataset,
197 // make sure to erase the ps pointer to mark us invalid afterwards
198 const struct PsDeleter
199 {
200 std::unique_ptr<PeriodicSubscriptionsBase, Unlocker> &ps_;
201 89 ~PsDeleter() { ps_.reset(); }
202 89 PsDeleter(std::unique_ptr<PeriodicSubscriptionsBase, Unlocker> &ps) :
203 89 ps_(ps)
204 89 {}
205
2/2
✓ Branch 4 taken 88 times.
✓ Branch 5 taken 1 times.
177 } psdeleter_ {ps_};
206 // subscriptions can dissappear during <data> tag parsing, so don't
207 // assume that subscriptions_ is not empty
208
5/6
✓ Branch 1 taken 89 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1 times.
✓ Branch 5 taken 88 times.
✓ Branch 6 taken 1 times.
✓ Branch 7 taken 88 times.
89 if (!cache or current_blocksize_ == -1)
209 1 return;
210
211
2/2
✓ Branch 1 taken 94 times.
✓ Branch 2 taken 88 times.
182 for (int block = 0; block < max_read_blocks_; ++block) {
212
2/2
✓ Branch 7 taken 129 times.
✓ Branch 8 taken 94 times.
223 for (const auto &sub_map : *subscription_map_) {
213
1/2
✓ Branch 2 taken 129 times.
✗ Branch 3 not taken.
129 const DataDecoder *var = cache.filled_cache(sub_map.first);
214 // no data recieved for given variable
215
2/2
✓ Branch 0 taken 98 times.
✓ Branch 1 taken 31 times.
160 if (!var
216
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 98 times.
98 || sub_map.second.subscriptions_.currentState
217 != PdCom::Subscription::State::Active)
218 31 continue;
219
1/2
✓ Branch 4 taken 98 times.
✗ Branch 5 not taken.
98 sendDataToSubscribers(*var, sub_map.second, block);
220 }
221
1/2
✓ Branch 4 taken 94 times.
✗ Branch 5 not taken.
94 ps_->subscriber_notifier_.notify();
222 94 ps_->subscriber_notifier_.clear();
223 }
224
2/2
✓ Branch 2 taken 88 times.
✓ Branch 3 taken 1 times.
88 current_blocksize_ = -1;
225 }
226
227 98 void PeriodicSubscriptionsBase::DataReceiveHandle::sendDataToSubscribers(
228 const DataDecoder &var,
229 const ChannelSubscripion &sub_map,
230 const int block)
231 {
232 98 bool size_check_done = false;
233
5/8
✓ Branch 6 taken 98 times.
✓ Branch 7 taken 98 times.
✓ Branch 12 taken 98 times.
✗ Branch 13 not taken.
✓ Branch 15 taken 98 times.
✗ Branch 16 not taken.
✓ Branch 19 taken 98 times.
✗ Branch 20 not taken.
196 for (const auto &weak_sub : sub_map.subscriptions_) {
234
2/4
✓ Branch 3 taken 98 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 98 times.
✗ Branch 7 not taken.
196 if (const auto sub = weak_sub.lock()) {
235
2/4
✓ Branch 5 taken 98 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 98 times.
✗ Branch 9 not taken.
196 const auto var_ptr = sub->variable_.lock();
236
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 98 times.
98 if (!var_ptr)
237 throw EmptyVariable();
238
1/2
✓ Branch 0 taken 98 times.
✗ Branch 1 not taken.
98 if (!size_check_done) {
239
2/4
✗ Branch 1 not taken.
✓ Branch 2 taken 98 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 98 times.
196 if (consistent_blocksize_
240
3/4
✓ Branch 0 taken 53 times.
✓ Branch 1 taken 45 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 53 times.
151 && var.size() < sub_map.blocksize_
241
1/2
✓ Branch 5 taken 53 times.
✗ Branch 6 not taken.
53 * var_ptr->totalSizeInBytes()) {
242 std::ostringstream os;
243 os << "size mismatch in variable cache, "
244 "expected "
245 << sub_map.blocksize_ * var_ptr->totalSizeInBytes()
246 << ", got " << var.size();
247 throw ProtocolError(os.str());
248 }
249
2/4
✓ Branch 7 taken 98 times.
✗ Branch 8 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 98 times.
98 else if (var.size() < (block + 1) * var_ptr->totalSizeInBytes())
250 return;
251 98 size_check_done = true;
252 }
253
2/4
✓ Branch 11 taken 98 times.
✗ Branch 12 not taken.
✓ Branch 14 taken 98 times.
✗ Branch 15 not taken.
98 sub->readData(var.data() + block * var_ptr->totalSizeInBytes());
254 98 std::chrono::nanoseconds ts;
255
3/4
✗ Branch 1 not taken.
✓ Branch 2 taken 98 times.
✓ Branch 3 taken 53 times.
✓ Branch 4 taken 45 times.
98 if (timestamps_read_)
256
1/2
✓ Branch 4 taken 53 times.
✗ Branch 5 not taken.
53 ts = ps_->time_vector_.at(block);
257 else
258 90 ts = ts_
259 90 + block
260
1/2
✓ Branch 1 taken 45 times.
✗ Branch 2 not taken.
90 * std::chrono::duration_cast<
261
1/2
✓ Branch 1 taken 45 times.
✗ Branch 2 not taken.
90 std::chrono::nanoseconds>(
262
1/2
✓ Branch 9 taken 45 times.
✗ Branch 10 not taken.
135 var_ptr->getSampleTime())
263 135 / sub_map.decimation_;
264
2/4
✓ Branch 8 taken 98 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 98 times.
✗ Branch 12 not taken.
98 ps_->subscriber_notifier_.insert(sub->subscriber_, ts);
265 }
266 }
267 }
268
269 PeriodicSubscriptionsBase::GroupId
270 10 PeriodicSubscriptionsWithGroup::GroupMap::find(PdCom::Transmission t)
271 {
272 // does a group already exists for given transmission?
273
1/2
✓ Branch 2 taken 10 times.
✗ Branch 3 not taken.
20 const auto it = backward_.find(t);
274
1/2
✗ Branch 6 not taken.
✓ Branch 7 taken 10 times.
10 if (it != backward_.end()) {
275 if (forward_.at(it->second - 1).state
276 == ChannelSubscriptionMapImpl::Taken)
277 return it->second;
278 }
279 // find a free one;
280
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 10 times.
10 for (unsigned int i = 0; i < forward_.size(); ++i) {
281 if (forward_[i].state == ChannelSubscriptionMapImpl::Empty) {
282 const auto group_id = i + 1;
283 backward_[t] = group_id;
284 forward_[i].state = ChannelSubscriptionMapImpl::Taken;
285 return group_id;
286 }
287 }
288 // allocate one at the back;
289
1/2
✓ Branch 2 taken 10 times.
✗ Branch 3 not taken.
10 forward_.emplace_back();
290 10 forward_.back().state = ChannelSubscriptionMapImpl::Taken;
291
1/2
✓ Branch 4 taken 10 times.
✗ Branch 5 not taken.
10 const auto group_id = backward_[t] = forward_.size();
292 10 return group_id;
293 }
294
295 PeriodicSubscriptionsBase::GroupId
296 PeriodicSubscriptionsWithGroup::GroupMap::erase(
297 PdCom::Transmission t,
298 PeriodicSubscriptionsBase::ChannelId channel_idx)
299 {
300 const auto backward_it = backward_.find(t);
301 if (backward_it == backward_.end()) {
302 assert(backward_it != backward_.end()
303 and "Inconsistent state, backward_ and forward_ out of sync");
304 return 0;
305 }
306 auto &channel_map = at(backward_it->second);
307 const auto map_it = channel_map.find(channel_idx);
308 // already cleaned? if so, nothing to do.
309 if (map_it == channel_map.end())
310 return 0;
311 // clean subscription map
312 const bool no_channel_subs_left =
313 map_it->second.subscriptions_.remove_expired();
314 // if channels left, also nothing to do.
315 if (!no_channel_subs_left)
316 return 0;
317 // no subscriptions for this channel is left, we have to remember the
318 // group id
319 const auto ans = backward_it->second;
320 // now check whether any subscriptions are left for this transmission
321 if (channel_map.guaranteed_empty()) {
322 // group id no longer in use, free up channel map
323 channel_map.state = ChannelSubscriptionMapImpl::AboutToBeDeleted;
324 backward_.erase(backward_it);
325 }
326 return ans;
327 }
328
329 10 void PeriodicSubscriptionsWithGroup::subscribeWasConfirmed(
330 PdCom::impl::Process::PendingCallbackQueue &q,
331 PeriodicSubscriptionsWithGroup::GroupId group_id,
332 PeriodicSubscriptionsWithGroup::ChannelId channel_id)
333 {
334
1/2
✓ Branch 2 taken 10 times.
✗ Branch 3 not taken.
10 auto &sub_map = map_.at(group_id);
335
2/4
✓ Branch 2 taken 10 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 10 times.
✗ Branch 6 not taken.
20 const auto it = sub_map.find(channel_id);
336
1/2
✗ Branch 6 not taken.
✓ Branch 7 taken 10 times.
10 if (it == sub_map.end())
337 return;
338
339
2/4
✓ Branch 3 taken 10 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 10 times.
✗ Branch 7 not taken.
10 it->second.subscriptions_.broadcastState(
340 PdCom::Subscription::State::Active, q);
341 }
342
343 void PeriodicSubscriptionsWithGroup::GroupMap::dump(
344 std::vector<PdCom::Process::SubscriptionInfo> &ans,
345 const std::unordered_map<unsigned, Channel *> &channels) const
346 {
347 for (const auto &all_groups : forward_) {
348 for (const auto &sub_map : all_groups) {
349 const auto var = impl::Variable::toUApi(
350 std::static_pointer_cast<const MsrProto::Variable>(
351 channels.at(sub_map.first)->shared_from_this()));
352 for (const auto &weak_sub : sub_map.second.subscriptions_) {
353 if (const auto sub = weak_sub.lock()) {
354 ans.emplace_back(sub->This_, &sub->subscriber_, var);
355 }
356 }
357 }
358 }
359 }
360
361 PeriodicSubscriptionWithoutGroup::transaction
362 4 PeriodicSubscriptionWithoutGroup::add(
363 Process::PendingCallbackQueue &q,
364 bool notify_pending,
365 const std::shared_ptr<PeriodicSubscription> &s)
366 {
367
2/4
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 4 times.
4 if (locked_)
368 throw PdCom::InternalError("PeriodicSubscription is locked!");
369 8 const auto var_ptr = s->variable_.lock();
370
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (!var_ptr)
371 throw EmptyVariable();
372 4 const auto &var = static_cast<Channel const &>(*var_ptr);
373
1/2
✓ Branch 3 taken 4 times.
✗ Branch 4 not taken.
8 auto map_it = map_.find(var.index_);
374
375
376 4 unsigned decimation = s->subscriber_.getTransmission().getInterval()
377 4 / var.sample_time_.count()
378 4 + 0.5;
379 8 decimation += s->subscriber_.getTransmission().getInterval() > 0.0
380
2/4
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 4 times.
4 and !decimation;
381 4 unsigned blocksize = 0;
382
383
1/2
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
4 if (decimation > 0) {
384 4 blocksize = 1.0 / var.sample_time_.count() / decimation / 25.0 + 0.5;
385
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 blocksize = std::min(var.bufsize / decimation, blocksize);
386 }
387
388
1/2
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
4 if (!blocksize)
389 4 blocksize = 1;
390
1/2
✓ Branch 10 taken 4 times.
✗ Branch 11 not taken.
4 time_vector_.resize(std::max<size_t>(time_vector_.size(), blocksize + 2));
391
392
2/2
✓ Branch 6 taken 3 times.
✓ Branch 7 taken 1 times.
4 if (map_it == map_.end()) {
393 // not yet subscribed
394
5/10
✓ Branch 10 taken 3 times.
✗ Branch 11 not taken.
✓ Branch 14 taken 3 times.
✗ Branch 15 not taken.
✓ Branch 18 taken 3 times.
✗ Branch 19 not taken.
✓ Branch 29 taken 3 times.
✓ Branch 30 taken 3 times.
✗ Branch 41 not taken.
✗ Branch 42 not taken.
3 map_.insert({var.index_, {blocksize, decimation, {{s}}}});
395
396
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (notify_pending)
397 q.push_back({s, PdCom::Subscription::State::Pending});
398 3 return {true, decimation, blocksize};
399 }
400
1/2
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
1 else if (decimation < map_it->second.decimation_) {
401 // we have to resubscribe
402 1 map_it->second.subscriptions_.currentState =
403 PdCom::Subscription::State::Pending;
404
405
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (notify_pending)
406 q.push_back({s, PdCom::Subscription::State::Pending});
407 2 auto subs = std::move(map_it->second.subscriptions_);
408
1/2
✓ Branch 5 taken 1 times.
✗ Branch 6 not taken.
1 subs.insert(s);
409
1/2
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
1 map_.erase(map_it);
410
2/4
✓ Branch 9 taken 1 times.
✗ Branch 10 not taken.
✓ Branch 13 taken 1 times.
✗ Branch 14 not taken.
1 map_.insert({var.index_, {blocksize, decimation, std::move(subs)}});
411 1 return {true, decimation, blocksize};
412 }
413 else {
414 if (notify_pending
415 or map_it->second.subscriptions_.currentState
416 != PdCom::Subscription::State::Pending)
417 q.push_back({s, map_it->second.subscriptions_.currentState});
418 map_it->second.subscriptions_.insert(s);
419 return {false, map_it->second.decimation_, map_it->second.blocksize_};
420 }
421 }
422
423 PeriodicSubscriptionWithoutGroup::transaction
424 2 PeriodicSubscriptionWithoutGroup::remove(const Variable &_var)
425 {
426
1/2
✓ Branch 3 taken 2 times.
✗ Branch 4 not taken.
4 const auto it = map_.find(_var.index_);
427
7/10
✓ Branch 4 taken 2 times.
✗ Branch 5 not taken.
✓ Branch 9 taken 1 times.
✓ Branch 10 taken 1 times.
✓ Branch 11 taken 2 times.
✗ Branch 12 not taken.
✓ Branch 14 taken 2 times.
✗ Branch 15 not taken.
✓ Branch 17 taken 1 times.
✓ Branch 18 taken 1 times.
2 if (it == map_.end() || !it->second.subscriptions_.remove_expired())
428 // not found or still subscribed by others
429 1 return {false, 0, 0};
430 else {
431 // now empty
432
1/2
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
1 map_.erase(it);
433 1 return {true, 0, 0};
434 }
435 }
436
437 4 void PeriodicSubscriptionWithoutGroup::subscribeWasConfirmed(
438 Process::PendingCallbackQueue &q,
439 ChannelId channel_id)
440 {
441
2/4
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 4 times.
✗ Branch 6 not taken.
8 const auto it = map_.find(channel_id);
442
1/2
✗ Branch 6 not taken.
✓ Branch 7 taken 4 times.
4 if (it == map_.end())
443 return;
444
445
2/4
✓ Branch 3 taken 4 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 4 times.
✗ Branch 7 not taken.
4 it->second.subscriptions_.broadcastState(
446 PdCom::Subscription::State::Active, q);
447 }
448
449 void PeriodicSubscriptionWithoutGroup::dump(
450 std::vector<PdCom::Process::SubscriptionInfo> &ans,
451 const std::unordered_map<unsigned, Channel *> &channels) const
452 {
453 for (const auto &sub_map : map_) {
454 const auto var = impl::Variable::toUApi(
455 std::static_pointer_cast<const MsrProto::Variable>(
456 channels.at(sub_map.first)->shared_from_this()));
457 for (const auto &weak_sub : sub_map.second.subscriptions_) {
458 if (const auto sub = weak_sub.lock()) {
459 ans.emplace_back(sub->This_, &sub->subscriber_, var);
460 }
461 }
462 }
463 }
464