Directory: | ./ |
---|---|
File: | src/msrproto/PeriodicSubscriptions.cpp |
Date: | 2024-11-05 15:23:15 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 192 | 246 | 78.0% |
Branches: | 177 | 392 | 45.2% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /***************************************************************************** | ||
2 | * | ||
3 | * Copyright (C) 2021 Bjarne von Horn (vh at igh dot de) | ||
4 | * | ||
5 | * This file is part of the PdCom library. | ||
6 | * | ||
7 | * The PdCom library is free software: you can redistribute it and/or modify | ||
8 | * it under the terms of the GNU Lesser General Public License as published by | ||
9 | * the Free Software Foundation, either version 3 of the License, or (at your | ||
10 | * option) any later version. | ||
11 | * | ||
12 | * The PdCom library is distributed in the hope that it will be useful, but | ||
13 | * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY | ||
14 | * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public | ||
15 | * License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public License | ||
18 | * along with the PdCom library. If not, see <http://www.gnu.org/licenses/>. | ||
19 | * | ||
20 | *****************************************************************************/ | ||
21 | |||
22 | #include "PeriodicSubscriptions.h" | ||
23 | |||
24 | #include "Channel.h" | ||
25 | #include "Subscription.h" | ||
26 | |||
27 | #include <cassert> | ||
28 | #include <cstring> | ||
29 | #include <pdcom5/Exception.h> | ||
30 | #include <sstream> | ||
31 | |||
32 | using PdCom::impl::MsrProto::DataDecoder; | ||
33 | using PdCom::impl::MsrProto::PeriodicSubscription; | ||
34 | using PdCom::impl::MsrProto::PeriodicSubscriptionsBase; | ||
35 | using PdCom::impl::MsrProto::PeriodicSubscriptionsWithGroup; | ||
36 | using PdCom::impl::MsrProto::PeriodicSubscriptionWithoutGroup; | ||
37 | |||
38 | using PdCom::impl::MsrProto::Variable; | ||
39 | |||
40 | |||
41 | 52 | PeriodicSubscriptionsWithGroup::transaction PeriodicSubscriptionsWithGroup::add( | |
42 | PdCom::impl::Process::PendingCallbackQueue &queue, | ||
43 | bool notify_pending, | ||
44 | const std::shared_ptr<PeriodicSubscription> &s) | ||
45 | { | ||
46 | 104 | const auto var_ptr = s->variable_.lock(); | |
47 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 52 times.
|
52 | if (!var_ptr) |
48 | ✗ | throw EmptyVariable(); | |
49 | 52 | const auto &var = static_cast<Channel const &>(*var_ptr); | |
50 |
1/2✓ Branch 11 taken 52 times.
✗ Branch 12 not taken.
|
52 | const auto group_id = map_.find(s->subscriber_.getTransmission()); |
51 |
2/4✓ Branch 3 taken 52 times.
✗ Branch 4 not taken.
✓ Branch 7 taken 52 times.
✗ Branch 8 not taken.
|
104 | auto map_it = map_.at(group_id).find(var.index_); |
52 |
3/4✓ Branch 3 taken 52 times.
✗ Branch 4 not taken.
✓ Branch 10 taken 41 times.
✓ Branch 11 taken 11 times.
|
52 | if (map_it == map_.at(group_id).end()) { |
53 | // not yet subscribed | ||
54 | 41 | unsigned decimation = s->subscriber_.getTransmission().getInterval() | |
55 | 41 | / var.sample_time_.count() | |
56 | 41 | + 0.5; | |
57 | 82 | decimation += s->subscriber_.getTransmission().getInterval() > 0.0 | |
58 |
3/4✓ Branch 0 taken 41 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
✓ Branch 3 taken 39 times.
|
41 | and !decimation; |
59 | 41 | unsigned blocksize = 0; | |
60 | |||
61 |
1/2✓ Branch 0 taken 41 times.
✗ Branch 1 not taken.
|
41 | if (decimation > 0) { |
62 | 41 | blocksize = | |
63 | 41 | 1.0 / var.sample_time_.count() / decimation / 25.0 + 0.5; | |
64 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 41 times.
|
41 | blocksize = std::min(var.bufsize / decimation, blocksize); |
65 | 41 | blocksize = std::min(blocksize, 1000U); | |
66 | } | ||
67 | |||
68 |
2/2✓ Branch 0 taken 38 times.
✓ Branch 1 taken 3 times.
|
41 | if (!blocksize) |
69 | 38 | blocksize = 1; | |
70 |
1/2✓ Branch 3 taken 41 times.
✗ Branch 4 not taken.
|
82 | time_vector_.resize( |
71 | 82 | std::max<size_t>(time_vector_.size(), blocksize + 2)); | |
72 | |||
73 |
6/12✓ Branch 3 taken 41 times.
✗ Branch 4 not taken.
✓ Branch 14 taken 41 times.
✗ Branch 15 not taken.
✓ Branch 18 taken 41 times.
✗ Branch 19 not taken.
✓ Branch 22 taken 41 times.
✗ Branch 23 not taken.
✓ Branch 33 taken 41 times.
✓ Branch 34 taken 41 times.
✗ Branch 45 not taken.
✗ Branch 46 not taken.
|
41 | map_.at(group_id).insert({var.index_, {blocksize, decimation, {{s}}}}); |
74 | |||
75 |
2/2✓ Branch 0 taken 25 times.
✓ Branch 1 taken 16 times.
|
41 | if (notify_pending) |
76 |
1/2✓ Branch 8 taken 25 times.
✗ Branch 9 not taken.
|
25 | queue.push_back({s, PdCom::Subscription::State::Pending}); |
77 | 41 | return {true, group_id, decimation, blocksize}; | |
78 | } | ||
79 | else { | ||
80 |
2/2✓ Branch 0 taken 10 times.
✓ Branch 1 taken 1 times.
|
11 | if (notify_pending |
81 |
4/4✓ Branch 0 taken 7 times.
✓ Branch 1 taken 4 times.
✓ Branch 4 taken 6 times.
✓ Branch 5 taken 1 times.
|
11 | or map_it->second.subscriptions_.currentState |
82 | != PdCom::Subscription::State::Pending) | ||
83 |
1/2✓ Branch 10 taken 10 times.
✗ Branch 11 not taken.
|
10 | queue.push_back({s, map_it->second.subscriptions_.currentState}); |
84 |
1/2✓ Branch 6 taken 11 times.
✗ Branch 7 not taken.
|
11 | map_it->second.subscriptions_.insert(s); |
85 | 11 | return {false, group_id, map_it->second.decimation_, | |
86 | 22 | map_it->second.blocksize_}; | |
87 | } | ||
88 | } | ||
89 | |||
90 | PeriodicSubscriptionsWithGroup::transaction | ||
91 | 27 | PeriodicSubscriptionsWithGroup::remove( | |
92 | const Variable &var, | ||
93 | PdCom::Transmission tm) | ||
94 | { | ||
95 | // remove'ing is okay with locked ps, | ||
96 | // because the subscription map is _not_ deleted | ||
97 | // and the forward map _not_ altered. So pointers stay valid. | ||
98 | 27 | const auto group_id = map_.erase(tm, var.index_); | |
99 | 27 | return {group_id != 0, group_id, 0, 0}; | |
100 | } | ||
101 | |||
102 | 51 | PdCom::impl::MsrProto::PeriodicSubscriptionsBase::DataReceiveHandle:: | |
103 | DataReceiveHandle( | ||
104 | PeriodicSubscriptionsBase &ps, | ||
105 | ChannelSubscriptionMap &subscription_map, | ||
106 | VariableCache::Lock variable_cache_lock, | ||
107 | std::chrono::nanoseconds ts, | ||
108 | 51 | bool consistent_blocksize) : | |
109 | ps_(&ps), | ||
110 | 51 | variable_cache_lock_(std::move(variable_cache_lock)), | |
111 | subscription_map_(&subscription_map), | ||
112 | ts_(ts), | ||
113 | 102 | consistent_blocksize_(consistent_blocksize) | |
114 | { | ||
115 |
2/4✗ Branch 1 not taken.
✓ Branch 2 taken 51 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 51 times.
|
51 | if (ps.locked_) |
116 | throw PdCom::InternalError( | ||
117 | ✗ | "Receiving periodic data already in progress"); | |
118 | 51 | ps.locked_ = true; | |
119 | 51 | } | |
120 | |||
121 | 53 | void PeriodicSubscriptionsBase::DataReceiveHandle::readData( | |
122 | const PdCom::impl::MsrProto::Channel &c, | ||
123 | const char *data) | ||
124 | { | ||
125 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 53 times.
|
53 | if (!ps_) { |
126 | ✗ | throw PdCom::InternalError("Operating on invalid DataReceiveHandle"); | |
127 | } | ||
128 |
1/2✓ Branch 2 taken 53 times.
✗ Branch 3 not taken.
|
53 | const Base64Info bi(data); |
129 | |||
130 |
3/4✓ Branch 4 taken 53 times.
✗ Branch 5 not taken.
✓ Branch 8 taken 45 times.
✓ Branch 9 taken 8 times.
|
98 | const auto sub_info_it = subscription_map_->find(c.index_); |
131 |
2/2✓ Branch 7 taken 8 times.
✓ Branch 8 taken 45 times.
|
53 | if (sub_info_it == subscription_map_->end()) |
132 | 8 | return; | |
133 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 45 times.
|
90 | if (sub_info_it->second.subscriptions_.currentState |
134 | 45 | != PdCom::Subscription::State::Active) | |
135 | ✗ | return; | |
136 |
3/4✗ Branch 1 not taken.
✓ Branch 2 taken 45 times.
✓ Branch 3 taken 5 times.
✓ Branch 4 taken 40 times.
|
45 | if (!consistent_blocksize_) { |
137 | 5 | current_blocksize_ = std::min<int>( | |
138 | 10 | sub_info_it->second.blocksize_, | |
139 |
1/2✗ Branch 5 not taken.
✓ Branch 6 taken 5 times.
|
15 | bi.encodedDataLength_ / c.type_info->element_size); |
140 | 5 | max_read_blocks_ = std::max(max_read_blocks_, current_blocksize_); | |
141 | } | ||
142 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 40 times.
|
40 | else if (current_blocksize_ == -1) |
143 | ✗ | current_blocksize_ = sub_info_it->second.blocksize_; | |
144 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 40 times.
|
40 | else if ( |
145 | 40 | current_blocksize_ | |
146 | 40 | != static_cast<int>(sub_info_it->second.blocksize_)) | |
147 | ✗ | throw ProtocolError("Blocksize mismatch"); | |
148 |
4/6✓ Branch 5 taken 45 times.
✗ Branch 6 not taken.
✓ Branch 10 taken 45 times.
✗ Branch 11 not taken.
✓ Branch 13 taken 45 times.
✓ Branch 14 taken 8 times.
|
135 | variable_cache_lock_[c.index_].readFromBase64( |
149 | 90 | data, bi.base64Length_, c, current_blocksize_); | |
150 | } | ||
151 | |||
152 | |||
153 | 45 | void PeriodicSubscriptionsBase::DataReceiveHandle::readTimestamps( | |
154 | const char *data) | ||
155 | { | ||
156 | static_assert( | ||
157 | sizeof(decltype(time_vector_)::value_type) == sizeof(int64_t), | ||
158 | "time_vector_ is not size compatible to time tag"); | ||
159 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 45 times.
|
45 | if (!ps_) { |
160 | ✗ | throw PdCom::InternalError("Operating on invalid DataReceiveHandle"); | |
161 | } | ||
162 |
2/4✓ Branch 0 taken 45 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 45 times.
|
45 | if (!data || !*data) |
163 | ✗ | throw PdCom::ProtocolError("Tag does not contain data"); | |
164 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 45 times.
|
45 | const auto b64len = ::strlen(data); |
165 |
1/2✓ Branch 1 taken 45 times.
✗ Branch 2 not taken.
|
45 | if (current_blocksize_ == -1) { |
166 | 45 | current_blocksize_ = b64len * 3 / 4 / sizeof(int64_t); | |
167 | 45 | max_read_blocks_ = current_blocksize_; | |
168 | } | ||
169 | 45 | consistent_blocksize_ = true; | |
170 | |||
171 |
1/2✓ Branch 5 taken 45 times.
✗ Branch 6 not taken.
|
90 | ps_->time_vector_.resize( |
172 | 90 | std::max<size_t>(current_blocksize_ + 1, ps_->time_vector_.size())); | |
173 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 45 times.
|
90 | if (!readFromBase64( |
174 | 45 | reinterpret_cast<char *>(ps_->time_vector_.data()), data, | |
175 | 45 | b64len, sizeof(int64_t) * current_blocksize_)) { | |
176 | ✗ | throw ProtocolError("Invalid <time> child of <data>"); | |
177 | } | ||
178 | 45 | timestamps_read_ = true; | |
179 | 45 | } | |
180 | |||
181 | 1 | void PeriodicSubscriptionsWithGroup::dump( | |
182 | std::vector<PdCom::Process::SubscriptionInfo> &ans, | ||
183 | const std::unordered_map<unsigned, Channel *> &channels) const | ||
184 | { | ||
185 | 1 | map_.dump(ans, channels); | |
186 | 1 | } | |
187 | |||
188 | 49 | void PeriodicSubscriptionsBase::DataReceiveHandle::endNewDataRecieving() | |
189 | { | ||
190 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 49 times.
|
49 | if (!ps_) { |
191 | ✗ | throw PdCom::InternalError("Operating on invalid DataReceiveHandle"); | |
192 | } | ||
193 | // make sure to release the variable cache lock | ||
194 |
1/2✓ Branch 7 taken 49 times.
✗ Branch 8 not taken.
|
98 | auto cache = std::move(variable_cache_lock_); |
195 | // as this is the final call on receiving a new dataset, | ||
196 | // make sure to erase the ps pointer to mark us invalid afterwards | ||
197 | const struct PsDeleter | ||
198 | { | ||
199 | std::unique_ptr<PeriodicSubscriptionsBase, Unlocker> &ps_; | ||
200 | 49 | ~PsDeleter() { ps_.reset(); } | |
201 | 49 | PsDeleter(std::unique_ptr<PeriodicSubscriptionsBase, Unlocker> &ps) : | |
202 | 49 | ps_(ps) | |
203 | 49 | {} | |
204 |
1/2✓ Branch 4 taken 49 times.
✗ Branch 5 not taken.
|
98 | } psdeleter_ {ps_}; |
205 | // subscriptions can dissappear during <data> tag parsing, so don't | ||
206 | // assume that subscriptions_ is not empty | ||
207 |
3/6✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 49 times.
✗ Branch 6 not taken.
✓ Branch 7 taken 49 times.
|
49 | if (!cache or current_blocksize_ == -1) |
208 | ✗ | return; | |
209 | |||
210 |
2/2✓ Branch 1 taken 62 times.
✓ Branch 2 taken 49 times.
|
111 | for (int block = 0; block < max_read_blocks_; ++block) { |
211 |
2/2✓ Branch 7 taken 65 times.
✓ Branch 8 taken 62 times.
|
127 | for (const auto &sub_map : *subscription_map_) { |
212 |
1/2✓ Branch 2 taken 65 times.
✗ Branch 3 not taken.
|
65 | const DataDecoder *var = cache.filled_cache(sub_map.first); |
213 | // no data recieved for given variable | ||
214 |
2/2✓ Branch 0 taken 64 times.
✓ Branch 1 taken 1 times.
|
66 | if (!var |
215 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 64 times.
|
64 | || sub_map.second.subscriptions_.currentState |
216 | != PdCom::Subscription::State::Active) | ||
217 | 1 | continue; | |
218 |
1/2✓ Branch 4 taken 64 times.
✗ Branch 5 not taken.
|
64 | sendDataToSubscribers(*var, sub_map.second, block); |
219 | } | ||
220 |
1/2✓ Branch 4 taken 62 times.
✗ Branch 5 not taken.
|
62 | ps_->subscriber_notifier_.notify(); |
221 | 62 | ps_->subscriber_notifier_.clear(); | |
222 | } | ||
223 |
1/2✓ Branch 2 taken 49 times.
✗ Branch 3 not taken.
|
49 | current_blocksize_ = -1; |
224 | } | ||
225 | |||
226 | 64 | void PeriodicSubscriptionsBase::DataReceiveHandle::sendDataToSubscribers( | |
227 | const DataDecoder &var, | ||
228 | const ChannelSubscripion &sub_map, | ||
229 | const int block) | ||
230 | { | ||
231 | 64 | bool size_check_done = false; | |
232 |
8/8✓ Branch 6 taken 71 times.
✓ Branch 7 taken 62 times.
✓ Branch 12 taken 62 times.
✓ Branch 13 taken 2 times.
✓ Branch 15 taken 62 times.
✓ Branch 16 taken 2 times.
✓ Branch 19 taken 62 times.
✓ Branch 20 taken 2 times.
|
133 | for (const auto &weak_sub : sub_map.subscriptions_) { |
233 |
3/4✓ Branch 3 taken 71 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 69 times.
✓ Branch 7 taken 2 times.
|
140 | if (const auto sub = weak_sub.lock()) { |
234 |
4/4✓ Branch 5 taken 69 times.
✓ Branch 6 taken 2 times.
✓ Branch 8 taken 69 times.
✓ Branch 9 taken 2 times.
|
140 | const auto var_ptr = sub->variable_.lock(); |
235 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 71 times.
|
71 | if (!var_ptr) |
236 | ✗ | throw EmptyVariable(); | |
237 |
2/2✓ Branch 0 taken 64 times.
✓ Branch 1 taken 7 times.
|
71 | if (!size_check_done) { |
238 |
2/4✗ Branch 1 not taken.
✓ Branch 2 taken 64 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 64 times.
|
128 | if (consistent_blocksize_ |
239 |
3/4✓ Branch 0 taken 49 times.
✓ Branch 1 taken 15 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 49 times.
|
113 | && var.size() < sub_map.blocksize_ |
240 |
1/2✓ Branch 5 taken 49 times.
✗ Branch 6 not taken.
|
49 | * var_ptr->totalSizeInBytes()) { |
241 | ✗ | std::ostringstream os; | |
242 | os << "size mismatch in variable cache, " | ||
243 | ✗ | "expected " | |
244 | ✗ | << sub_map.blocksize_ * var_ptr->totalSizeInBytes() | |
245 | ✗ | << ", got " << var.size(); | |
246 | ✗ | throw ProtocolError(os.str()); | |
247 | } | ||
248 |
3/4✓ Branch 7 taken 64 times.
✗ Branch 8 not taken.
✓ Branch 9 taken 2 times.
✓ Branch 10 taken 62 times.
|
64 | else if (var.size() < (block + 1) * var_ptr->totalSizeInBytes()) |
249 | 2 | return; | |
250 | 62 | size_check_done = true; | |
251 | } | ||
252 |
2/4✓ Branch 11 taken 69 times.
✗ Branch 12 not taken.
✓ Branch 14 taken 69 times.
✗ Branch 15 not taken.
|
69 | sub->readData(var.data() + block * var_ptr->totalSizeInBytes()); |
253 | 69 | std::chrono::nanoseconds ts; | |
254 |
3/4✗ Branch 1 not taken.
✓ Branch 2 taken 69 times.
✓ Branch 3 taken 56 times.
✓ Branch 4 taken 13 times.
|
69 | if (timestamps_read_) |
255 |
1/2✓ Branch 4 taken 56 times.
✗ Branch 5 not taken.
|
56 | ts = ps_->time_vector_.at(block); |
256 | else | ||
257 | 26 | ts = ts_ | |
258 | 52 | + (block - max_read_blocks_ + 1) | |
259 |
1/2✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
|
26 | * std::chrono::duration_cast< |
260 |
1/2✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
|
26 | std::chrono::nanoseconds>( |
261 |
1/2✓ Branch 9 taken 13 times.
✗ Branch 10 not taken.
|
52 | var_ptr->getSampleTime()) |
262 | 39 | * sub_map.decimation_; | |
263 |
3/4✓ Branch 8 taken 69 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 69 times.
✓ Branch 12 taken 2 times.
|
69 | ps_->subscriber_notifier_.insert(sub->subscriber_, ts); |
264 | } | ||
265 | } | ||
266 | } | ||
267 | |||
268 | PeriodicSubscriptionsBase::GroupId | ||
269 | 52 | PeriodicSubscriptionsWithGroup::GroupMap::find(PdCom::Transmission t) | |
270 | { | ||
271 | // does a group already exists for given transmission? | ||
272 |
1/2✓ Branch 2 taken 52 times.
✗ Branch 3 not taken.
|
104 | const auto it = backward_.find(t); |
273 |
2/2✓ Branch 6 taken 19 times.
✓ Branch 7 taken 33 times.
|
52 | if (it != backward_.end()) { |
274 |
3/6✓ Branch 4 taken 19 times.
✗ Branch 5 not taken.
✗ Branch 7 not taken.
✓ Branch 8 taken 19 times.
✓ Branch 9 taken 19 times.
✗ Branch 10 not taken.
|
38 | if (forward_.at(it->second - 1).state |
275 | 19 | == ChannelSubscriptionMapImpl::Taken) | |
276 | 19 | return it->second; | |
277 | } | ||
278 | // find a free one; | ||
279 |
2/2✓ Branch 2 taken 8 times.
✓ Branch 3 taken 32 times.
|
40 | for (unsigned int i = 0; i < forward_.size(); ++i) { |
280 |
3/4✗ Branch 3 not taken.
✓ Branch 4 taken 8 times.
✓ Branch 5 taken 1 times.
✓ Branch 6 taken 7 times.
|
8 | if (forward_[i].state == ChannelSubscriptionMapImpl::Empty) { |
281 | 1 | const auto group_id = i + 1; | |
282 |
1/2✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
|
1 | backward_[t] = group_id; |
283 | 1 | forward_[i].state = ChannelSubscriptionMapImpl::Taken; | |
284 | 1 | return group_id; | |
285 | } | ||
286 | } | ||
287 | // allocate one at the back; | ||
288 |
1/2✓ Branch 2 taken 32 times.
✗ Branch 3 not taken.
|
32 | forward_.emplace_back(); |
289 | 32 | forward_.back().state = ChannelSubscriptionMapImpl::Taken; | |
290 |
1/2✓ Branch 4 taken 32 times.
✗ Branch 5 not taken.
|
32 | const auto group_id = backward_[t] = forward_.size(); |
291 | 32 | return group_id; | |
292 | } | ||
293 | |||
294 | PeriodicSubscriptionsBase::GroupId | ||
295 | 27 | PeriodicSubscriptionsWithGroup::GroupMap::erase( | |
296 | PdCom::Transmission t, | ||
297 | PeriodicSubscriptionsBase::ChannelId channel_idx) | ||
298 | { | ||
299 |
1/2✓ Branch 2 taken 27 times.
✗ Branch 3 not taken.
|
54 | const auto backward_it = backward_.find(t); |
300 |
1/2✗ Branch 6 not taken.
✓ Branch 7 taken 27 times.
|
27 | if (backward_it == backward_.end()) { |
301 | ✗ | assert(backward_it != backward_.end() | |
302 | and "Inconsistent state, backward_ and forward_ out of sync"); | ||
303 | ✗ | return 0; | |
304 | } | ||
305 |
1/2✓ Branch 4 taken 27 times.
✗ Branch 5 not taken.
|
27 | auto &channel_map = at(backward_it->second); |
306 |
1/2✓ Branch 2 taken 27 times.
✗ Branch 3 not taken.
|
54 | const auto map_it = channel_map.find(channel_idx); |
307 | // already cleaned? if so, nothing to do. | ||
308 |
1/2✗ Branch 6 not taken.
✓ Branch 7 taken 27 times.
|
27 | if (map_it == channel_map.end()) |
309 | ✗ | return 0; | |
310 | // clean subscription map | ||
311 | const bool no_channel_subs_left = | ||
312 | 27 | map_it->second.subscriptions_.remove_expired(); | |
313 | // if channels left, also nothing to do. | ||
314 |
2/2✓ Branch 0 taken 5 times.
✓ Branch 1 taken 22 times.
|
27 | if (!no_channel_subs_left) |
315 | 5 | return 0; | |
316 | // no subscriptions for this channel is left, we have to remember the | ||
317 | // group id | ||
318 | 22 | const auto ans = backward_it->second; | |
319 | // now check whether any subscriptions are left for this transmission | ||
320 |
2/2✓ Branch 2 taken 14 times.
✓ Branch 3 taken 8 times.
|
22 | if (channel_map.guaranteed_empty()) { |
321 | // group id no longer in use, free up channel map | ||
322 | 14 | channel_map.state = ChannelSubscriptionMapImpl::AboutToBeDeleted; | |
323 |
1/2✓ Branch 4 taken 14 times.
✗ Branch 5 not taken.
|
14 | backward_.erase(backward_it); |
324 | } | ||
325 | 22 | return ans; | |
326 | } | ||
327 | |||
328 | 41 | void PeriodicSubscriptionsWithGroup::subscribeWasConfirmed( | |
329 | PdCom::impl::Process::PendingCallbackQueue &q, | ||
330 | PeriodicSubscriptionsWithGroup::GroupId group_id, | ||
331 | PeriodicSubscriptionsWithGroup::ChannelId channel_id) | ||
332 | { | ||
333 |
1/2✓ Branch 2 taken 41 times.
✗ Branch 3 not taken.
|
41 | auto &sub_map = map_.at(group_id); |
334 |
2/4✓ Branch 2 taken 41 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 41 times.
✗ Branch 6 not taken.
|
82 | const auto it = sub_map.find(channel_id); |
335 |
1/2✗ Branch 6 not taken.
✓ Branch 7 taken 41 times.
|
41 | if (it == sub_map.end()) |
336 | ✗ | return; | |
337 | |||
338 |
2/4✓ Branch 3 taken 41 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 41 times.
✗ Branch 7 not taken.
|
41 | it->second.subscriptions_.broadcastState( |
339 | PdCom::Subscription::State::Active, q); | ||
340 | } | ||
341 | |||
342 | 1 | void PeriodicSubscriptionsWithGroup::GroupMap::dump( | |
343 | std::vector<PdCom::Process::SubscriptionInfo> &ans, | ||
344 | const std::unordered_map<unsigned, Channel *> &channels) const | ||
345 | { | ||
346 |
2/2✓ Branch 6 taken 1 times.
✓ Branch 7 taken 1 times.
|
2 | for (const auto &all_groups : forward_) { |
347 |
2/2✓ Branch 5 taken 1 times.
✓ Branch 6 taken 1 times.
|
2 | for (const auto &sub_map : all_groups) { |
348 | 1 | const auto var = impl::Variable::toUApi( | |
349 | 3 | std::static_pointer_cast<const MsrProto::Variable>( | |
350 |
2/4✓ Branch 5 taken 1 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 1 times.
✗ Branch 10 not taken.
|
5 | channels.at(sub_map.first)->shared_from_this())); |
351 |
2/2✓ Branch 6 taken 1 times.
✓ Branch 7 taken 1 times.
|
2 | for (const auto &weak_sub : sub_map.second.subscriptions_) { |
352 |
1/2✓ Branch 3 taken 1 times.
✗ Branch 4 not taken.
|
2 | if (const auto sub = weak_sub.lock()) { |
353 |
1/2✓ Branch 10 taken 1 times.
✗ Branch 11 not taken.
|
1 | ans.emplace_back(sub->This_, &sub->subscriber_, var); |
354 | } | ||
355 | } | ||
356 | } | ||
357 | } | ||
358 | 1 | } | |
359 | |||
360 | PeriodicSubscriptionWithoutGroup::transaction | ||
361 | 3 | PeriodicSubscriptionWithoutGroup::add( | |
362 | Process::PendingCallbackQueue &q, | ||
363 | bool notify_pending, | ||
364 | const std::shared_ptr<PeriodicSubscription> &s) | ||
365 | { | ||
366 |
2/4✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 3 times.
|
3 | if (locked_) |
367 | ✗ | throw PdCom::InternalError("PeriodicSubscription is locked!"); | |
368 | 6 | const auto var_ptr = s->variable_.lock(); | |
369 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
|
3 | if (!var_ptr) |
370 | ✗ | throw EmptyVariable(); | |
371 | 3 | const auto &var = static_cast<Channel const &>(*var_ptr); | |
372 |
1/2✓ Branch 3 taken 3 times.
✗ Branch 4 not taken.
|
6 | auto map_it = map_.find(var.index_); |
373 | |||
374 | |||
375 | 3 | unsigned decimation = s->subscriber_.getTransmission().getInterval() | |
376 | 3 | / var.sample_time_.count() | |
377 | 3 | + 0.5; | |
378 | 6 | decimation += s->subscriber_.getTransmission().getInterval() > 0.0 | |
379 |
2/4✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 3 times.
|
3 | and !decimation; |
380 | 3 | unsigned blocksize = 0; | |
381 | |||
382 |
1/2✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
|
3 | if (decimation > 0) { |
383 | 3 | blocksize = 1.0 / var.sample_time_.count() / decimation / 25.0 + 0.5; | |
384 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
|
3 | blocksize = std::min(var.bufsize / decimation, blocksize); |
385 | } | ||
386 | |||
387 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
|
3 | if (!blocksize) |
388 | ✗ | blocksize = 1; | |
389 |
1/2✓ Branch 10 taken 3 times.
✗ Branch 11 not taken.
|
3 | time_vector_.resize(std::max<size_t>(time_vector_.size(), blocksize + 2)); |
390 | |||
391 |
1/2✓ Branch 6 taken 3 times.
✗ Branch 7 not taken.
|
3 | if (map_it == map_.end()) { |
392 | // not yet subscribed | ||
393 |
5/10✓ Branch 10 taken 3 times.
✗ Branch 11 not taken.
✓ Branch 14 taken 3 times.
✗ Branch 15 not taken.
✓ Branch 18 taken 3 times.
✗ Branch 19 not taken.
✓ Branch 29 taken 3 times.
✓ Branch 30 taken 3 times.
✗ Branch 41 not taken.
✗ Branch 42 not taken.
|
3 | map_.insert({var.index_, {blocksize, decimation, {{s}}}}); |
394 | |||
395 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
|
3 | if (notify_pending) |
396 | ✗ | q.push_back({s, PdCom::Subscription::State::Pending}); | |
397 | 3 | return {true, decimation, blocksize}; | |
398 | } | ||
399 | ✗ | else if (decimation < map_it->second.decimation_) { | |
400 | // we have to resubscribe | ||
401 | ✗ | map_it->second.subscriptions_.currentState = | |
402 | PdCom::Subscription::State::Pending; | ||
403 | |||
404 | ✗ | if (notify_pending) | |
405 | ✗ | q.push_back({s, PdCom::Subscription::State::Pending}); | |
406 | ✗ | auto subs = std::move(map_it->second.subscriptions_); | |
407 | ✗ | subs.insert(s); | |
408 | ✗ | map_.erase(map_it); | |
409 | ✗ | map_.insert({var.index_, {blocksize, decimation, std::move(subs)}}); | |
410 | ✗ | return {true, decimation, blocksize}; | |
411 | } | ||
412 | else { | ||
413 | ✗ | if (notify_pending | |
414 | ✗ | or map_it->second.subscriptions_.currentState | |
415 | != PdCom::Subscription::State::Pending) | ||
416 | ✗ | q.push_back({s, map_it->second.subscriptions_.currentState}); | |
417 | ✗ | map_it->second.subscriptions_.insert(s); | |
418 | ✗ | return {false, map_it->second.decimation_, map_it->second.blocksize_}; | |
419 | } | ||
420 | } | ||
421 | |||
422 | PeriodicSubscriptionWithoutGroup::transaction | ||
423 | ✗ | PeriodicSubscriptionWithoutGroup::remove(const Variable &_var) | |
424 | { | ||
425 | ✗ | const auto it = map_.find(_var.index_); | |
426 | ✗ | if (it == map_.end() || !it->second.subscriptions_.remove_expired()) | |
427 | // not found or still subscribed by others | ||
428 | ✗ | return {false, 0, 0}; | |
429 | else { | ||
430 | // now empty | ||
431 | ✗ | map_.erase(it); | |
432 | ✗ | return {true, 0, 0}; | |
433 | } | ||
434 | } | ||
435 | |||
436 | 3 | void PeriodicSubscriptionWithoutGroup::subscribeWasConfirmed( | |
437 | Process::PendingCallbackQueue &q, | ||
438 | ChannelId channel_id) | ||
439 | { | ||
440 |
2/4✓ Branch 2 taken 3 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 3 times.
✗ Branch 6 not taken.
|
6 | const auto it = map_.find(channel_id); |
441 |
1/2✗ Branch 6 not taken.
✓ Branch 7 taken 3 times.
|
3 | if (it == map_.end()) |
442 | ✗ | return; | |
443 | |||
444 |
2/4✓ Branch 3 taken 3 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 3 times.
✗ Branch 7 not taken.
|
3 | it->second.subscriptions_.broadcastState( |
445 | PdCom::Subscription::State::Active, q); | ||
446 | } | ||
447 | |||
448 | ✗ | void PeriodicSubscriptionWithoutGroup::dump( | |
449 | std::vector<PdCom::Process::SubscriptionInfo> &ans, | ||
450 | const std::unordered_map<unsigned, Channel *> &channels) const | ||
451 | { | ||
452 | ✗ | for (const auto &sub_map : map_) { | |
453 | ✗ | const auto var = impl::Variable::toUApi( | |
454 | ✗ | std::static_pointer_cast<const MsrProto::Variable>( | |
455 | ✗ | channels.at(sub_map.first)->shared_from_this())); | |
456 | ✗ | for (const auto &weak_sub : sub_map.second.subscriptions_) { | |
457 | ✗ | if (const auto sub = weak_sub.lock()) { | |
458 | ✗ | ans.emplace_back(sub->This_, &sub->subscriber_, var); | |
459 | } | ||
460 | } | ||
461 | } | ||
462 | } | ||
463 |