Directory: | ./ |
---|---|
File: | src/msrproto/PeriodicSubscriptions.cpp |
Date: | 2024-03-27 13:09:52 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 193 | 248 | 77.8% |
Branches: | 179 | 400 | 44.8% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /***************************************************************************** | ||
2 | * | ||
3 | * Copyright (C) 2021 Bjarne von Horn (vh at igh dot de) | ||
4 | * | ||
5 | * This file is part of the PdCom library. | ||
6 | * | ||
7 | * The PdCom library is free software: you can redistribute it and/or modify | ||
8 | * it under the terms of the GNU Lesser General Public License as published by | ||
9 | * the Free Software Foundation, either version 3 of the License, or (at your | ||
10 | * option) any later version. | ||
11 | * | ||
12 | * The PdCom library is distributed in the hope that it will be useful, but | ||
13 | * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY | ||
14 | * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public | ||
15 | * License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public License | ||
18 | * along with the PdCom library. If not, see <http://www.gnu.org/licenses/>. | ||
19 | * | ||
20 | *****************************************************************************/ | ||
21 | |||
22 | #include "PeriodicSubscriptions.h" | ||
23 | |||
24 | #include "Channel.h" | ||
25 | #include "Subscription.h" | ||
26 | |||
27 | #include <cassert> | ||
28 | #include <cstring> | ||
29 | #include <pdcom5/Exception.h> | ||
30 | #include <sstream> | ||
31 | |||
32 | using PdCom::impl::MsrProto::DataDecoder; | ||
33 | using PdCom::impl::MsrProto::PeriodicSubscription; | ||
34 | using PdCom::impl::MsrProto::PeriodicSubscriptionsBase; | ||
35 | using PdCom::impl::MsrProto::PeriodicSubscriptionsWithGroup; | ||
36 | using PdCom::impl::MsrProto::PeriodicSubscriptionWithoutGroup; | ||
37 | |||
38 | using PdCom::impl::MsrProto::Variable; | ||
39 | |||
40 | |||
41 | 48 | PeriodicSubscriptionsWithGroup::transaction PeriodicSubscriptionsWithGroup::add( | |
42 | PdCom::impl::Process::PendingCallbackQueue &queue, | ||
43 | bool notify_pending, | ||
44 | const std::shared_ptr<PeriodicSubscription> &s) | ||
45 | { | ||
46 |
2/4✗ Branch 1 not taken.
✓ Branch 2 taken 48 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 48 times.
|
48 | if (locked_) |
47 | ✗ | throw PdCom::InternalError("PeriodicSubscription is locked!"); | |
48 | 96 | const auto var_ptr = s->variable_.lock(); | |
49 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 48 times.
|
48 | if (!var_ptr) |
50 | ✗ | throw EmptyVariable(); | |
51 | 48 | const auto &var = static_cast<Channel const &>(*var_ptr); | |
52 |
1/2✓ Branch 11 taken 48 times.
✗ Branch 12 not taken.
|
48 | const auto group_id = map_.find(s->subscriber_.getTransmission()); |
53 |
2/4✓ Branch 3 taken 48 times.
✗ Branch 4 not taken.
✓ Branch 7 taken 48 times.
✗ Branch 8 not taken.
|
96 | auto map_it = map_.at(group_id).find(var.index_); |
54 |
3/4✓ Branch 3 taken 48 times.
✗ Branch 4 not taken.
✓ Branch 10 taken 39 times.
✓ Branch 11 taken 9 times.
|
48 | if (map_it == map_.at(group_id).end()) { |
55 | // not yet subscribed | ||
56 | 39 | unsigned decimation = s->subscriber_.getTransmission().getInterval() | |
57 | 39 | / var.sample_time_.count() | |
58 | 39 | + 0.5; | |
59 | 78 | decimation += s->subscriber_.getTransmission().getInterval() > 0.0 | |
60 |
3/4✓ Branch 0 taken 39 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
✓ Branch 3 taken 37 times.
|
39 | and !decimation; |
61 | 39 | unsigned blocksize = 0; | |
62 | |||
63 |
1/2✓ Branch 0 taken 39 times.
✗ Branch 1 not taken.
|
39 | if (decimation > 0) { |
64 | 39 | blocksize = | |
65 | 39 | 1.0 / var.sample_time_.count() / decimation / 25.0 + 0.5; | |
66 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 39 times.
|
39 | blocksize = std::min(var.bufsize / decimation, blocksize); |
67 | 39 | blocksize = std::min(blocksize, 1000U); | |
68 | } | ||
69 | |||
70 |
2/2✓ Branch 0 taken 36 times.
✓ Branch 1 taken 3 times.
|
39 | if (!blocksize) |
71 | 36 | blocksize = 1; | |
72 |
1/2✓ Branch 3 taken 39 times.
✗ Branch 4 not taken.
|
78 | time_vector_.resize( |
73 | 78 | std::max<size_t>(time_vector_.size(), blocksize + 2)); | |
74 | |||
75 |
6/12✓ Branch 3 taken 39 times.
✗ Branch 4 not taken.
✓ Branch 14 taken 39 times.
✗ Branch 15 not taken.
✓ Branch 18 taken 39 times.
✗ Branch 19 not taken.
✓ Branch 22 taken 39 times.
✗ Branch 23 not taken.
✓ Branch 33 taken 39 times.
✓ Branch 34 taken 39 times.
✗ Branch 45 not taken.
✗ Branch 46 not taken.
|
39 | map_.at(group_id).insert({var.index_, {blocksize, decimation, {{s}}}}); |
76 | |||
77 |
2/2✓ Branch 0 taken 23 times.
✓ Branch 1 taken 16 times.
|
39 | if (notify_pending) |
78 |
1/2✓ Branch 8 taken 23 times.
✗ Branch 9 not taken.
|
23 | queue.push_back({s, PdCom::Subscription::State::Pending}); |
79 | 39 | return {true, group_id, decimation, blocksize}; | |
80 | } | ||
81 | else { | ||
82 |
2/2✓ Branch 0 taken 8 times.
✓ Branch 1 taken 1 times.
|
9 | if (notify_pending |
83 |
4/4✓ Branch 0 taken 7 times.
✓ Branch 1 taken 2 times.
✓ Branch 4 taken 6 times.
✓ Branch 5 taken 1 times.
|
9 | or map_it->second.subscriptions_.currentState |
84 | != PdCom::Subscription::State::Pending) | ||
85 |
1/2✓ Branch 10 taken 8 times.
✗ Branch 11 not taken.
|
8 | queue.push_back({s, map_it->second.subscriptions_.currentState}); |
86 |
1/2✓ Branch 6 taken 9 times.
✗ Branch 7 not taken.
|
9 | map_it->second.subscriptions_.insert(s); |
87 | 9 | return {false, group_id, map_it->second.decimation_, | |
88 | 18 | map_it->second.blocksize_}; | |
89 | } | ||
90 | } | ||
91 | |||
92 | PeriodicSubscriptionsWithGroup::transaction | ||
93 | 27 | PeriodicSubscriptionsWithGroup::remove( | |
94 | const Variable &var, | ||
95 | PdCom::Transmission tm) | ||
96 | { | ||
97 | // remove'ing is okay with locked ps, | ||
98 | // because the subscription map is _not_ deleted | ||
99 | // and the forward map _not_ altered. So pointers stay valid. | ||
100 | 27 | const auto group_id = map_.erase(tm, var.index_); | |
101 | 27 | return {group_id != 0, group_id, 0, 0}; | |
102 | } | ||
103 | |||
104 | 49 | PdCom::impl::MsrProto::PeriodicSubscriptionsBase::DataReceiveHandle:: | |
105 | DataReceiveHandle( | ||
106 | PeriodicSubscriptionsBase &ps, | ||
107 | ChannelSubscriptionMap &subscription_map, | ||
108 | VariableCache::Lock variable_cache_lock, | ||
109 | std::chrono::nanoseconds ts, | ||
110 | 49 | bool consistent_blocksize) : | |
111 | ps_(&ps), | ||
112 | 49 | variable_cache_lock_(std::move(variable_cache_lock)), | |
113 | subscription_map_(&subscription_map), | ||
114 | ts_(ts), | ||
115 | 98 | consistent_blocksize_(consistent_blocksize) | |
116 | { | ||
117 |
2/4✗ Branch 1 not taken.
✓ Branch 2 taken 49 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 49 times.
|
49 | if (ps.locked_) |
118 | throw PdCom::InternalError( | ||
119 | ✗ | "Receiving periodic data already in progress"); | |
120 | 49 | ps.locked_ = true; | |
121 | 49 | } | |
122 | |||
123 | 51 | void PeriodicSubscriptionsBase::DataReceiveHandle::readData( | |
124 | const PdCom::impl::MsrProto::Channel &c, | ||
125 | const char *data) | ||
126 | { | ||
127 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 51 times.
|
51 | if (!ps_) { |
128 | ✗ | throw PdCom::InternalError("Operating on invalid DataReceiveHandle"); | |
129 | } | ||
130 |
1/2✓ Branch 2 taken 51 times.
✗ Branch 3 not taken.
|
51 | const Base64Info bi(data); |
131 | |||
132 |
3/4✓ Branch 4 taken 51 times.
✗ Branch 5 not taken.
✓ Branch 8 taken 43 times.
✓ Branch 9 taken 8 times.
|
94 | const auto sub_info_it = subscription_map_->find(c.index_); |
133 |
2/2✓ Branch 7 taken 8 times.
✓ Branch 8 taken 43 times.
|
51 | if (sub_info_it == subscription_map_->end()) |
134 | 8 | return; | |
135 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 43 times.
|
86 | if (sub_info_it->second.subscriptions_.currentState |
136 | 43 | != PdCom::Subscription::State::Active) | |
137 | ✗ | return; | |
138 |
3/4✗ Branch 1 not taken.
✓ Branch 2 taken 43 times.
✓ Branch 3 taken 5 times.
✓ Branch 4 taken 38 times.
|
43 | if (!consistent_blocksize_) { |
139 | 5 | current_blocksize_ = std::min<int>( | |
140 | 10 | sub_info_it->second.blocksize_, | |
141 |
1/2✗ Branch 5 not taken.
✓ Branch 6 taken 5 times.
|
15 | bi.encodedDataLength_ / c.type_info->element_size); |
142 | 5 | max_read_blocks_ = std::max(max_read_blocks_, current_blocksize_); | |
143 | } | ||
144 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 38 times.
|
38 | else if (current_blocksize_ == -1) |
145 | ✗ | current_blocksize_ = sub_info_it->second.blocksize_; | |
146 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 38 times.
|
38 | else if ( |
147 | 38 | current_blocksize_ | |
148 | 38 | != static_cast<int>(sub_info_it->second.blocksize_)) | |
149 | ✗ | throw ProtocolError("Blocksize mismatch"); | |
150 |
4/6✓ Branch 5 taken 43 times.
✗ Branch 6 not taken.
✓ Branch 10 taken 43 times.
✗ Branch 11 not taken.
✓ Branch 13 taken 43 times.
✓ Branch 14 taken 8 times.
|
129 | variable_cache_lock_[c.index_].readFromBase64( |
151 | 86 | data, bi.base64Length_, c, current_blocksize_); | |
152 | } | ||
153 | |||
154 | |||
155 | 43 | void PeriodicSubscriptionsBase::DataReceiveHandle::readTimestamps( | |
156 | const char *data) | ||
157 | { | ||
158 | static_assert( | ||
159 | sizeof(decltype(time_vector_)::value_type) == sizeof(int64_t), | ||
160 | "time_vector_ is not size compatible to time tag"); | ||
161 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 43 times.
|
43 | if (!ps_) { |
162 | ✗ | throw PdCom::InternalError("Operating on invalid DataReceiveHandle"); | |
163 | } | ||
164 |
2/4✓ Branch 0 taken 43 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 43 times.
|
43 | if (!data || !*data) |
165 | ✗ | throw PdCom::ProtocolError("Tag does not contain data"); | |
166 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 43 times.
|
43 | const auto b64len = ::strlen(data); |
167 |
1/2✓ Branch 1 taken 43 times.
✗ Branch 2 not taken.
|
43 | if (current_blocksize_ == -1) { |
168 | 43 | current_blocksize_ = b64len * 3 / 4 / sizeof(int64_t); | |
169 | 43 | max_read_blocks_ = current_blocksize_; | |
170 | } | ||
171 | 43 | consistent_blocksize_ = true; | |
172 | |||
173 |
1/2✓ Branch 5 taken 43 times.
✗ Branch 6 not taken.
|
86 | ps_->time_vector_.resize( |
174 | 86 | std::max<size_t>(current_blocksize_ + 1, ps_->time_vector_.size())); | |
175 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 43 times.
|
86 | if (!readFromBase64( |
176 | 43 | reinterpret_cast<char *>(ps_->time_vector_.data()), data, | |
177 | 43 | b64len, sizeof(int64_t) * current_blocksize_)) { | |
178 | ✗ | throw ProtocolError("Invalid <time> child of <data>"); | |
179 | } | ||
180 | 43 | timestamps_read_ = true; | |
181 | 43 | } | |
182 | |||
183 | 1 | void PeriodicSubscriptionsWithGroup::dump( | |
184 | std::vector<PdCom::Process::SubscriptionInfo> &ans, | ||
185 | const std::unordered_map<unsigned, Channel *> &channels) const | ||
186 | { | ||
187 | 1 | map_.dump(ans, channels); | |
188 | 1 | } | |
189 | |||
190 | 47 | void PeriodicSubscriptionsBase::DataReceiveHandle::endNewDataRecieving() | |
191 | { | ||
192 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 47 times.
|
47 | if (!ps_) { |
193 | ✗ | throw PdCom::InternalError("Operating on invalid DataReceiveHandle"); | |
194 | } | ||
195 | // make sure to release the variable cache lock | ||
196 |
1/2✓ Branch 7 taken 47 times.
✗ Branch 8 not taken.
|
94 | auto cache = std::move(variable_cache_lock_); |
197 | // as this is the final call on receiving a new dataset, | ||
198 | // make sure to erase the ps pointer to mark us invalid afterwards | ||
199 | const struct PsDeleter | ||
200 | { | ||
201 | std::unique_ptr<PeriodicSubscriptionsBase, Unlocker> &ps_; | ||
202 | 47 | ~PsDeleter() { ps_.reset(); } | |
203 | 47 | PsDeleter(std::unique_ptr<PeriodicSubscriptionsBase, Unlocker> &ps) : | |
204 | 47 | ps_(ps) | |
205 | 47 | {} | |
206 |
1/2✓ Branch 4 taken 47 times.
✗ Branch 5 not taken.
|
94 | } psdeleter_ {ps_}; |
207 | // subscriptions can dissappear during <data> tag parsing, so don't | ||
208 | // assume that subscriptions_ is not empty | ||
209 |
3/6✓ Branch 1 taken 47 times.
✗ Branch 2 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 47 times.
✗ Branch 6 not taken.
✓ Branch 7 taken 47 times.
|
47 | if (!cache or current_blocksize_ == -1) |
210 | ✗ | return; | |
211 | |||
212 |
2/2✓ Branch 1 taken 60 times.
✓ Branch 2 taken 47 times.
|
107 | for (int block = 0; block < max_read_blocks_; ++block) { |
213 |
2/2✓ Branch 7 taken 63 times.
✓ Branch 8 taken 60 times.
|
123 | for (const auto &sub_map : *subscription_map_) { |
214 |
1/2✓ Branch 2 taken 63 times.
✗ Branch 3 not taken.
|
63 | const DataDecoder *var = cache.filled_cache(sub_map.first); |
215 | // no data recieved for given variable | ||
216 |
2/2✓ Branch 0 taken 62 times.
✓ Branch 1 taken 1 times.
|
64 | if (!var |
217 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 62 times.
|
62 | || sub_map.second.subscriptions_.currentState |
218 | != PdCom::Subscription::State::Active) | ||
219 | 1 | continue; | |
220 |
1/2✓ Branch 4 taken 62 times.
✗ Branch 5 not taken.
|
62 | sendDataToSubscribers(*var, sub_map.second, block); |
221 | } | ||
222 |
1/2✓ Branch 4 taken 60 times.
✗ Branch 5 not taken.
|
60 | ps_->subscriber_notifier_.notify(); |
223 | 60 | ps_->subscriber_notifier_.clear(); | |
224 | } | ||
225 |
1/2✓ Branch 2 taken 47 times.
✗ Branch 3 not taken.
|
47 | current_blocksize_ = -1; |
226 | } | ||
227 | |||
228 | 62 | void PeriodicSubscriptionsBase::DataReceiveHandle::sendDataToSubscribers( | |
229 | const DataDecoder &var, | ||
230 | const ChannelSubscripion &sub_map, | ||
231 | const int block) | ||
232 | { | ||
233 | 62 | bool size_check_done = false; | |
234 |
8/8✓ Branch 6 taken 68 times.
✓ Branch 7 taken 60 times.
✓ Branch 12 taken 60 times.
✓ Branch 13 taken 2 times.
✓ Branch 15 taken 60 times.
✓ Branch 16 taken 2 times.
✓ Branch 19 taken 60 times.
✓ Branch 20 taken 2 times.
|
128 | for (const auto &weak_sub : sub_map.subscriptions_) { |
235 |
3/4✓ Branch 3 taken 68 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 66 times.
✓ Branch 7 taken 2 times.
|
134 | if (const auto sub = weak_sub.lock()) { |
236 |
4/4✓ Branch 5 taken 66 times.
✓ Branch 6 taken 2 times.
✓ Branch 8 taken 66 times.
✓ Branch 9 taken 2 times.
|
134 | const auto var_ptr = sub->variable_.lock(); |
237 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 68 times.
|
68 | if (!var_ptr) |
238 | ✗ | throw EmptyVariable(); | |
239 |
2/2✓ Branch 0 taken 62 times.
✓ Branch 1 taken 6 times.
|
68 | if (!size_check_done) { |
240 |
2/4✗ Branch 1 not taken.
✓ Branch 2 taken 62 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 62 times.
|
124 | if (consistent_blocksize_ |
241 |
3/4✓ Branch 0 taken 47 times.
✓ Branch 1 taken 15 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 47 times.
|
109 | && var.size() < sub_map.blocksize_ |
242 |
1/2✓ Branch 5 taken 47 times.
✗ Branch 6 not taken.
|
47 | * var_ptr->totalSizeInBytes()) { |
243 | ✗ | std::ostringstream os; | |
244 | os << "size mismatch in variable cache, " | ||
245 | ✗ | "expected " | |
246 | ✗ | << sub_map.blocksize_ * var_ptr->totalSizeInBytes() | |
247 | ✗ | << ", got " << var.size(); | |
248 | ✗ | throw ProtocolError(os.str()); | |
249 | } | ||
250 |
3/4✓ Branch 7 taken 62 times.
✗ Branch 8 not taken.
✓ Branch 9 taken 2 times.
✓ Branch 10 taken 60 times.
|
62 | else if (var.size() < (block + 1) * var_ptr->totalSizeInBytes()) |
251 | 2 | return; | |
252 | 60 | size_check_done = true; | |
253 | } | ||
254 |
2/4✓ Branch 11 taken 66 times.
✗ Branch 12 not taken.
✓ Branch 14 taken 66 times.
✗ Branch 15 not taken.
|
66 | sub->readData(var.data() + block * var_ptr->totalSizeInBytes()); |
255 | 66 | std::chrono::nanoseconds ts; | |
256 |
3/4✗ Branch 1 not taken.
✓ Branch 2 taken 66 times.
✓ Branch 3 taken 53 times.
✓ Branch 4 taken 13 times.
|
66 | if (timestamps_read_) |
257 |
1/2✓ Branch 4 taken 53 times.
✗ Branch 5 not taken.
|
53 | ts = ps_->time_vector_.at(block); |
258 | else | ||
259 | 26 | ts = ts_ | |
260 | 52 | + (block - max_read_blocks_ + 1) | |
261 |
1/2✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
|
26 | * std::chrono::duration_cast< |
262 |
1/2✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
|
26 | std::chrono::nanoseconds>( |
263 |
1/2✓ Branch 9 taken 13 times.
✗ Branch 10 not taken.
|
52 | var_ptr->getSampleTime()) |
264 | 39 | * sub_map.decimation_; | |
265 |
3/4✓ Branch 8 taken 66 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 66 times.
✓ Branch 12 taken 2 times.
|
66 | ps_->subscriber_notifier_.insert(sub->subscriber_, ts); |
266 | } | ||
267 | } | ||
268 | } | ||
269 | |||
270 | PeriodicSubscriptionsBase::GroupId | ||
271 | 48 | PeriodicSubscriptionsWithGroup::GroupMap::find(PdCom::Transmission t) | |
272 | { | ||
273 | // does a group already exists for given transmission? | ||
274 |
1/2✓ Branch 2 taken 48 times.
✗ Branch 3 not taken.
|
96 | const auto it = backward_.find(t); |
275 |
2/2✓ Branch 6 taken 17 times.
✓ Branch 7 taken 31 times.
|
48 | if (it != backward_.end()) { |
276 |
3/6✓ Branch 4 taken 17 times.
✗ Branch 5 not taken.
✗ Branch 7 not taken.
✓ Branch 8 taken 17 times.
✓ Branch 9 taken 17 times.
✗ Branch 10 not taken.
|
34 | if (forward_.at(it->second - 1).state |
277 | 17 | == ChannelSubscriptionMapImpl::Taken) | |
278 | 17 | return it->second; | |
279 | } | ||
280 | // find a free one; | ||
281 |
2/2✓ Branch 2 taken 8 times.
✓ Branch 3 taken 30 times.
|
38 | for (unsigned int i = 0; i < forward_.size(); ++i) { |
282 |
3/4✗ Branch 3 not taken.
✓ Branch 4 taken 8 times.
✓ Branch 5 taken 1 times.
✓ Branch 6 taken 7 times.
|
8 | if (forward_[i].state == ChannelSubscriptionMapImpl::Empty) { |
283 | 1 | const auto group_id = i + 1; | |
284 |
1/2✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
|
1 | backward_[t] = group_id; |
285 | 1 | forward_[i].state = ChannelSubscriptionMapImpl::Taken; | |
286 | 1 | return group_id; | |
287 | } | ||
288 | } | ||
289 | // allocate one at the back; | ||
290 |
1/2✓ Branch 2 taken 30 times.
✗ Branch 3 not taken.
|
30 | forward_.emplace_back(); |
291 | 30 | forward_.back().state = ChannelSubscriptionMapImpl::Taken; | |
292 |
1/2✓ Branch 4 taken 30 times.
✗ Branch 5 not taken.
|
30 | const auto group_id = backward_[t] = forward_.size(); |
293 | 30 | return group_id; | |
294 | } | ||
295 | |||
296 | PeriodicSubscriptionsBase::GroupId | ||
297 | 27 | PeriodicSubscriptionsWithGroup::GroupMap::erase( | |
298 | PdCom::Transmission t, | ||
299 | PeriodicSubscriptionsBase::ChannelId channel_idx) | ||
300 | { | ||
301 |
1/2✓ Branch 2 taken 27 times.
✗ Branch 3 not taken.
|
54 | const auto backward_it = backward_.find(t); |
302 |
1/2✗ Branch 6 not taken.
✓ Branch 7 taken 27 times.
|
27 | if (backward_it == backward_.end()) { |
303 | ✗ | assert(backward_it != backward_.end() | |
304 | and "Inconsistent state, backward_ and forward_ out of sync"); | ||
305 | ✗ | return 0; | |
306 | } | ||
307 |
1/2✓ Branch 4 taken 27 times.
✗ Branch 5 not taken.
|
27 | auto &channel_map = at(backward_it->second); |
308 |
1/2✓ Branch 2 taken 27 times.
✗ Branch 3 not taken.
|
54 | const auto map_it = channel_map.find(channel_idx); |
309 | // already cleaned? if so, nothing to do. | ||
310 |
1/2✗ Branch 6 not taken.
✓ Branch 7 taken 27 times.
|
27 | if (map_it == channel_map.end()) |
311 | ✗ | return 0; | |
312 | // clean subscription map | ||
313 | const bool no_channel_subs_left = | ||
314 | 27 | map_it->second.subscriptions_.remove_expired(); | |
315 | // if channels left, also nothing to do. | ||
316 |
2/2✓ Branch 0 taken 5 times.
✓ Branch 1 taken 22 times.
|
27 | if (!no_channel_subs_left) |
317 | 5 | return 0; | |
318 | // no subscriptions for this channel is left, we have to remember the | ||
319 | // group id | ||
320 | 22 | const auto ans = backward_it->second; | |
321 | // now check whether any subscriptions are left for this transmission | ||
322 |
2/2✓ Branch 2 taken 14 times.
✓ Branch 3 taken 8 times.
|
22 | if (channel_map.guaranteed_empty()) { |
323 | // group id no longer in use, free up channel map | ||
324 | 14 | channel_map.state = ChannelSubscriptionMapImpl::AboutToBeDeleted; | |
325 |
1/2✓ Branch 4 taken 14 times.
✗ Branch 5 not taken.
|
14 | backward_.erase(backward_it); |
326 | } | ||
327 | 22 | return ans; | |
328 | } | ||
329 | |||
330 | 39 | void PeriodicSubscriptionsWithGroup::subscribeWasConfirmed( | |
331 | PdCom::impl::Process::PendingCallbackQueue &q, | ||
332 | PeriodicSubscriptionsWithGroup::GroupId group_id, | ||
333 | PeriodicSubscriptionsWithGroup::ChannelId channel_id) | ||
334 | { | ||
335 |
1/2✓ Branch 2 taken 39 times.
✗ Branch 3 not taken.
|
39 | auto &sub_map = map_.at(group_id); |
336 |
2/4✓ Branch 2 taken 39 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 39 times.
✗ Branch 6 not taken.
|
78 | const auto it = sub_map.find(channel_id); |
337 |
1/2✗ Branch 6 not taken.
✓ Branch 7 taken 39 times.
|
39 | if (it == sub_map.end()) |
338 | ✗ | return; | |
339 | |||
340 |
2/4✓ Branch 3 taken 39 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 39 times.
✗ Branch 7 not taken.
|
39 | it->second.subscriptions_.broadcastState( |
341 | PdCom::Subscription::State::Active, q); | ||
342 | } | ||
343 | |||
344 | 1 | void PeriodicSubscriptionsWithGroup::GroupMap::dump( | |
345 | std::vector<PdCom::Process::SubscriptionInfo> &ans, | ||
346 | const std::unordered_map<unsigned, Channel *> &channels) const | ||
347 | { | ||
348 |
2/2✓ Branch 6 taken 1 times.
✓ Branch 7 taken 1 times.
|
2 | for (const auto &all_groups : forward_) { |
349 |
2/2✓ Branch 5 taken 1 times.
✓ Branch 6 taken 1 times.
|
2 | for (const auto &sub_map : all_groups) { |
350 | 1 | const auto var = impl::Variable::toUApi( | |
351 | 3 | std::static_pointer_cast<const MsrProto::Variable>( | |
352 |
2/4✓ Branch 5 taken 1 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 1 times.
✗ Branch 10 not taken.
|
5 | channels.at(sub_map.first)->shared_from_this())); |
353 |
2/2✓ Branch 6 taken 1 times.
✓ Branch 7 taken 1 times.
|
2 | for (const auto &weak_sub : sub_map.second.subscriptions_) { |
354 |
1/2✓ Branch 3 taken 1 times.
✗ Branch 4 not taken.
|
2 | if (const auto sub = weak_sub.lock()) { |
355 |
1/2✓ Branch 10 taken 1 times.
✗ Branch 11 not taken.
|
1 | ans.emplace_back(sub->This_, &sub->subscriber_, var); |
356 | } | ||
357 | } | ||
358 | } | ||
359 | } | ||
360 | 1 | } | |
361 | |||
362 | PeriodicSubscriptionWithoutGroup::transaction | ||
363 | 3 | PeriodicSubscriptionWithoutGroup::add( | |
364 | Process::PendingCallbackQueue &q, | ||
365 | bool notify_pending, | ||
366 | const std::shared_ptr<PeriodicSubscription> &s) | ||
367 | { | ||
368 |
2/4✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 3 times.
|
3 | if (locked_) |
369 | ✗ | throw PdCom::InternalError("PeriodicSubscription is locked!"); | |
370 | 6 | const auto var_ptr = s->variable_.lock(); | |
371 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
|
3 | if (!var_ptr) |
372 | ✗ | throw EmptyVariable(); | |
373 | 3 | const auto &var = static_cast<Channel const &>(*var_ptr); | |
374 |
1/2✓ Branch 3 taken 3 times.
✗ Branch 4 not taken.
|
6 | auto map_it = map_.find(var.index_); |
375 | |||
376 | |||
377 | 3 | unsigned decimation = s->subscriber_.getTransmission().getInterval() | |
378 | 3 | / var.sample_time_.count() | |
379 | 3 | + 0.5; | |
380 | 6 | decimation += s->subscriber_.getTransmission().getInterval() > 0.0 | |
381 |
2/4✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 3 times.
|
3 | and !decimation; |
382 | 3 | unsigned blocksize = 0; | |
383 | |||
384 |
1/2✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
|
3 | if (decimation > 0) { |
385 | 3 | blocksize = 1.0 / var.sample_time_.count() / decimation / 25.0 + 0.5; | |
386 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
|
3 | blocksize = std::min(var.bufsize / decimation, blocksize); |
387 | } | ||
388 | |||
389 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
|
3 | if (!blocksize) |
390 | ✗ | blocksize = 1; | |
391 |
1/2✓ Branch 10 taken 3 times.
✗ Branch 11 not taken.
|
3 | time_vector_.resize(std::max<size_t>(time_vector_.size(), blocksize + 2)); |
392 | |||
393 |
1/2✓ Branch 6 taken 3 times.
✗ Branch 7 not taken.
|
3 | if (map_it == map_.end()) { |
394 | // not yet subscribed | ||
395 |
5/10✓ Branch 10 taken 3 times.
✗ Branch 11 not taken.
✓ Branch 14 taken 3 times.
✗ Branch 15 not taken.
✓ Branch 18 taken 3 times.
✗ Branch 19 not taken.
✓ Branch 29 taken 3 times.
✓ Branch 30 taken 3 times.
✗ Branch 41 not taken.
✗ Branch 42 not taken.
|
3 | map_.insert({var.index_, {blocksize, decimation, {{s}}}}); |
396 | |||
397 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
|
3 | if (notify_pending) |
398 | ✗ | q.push_back({s, PdCom::Subscription::State::Pending}); | |
399 | 3 | return {true, decimation, blocksize}; | |
400 | } | ||
401 | ✗ | else if (decimation < map_it->second.decimation_) { | |
402 | // we have to resubscribe | ||
403 | ✗ | map_it->second.subscriptions_.currentState = | |
404 | PdCom::Subscription::State::Pending; | ||
405 | |||
406 | ✗ | if (notify_pending) | |
407 | ✗ | q.push_back({s, PdCom::Subscription::State::Pending}); | |
408 | ✗ | auto subs = std::move(map_it->second.subscriptions_); | |
409 | ✗ | subs.insert(s); | |
410 | ✗ | map_.erase(map_it); | |
411 | ✗ | map_.insert({var.index_, {blocksize, decimation, std::move(subs)}}); | |
412 | ✗ | return {true, decimation, blocksize}; | |
413 | } | ||
414 | else { | ||
415 | ✗ | if (notify_pending | |
416 | ✗ | or map_it->second.subscriptions_.currentState | |
417 | != PdCom::Subscription::State::Pending) | ||
418 | ✗ | q.push_back({s, map_it->second.subscriptions_.currentState}); | |
419 | ✗ | map_it->second.subscriptions_.insert(s); | |
420 | ✗ | return {false, map_it->second.decimation_, map_it->second.blocksize_}; | |
421 | } | ||
422 | } | ||
423 | |||
424 | PeriodicSubscriptionWithoutGroup::transaction | ||
425 | ✗ | PeriodicSubscriptionWithoutGroup::remove(const Variable &_var) | |
426 | { | ||
427 | ✗ | const auto it = map_.find(_var.index_); | |
428 | ✗ | if (it == map_.end() || !it->second.subscriptions_.remove_expired()) | |
429 | // not found or still subscribed by others | ||
430 | ✗ | return {false, 0, 0}; | |
431 | else { | ||
432 | // now empty | ||
433 | ✗ | map_.erase(it); | |
434 | ✗ | return {true, 0, 0}; | |
435 | } | ||
436 | } | ||
437 | |||
438 | 3 | void PeriodicSubscriptionWithoutGroup::subscribeWasConfirmed( | |
439 | Process::PendingCallbackQueue &q, | ||
440 | ChannelId channel_id) | ||
441 | { | ||
442 |
2/4✓ Branch 2 taken 3 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 3 times.
✗ Branch 6 not taken.
|
6 | const auto it = map_.find(channel_id); |
443 |
1/2✗ Branch 6 not taken.
✓ Branch 7 taken 3 times.
|
3 | if (it == map_.end()) |
444 | ✗ | return; | |
445 | |||
446 |
2/4✓ Branch 3 taken 3 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 3 times.
✗ Branch 7 not taken.
|
3 | it->second.subscriptions_.broadcastState( |
447 | PdCom::Subscription::State::Active, q); | ||
448 | } | ||
449 | |||
450 | ✗ | void PeriodicSubscriptionWithoutGroup::dump( | |
451 | std::vector<PdCom::Process::SubscriptionInfo> &ans, | ||
452 | const std::unordered_map<unsigned, Channel *> &channels) const | ||
453 | { | ||
454 | ✗ | for (const auto &sub_map : map_) { | |
455 | ✗ | const auto var = impl::Variable::toUApi( | |
456 | ✗ | std::static_pointer_cast<const MsrProto::Variable>( | |
457 | ✗ | channels.at(sub_map.first)->shared_from_this())); | |
458 | ✗ | for (const auto &weak_sub : sub_map.second.subscriptions_) { | |
459 | ✗ | if (const auto sub = weak_sub.lock()) { | |
460 | ✗ | ans.emplace_back(sub->This_, &sub->subscriber_, var); | |
461 | } | ||
462 | } | ||
463 | } | ||
464 | } | ||
465 |