Directory: | ./ |
---|---|
File: | pdcom5/src/msrproto/PeriodicSubscriptions.cpp |
Date: | 2024-12-29 04:08:32 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 182 | 244 | 74.6% |
Branches: | 162 | 392 | 41.3% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /***************************************************************************** | ||
2 | * | ||
3 | * Copyright (C) 2021 Bjarne von Horn (vh at igh dot de) | ||
4 | * | ||
5 | * This file is part of the PdCom library. | ||
6 | * | ||
7 | * The PdCom library is free software: you can redistribute it and/or modify | ||
8 | * it under the terms of the GNU Lesser General Public License as published by | ||
9 | * the Free Software Foundation, either version 3 of the License, or (at your | ||
10 | * option) any later version. | ||
11 | * | ||
12 | * The PdCom library is distributed in the hope that it will be useful, but | ||
13 | * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY | ||
14 | * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public | ||
15 | * License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public License | ||
18 | * along with the PdCom library. If not, see <http://www.gnu.org/licenses/>. | ||
19 | * | ||
20 | *****************************************************************************/ | ||
21 | |||
22 | #include "PeriodicSubscriptions.h" | ||
23 | |||
24 | #include "Channel.h" | ||
25 | #include "Subscription.h" | ||
26 | |||
27 | #include <cassert> | ||
28 | #include <cstring> | ||
29 | #include <pdcom5/Exception.h> | ||
30 | #include <sstream> | ||
31 | |||
32 | using PdCom::impl::MsrProto::DataDecoder; | ||
33 | using PdCom::impl::MsrProto::PeriodicSubscription; | ||
34 | using PdCom::impl::MsrProto::PeriodicSubscriptionsBase; | ||
35 | using PdCom::impl::MsrProto::PeriodicSubscriptionsWithGroup; | ||
36 | using PdCom::impl::MsrProto::PeriodicSubscriptionWithoutGroup; | ||
37 | |||
38 | using PdCom::impl::MsrProto::Variable; | ||
39 | |||
40 | |||
41 | 42 | PeriodicSubscriptionsWithGroup::transaction PeriodicSubscriptionsWithGroup::add( | |
42 | PdCom::impl::Process::PendingCallbackQueue &queue, | ||
43 | bool notify_pending, | ||
44 | const std::shared_ptr<PeriodicSubscription> &s) | ||
45 | { | ||
46 | 84 | const auto var_ptr = s->variable_.lock(); | |
47 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 42 times.
|
42 | if (!var_ptr) |
48 | ✗ | throw EmptyVariable(); | |
49 | 42 | const auto &var = static_cast<Channel const &>(*var_ptr); | |
50 |
1/2✓ Branch 11 taken 42 times.
✗ Branch 12 not taken.
|
42 | const auto group_id = map_.find(s->subscriber_.getTransmission()); |
51 |
2/4✓ Branch 3 taken 42 times.
✗ Branch 4 not taken.
✓ Branch 7 taken 42 times.
✗ Branch 8 not taken.
|
84 | auto map_it = map_.at(group_id).find(var.index_); |
52 |
2/4✓ Branch 3 taken 42 times.
✗ Branch 4 not taken.
✓ Branch 10 taken 42 times.
✗ Branch 11 not taken.
|
42 | if (map_it == map_.at(group_id).end()) { |
53 | // not yet subscribed | ||
54 | 42 | unsigned decimation = s->subscriber_.getTransmission().getInterval() | |
55 | 42 | / var.sample_time_.count() | |
56 | 42 | + 0.5; | |
57 | 84 | decimation += s->subscriber_.getTransmission().getInterval() > 0.0 | |
58 |
2/4✓ Branch 0 taken 42 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 42 times.
|
42 | and !decimation; |
59 | 42 | unsigned blocksize = 0; | |
60 | |||
61 |
1/2✓ Branch 0 taken 42 times.
✗ Branch 1 not taken.
|
42 | if (decimation > 0) { |
62 | 42 | blocksize = | |
63 | 42 | 1.0 / var.sample_time_.count() / decimation / 25.0 + 0.5; | |
64 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 42 times.
|
42 | blocksize = std::min(var.bufsize / decimation, blocksize); |
65 | 42 | blocksize = std::min(blocksize, 1000U); | |
66 | } | ||
67 | |||
68 |
2/2✓ Branch 0 taken 13 times.
✓ Branch 1 taken 29 times.
|
42 | if (!blocksize) |
69 | 13 | blocksize = 1; | |
70 |
1/2✓ Branch 3 taken 42 times.
✗ Branch 4 not taken.
|
84 | time_vector_.resize( |
71 | 84 | std::max<size_t>(time_vector_.size(), blocksize + 2)); | |
72 | |||
73 |
6/12✓ Branch 3 taken 42 times.
✗ Branch 4 not taken.
✓ Branch 14 taken 42 times.
✗ Branch 15 not taken.
✓ Branch 18 taken 42 times.
✗ Branch 19 not taken.
✓ Branch 22 taken 42 times.
✗ Branch 23 not taken.
✓ Branch 33 taken 42 times.
✓ Branch 34 taken 42 times.
✗ Branch 45 not taken.
✗ Branch 46 not taken.
|
42 | map_.at(group_id).insert({var.index_, {blocksize, decimation, {{s}}}}); |
74 | |||
75 |
2/2✓ Branch 0 taken 26 times.
✓ Branch 1 taken 16 times.
|
42 | if (notify_pending) |
76 |
1/2✓ Branch 8 taken 26 times.
✗ Branch 9 not taken.
|
26 | queue.push_back({s, PdCom::Subscription::State::Pending}); |
77 | 42 | return {true, group_id, decimation, blocksize}; | |
78 | } | ||
79 | else { | ||
80 | ✗ | if (notify_pending | |
81 | ✗ | or map_it->second.subscriptions_.currentState | |
82 | != PdCom::Subscription::State::Pending) | ||
83 | ✗ | queue.push_back({s, map_it->second.subscriptions_.currentState}); | |
84 | ✗ | map_it->second.subscriptions_.insert(s); | |
85 | ✗ | return {false, group_id, map_it->second.decimation_, | |
86 | ✗ | map_it->second.blocksize_}; | |
87 | } | ||
88 | } | ||
89 | |||
90 | PeriodicSubscriptionsWithGroup::transaction | ||
91 | 1 | PeriodicSubscriptionsWithGroup::remove( | |
92 | const Variable &var, | ||
93 | PdCom::Transmission tm) | ||
94 | { | ||
95 | // remove'ing is okay with locked ps, | ||
96 | // because the subscription map is _not_ deleted | ||
97 | // and the forward map _not_ altered. So pointers stay valid. | ||
98 | 1 | const auto group_id = map_.erase(tm, var.index_); | |
99 | 1 | return {group_id != 0, group_id, 0, 0}; | |
100 | } | ||
101 | |||
102 | 387 | PdCom::impl::MsrProto::PeriodicSubscriptionsBase::DataReceiveHandle:: | |
103 | DataReceiveHandle( | ||
104 | PeriodicSubscriptionsBase &ps, | ||
105 | ChannelSubscriptionMap &subscription_map, | ||
106 | VariableCache::Lock variable_cache_lock, | ||
107 | std::chrono::nanoseconds ts, | ||
108 | 387 | bool consistent_blocksize) : | |
109 | ps_(&ps), | ||
110 | 387 | variable_cache_lock_(std::move(variable_cache_lock)), | |
111 | subscription_map_(&subscription_map), | ||
112 | ts_(ts), | ||
113 | 774 | consistent_blocksize_(consistent_blocksize) | |
114 | { | ||
115 |
2/4✗ Branch 1 not taken.
✓ Branch 2 taken 387 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 387 times.
|
387 | if (ps.locked_) |
116 | throw PdCom::InternalError( | ||
117 | ✗ | "Receiving periodic data already in progress"); | |
118 | 387 | ps.locked_ = true; | |
119 | 387 | } | |
120 | |||
121 | 547 | void PeriodicSubscriptionsBase::DataReceiveHandle::readData( | |
122 | const PdCom::impl::MsrProto::Channel &c, | ||
123 | const char *data) | ||
124 | { | ||
125 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 547 times.
|
547 | if (!ps_) { |
126 | ✗ | throw PdCom::InternalError("Operating on invalid DataReceiveHandle"); | |
127 | } | ||
128 |
1/2✓ Branch 2 taken 547 times.
✗ Branch 3 not taken.
|
547 | const Base64Info bi(data); |
129 | |||
130 |
3/4✓ Branch 4 taken 547 times.
✗ Branch 5 not taken.
✓ Branch 8 taken 546 times.
✓ Branch 9 taken 1 times.
|
1093 | const auto sub_info_it = subscription_map_->find(c.index_); |
131 |
2/2✓ Branch 7 taken 1 times.
✓ Branch 8 taken 546 times.
|
547 | if (sub_info_it == subscription_map_->end()) |
132 | 1 | return; | |
133 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 546 times.
|
1092 | if (sub_info_it->second.subscriptions_.currentState |
134 | 546 | != PdCom::Subscription::State::Active) | |
135 | ✗ | return; | |
136 |
3/4✗ Branch 1 not taken.
✓ Branch 2 taken 546 times.
✓ Branch 3 taken 45 times.
✓ Branch 4 taken 501 times.
|
546 | if (!consistent_blocksize_) { |
137 | 45 | current_blocksize_ = std::min<int>( | |
138 | 90 | sub_info_it->second.blocksize_, | |
139 |
1/2✗ Branch 5 not taken.
✓ Branch 6 taken 45 times.
|
135 | bi.encodedDataLength_ / c.type_info->element_size); |
140 | 45 | max_read_blocks_ = std::max(max_read_blocks_, current_blocksize_); | |
141 | } | ||
142 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 501 times.
|
501 | else if (current_blocksize_ == -1) |
143 | ✗ | current_blocksize_ = sub_info_it->second.blocksize_; | |
144 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 501 times.
|
501 | else if ( |
145 | 501 | current_blocksize_ | |
146 | 501 | != static_cast<int>(sub_info_it->second.blocksize_)) | |
147 | ✗ | throw ProtocolError("Blocksize mismatch"); | |
148 |
4/6✓ Branch 5 taken 546 times.
✗ Branch 6 not taken.
✓ Branch 10 taken 546 times.
✗ Branch 11 not taken.
✓ Branch 13 taken 546 times.
✓ Branch 14 taken 1 times.
|
1638 | variable_cache_lock_[c.index_].readFromBase64( |
149 | 1092 | data, bi.base64Length_, c, current_blocksize_); | |
150 | } | ||
151 | |||
152 | |||
153 | 341 | void PeriodicSubscriptionsBase::DataReceiveHandle::readTimestamps( | |
154 | const char *data) | ||
155 | { | ||
156 | static_assert( | ||
157 | sizeof(decltype(time_vector_)::value_type) == sizeof(int64_t), | ||
158 | "time_vector_ is not size compatible to time tag"); | ||
159 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 341 times.
|
341 | if (!ps_) { |
160 | ✗ | throw PdCom::InternalError("Operating on invalid DataReceiveHandle"); | |
161 | } | ||
162 |
2/4✓ Branch 0 taken 341 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 341 times.
|
341 | if (!data || !*data) |
163 | ✗ | throw PdCom::ProtocolError("Tag does not contain data"); | |
164 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 341 times.
|
341 | const auto b64len = ::strlen(data); |
165 |
1/2✓ Branch 1 taken 341 times.
✗ Branch 2 not taken.
|
341 | if (current_blocksize_ == -1) { |
166 | 341 | current_blocksize_ = b64len * 3 / 4 / sizeof(int64_t); | |
167 | 341 | max_read_blocks_ = current_blocksize_; | |
168 | } | ||
169 | 341 | consistent_blocksize_ = true; | |
170 | |||
171 |
1/2✓ Branch 5 taken 341 times.
✗ Branch 6 not taken.
|
682 | ps_->time_vector_.resize( |
172 | 682 | std::max<size_t>(current_blocksize_ + 1, ps_->time_vector_.size())); | |
173 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 341 times.
|
682 | if (!readFromBase64( |
174 | 341 | reinterpret_cast<char *>(ps_->time_vector_.data()), data, | |
175 | 341 | b64len, sizeof(int64_t) * current_blocksize_)) { | |
176 | ✗ | throw ProtocolError("Invalid <time> child of <data>"); | |
177 | } | ||
178 | 341 | timestamps_read_ = true; | |
179 | 341 | } | |
180 | |||
181 | ✗ | void PeriodicSubscriptionsWithGroup::dump( | |
182 | std::vector<PdCom::Process::SubscriptionInfo> &ans, | ||
183 | const std::unordered_map<unsigned, Channel *> &channels) const | ||
184 | { | ||
185 | ✗ | map_.dump(ans, channels); | |
186 | } | ||
187 | |||
188 | 387 | void PeriodicSubscriptionsBase::DataReceiveHandle::endNewDataRecieving() | |
189 | { | ||
190 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 387 times.
|
387 | if (!ps_) { |
191 | ✗ | throw PdCom::InternalError("Operating on invalid DataReceiveHandle"); | |
192 | } | ||
193 | // make sure to release the variable cache lock | ||
194 |
2/2✓ Branch 7 taken 386 times.
✓ Branch 8 taken 1 times.
|
773 | auto cache = std::move(variable_cache_lock_); |
195 | // as this is the final call on receiving a new dataset, | ||
196 | // make sure to erase the ps pointer to mark us invalid afterwards | ||
197 | const struct PsDeleter | ||
198 | { | ||
199 | std::unique_ptr<PeriodicSubscriptionsBase, Unlocker> &ps_; | ||
200 | 387 | ~PsDeleter() { ps_.reset(); } | |
201 | 387 | PsDeleter(std::unique_ptr<PeriodicSubscriptionsBase, Unlocker> &ps) : | |
202 | 387 | ps_(ps) | |
203 | 387 | {} | |
204 |
2/2✓ Branch 4 taken 386 times.
✓ Branch 5 taken 1 times.
|
773 | } psdeleter_ {ps_}; |
205 | // subscriptions can dissappear during <data> tag parsing, so don't | ||
206 | // assume that subscriptions_ is not empty | ||
207 |
5/6✓ Branch 1 taken 387 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1 times.
✓ Branch 5 taken 386 times.
✓ Branch 6 taken 1 times.
✓ Branch 7 taken 386 times.
|
387 | if (!cache or current_blocksize_ == -1) |
208 | 1 | return; | |
209 | |||
210 |
2/2✓ Branch 1 taken 890 times.
✓ Branch 2 taken 386 times.
|
1276 | for (int block = 0; block < max_read_blocks_; ++block) { |
211 |
2/2✓ Branch 7 taken 1253 times.
✓ Branch 8 taken 890 times.
|
2143 | for (const auto &sub_map : *subscription_map_) { |
212 |
1/2✓ Branch 2 taken 1253 times.
✗ Branch 3 not taken.
|
1253 | const DataDecoder *var = cache.filled_cache(sub_map.first); |
213 | // no data recieved for given variable | ||
214 |
2/2✓ Branch 0 taken 1210 times.
✓ Branch 1 taken 43 times.
|
1296 | if (!var |
215 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 1210 times.
|
1210 | || sub_map.second.subscriptions_.currentState |
216 | != PdCom::Subscription::State::Active) | ||
217 | 43 | continue; | |
218 |
1/2✓ Branch 4 taken 1210 times.
✗ Branch 5 not taken.
|
1210 | sendDataToSubscribers(*var, sub_map.second, block); |
219 | } | ||
220 |
1/2✓ Branch 4 taken 890 times.
✗ Branch 5 not taken.
|
890 | ps_->subscriber_notifier_.notify(); |
221 | 890 | ps_->subscriber_notifier_.clear(); | |
222 | } | ||
223 |
2/2✓ Branch 2 taken 386 times.
✓ Branch 3 taken 1 times.
|
386 | current_blocksize_ = -1; |
224 | } | ||
225 | |||
226 | 1210 | void PeriodicSubscriptionsBase::DataReceiveHandle::sendDataToSubscribers( | |
227 | const DataDecoder &var, | ||
228 | const ChannelSubscripion &sub_map, | ||
229 | const int block) | ||
230 | { | ||
231 | 1210 | bool size_check_done = false; | |
232 |
5/8✓ Branch 6 taken 1210 times.
✓ Branch 7 taken 1210 times.
✓ Branch 12 taken 1210 times.
✗ Branch 13 not taken.
✓ Branch 15 taken 1210 times.
✗ Branch 16 not taken.
✓ Branch 19 taken 1210 times.
✗ Branch 20 not taken.
|
2420 | for (const auto &weak_sub : sub_map.subscriptions_) { |
233 |
2/4✓ Branch 3 taken 1210 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 1210 times.
✗ Branch 7 not taken.
|
2420 | if (const auto sub = weak_sub.lock()) { |
234 |
2/4✓ Branch 5 taken 1210 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 1210 times.
✗ Branch 9 not taken.
|
2420 | const auto var_ptr = sub->variable_.lock(); |
235 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 1210 times.
|
1210 | if (!var_ptr) |
236 | ✗ | throw EmptyVariable(); | |
237 |
1/2✓ Branch 0 taken 1210 times.
✗ Branch 1 not taken.
|
1210 | if (!size_check_done) { |
238 |
2/4✗ Branch 1 not taken.
✓ Branch 2 taken 1210 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 1210 times.
|
2420 | if (consistent_blocksize_ |
239 |
3/4✓ Branch 0 taken 1165 times.
✓ Branch 1 taken 45 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 1165 times.
|
2375 | && var.size() < sub_map.blocksize_ |
240 |
1/2✓ Branch 5 taken 1165 times.
✗ Branch 6 not taken.
|
1165 | * var_ptr->totalSizeInBytes()) { |
241 | ✗ | std::ostringstream os; | |
242 | os << "size mismatch in variable cache, " | ||
243 | ✗ | "expected " | |
244 | ✗ | << sub_map.blocksize_ * var_ptr->totalSizeInBytes() | |
245 | ✗ | << ", got " << var.size(); | |
246 | ✗ | throw ProtocolError(os.str()); | |
247 | } | ||
248 |
2/4✓ Branch 7 taken 1210 times.
✗ Branch 8 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 1210 times.
|
1210 | else if (var.size() < (block + 1) * var_ptr->totalSizeInBytes()) |
249 | ✗ | return; | |
250 | 1210 | size_check_done = true; | |
251 | } | ||
252 |
2/4✓ Branch 11 taken 1210 times.
✗ Branch 12 not taken.
✓ Branch 14 taken 1210 times.
✗ Branch 15 not taken.
|
1210 | sub->readData(var.data() + block * var_ptr->totalSizeInBytes()); |
253 | 1210 | std::chrono::nanoseconds ts; | |
254 |
3/4✗ Branch 1 not taken.
✓ Branch 2 taken 1210 times.
✓ Branch 3 taken 1165 times.
✓ Branch 4 taken 45 times.
|
1210 | if (timestamps_read_) |
255 |
1/2✓ Branch 4 taken 1165 times.
✗ Branch 5 not taken.
|
1165 | ts = ps_->time_vector_.at(block); |
256 | else | ||
257 | 90 | ts = ts_ | |
258 | 180 | + (block - max_read_blocks_ + 1) | |
259 |
1/2✓ Branch 1 taken 45 times.
✗ Branch 2 not taken.
|
90 | * std::chrono::duration_cast< |
260 |
1/2✓ Branch 1 taken 45 times.
✗ Branch 2 not taken.
|
90 | std::chrono::nanoseconds>( |
261 |
1/2✓ Branch 9 taken 45 times.
✗ Branch 10 not taken.
|
180 | var_ptr->getSampleTime()) |
262 | 135 | * sub_map.decimation_; | |
263 |
2/4✓ Branch 8 taken 1210 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 1210 times.
✗ Branch 12 not taken.
|
1210 | ps_->subscriber_notifier_.insert(sub->subscriber_, ts); |
264 | } | ||
265 | } | ||
266 | } | ||
267 | |||
268 | PeriodicSubscriptionsBase::GroupId | ||
269 | 42 | PeriodicSubscriptionsWithGroup::GroupMap::find(PdCom::Transmission t) | |
270 | { | ||
271 | // does a group already exists for given transmission? | ||
272 |
1/2✓ Branch 2 taken 42 times.
✗ Branch 3 not taken.
|
84 | const auto it = backward_.find(t); |
273 |
2/2✓ Branch 6 taken 8 times.
✓ Branch 7 taken 34 times.
|
42 | if (it != backward_.end()) { |
274 |
3/6✓ Branch 4 taken 8 times.
✗ Branch 5 not taken.
✗ Branch 7 not taken.
✓ Branch 8 taken 8 times.
✓ Branch 9 taken 8 times.
✗ Branch 10 not taken.
|
16 | if (forward_.at(it->second - 1).state |
275 | 8 | == ChannelSubscriptionMapImpl::Taken) | |
276 | 8 | return it->second; | |
277 | } | ||
278 | // find a free one; | ||
279 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 34 times.
|
34 | for (unsigned int i = 0; i < forward_.size(); ++i) { |
280 | ✗ | if (forward_[i].state == ChannelSubscriptionMapImpl::Empty) { | |
281 | ✗ | const auto group_id = i + 1; | |
282 | ✗ | backward_[t] = group_id; | |
283 | ✗ | forward_[i].state = ChannelSubscriptionMapImpl::Taken; | |
284 | ✗ | return group_id; | |
285 | } | ||
286 | } | ||
287 | // allocate one at the back; | ||
288 |
1/2✓ Branch 2 taken 34 times.
✗ Branch 3 not taken.
|
34 | forward_.emplace_back(); |
289 | 34 | forward_.back().state = ChannelSubscriptionMapImpl::Taken; | |
290 |
1/2✓ Branch 4 taken 34 times.
✗ Branch 5 not taken.
|
34 | const auto group_id = backward_[t] = forward_.size(); |
291 | 34 | return group_id; | |
292 | } | ||
293 | |||
294 | PeriodicSubscriptionsBase::GroupId | ||
295 | 1 | PeriodicSubscriptionsWithGroup::GroupMap::erase( | |
296 | PdCom::Transmission t, | ||
297 | PeriodicSubscriptionsBase::ChannelId channel_idx) | ||
298 | { | ||
299 |
1/2✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
|
2 | const auto backward_it = backward_.find(t); |
300 |
1/2✗ Branch 6 not taken.
✓ Branch 7 taken 1 times.
|
1 | if (backward_it == backward_.end()) { |
301 | ✗ | assert(backward_it != backward_.end() | |
302 | and "Inconsistent state, backward_ and forward_ out of sync"); | ||
303 | ✗ | return 0; | |
304 | } | ||
305 |
1/2✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
|
1 | auto &channel_map = at(backward_it->second); |
306 |
1/2✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
|
2 | const auto map_it = channel_map.find(channel_idx); |
307 | // already cleaned? if so, nothing to do. | ||
308 |
1/2✗ Branch 6 not taken.
✓ Branch 7 taken 1 times.
|
1 | if (map_it == channel_map.end()) |
309 | ✗ | return 0; | |
310 | // clean subscription map | ||
311 | const bool no_channel_subs_left = | ||
312 | 1 | map_it->second.subscriptions_.remove_expired(); | |
313 | // if channels left, also nothing to do. | ||
314 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (!no_channel_subs_left) |
315 | ✗ | return 0; | |
316 | // no subscriptions for this channel is left, we have to remember the | ||
317 | // group id | ||
318 | 1 | const auto ans = backward_it->second; | |
319 | // now check whether any subscriptions are left for this transmission | ||
320 |
1/2✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
|
1 | if (channel_map.guaranteed_empty()) { |
321 | // group id no longer in use, free up channel map | ||
322 | 1 | channel_map.state = ChannelSubscriptionMapImpl::AboutToBeDeleted; | |
323 |
1/2✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
|
1 | backward_.erase(backward_it); |
324 | } | ||
325 | 1 | return ans; | |
326 | } | ||
327 | |||
328 | 42 | void PeriodicSubscriptionsWithGroup::subscribeWasConfirmed( | |
329 | PdCom::impl::Process::PendingCallbackQueue &q, | ||
330 | PeriodicSubscriptionsWithGroup::GroupId group_id, | ||
331 | PeriodicSubscriptionsWithGroup::ChannelId channel_id) | ||
332 | { | ||
333 |
1/2✓ Branch 2 taken 42 times.
✗ Branch 3 not taken.
|
42 | auto &sub_map = map_.at(group_id); |
334 |
2/4✓ Branch 2 taken 42 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 42 times.
✗ Branch 6 not taken.
|
84 | const auto it = sub_map.find(channel_id); |
335 |
1/2✗ Branch 6 not taken.
✓ Branch 7 taken 42 times.
|
42 | if (it == sub_map.end()) |
336 | ✗ | return; | |
337 | |||
338 |
2/4✓ Branch 3 taken 42 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 42 times.
✗ Branch 7 not taken.
|
42 | it->second.subscriptions_.broadcastState( |
339 | PdCom::Subscription::State::Active, q); | ||
340 | } | ||
341 | |||
342 | ✗ | void PeriodicSubscriptionsWithGroup::GroupMap::dump( | |
343 | std::vector<PdCom::Process::SubscriptionInfo> &ans, | ||
344 | const std::unordered_map<unsigned, Channel *> &channels) const | ||
345 | { | ||
346 | ✗ | for (const auto &all_groups : forward_) { | |
347 | ✗ | for (const auto &sub_map : all_groups) { | |
348 | ✗ | const auto var = impl::Variable::toUApi( | |
349 | ✗ | std::static_pointer_cast<const MsrProto::Variable>( | |
350 | ✗ | channels.at(sub_map.first)->shared_from_this())); | |
351 | ✗ | for (const auto &weak_sub : sub_map.second.subscriptions_) { | |
352 | ✗ | if (const auto sub = weak_sub.lock()) { | |
353 | ✗ | ans.emplace_back(sub->This_, &sub->subscriber_, var); | |
354 | } | ||
355 | } | ||
356 | } | ||
357 | } | ||
358 | } | ||
359 | |||
360 | PeriodicSubscriptionWithoutGroup::transaction | ||
361 | 4 | PeriodicSubscriptionWithoutGroup::add( | |
362 | Process::PendingCallbackQueue &q, | ||
363 | bool notify_pending, | ||
364 | const std::shared_ptr<PeriodicSubscription> &s) | ||
365 | { | ||
366 |
2/4✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 4 times.
|
4 | if (locked_) |
367 | ✗ | throw PdCom::InternalError("PeriodicSubscription is locked!"); | |
368 | 8 | const auto var_ptr = s->variable_.lock(); | |
369 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
|
4 | if (!var_ptr) |
370 | ✗ | throw EmptyVariable(); | |
371 | 4 | const auto &var = static_cast<Channel const &>(*var_ptr); | |
372 |
1/2✓ Branch 3 taken 4 times.
✗ Branch 4 not taken.
|
8 | auto map_it = map_.find(var.index_); |
373 | |||
374 | |||
375 | 4 | unsigned decimation = s->subscriber_.getTransmission().getInterval() | |
376 | 4 | / var.sample_time_.count() | |
377 | 4 | + 0.5; | |
378 | 8 | decimation += s->subscriber_.getTransmission().getInterval() > 0.0 | |
379 |
2/4✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 4 times.
|
4 | and !decimation; |
380 | 4 | unsigned blocksize = 0; | |
381 | |||
382 |
1/2✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
|
4 | if (decimation > 0) { |
383 | 4 | blocksize = 1.0 / var.sample_time_.count() / decimation / 25.0 + 0.5; | |
384 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
|
4 | blocksize = std::min(var.bufsize / decimation, blocksize); |
385 | } | ||
386 | |||
387 |
1/2✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
|
4 | if (!blocksize) |
388 | 4 | blocksize = 1; | |
389 |
1/2✓ Branch 10 taken 4 times.
✗ Branch 11 not taken.
|
4 | time_vector_.resize(std::max<size_t>(time_vector_.size(), blocksize + 2)); |
390 | |||
391 |
2/2✓ Branch 6 taken 3 times.
✓ Branch 7 taken 1 times.
|
4 | if (map_it == map_.end()) { |
392 | // not yet subscribed | ||
393 |
5/10✓ Branch 10 taken 3 times.
✗ Branch 11 not taken.
✓ Branch 14 taken 3 times.
✗ Branch 15 not taken.
✓ Branch 18 taken 3 times.
✗ Branch 19 not taken.
✓ Branch 29 taken 3 times.
✓ Branch 30 taken 3 times.
✗ Branch 41 not taken.
✗ Branch 42 not taken.
|
3 | map_.insert({var.index_, {blocksize, decimation, {{s}}}}); |
394 | |||
395 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
|
3 | if (notify_pending) |
396 | ✗ | q.push_back({s, PdCom::Subscription::State::Pending}); | |
397 | 3 | return {true, decimation, blocksize}; | |
398 | } | ||
399 |
1/2✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
|
1 | else if (decimation < map_it->second.decimation_) { |
400 | // we have to resubscribe | ||
401 | 1 | map_it->second.subscriptions_.currentState = | |
402 | PdCom::Subscription::State::Pending; | ||
403 | |||
404 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (notify_pending) |
405 | ✗ | q.push_back({s, PdCom::Subscription::State::Pending}); | |
406 | 2 | auto subs = std::move(map_it->second.subscriptions_); | |
407 |
1/2✓ Branch 5 taken 1 times.
✗ Branch 6 not taken.
|
1 | subs.insert(s); |
408 |
1/2✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
|
1 | map_.erase(map_it); |
409 |
2/4✓ Branch 9 taken 1 times.
✗ Branch 10 not taken.
✓ Branch 13 taken 1 times.
✗ Branch 14 not taken.
|
1 | map_.insert({var.index_, {blocksize, decimation, std::move(subs)}}); |
410 | 1 | return {true, decimation, blocksize}; | |
411 | } | ||
412 | else { | ||
413 | ✗ | if (notify_pending | |
414 | ✗ | or map_it->second.subscriptions_.currentState | |
415 | != PdCom::Subscription::State::Pending) | ||
416 | ✗ | q.push_back({s, map_it->second.subscriptions_.currentState}); | |
417 | ✗ | map_it->second.subscriptions_.insert(s); | |
418 | ✗ | return {false, map_it->second.decimation_, map_it->second.blocksize_}; | |
419 | } | ||
420 | } | ||
421 | |||
422 | PeriodicSubscriptionWithoutGroup::transaction | ||
423 | 2 | PeriodicSubscriptionWithoutGroup::remove(const Variable &_var) | |
424 | { | ||
425 |
1/2✓ Branch 3 taken 2 times.
✗ Branch 4 not taken.
|
4 | const auto it = map_.find(_var.index_); |
426 |
7/10✓ Branch 4 taken 2 times.
✗ Branch 5 not taken.
✓ Branch 9 taken 1 times.
✓ Branch 10 taken 1 times.
✓ Branch 11 taken 2 times.
✗ Branch 12 not taken.
✓ Branch 14 taken 2 times.
✗ Branch 15 not taken.
✓ Branch 17 taken 1 times.
✓ Branch 18 taken 1 times.
|
2 | if (it == map_.end() || !it->second.subscriptions_.remove_expired()) |
427 | // not found or still subscribed by others | ||
428 | 1 | return {false, 0, 0}; | |
429 | else { | ||
430 | // now empty | ||
431 |
1/2✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
|
1 | map_.erase(it); |
432 | 1 | return {true, 0, 0}; | |
433 | } | ||
434 | } | ||
435 | |||
436 | 4 | void PeriodicSubscriptionWithoutGroup::subscribeWasConfirmed( | |
437 | Process::PendingCallbackQueue &q, | ||
438 | ChannelId channel_id) | ||
439 | { | ||
440 |
2/4✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 4 times.
✗ Branch 6 not taken.
|
8 | const auto it = map_.find(channel_id); |
441 |
1/2✗ Branch 6 not taken.
✓ Branch 7 taken 4 times.
|
4 | if (it == map_.end()) |
442 | ✗ | return; | |
443 | |||
444 |
2/4✓ Branch 3 taken 4 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 4 times.
✗ Branch 7 not taken.
|
4 | it->second.subscriptions_.broadcastState( |
445 | PdCom::Subscription::State::Active, q); | ||
446 | } | ||
447 | |||
448 | ✗ | void PeriodicSubscriptionWithoutGroup::dump( | |
449 | std::vector<PdCom::Process::SubscriptionInfo> &ans, | ||
450 | const std::unordered_map<unsigned, Channel *> &channels) const | ||
451 | { | ||
452 | ✗ | for (const auto &sub_map : map_) { | |
453 | ✗ | const auto var = impl::Variable::toUApi( | |
454 | ✗ | std::static_pointer_cast<const MsrProto::Variable>( | |
455 | ✗ | channels.at(sub_map.first)->shared_from_this())); | |
456 | ✗ | for (const auto &weak_sub : sub_map.second.subscriptions_) { | |
457 | ✗ | if (const auto sub = weak_sub.lock()) { | |
458 | ✗ | ans.emplace_back(sub->This_, &sub->subscriber_, var); | |
459 | } | ||
460 | } | ||
461 | } | ||
462 | } | ||
463 |