Directory: | ./ |
---|---|
File: | pdcom5/src/msrproto/PeriodicSubscriptions.cpp |
Date: | 2023-11-12 04:06:57 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 163 | 245 | 66.5% |
Branches: | 151 | 400 | 37.8% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /***************************************************************************** | ||
2 | * | ||
3 | * Copyright (C) 2021 Bjarne von Horn (vh at igh dot de) | ||
4 | * | ||
5 | * This file is part of the PdCom library. | ||
6 | * | ||
7 | * The PdCom library is free software: you can redistribute it and/or modify | ||
8 | * it under the terms of the GNU Lesser General Public License as published by | ||
9 | * the Free Software Foundation, either version 3 of the License, or (at your | ||
10 | * option) any later version. | ||
11 | * | ||
12 | * The PdCom library is distributed in the hope that it will be useful, but | ||
13 | * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY | ||
14 | * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public | ||
15 | * License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public License | ||
18 | * along with the PdCom library. If not, see <http://www.gnu.org/licenses/>. | ||
19 | * | ||
20 | *****************************************************************************/ | ||
21 | |||
22 | #include "PeriodicSubscriptions.h" | ||
23 | |||
24 | #include "Channel.h" | ||
25 | #include "Subscription.h" | ||
26 | |||
27 | #include <cassert> | ||
28 | #include <cstring> | ||
29 | #include <pdcom5/Exception.h> | ||
30 | #include <sstream> | ||
31 | |||
32 | using PdCom::impl::MsrProto::DataDecoder; | ||
33 | using PdCom::impl::MsrProto::PeriodicSubscription; | ||
34 | using PdCom::impl::MsrProto::PeriodicSubscriptionsBase; | ||
35 | using PdCom::impl::MsrProto::PeriodicSubscriptionsWithGroup; | ||
36 | using PdCom::impl::MsrProto::PeriodicSubscriptionWithoutGroup; | ||
37 | |||
38 | using PdCom::impl::MsrProto::Variable; | ||
39 | |||
40 | |||
41 | 10 | PeriodicSubscriptionsWithGroup::transaction PeriodicSubscriptionsWithGroup::add( | |
42 | PdCom::impl::Process::PendingCallbackQueue &queue, | ||
43 | bool notify_pending, | ||
44 | const std::shared_ptr<PeriodicSubscription> &s) | ||
45 | { | ||
46 |
2/4✗ Branch 1 not taken.
✓ Branch 2 taken 10 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 10 times.
|
10 | if (locked_) |
47 | ✗ | throw PdCom::InternalError("PeriodicSubscription is locked!"); | |
48 | 20 | const auto var_ptr = s->variable_.lock(); | |
49 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 10 times.
|
10 | if (!var_ptr) |
50 | ✗ | throw EmptyVariable(); | |
51 | 10 | const auto &var = static_cast<Channel const &>(*var_ptr); | |
52 |
1/2✓ Branch 11 taken 10 times.
✗ Branch 12 not taken.
|
10 | const auto group_id = map_.find(s->subscriber_.getTransmission()); |
53 |
2/4✓ Branch 3 taken 10 times.
✗ Branch 4 not taken.
✓ Branch 7 taken 10 times.
✗ Branch 8 not taken.
|
20 | auto map_it = map_.at(group_id).find(var.index_); |
54 |
2/4✓ Branch 3 taken 10 times.
✗ Branch 4 not taken.
✓ Branch 10 taken 10 times.
✗ Branch 11 not taken.
|
10 | if (map_it == map_.at(group_id).end()) { |
55 | // not yet subscribed | ||
56 | 10 | unsigned decimation = s->subscriber_.getTransmission().getInterval() | |
57 | 10 | / var.sample_time_.count() | |
58 | 10 | + 0.5; | |
59 | 20 | decimation += s->subscriber_.getTransmission().getInterval() > 0.0 | |
60 |
2/4✓ Branch 0 taken 10 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 10 times.
|
10 | and !decimation; |
61 | 10 | unsigned blocksize = 0; | |
62 | |||
63 |
1/2✓ Branch 0 taken 10 times.
✗ Branch 1 not taken.
|
10 | if (decimation > 0) { |
64 | 10 | blocksize = | |
65 | 10 | 1.0 / var.sample_time_.count() / decimation / 25.0 + 0.5; | |
66 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 10 times.
|
10 | blocksize = std::min(var.bufsize / decimation, blocksize); |
67 | } | ||
68 | |||
69 |
2/2✓ Branch 0 taken 9 times.
✓ Branch 1 taken 1 times.
|
10 | if (!blocksize) |
70 | 9 | blocksize = 1; | |
71 |
1/2✓ Branch 3 taken 10 times.
✗ Branch 4 not taken.
|
20 | time_vector_.resize( |
72 | 20 | std::max<size_t>(time_vector_.size(), blocksize + 2)); | |
73 | |||
74 |
6/12✓ Branch 3 taken 10 times.
✗ Branch 4 not taken.
✓ Branch 14 taken 10 times.
✗ Branch 15 not taken.
✓ Branch 18 taken 10 times.
✗ Branch 19 not taken.
✓ Branch 22 taken 10 times.
✗ Branch 23 not taken.
✓ Branch 33 taken 10 times.
✓ Branch 34 taken 10 times.
✗ Branch 45 not taken.
✗ Branch 46 not taken.
|
10 | map_.at(group_id).insert({var.index_, {blocksize, decimation, {{s}}}}); |
75 | |||
76 |
1/2✓ Branch 0 taken 10 times.
✗ Branch 1 not taken.
|
10 | if (notify_pending) |
77 |
1/2✓ Branch 8 taken 10 times.
✗ Branch 9 not taken.
|
10 | queue.push_back({s, PdCom::Subscription::State::Pending}); |
78 | 10 | return {true, group_id, decimation, blocksize}; | |
79 | } | ||
80 | else { | ||
81 | ✗ | if (notify_pending | |
82 | ✗ | or map_it->second.subscriptions_.currentState | |
83 | != PdCom::Subscription::State::Pending) | ||
84 | ✗ | queue.push_back({s, map_it->second.subscriptions_.currentState}); | |
85 | ✗ | map_it->second.subscriptions_.insert(s); | |
86 | ✗ | return {false, group_id, map_it->second.decimation_, | |
87 | ✗ | map_it->second.blocksize_}; | |
88 | } | ||
89 | } | ||
90 | |||
91 | PeriodicSubscriptionsWithGroup::transaction | ||
92 | ✗ | PeriodicSubscriptionsWithGroup::remove( | |
93 | const Variable &var, | ||
94 | PdCom::Transmission tm) | ||
95 | { | ||
96 | // remove'ing is okay with locked ps, | ||
97 | // because the subscription map is _not_ deleted | ||
98 | // and the forward map _not_ altered. So pointers stay valid. | ||
99 | ✗ | const auto group_id = map_.erase(tm, var.index_); | |
100 | ✗ | return {group_id != 0, group_id, 0, 0}; | |
101 | } | ||
102 | |||
103 | 89 | PdCom::impl::MsrProto::PeriodicSubscriptionsBase::DataReceiveHandle:: | |
104 | DataReceiveHandle( | ||
105 | PeriodicSubscriptionsBase &ps, | ||
106 | ChannelSubscriptionMap &subscription_map, | ||
107 | VariableCache::Lock variable_cache_lock, | ||
108 | std::chrono::nanoseconds ts, | ||
109 | 89 | bool consistent_blocksize) : | |
110 | ps_(&ps), | ||
111 | 89 | variable_cache_lock_(std::move(variable_cache_lock)), | |
112 | subscription_map_(&subscription_map), | ||
113 | ts_(ts), | ||
114 | 178 | consistent_blocksize_(consistent_blocksize) | |
115 | { | ||
116 |
2/4✗ Branch 1 not taken.
✓ Branch 2 taken 89 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 89 times.
|
89 | if (ps.locked_) |
117 | throw PdCom::InternalError( | ||
118 | ✗ | "Receiving periodic data already in progress"); | |
119 | 89 | ps.locked_ = true; | |
120 | 89 | } | |
121 | |||
122 | 93 | void PeriodicSubscriptionsBase::DataReceiveHandle::readData( | |
123 | const PdCom::impl::MsrProto::Channel &c, | ||
124 | const char *data) | ||
125 | { | ||
126 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 93 times.
|
93 | if (!ps_) { |
127 | ✗ | throw PdCom::InternalError("Operating on invalid DataReceiveHandle"); | |
128 | } | ||
129 |
1/2✓ Branch 2 taken 93 times.
✗ Branch 3 not taken.
|
93 | const Base64Info bi(data); |
130 | |||
131 |
3/4✓ Branch 4 taken 93 times.
✗ Branch 5 not taken.
✓ Branch 8 taken 92 times.
✓ Branch 9 taken 1 times.
|
185 | const auto sub_info_it = subscription_map_->find(c.index_); |
132 |
2/2✓ Branch 7 taken 1 times.
✓ Branch 8 taken 92 times.
|
93 | if (sub_info_it == subscription_map_->end()) |
133 | 1 | return; | |
134 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 92 times.
|
184 | if (sub_info_it->second.subscriptions_.currentState |
135 | 92 | != PdCom::Subscription::State::Active) | |
136 | ✗ | return; | |
137 |
3/4✗ Branch 1 not taken.
✓ Branch 2 taken 92 times.
✓ Branch 3 taken 45 times.
✓ Branch 4 taken 47 times.
|
92 | if (!consistent_blocksize_) { |
138 | 45 | current_blocksize_ = std::min<int>( | |
139 | 90 | sub_info_it->second.blocksize_, | |
140 |
1/2✗ Branch 5 not taken.
✓ Branch 6 taken 45 times.
|
135 | bi.encodedDataLength_ / c.type_info->element_size); |
141 | 45 | max_read_blocks_ = std::max(max_read_blocks_, current_blocksize_); | |
142 | } | ||
143 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 47 times.
|
47 | else if (current_blocksize_ == -1) |
144 | ✗ | current_blocksize_ = sub_info_it->second.blocksize_; | |
145 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 47 times.
|
47 | else if ( |
146 | 47 | current_blocksize_ | |
147 | 47 | != static_cast<int>(sub_info_it->second.blocksize_)) | |
148 | ✗ | throw ProtocolError("Blocksize mismatch"); | |
149 |
4/6✓ Branch 5 taken 92 times.
✗ Branch 6 not taken.
✓ Branch 10 taken 92 times.
✗ Branch 11 not taken.
✓ Branch 13 taken 92 times.
✓ Branch 14 taken 1 times.
|
276 | variable_cache_lock_[c.index_].readFromBase64( |
150 | 184 | data, bi.base64Length_, c, current_blocksize_); | |
151 | } | ||
152 | |||
153 | |||
154 | 47 | void PeriodicSubscriptionsBase::DataReceiveHandle::readTimestamps( | |
155 | const char *data) | ||
156 | { | ||
157 | static_assert( | ||
158 | sizeof(decltype(time_vector_)::value_type) == sizeof(int64_t), | ||
159 | "time_vector_ is not size compatible to time tag"); | ||
160 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 47 times.
|
47 | if (!ps_) { |
161 | ✗ | throw PdCom::InternalError("Operating on invalid DataReceiveHandle"); | |
162 | } | ||
163 |
2/4✓ Branch 0 taken 47 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 47 times.
|
47 | if (!data || !*data) |
164 | ✗ | throw PdCom::ProtocolError("Tag does not contain data"); | |
165 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 47 times.
|
47 | const auto b64len = ::strlen(data); |
166 |
1/2✓ Branch 1 taken 47 times.
✗ Branch 2 not taken.
|
47 | if (current_blocksize_ == -1) { |
167 | 47 | current_blocksize_ = b64len * 3 / 4 / sizeof(int64_t); | |
168 | 47 | max_read_blocks_ = current_blocksize_; | |
169 | } | ||
170 | 47 | consistent_blocksize_ = true; | |
171 | |||
172 |
1/2✓ Branch 5 taken 47 times.
✗ Branch 6 not taken.
|
94 | ps_->time_vector_.resize( |
173 | 94 | std::max<size_t>(current_blocksize_ + 1, ps_->time_vector_.size())); | |
174 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 47 times.
|
94 | if (!readFromBase64( |
175 | 47 | reinterpret_cast<char *>(ps_->time_vector_.data()), data, | |
176 | 47 | b64len, sizeof(int64_t) * current_blocksize_)) { | |
177 | ✗ | throw ProtocolError("Invalid <time> child of <data>"); | |
178 | } | ||
179 | 47 | timestamps_read_ = true; | |
180 | 47 | } | |
181 | |||
182 | ✗ | void PeriodicSubscriptionsWithGroup::dump( | |
183 | std::vector<PdCom::Process::SubscriptionInfo> &ans, | ||
184 | const std::unordered_map<unsigned, Channel *> &channels) const | ||
185 | { | ||
186 | ✗ | map_.dump(ans, channels); | |
187 | } | ||
188 | |||
189 | 89 | void PeriodicSubscriptionsBase::DataReceiveHandle::endNewDataRecieving() | |
190 | { | ||
191 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 89 times.
|
89 | if (!ps_) { |
192 | ✗ | throw PdCom::InternalError("Operating on invalid DataReceiveHandle"); | |
193 | } | ||
194 | // make sure to release the variable cache lock | ||
195 |
2/2✓ Branch 7 taken 88 times.
✓ Branch 8 taken 1 times.
|
177 | auto cache = std::move(variable_cache_lock_); |
196 | // as this is the final call on receiving a new dataset, | ||
197 | // make sure to erase the ps pointer to mark us invalid afterwards | ||
198 | const struct PsDeleter | ||
199 | { | ||
200 | std::unique_ptr<PeriodicSubscriptionsBase, Unlocker> &ps_; | ||
201 | 89 | ~PsDeleter() { ps_.reset(); } | |
202 | 89 | PsDeleter(std::unique_ptr<PeriodicSubscriptionsBase, Unlocker> &ps) : | |
203 | 89 | ps_(ps) | |
204 | 89 | {} | |
205 |
2/2✓ Branch 4 taken 88 times.
✓ Branch 5 taken 1 times.
|
177 | } psdeleter_ {ps_}; |
206 | // subscriptions can dissappear during <data> tag parsing, so don't | ||
207 | // assume that subscriptions_ is not empty | ||
208 |
5/6✓ Branch 1 taken 89 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1 times.
✓ Branch 5 taken 88 times.
✓ Branch 6 taken 1 times.
✓ Branch 7 taken 88 times.
|
89 | if (!cache or current_blocksize_ == -1) |
209 | 1 | return; | |
210 | |||
211 |
2/2✓ Branch 1 taken 94 times.
✓ Branch 2 taken 88 times.
|
182 | for (int block = 0; block < max_read_blocks_; ++block) { |
212 |
2/2✓ Branch 7 taken 129 times.
✓ Branch 8 taken 94 times.
|
223 | for (const auto &sub_map : *subscription_map_) { |
213 |
1/2✓ Branch 2 taken 129 times.
✗ Branch 3 not taken.
|
129 | const DataDecoder *var = cache.filled_cache(sub_map.first); |
214 | // no data recieved for given variable | ||
215 |
2/2✓ Branch 0 taken 98 times.
✓ Branch 1 taken 31 times.
|
160 | if (!var |
216 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 98 times.
|
98 | || sub_map.second.subscriptions_.currentState |
217 | != PdCom::Subscription::State::Active) | ||
218 | 31 | continue; | |
219 |
1/2✓ Branch 4 taken 98 times.
✗ Branch 5 not taken.
|
98 | sendDataToSubscribers(*var, sub_map.second, block); |
220 | } | ||
221 |
1/2✓ Branch 4 taken 94 times.
✗ Branch 5 not taken.
|
94 | ps_->subscriber_notifier_.notify(); |
222 | 94 | ps_->subscriber_notifier_.clear(); | |
223 | } | ||
224 |
2/2✓ Branch 2 taken 88 times.
✓ Branch 3 taken 1 times.
|
88 | current_blocksize_ = -1; |
225 | } | ||
226 | |||
227 | 98 | void PeriodicSubscriptionsBase::DataReceiveHandle::sendDataToSubscribers( | |
228 | const DataDecoder &var, | ||
229 | const ChannelSubscripion &sub_map, | ||
230 | const int block) | ||
231 | { | ||
232 | 98 | bool size_check_done = false; | |
233 |
5/8✓ Branch 6 taken 98 times.
✓ Branch 7 taken 98 times.
✓ Branch 12 taken 98 times.
✗ Branch 13 not taken.
✓ Branch 15 taken 98 times.
✗ Branch 16 not taken.
✓ Branch 19 taken 98 times.
✗ Branch 20 not taken.
|
196 | for (const auto &weak_sub : sub_map.subscriptions_) { |
234 |
2/4✓ Branch 3 taken 98 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 98 times.
✗ Branch 7 not taken.
|
196 | if (const auto sub = weak_sub.lock()) { |
235 |
2/4✓ Branch 5 taken 98 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 98 times.
✗ Branch 9 not taken.
|
196 | const auto var_ptr = sub->variable_.lock(); |
236 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 98 times.
|
98 | if (!var_ptr) |
237 | ✗ | throw EmptyVariable(); | |
238 |
1/2✓ Branch 0 taken 98 times.
✗ Branch 1 not taken.
|
98 | if (!size_check_done) { |
239 |
2/4✗ Branch 1 not taken.
✓ Branch 2 taken 98 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 98 times.
|
196 | if (consistent_blocksize_ |
240 |
3/4✓ Branch 0 taken 53 times.
✓ Branch 1 taken 45 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 53 times.
|
151 | && var.size() < sub_map.blocksize_ |
241 |
1/2✓ Branch 5 taken 53 times.
✗ Branch 6 not taken.
|
53 | * var_ptr->totalSizeInBytes()) { |
242 | ✗ | std::ostringstream os; | |
243 | os << "size mismatch in variable cache, " | ||
244 | ✗ | "expected " | |
245 | ✗ | << sub_map.blocksize_ * var_ptr->totalSizeInBytes() | |
246 | ✗ | << ", got " << var.size(); | |
247 | ✗ | throw ProtocolError(os.str()); | |
248 | } | ||
249 |
2/4✓ Branch 7 taken 98 times.
✗ Branch 8 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 98 times.
|
98 | else if (var.size() < (block + 1) * var_ptr->totalSizeInBytes()) |
250 | ✗ | return; | |
251 | 98 | size_check_done = true; | |
252 | } | ||
253 |
2/4✓ Branch 11 taken 98 times.
✗ Branch 12 not taken.
✓ Branch 14 taken 98 times.
✗ Branch 15 not taken.
|
98 | sub->readData(var.data() + block * var_ptr->totalSizeInBytes()); |
254 | 98 | std::chrono::nanoseconds ts; | |
255 |
3/4✗ Branch 1 not taken.
✓ Branch 2 taken 98 times.
✓ Branch 3 taken 53 times.
✓ Branch 4 taken 45 times.
|
98 | if (timestamps_read_) |
256 |
1/2✓ Branch 4 taken 53 times.
✗ Branch 5 not taken.
|
53 | ts = ps_->time_vector_.at(block); |
257 | else | ||
258 | 90 | ts = ts_ | |
259 | 90 | + block | |
260 |
1/2✓ Branch 1 taken 45 times.
✗ Branch 2 not taken.
|
90 | * std::chrono::duration_cast< |
261 |
1/2✓ Branch 1 taken 45 times.
✗ Branch 2 not taken.
|
90 | std::chrono::nanoseconds>( |
262 |
1/2✓ Branch 9 taken 45 times.
✗ Branch 10 not taken.
|
135 | var_ptr->getSampleTime()) |
263 | 135 | / sub_map.decimation_; | |
264 |
2/4✓ Branch 8 taken 98 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 98 times.
✗ Branch 12 not taken.
|
98 | ps_->subscriber_notifier_.insert(sub->subscriber_, ts); |
265 | } | ||
266 | } | ||
267 | } | ||
268 | |||
269 | PeriodicSubscriptionsBase::GroupId | ||
270 | 10 | PeriodicSubscriptionsWithGroup::GroupMap::find(PdCom::Transmission t) | |
271 | { | ||
272 | // does a group already exists for given transmission? | ||
273 |
1/2✓ Branch 2 taken 10 times.
✗ Branch 3 not taken.
|
20 | const auto it = backward_.find(t); |
274 |
1/2✗ Branch 6 not taken.
✓ Branch 7 taken 10 times.
|
10 | if (it != backward_.end()) { |
275 | ✗ | if (forward_.at(it->second - 1).state | |
276 | ✗ | == ChannelSubscriptionMapImpl::Taken) | |
277 | ✗ | return it->second; | |
278 | } | ||
279 | // find a free one; | ||
280 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 10 times.
|
10 | for (unsigned int i = 0; i < forward_.size(); ++i) { |
281 | ✗ | if (forward_[i].state == ChannelSubscriptionMapImpl::Empty) { | |
282 | ✗ | const auto group_id = i + 1; | |
283 | ✗ | backward_[t] = group_id; | |
284 | ✗ | forward_[i].state = ChannelSubscriptionMapImpl::Taken; | |
285 | ✗ | return group_id; | |
286 | } | ||
287 | } | ||
288 | // allocate one at the back; | ||
289 |
1/2✓ Branch 2 taken 10 times.
✗ Branch 3 not taken.
|
10 | forward_.emplace_back(); |
290 | 10 | forward_.back().state = ChannelSubscriptionMapImpl::Taken; | |
291 |
1/2✓ Branch 4 taken 10 times.
✗ Branch 5 not taken.
|
10 | const auto group_id = backward_[t] = forward_.size(); |
292 | 10 | return group_id; | |
293 | } | ||
294 | |||
295 | PeriodicSubscriptionsBase::GroupId | ||
296 | ✗ | PeriodicSubscriptionsWithGroup::GroupMap::erase( | |
297 | PdCom::Transmission t, | ||
298 | PeriodicSubscriptionsBase::ChannelId channel_idx) | ||
299 | { | ||
300 | ✗ | const auto backward_it = backward_.find(t); | |
301 | ✗ | if (backward_it == backward_.end()) { | |
302 | ✗ | assert(backward_it != backward_.end() | |
303 | and "Inconsistent state, backward_ and forward_ out of sync"); | ||
304 | ✗ | return 0; | |
305 | } | ||
306 | ✗ | auto &channel_map = at(backward_it->second); | |
307 | ✗ | const auto map_it = channel_map.find(channel_idx); | |
308 | // already cleaned? if so, nothing to do. | ||
309 | ✗ | if (map_it == channel_map.end()) | |
310 | ✗ | return 0; | |
311 | // clean subscription map | ||
312 | const bool no_channel_subs_left = | ||
313 | ✗ | map_it->second.subscriptions_.remove_expired(); | |
314 | // if channels left, also nothing to do. | ||
315 | ✗ | if (!no_channel_subs_left) | |
316 | ✗ | return 0; | |
317 | // no subscriptions for this channel is left, we have to remember the | ||
318 | // group id | ||
319 | ✗ | const auto ans = backward_it->second; | |
320 | // now check whether any subscriptions are left for this transmission | ||
321 | ✗ | if (channel_map.guaranteed_empty()) { | |
322 | // group id no longer in use, free up channel map | ||
323 | ✗ | channel_map.state = ChannelSubscriptionMapImpl::AboutToBeDeleted; | |
324 | ✗ | backward_.erase(backward_it); | |
325 | } | ||
326 | ✗ | return ans; | |
327 | } | ||
328 | |||
329 | 10 | void PeriodicSubscriptionsWithGroup::subscribeWasConfirmed( | |
330 | PdCom::impl::Process::PendingCallbackQueue &q, | ||
331 | PeriodicSubscriptionsWithGroup::GroupId group_id, | ||
332 | PeriodicSubscriptionsWithGroup::ChannelId channel_id) | ||
333 | { | ||
334 |
1/2✓ Branch 2 taken 10 times.
✗ Branch 3 not taken.
|
10 | auto &sub_map = map_.at(group_id); |
335 |
2/4✓ Branch 2 taken 10 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 10 times.
✗ Branch 6 not taken.
|
20 | const auto it = sub_map.find(channel_id); |
336 |
1/2✗ Branch 6 not taken.
✓ Branch 7 taken 10 times.
|
10 | if (it == sub_map.end()) |
337 | ✗ | return; | |
338 | |||
339 |
2/4✓ Branch 3 taken 10 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 10 times.
✗ Branch 7 not taken.
|
10 | it->second.subscriptions_.broadcastState( |
340 | PdCom::Subscription::State::Active, q); | ||
341 | } | ||
342 | |||
343 | ✗ | void PeriodicSubscriptionsWithGroup::GroupMap::dump( | |
344 | std::vector<PdCom::Process::SubscriptionInfo> &ans, | ||
345 | const std::unordered_map<unsigned, Channel *> &channels) const | ||
346 | { | ||
347 | ✗ | for (const auto &all_groups : forward_) { | |
348 | ✗ | for (const auto &sub_map : all_groups) { | |
349 | ✗ | const auto var = impl::Variable::toUApi( | |
350 | ✗ | std::static_pointer_cast<const MsrProto::Variable>( | |
351 | ✗ | channels.at(sub_map.first)->shared_from_this())); | |
352 | ✗ | for (const auto &weak_sub : sub_map.second.subscriptions_) { | |
353 | ✗ | if (const auto sub = weak_sub.lock()) { | |
354 | ✗ | ans.emplace_back(sub->This_, &sub->subscriber_, var); | |
355 | } | ||
356 | } | ||
357 | } | ||
358 | } | ||
359 | } | ||
360 | |||
361 | PeriodicSubscriptionWithoutGroup::transaction | ||
362 | 4 | PeriodicSubscriptionWithoutGroup::add( | |
363 | Process::PendingCallbackQueue &q, | ||
364 | bool notify_pending, | ||
365 | const std::shared_ptr<PeriodicSubscription> &s) | ||
366 | { | ||
367 |
2/4✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 4 times.
|
4 | if (locked_) |
368 | ✗ | throw PdCom::InternalError("PeriodicSubscription is locked!"); | |
369 | 8 | const auto var_ptr = s->variable_.lock(); | |
370 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
|
4 | if (!var_ptr) |
371 | ✗ | throw EmptyVariable(); | |
372 | 4 | const auto &var = static_cast<Channel const &>(*var_ptr); | |
373 |
1/2✓ Branch 3 taken 4 times.
✗ Branch 4 not taken.
|
8 | auto map_it = map_.find(var.index_); |
374 | |||
375 | |||
376 | 4 | unsigned decimation = s->subscriber_.getTransmission().getInterval() | |
377 | 4 | / var.sample_time_.count() | |
378 | 4 | + 0.5; | |
379 | 8 | decimation += s->subscriber_.getTransmission().getInterval() > 0.0 | |
380 |
2/4✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 4 times.
|
4 | and !decimation; |
381 | 4 | unsigned blocksize = 0; | |
382 | |||
383 |
1/2✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
|
4 | if (decimation > 0) { |
384 | 4 | blocksize = 1.0 / var.sample_time_.count() / decimation / 25.0 + 0.5; | |
385 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
|
4 | blocksize = std::min(var.bufsize / decimation, blocksize); |
386 | } | ||
387 | |||
388 |
1/2✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
|
4 | if (!blocksize) |
389 | 4 | blocksize = 1; | |
390 |
1/2✓ Branch 10 taken 4 times.
✗ Branch 11 not taken.
|
4 | time_vector_.resize(std::max<size_t>(time_vector_.size(), blocksize + 2)); |
391 | |||
392 |
2/2✓ Branch 6 taken 3 times.
✓ Branch 7 taken 1 times.
|
4 | if (map_it == map_.end()) { |
393 | // not yet subscribed | ||
394 |
5/10✓ Branch 10 taken 3 times.
✗ Branch 11 not taken.
✓ Branch 14 taken 3 times.
✗ Branch 15 not taken.
✓ Branch 18 taken 3 times.
✗ Branch 19 not taken.
✓ Branch 29 taken 3 times.
✓ Branch 30 taken 3 times.
✗ Branch 41 not taken.
✗ Branch 42 not taken.
|
3 | map_.insert({var.index_, {blocksize, decimation, {{s}}}}); |
395 | |||
396 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
|
3 | if (notify_pending) |
397 | ✗ | q.push_back({s, PdCom::Subscription::State::Pending}); | |
398 | 3 | return {true, decimation, blocksize}; | |
399 | } | ||
400 |
1/2✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
|
1 | else if (decimation < map_it->second.decimation_) { |
401 | // we have to resubscribe | ||
402 | 1 | map_it->second.subscriptions_.currentState = | |
403 | PdCom::Subscription::State::Pending; | ||
404 | |||
405 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (notify_pending) |
406 | ✗ | q.push_back({s, PdCom::Subscription::State::Pending}); | |
407 | 2 | auto subs = std::move(map_it->second.subscriptions_); | |
408 |
1/2✓ Branch 5 taken 1 times.
✗ Branch 6 not taken.
|
1 | subs.insert(s); |
409 |
1/2✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
|
1 | map_.erase(map_it); |
410 |
2/4✓ Branch 9 taken 1 times.
✗ Branch 10 not taken.
✓ Branch 13 taken 1 times.
✗ Branch 14 not taken.
|
1 | map_.insert({var.index_, {blocksize, decimation, std::move(subs)}}); |
411 | 1 | return {true, decimation, blocksize}; | |
412 | } | ||
413 | else { | ||
414 | ✗ | if (notify_pending | |
415 | ✗ | or map_it->second.subscriptions_.currentState | |
416 | != PdCom::Subscription::State::Pending) | ||
417 | ✗ | q.push_back({s, map_it->second.subscriptions_.currentState}); | |
418 | ✗ | map_it->second.subscriptions_.insert(s); | |
419 | ✗ | return {false, map_it->second.decimation_, map_it->second.blocksize_}; | |
420 | } | ||
421 | } | ||
422 | |||
423 | PeriodicSubscriptionWithoutGroup::transaction | ||
424 | 2 | PeriodicSubscriptionWithoutGroup::remove(const Variable &_var) | |
425 | { | ||
426 |
1/2✓ Branch 3 taken 2 times.
✗ Branch 4 not taken.
|
4 | const auto it = map_.find(_var.index_); |
427 |
7/10✓ Branch 4 taken 2 times.
✗ Branch 5 not taken.
✓ Branch 9 taken 1 times.
✓ Branch 10 taken 1 times.
✓ Branch 11 taken 2 times.
✗ Branch 12 not taken.
✓ Branch 14 taken 2 times.
✗ Branch 15 not taken.
✓ Branch 17 taken 1 times.
✓ Branch 18 taken 1 times.
|
2 | if (it == map_.end() || !it->second.subscriptions_.remove_expired()) |
428 | // not found or still subscribed by others | ||
429 | 1 | return {false, 0, 0}; | |
430 | else { | ||
431 | // now empty | ||
432 |
1/2✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
|
1 | map_.erase(it); |
433 | 1 | return {true, 0, 0}; | |
434 | } | ||
435 | } | ||
436 | |||
437 | 4 | void PeriodicSubscriptionWithoutGroup::subscribeWasConfirmed( | |
438 | Process::PendingCallbackQueue &q, | ||
439 | ChannelId channel_id) | ||
440 | { | ||
441 |
2/4✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 4 times.
✗ Branch 6 not taken.
|
8 | const auto it = map_.find(channel_id); |
442 |
1/2✗ Branch 6 not taken.
✓ Branch 7 taken 4 times.
|
4 | if (it == map_.end()) |
443 | ✗ | return; | |
444 | |||
445 |
2/4✓ Branch 3 taken 4 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 4 times.
✗ Branch 7 not taken.
|
4 | it->second.subscriptions_.broadcastState( |
446 | PdCom::Subscription::State::Active, q); | ||
447 | } | ||
448 | |||
449 | ✗ | void PeriodicSubscriptionWithoutGroup::dump( | |
450 | std::vector<PdCom::Process::SubscriptionInfo> &ans, | ||
451 | const std::unordered_map<unsigned, Channel *> &channels) const | ||
452 | { | ||
453 | ✗ | for (const auto &sub_map : map_) { | |
454 | ✗ | const auto var = impl::Variable::toUApi( | |
455 | ✗ | std::static_pointer_cast<const MsrProto::Variable>( | |
456 | ✗ | channels.at(sub_map.first)->shared_from_this())); | |
457 | ✗ | for (const auto &weak_sub : sub_map.second.subscriptions_) { | |
458 | ✗ | if (const auto sub = weak_sub.lock()) { | |
459 | ✗ | ans.emplace_back(sub->This_, &sub->subscriber_, var); | |
460 | } | ||
461 | } | ||
462 | } | ||
463 | } | ||
464 |