| Directory: | ./ |
|---|---|
| File: | src/msrproto/PeriodicSubscriptions.cpp |
| Date: | 2025-02-12 12:41:42 |
| Exec | Total | Coverage | |
|---|---|---|---|
| Lines: | 192 | 246 | 78.0% |
| Branches: | 177 | 392 | 45.2% |
| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | /***************************************************************************** | ||
| 2 | * | ||
| 3 | * Copyright (C) 2021 Bjarne von Horn (vh at igh dot de) | ||
| 4 | * | ||
| 5 | * This file is part of the PdCom library. | ||
| 6 | * | ||
| 7 | * The PdCom library is free software: you can redistribute it and/or modify | ||
| 8 | * it under the terms of the GNU Lesser General Public License as published by | ||
| 9 | * the Free Software Foundation, either version 3 of the License, or (at your | ||
| 10 | * option) any later version. | ||
| 11 | * | ||
| 12 | * The PdCom library is distributed in the hope that it will be useful, but | ||
| 13 | * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY | ||
| 14 | * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public | ||
| 15 | * License for more details. | ||
| 16 | * | ||
| 17 | * You should have received a copy of the GNU Lesser General Public License | ||
| 18 | * along with the PdCom library. If not, see <http://www.gnu.org/licenses/>. | ||
| 19 | * | ||
| 20 | *****************************************************************************/ | ||
| 21 | |||
| 22 | #include "PeriodicSubscriptions.h" | ||
| 23 | |||
| 24 | #include "Channel.h" | ||
| 25 | #include "Subscription.h" | ||
| 26 | |||
| 27 | #include <cassert> | ||
| 28 | #include <cstring> | ||
| 29 | #include <pdcom5/Exception.h> | ||
| 30 | #include <sstream> | ||
| 31 | |||
| 32 | using PdCom::impl::MsrProto::DataDecoder; | ||
| 33 | using PdCom::impl::MsrProto::PeriodicSubscription; | ||
| 34 | using PdCom::impl::MsrProto::PeriodicSubscriptionsBase; | ||
| 35 | using PdCom::impl::MsrProto::PeriodicSubscriptionsWithGroup; | ||
| 36 | using PdCom::impl::MsrProto::PeriodicSubscriptionWithoutGroup; | ||
| 37 | |||
| 38 | using PdCom::impl::MsrProto::Variable; | ||
| 39 | |||
| 40 | |||
| 41 | 52 | PeriodicSubscriptionsWithGroup::transaction PeriodicSubscriptionsWithGroup::add( | |
| 42 | PdCom::impl::Process::PendingCallbackQueue &queue, | ||
| 43 | bool notify_pending, | ||
| 44 | const std::shared_ptr<PeriodicSubscription> &s) | ||
| 45 | { | ||
| 46 | 104 | const auto var_ptr = s->variable_.lock(); | |
| 47 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 52 times.
|
52 | if (!var_ptr) |
| 48 | ✗ | throw EmptyVariable(); | |
| 49 | 52 | const auto &var = static_cast<Channel const &>(*var_ptr); | |
| 50 |
1/2✓ Branch 11 taken 52 times.
✗ Branch 12 not taken.
|
52 | const auto group_id = map_.find(s->subscriber_.getTransmission()); |
| 51 |
2/4✓ Branch 3 taken 52 times.
✗ Branch 4 not taken.
✓ Branch 7 taken 52 times.
✗ Branch 8 not taken.
|
104 | auto map_it = map_.at(group_id).find(var.index_); |
| 52 |
3/4✓ Branch 3 taken 52 times.
✗ Branch 4 not taken.
✓ Branch 10 taken 41 times.
✓ Branch 11 taken 11 times.
|
52 | if (map_it == map_.at(group_id).end()) { |
| 53 | // not yet subscribed | ||
| 54 | 41 | unsigned decimation = s->subscriber_.getTransmission().getInterval() | |
| 55 | 41 | / var.sample_time_.count() | |
| 56 | 41 | + 0.5; | |
| 57 | 82 | decimation += s->subscriber_.getTransmission().getInterval() > 0.0 | |
| 58 |
3/4✓ Branch 0 taken 41 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
✓ Branch 3 taken 39 times.
|
41 | and !decimation; |
| 59 | 41 | unsigned blocksize = 0; | |
| 60 | |||
| 61 |
1/2✓ Branch 0 taken 41 times.
✗ Branch 1 not taken.
|
41 | if (decimation > 0) { |
| 62 | 41 | blocksize = | |
| 63 | 41 | 1.0 / var.sample_time_.count() / decimation / 25.0 + 0.5; | |
| 64 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 41 times.
|
41 | blocksize = std::min(var.bufsize / decimation, blocksize); |
| 65 | 41 | blocksize = std::min(blocksize, 1000U); | |
| 66 | } | ||
| 67 | |||
| 68 |
2/2✓ Branch 0 taken 38 times.
✓ Branch 1 taken 3 times.
|
41 | if (!blocksize) |
| 69 | 38 | blocksize = 1; | |
| 70 |
1/2✓ Branch 3 taken 41 times.
✗ Branch 4 not taken.
|
82 | time_vector_.resize( |
| 71 | 82 | std::max<size_t>(time_vector_.size(), blocksize + 2)); | |
| 72 | |||
| 73 |
6/12✓ Branch 3 taken 41 times.
✗ Branch 4 not taken.
✓ Branch 14 taken 41 times.
✗ Branch 15 not taken.
✓ Branch 18 taken 41 times.
✗ Branch 19 not taken.
✓ Branch 22 taken 41 times.
✗ Branch 23 not taken.
✓ Branch 33 taken 41 times.
✓ Branch 34 taken 41 times.
✗ Branch 45 not taken.
✗ Branch 46 not taken.
|
41 | map_.at(group_id).insert({var.index_, {blocksize, decimation, {{s}}}}); |
| 74 | |||
| 75 |
2/2✓ Branch 0 taken 25 times.
✓ Branch 1 taken 16 times.
|
41 | if (notify_pending) |
| 76 |
1/2✓ Branch 8 taken 25 times.
✗ Branch 9 not taken.
|
25 | queue.push_back({s, PdCom::Subscription::State::Pending}); |
| 77 | 41 | return {true, group_id, decimation, blocksize}; | |
| 78 | } | ||
| 79 | else { | ||
| 80 |
2/2✓ Branch 0 taken 10 times.
✓ Branch 1 taken 1 times.
|
11 | if (notify_pending |
| 81 |
4/4✓ Branch 0 taken 7 times.
✓ Branch 1 taken 4 times.
✓ Branch 4 taken 6 times.
✓ Branch 5 taken 1 times.
|
11 | or map_it->second.subscriptions_.currentState |
| 82 | != PdCom::Subscription::State::Pending) | ||
| 83 |
1/2✓ Branch 10 taken 10 times.
✗ Branch 11 not taken.
|
10 | queue.push_back({s, map_it->second.subscriptions_.currentState}); |
| 84 |
1/2✓ Branch 6 taken 11 times.
✗ Branch 7 not taken.
|
11 | map_it->second.subscriptions_.insert(s); |
| 85 | 11 | return {false, group_id, map_it->second.decimation_, | |
| 86 | 22 | map_it->second.blocksize_}; | |
| 87 | } | ||
| 88 | } | ||
| 89 | |||
| 90 | PeriodicSubscriptionsWithGroup::transaction | ||
| 91 | 27 | PeriodicSubscriptionsWithGroup::remove( | |
| 92 | const Variable &var, | ||
| 93 | PdCom::Transmission tm) | ||
| 94 | { | ||
| 95 | // remove'ing is okay with locked ps, | ||
| 96 | // because the subscription map is _not_ deleted | ||
| 97 | // and the forward map _not_ altered. So pointers stay valid. | ||
| 98 | 27 | const auto group_id = map_.erase(tm, var.index_); | |
| 99 | 27 | return {group_id != 0, group_id, 0, 0}; | |
| 100 | } | ||
| 101 | |||
| 102 | 51 | PdCom::impl::MsrProto::PeriodicSubscriptionsBase::DataReceiveHandle:: | |
| 103 | DataReceiveHandle( | ||
| 104 | PeriodicSubscriptionsBase &ps, | ||
| 105 | ChannelSubscriptionMap &subscription_map, | ||
| 106 | VariableCache::Lock variable_cache_lock, | ||
| 107 | std::chrono::nanoseconds ts, | ||
| 108 | 51 | bool consistent_blocksize) : | |
| 109 | ps_(&ps), | ||
| 110 | 51 | variable_cache_lock_(std::move(variable_cache_lock)), | |
| 111 | subscription_map_(&subscription_map), | ||
| 112 | ts_(ts), | ||
| 113 | 102 | consistent_blocksize_(consistent_blocksize) | |
| 114 | { | ||
| 115 |
2/4✗ Branch 1 not taken.
✓ Branch 2 taken 51 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 51 times.
|
51 | if (ps.locked_) |
| 116 | throw PdCom::InternalError( | ||
| 117 | ✗ | "Receiving periodic data already in progress"); | |
| 118 | 51 | ps.locked_ = true; | |
| 119 | 51 | } | |
| 120 | |||
| 121 | 53 | void PeriodicSubscriptionsBase::DataReceiveHandle::readData( | |
| 122 | const PdCom::impl::MsrProto::Channel &c, | ||
| 123 | const char *data) | ||
| 124 | { | ||
| 125 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 53 times.
|
53 | if (!ps_) { |
| 126 | ✗ | throw PdCom::InternalError("Operating on invalid DataReceiveHandle"); | |
| 127 | } | ||
| 128 |
1/2✓ Branch 2 taken 53 times.
✗ Branch 3 not taken.
|
53 | const Base64Info bi(data); |
| 129 | |||
| 130 |
3/4✓ Branch 4 taken 53 times.
✗ Branch 5 not taken.
✓ Branch 8 taken 45 times.
✓ Branch 9 taken 8 times.
|
98 | const auto sub_info_it = subscription_map_->find(c.index_); |
| 131 |
2/2✓ Branch 7 taken 8 times.
✓ Branch 8 taken 45 times.
|
53 | if (sub_info_it == subscription_map_->end()) |
| 132 | 8 | return; | |
| 133 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 45 times.
|
90 | if (sub_info_it->second.subscriptions_.currentState |
| 134 | 45 | != PdCom::Subscription::State::Active) | |
| 135 | ✗ | return; | |
| 136 |
3/4✗ Branch 1 not taken.
✓ Branch 2 taken 45 times.
✓ Branch 3 taken 5 times.
✓ Branch 4 taken 40 times.
|
45 | if (!consistent_blocksize_) { |
| 137 | 5 | current_blocksize_ = std::min<int>( | |
| 138 | 10 | sub_info_it->second.blocksize_, | |
| 139 |
1/2✗ Branch 5 not taken.
✓ Branch 6 taken 5 times.
|
15 | bi.encodedDataLength_ / c.type_info->element_size); |
| 140 | 5 | max_read_blocks_ = std::max(max_read_blocks_, current_blocksize_); | |
| 141 | } | ||
| 142 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 40 times.
|
40 | else if (current_blocksize_ == -1) |
| 143 | ✗ | current_blocksize_ = sub_info_it->second.blocksize_; | |
| 144 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 40 times.
|
40 | else if ( |
| 145 | 40 | current_blocksize_ | |
| 146 | 40 | != static_cast<int>(sub_info_it->second.blocksize_)) | |
| 147 | ✗ | throw ProtocolError("Blocksize mismatch"); | |
| 148 |
4/6✓ Branch 5 taken 45 times.
✗ Branch 6 not taken.
✓ Branch 10 taken 45 times.
✗ Branch 11 not taken.
✓ Branch 13 taken 45 times.
✓ Branch 14 taken 8 times.
|
135 | variable_cache_lock_[c.index_].readFromBase64( |
| 149 | 90 | data, bi.base64Length_, c, current_blocksize_); | |
| 150 | } | ||
| 151 | |||
| 152 | |||
| 153 | 45 | void PeriodicSubscriptionsBase::DataReceiveHandle::readTimestamps( | |
| 154 | const char *data) | ||
| 155 | { | ||
| 156 | static_assert( | ||
| 157 | sizeof(decltype(time_vector_)::value_type) == sizeof(int64_t), | ||
| 158 | "time_vector_ is not size compatible to time tag"); | ||
| 159 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 45 times.
|
45 | if (!ps_) { |
| 160 | ✗ | throw PdCom::InternalError("Operating on invalid DataReceiveHandle"); | |
| 161 | } | ||
| 162 |
2/4✓ Branch 0 taken 45 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 45 times.
|
45 | if (!data || !*data) |
| 163 | ✗ | throw PdCom::ProtocolError("Tag does not contain data"); | |
| 164 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 45 times.
|
45 | const auto b64len = ::strlen(data); |
| 165 |
1/2✓ Branch 1 taken 45 times.
✗ Branch 2 not taken.
|
45 | if (current_blocksize_ == -1) { |
| 166 | 45 | current_blocksize_ = b64len * 3 / 4 / sizeof(int64_t); | |
| 167 | 45 | max_read_blocks_ = current_blocksize_; | |
| 168 | } | ||
| 169 | 45 | consistent_blocksize_ = true; | |
| 170 | |||
| 171 |
1/2✓ Branch 5 taken 45 times.
✗ Branch 6 not taken.
|
90 | ps_->time_vector_.resize( |
| 172 | 90 | std::max<size_t>(current_blocksize_ + 1, ps_->time_vector_.size())); | |
| 173 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 45 times.
|
90 | if (!readFromBase64( |
| 174 | 45 | reinterpret_cast<char *>(ps_->time_vector_.data()), data, | |
| 175 | 45 | b64len, sizeof(int64_t) * current_blocksize_)) { | |
| 176 | ✗ | throw ProtocolError("Invalid <time> child of <data>"); | |
| 177 | } | ||
| 178 | 45 | timestamps_read_ = true; | |
| 179 | 45 | } | |
| 180 | |||
| 181 | 1 | void PeriodicSubscriptionsWithGroup::dump( | |
| 182 | std::vector<PdCom::Process::SubscriptionInfo> &ans, | ||
| 183 | const std::unordered_map<unsigned, Channel *> &channels) const | ||
| 184 | { | ||
| 185 | 1 | map_.dump(ans, channels); | |
| 186 | 1 | } | |
| 187 | |||
| 188 | 49 | void PeriodicSubscriptionsBase::DataReceiveHandle::endNewDataRecieving() | |
| 189 | { | ||
| 190 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 49 times.
|
49 | if (!ps_) { |
| 191 | ✗ | throw PdCom::InternalError("Operating on invalid DataReceiveHandle"); | |
| 192 | } | ||
| 193 | // make sure to release the variable cache lock | ||
| 194 |
1/2✓ Branch 7 taken 49 times.
✗ Branch 8 not taken.
|
98 | auto cache = std::move(variable_cache_lock_); |
| 195 | // as this is the final call on receiving a new dataset, | ||
| 196 | // make sure to erase the ps pointer to mark us invalid afterwards | ||
| 197 | const struct PsDeleter | ||
| 198 | { | ||
| 199 | std::unique_ptr<PeriodicSubscriptionsBase, Unlocker> &ps_; | ||
| 200 | 49 | ~PsDeleter() { ps_.reset(); } | |
| 201 | 49 | PsDeleter(std::unique_ptr<PeriodicSubscriptionsBase, Unlocker> &ps) : | |
| 202 | 49 | ps_(ps) | |
| 203 | 49 | {} | |
| 204 |
1/2✓ Branch 4 taken 49 times.
✗ Branch 5 not taken.
|
98 | } psdeleter_ {ps_}; |
| 205 | // subscriptions can dissappear during <data> tag parsing, so don't | ||
| 206 | // assume that subscriptions_ is not empty | ||
| 207 |
3/6✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 49 times.
✗ Branch 6 not taken.
✓ Branch 7 taken 49 times.
|
49 | if (!cache or current_blocksize_ == -1) |
| 208 | ✗ | return; | |
| 209 | |||
| 210 |
2/2✓ Branch 1 taken 62 times.
✓ Branch 2 taken 49 times.
|
111 | for (int block = 0; block < max_read_blocks_; ++block) { |
| 211 |
2/2✓ Branch 7 taken 65 times.
✓ Branch 8 taken 62 times.
|
127 | for (const auto &sub_map : *subscription_map_) { |
| 212 |
1/2✓ Branch 2 taken 65 times.
✗ Branch 3 not taken.
|
65 | const DataDecoder *var = cache.filled_cache(sub_map.first); |
| 213 | // no data recieved for given variable | ||
| 214 |
2/2✓ Branch 0 taken 64 times.
✓ Branch 1 taken 1 times.
|
66 | if (!var |
| 215 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 64 times.
|
64 | || sub_map.second.subscriptions_.currentState |
| 216 | != PdCom::Subscription::State::Active) | ||
| 217 | 1 | continue; | |
| 218 |
1/2✓ Branch 4 taken 64 times.
✗ Branch 5 not taken.
|
64 | sendDataToSubscribers(*var, sub_map.second, block); |
| 219 | } | ||
| 220 |
1/2✓ Branch 4 taken 62 times.
✗ Branch 5 not taken.
|
62 | ps_->subscriber_notifier_.notify(); |
| 221 | 62 | ps_->subscriber_notifier_.clear(); | |
| 222 | } | ||
| 223 |
1/2✓ Branch 2 taken 49 times.
✗ Branch 3 not taken.
|
49 | current_blocksize_ = -1; |
| 224 | } | ||
| 225 | |||
| 226 | 64 | void PeriodicSubscriptionsBase::DataReceiveHandle::sendDataToSubscribers( | |
| 227 | const DataDecoder &var, | ||
| 228 | const ChannelSubscripion &sub_map, | ||
| 229 | const int block) | ||
| 230 | { | ||
| 231 | 64 | bool size_check_done = false; | |
| 232 |
8/8✓ Branch 6 taken 71 times.
✓ Branch 7 taken 62 times.
✓ Branch 12 taken 62 times.
✓ Branch 13 taken 2 times.
✓ Branch 15 taken 62 times.
✓ Branch 16 taken 2 times.
✓ Branch 19 taken 62 times.
✓ Branch 20 taken 2 times.
|
133 | for (const auto &weak_sub : sub_map.subscriptions_) { |
| 233 |
3/4✓ Branch 3 taken 71 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 69 times.
✓ Branch 7 taken 2 times.
|
140 | if (const auto sub = weak_sub.lock()) { |
| 234 |
4/4✓ Branch 5 taken 69 times.
✓ Branch 6 taken 2 times.
✓ Branch 8 taken 69 times.
✓ Branch 9 taken 2 times.
|
140 | const auto var_ptr = sub->variable_.lock(); |
| 235 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 71 times.
|
71 | if (!var_ptr) |
| 236 | ✗ | throw EmptyVariable(); | |
| 237 |
2/2✓ Branch 0 taken 64 times.
✓ Branch 1 taken 7 times.
|
71 | if (!size_check_done) { |
| 238 |
2/4✗ Branch 1 not taken.
✓ Branch 2 taken 64 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 64 times.
|
128 | if (consistent_blocksize_ |
| 239 |
3/4✓ Branch 0 taken 49 times.
✓ Branch 1 taken 15 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 49 times.
|
113 | && var.size() < sub_map.blocksize_ |
| 240 |
1/2✓ Branch 5 taken 49 times.
✗ Branch 6 not taken.
|
49 | * var_ptr->totalSizeInBytes()) { |
| 241 | ✗ | std::ostringstream os; | |
| 242 | os << "size mismatch in variable cache, " | ||
| 243 | ✗ | "expected " | |
| 244 | ✗ | << sub_map.blocksize_ * var_ptr->totalSizeInBytes() | |
| 245 | ✗ | << ", got " << var.size(); | |
| 246 | ✗ | throw ProtocolError(os.str()); | |
| 247 | } | ||
| 248 |
3/4✓ Branch 7 taken 64 times.
✗ Branch 8 not taken.
✓ Branch 9 taken 2 times.
✓ Branch 10 taken 62 times.
|
64 | else if (var.size() < (block + 1) * var_ptr->totalSizeInBytes()) |
| 249 | 2 | return; | |
| 250 | 62 | size_check_done = true; | |
| 251 | } | ||
| 252 |
2/4✓ Branch 11 taken 69 times.
✗ Branch 12 not taken.
✓ Branch 14 taken 69 times.
✗ Branch 15 not taken.
|
69 | sub->readData(var.data() + block * var_ptr->totalSizeInBytes()); |
| 253 | 69 | std::chrono::nanoseconds ts; | |
| 254 |
3/4✗ Branch 1 not taken.
✓ Branch 2 taken 69 times.
✓ Branch 3 taken 56 times.
✓ Branch 4 taken 13 times.
|
69 | if (timestamps_read_) |
| 255 |
1/2✓ Branch 4 taken 56 times.
✗ Branch 5 not taken.
|
56 | ts = ps_->time_vector_.at(block); |
| 256 | else | ||
| 257 | 26 | ts = ts_ | |
| 258 | 52 | + (block - max_read_blocks_ + 1) | |
| 259 |
1/2✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
|
26 | * std::chrono::duration_cast< |
| 260 |
1/2✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
|
26 | std::chrono::nanoseconds>( |
| 261 |
1/2✓ Branch 9 taken 13 times.
✗ Branch 10 not taken.
|
52 | var_ptr->getSampleTime()) |
| 262 | 39 | * sub_map.decimation_; | |
| 263 |
3/4✓ Branch 8 taken 69 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 69 times.
✓ Branch 12 taken 2 times.
|
69 | ps_->subscriber_notifier_.insert(sub->subscriber_, ts); |
| 264 | } | ||
| 265 | } | ||
| 266 | } | ||
| 267 | |||
| 268 | PeriodicSubscriptionsBase::GroupId | ||
| 269 | 52 | PeriodicSubscriptionsWithGroup::GroupMap::find(PdCom::Transmission t) | |
| 270 | { | ||
| 271 | // does a group already exists for given transmission? | ||
| 272 |
1/2✓ Branch 2 taken 52 times.
✗ Branch 3 not taken.
|
104 | const auto it = backward_.find(t); |
| 273 |
2/2✓ Branch 6 taken 19 times.
✓ Branch 7 taken 33 times.
|
52 | if (it != backward_.end()) { |
| 274 |
3/6✓ Branch 4 taken 19 times.
✗ Branch 5 not taken.
✗ Branch 7 not taken.
✓ Branch 8 taken 19 times.
✓ Branch 9 taken 19 times.
✗ Branch 10 not taken.
|
38 | if (forward_.at(it->second - 1).state |
| 275 | 19 | == ChannelSubscriptionMapImpl::Taken) | |
| 276 | 19 | return it->second; | |
| 277 | } | ||
| 278 | // find a free one; | ||
| 279 |
2/2✓ Branch 2 taken 8 times.
✓ Branch 3 taken 32 times.
|
40 | for (unsigned int i = 0; i < forward_.size(); ++i) { |
| 280 |
3/4✗ Branch 3 not taken.
✓ Branch 4 taken 8 times.
✓ Branch 5 taken 1 times.
✓ Branch 6 taken 7 times.
|
8 | if (forward_[i].state == ChannelSubscriptionMapImpl::Empty) { |
| 281 | 1 | const auto group_id = i + 1; | |
| 282 |
1/2✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
|
1 | backward_[t] = group_id; |
| 283 | 1 | forward_[i].state = ChannelSubscriptionMapImpl::Taken; | |
| 284 | 1 | return group_id; | |
| 285 | } | ||
| 286 | } | ||
| 287 | // allocate one at the back; | ||
| 288 |
1/2✓ Branch 2 taken 32 times.
✗ Branch 3 not taken.
|
32 | forward_.emplace_back(); |
| 289 | 32 | forward_.back().state = ChannelSubscriptionMapImpl::Taken; | |
| 290 |
1/2✓ Branch 4 taken 32 times.
✗ Branch 5 not taken.
|
32 | const auto group_id = backward_[t] = forward_.size(); |
| 291 | 32 | return group_id; | |
| 292 | } | ||
| 293 | |||
| 294 | PeriodicSubscriptionsBase::GroupId | ||
| 295 | 27 | PeriodicSubscriptionsWithGroup::GroupMap::erase( | |
| 296 | PdCom::Transmission t, | ||
| 297 | PeriodicSubscriptionsBase::ChannelId channel_idx) | ||
| 298 | { | ||
| 299 |
1/2✓ Branch 2 taken 27 times.
✗ Branch 3 not taken.
|
54 | const auto backward_it = backward_.find(t); |
| 300 |
1/2✗ Branch 6 not taken.
✓ Branch 7 taken 27 times.
|
27 | if (backward_it == backward_.end()) { |
| 301 | ✗ | assert(backward_it != backward_.end() | |
| 302 | and "Inconsistent state, backward_ and forward_ out of sync"); | ||
| 303 | ✗ | return 0; | |
| 304 | } | ||
| 305 |
1/2✓ Branch 4 taken 27 times.
✗ Branch 5 not taken.
|
27 | auto &channel_map = at(backward_it->second); |
| 306 |
1/2✓ Branch 2 taken 27 times.
✗ Branch 3 not taken.
|
54 | const auto map_it = channel_map.find(channel_idx); |
| 307 | // already cleaned? if so, nothing to do. | ||
| 308 |
1/2✗ Branch 6 not taken.
✓ Branch 7 taken 27 times.
|
27 | if (map_it == channel_map.end()) |
| 309 | ✗ | return 0; | |
| 310 | // clean subscription map | ||
| 311 | const bool no_channel_subs_left = | ||
| 312 | 27 | map_it->second.subscriptions_.remove_expired(); | |
| 313 | // if channels left, also nothing to do. | ||
| 314 |
2/2✓ Branch 0 taken 5 times.
✓ Branch 1 taken 22 times.
|
27 | if (!no_channel_subs_left) |
| 315 | 5 | return 0; | |
| 316 | // no subscriptions for this channel is left, we have to remember the | ||
| 317 | // group id | ||
| 318 | 22 | const auto ans = backward_it->second; | |
| 319 | // now check whether any subscriptions are left for this transmission | ||
| 320 |
2/2✓ Branch 2 taken 14 times.
✓ Branch 3 taken 8 times.
|
22 | if (channel_map.guaranteed_empty()) { |
| 321 | // group id no longer in use, free up channel map | ||
| 322 | 14 | channel_map.state = ChannelSubscriptionMapImpl::AboutToBeDeleted; | |
| 323 |
1/2✓ Branch 4 taken 14 times.
✗ Branch 5 not taken.
|
14 | backward_.erase(backward_it); |
| 324 | } | ||
| 325 | 22 | return ans; | |
| 326 | } | ||
| 327 | |||
| 328 | 41 | void PeriodicSubscriptionsWithGroup::subscribeWasConfirmed( | |
| 329 | PdCom::impl::Process::PendingCallbackQueue &q, | ||
| 330 | PeriodicSubscriptionsWithGroup::GroupId group_id, | ||
| 331 | PeriodicSubscriptionsWithGroup::ChannelId channel_id) | ||
| 332 | { | ||
| 333 |
1/2✓ Branch 2 taken 41 times.
✗ Branch 3 not taken.
|
41 | auto &sub_map = map_.at(group_id); |
| 334 |
2/4✓ Branch 2 taken 41 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 41 times.
✗ Branch 6 not taken.
|
82 | const auto it = sub_map.find(channel_id); |
| 335 |
1/2✗ Branch 6 not taken.
✓ Branch 7 taken 41 times.
|
41 | if (it == sub_map.end()) |
| 336 | ✗ | return; | |
| 337 | |||
| 338 |
2/4✓ Branch 3 taken 41 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 41 times.
✗ Branch 7 not taken.
|
41 | it->second.subscriptions_.broadcastState( |
| 339 | PdCom::Subscription::State::Active, q); | ||
| 340 | } | ||
| 341 | |||
| 342 | 1 | void PeriodicSubscriptionsWithGroup::GroupMap::dump( | |
| 343 | std::vector<PdCom::Process::SubscriptionInfo> &ans, | ||
| 344 | const std::unordered_map<unsigned, Channel *> &channels) const | ||
| 345 | { | ||
| 346 |
2/2✓ Branch 6 taken 1 times.
✓ Branch 7 taken 1 times.
|
2 | for (const auto &all_groups : forward_) { |
| 347 |
2/2✓ Branch 5 taken 1 times.
✓ Branch 6 taken 1 times.
|
2 | for (const auto &sub_map : all_groups) { |
| 348 | 1 | const auto var = impl::Variable::toUApi( | |
| 349 | 3 | std::static_pointer_cast<const MsrProto::Variable>( | |
| 350 |
2/4✓ Branch 5 taken 1 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 1 times.
✗ Branch 10 not taken.
|
5 | channels.at(sub_map.first)->shared_from_this())); |
| 351 |
2/2✓ Branch 6 taken 1 times.
✓ Branch 7 taken 1 times.
|
2 | for (const auto &weak_sub : sub_map.second.subscriptions_) { |
| 352 |
1/2✓ Branch 3 taken 1 times.
✗ Branch 4 not taken.
|
2 | if (const auto sub = weak_sub.lock()) { |
| 353 |
1/2✓ Branch 10 taken 1 times.
✗ Branch 11 not taken.
|
1 | ans.emplace_back(sub->This_, &sub->subscriber_, var); |
| 354 | } | ||
| 355 | } | ||
| 356 | } | ||
| 357 | } | ||
| 358 | 1 | } | |
| 359 | |||
| 360 | PeriodicSubscriptionWithoutGroup::transaction | ||
| 361 | 3 | PeriodicSubscriptionWithoutGroup::add( | |
| 362 | Process::PendingCallbackQueue &q, | ||
| 363 | bool notify_pending, | ||
| 364 | const std::shared_ptr<PeriodicSubscription> &s) | ||
| 365 | { | ||
| 366 |
2/4✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 3 times.
|
3 | if (locked_) |
| 367 | ✗ | throw PdCom::InternalError("PeriodicSubscription is locked!"); | |
| 368 | 6 | const auto var_ptr = s->variable_.lock(); | |
| 369 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
|
3 | if (!var_ptr) |
| 370 | ✗ | throw EmptyVariable(); | |
| 371 | 3 | const auto &var = static_cast<Channel const &>(*var_ptr); | |
| 372 |
1/2✓ Branch 3 taken 3 times.
✗ Branch 4 not taken.
|
6 | auto map_it = map_.find(var.index_); |
| 373 | |||
| 374 | |||
| 375 | 3 | unsigned decimation = s->subscriber_.getTransmission().getInterval() | |
| 376 | 3 | / var.sample_time_.count() | |
| 377 | 3 | + 0.5; | |
| 378 | 6 | decimation += s->subscriber_.getTransmission().getInterval() > 0.0 | |
| 379 |
2/4✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 3 times.
|
3 | and !decimation; |
| 380 | 3 | unsigned blocksize = 0; | |
| 381 | |||
| 382 |
1/2✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
|
3 | if (decimation > 0) { |
| 383 | 3 | blocksize = 1.0 / var.sample_time_.count() / decimation / 25.0 + 0.5; | |
| 384 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
|
3 | blocksize = std::min(var.bufsize / decimation, blocksize); |
| 385 | } | ||
| 386 | |||
| 387 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
|
3 | if (!blocksize) |
| 388 | ✗ | blocksize = 1; | |
| 389 |
1/2✓ Branch 10 taken 3 times.
✗ Branch 11 not taken.
|
3 | time_vector_.resize(std::max<size_t>(time_vector_.size(), blocksize + 2)); |
| 390 | |||
| 391 |
1/2✓ Branch 6 taken 3 times.
✗ Branch 7 not taken.
|
3 | if (map_it == map_.end()) { |
| 392 | // not yet subscribed | ||
| 393 |
5/10✓ Branch 10 taken 3 times.
✗ Branch 11 not taken.
✓ Branch 14 taken 3 times.
✗ Branch 15 not taken.
✓ Branch 18 taken 3 times.
✗ Branch 19 not taken.
✓ Branch 29 taken 3 times.
✓ Branch 30 taken 3 times.
✗ Branch 41 not taken.
✗ Branch 42 not taken.
|
3 | map_.insert({var.index_, {blocksize, decimation, {{s}}}}); |
| 394 | |||
| 395 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
|
3 | if (notify_pending) |
| 396 | ✗ | q.push_back({s, PdCom::Subscription::State::Pending}); | |
| 397 | 3 | return {true, decimation, blocksize}; | |
| 398 | } | ||
| 399 | ✗ | else if (decimation < map_it->second.decimation_) { | |
| 400 | // we have to resubscribe | ||
| 401 | ✗ | map_it->second.subscriptions_.currentState = | |
| 402 | PdCom::Subscription::State::Pending; | ||
| 403 | |||
| 404 | ✗ | if (notify_pending) | |
| 405 | ✗ | q.push_back({s, PdCom::Subscription::State::Pending}); | |
| 406 | ✗ | auto subs = std::move(map_it->second.subscriptions_); | |
| 407 | ✗ | subs.insert(s); | |
| 408 | ✗ | map_.erase(map_it); | |
| 409 | ✗ | map_.insert({var.index_, {blocksize, decimation, std::move(subs)}}); | |
| 410 | ✗ | return {true, decimation, blocksize}; | |
| 411 | } | ||
| 412 | else { | ||
| 413 | ✗ | if (notify_pending | |
| 414 | ✗ | or map_it->second.subscriptions_.currentState | |
| 415 | != PdCom::Subscription::State::Pending) | ||
| 416 | ✗ | q.push_back({s, map_it->second.subscriptions_.currentState}); | |
| 417 | ✗ | map_it->second.subscriptions_.insert(s); | |
| 418 | ✗ | return {false, map_it->second.decimation_, map_it->second.blocksize_}; | |
| 419 | } | ||
| 420 | } | ||
| 421 | |||
| 422 | PeriodicSubscriptionWithoutGroup::transaction | ||
| 423 | ✗ | PeriodicSubscriptionWithoutGroup::remove(const Variable &_var) | |
| 424 | { | ||
| 425 | ✗ | const auto it = map_.find(_var.index_); | |
| 426 | ✗ | if (it == map_.end() || !it->second.subscriptions_.remove_expired()) | |
| 427 | // not found or still subscribed by others | ||
| 428 | ✗ | return {false, 0, 0}; | |
| 429 | else { | ||
| 430 | // now empty | ||
| 431 | ✗ | map_.erase(it); | |
| 432 | ✗ | return {true, 0, 0}; | |
| 433 | } | ||
| 434 | } | ||
| 435 | |||
| 436 | 3 | void PeriodicSubscriptionWithoutGroup::subscribeWasConfirmed( | |
| 437 | Process::PendingCallbackQueue &q, | ||
| 438 | ChannelId channel_id) | ||
| 439 | { | ||
| 440 |
2/4✓ Branch 2 taken 3 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 3 times.
✗ Branch 6 not taken.
|
6 | const auto it = map_.find(channel_id); |
| 441 |
1/2✗ Branch 6 not taken.
✓ Branch 7 taken 3 times.
|
3 | if (it == map_.end()) |
| 442 | ✗ | return; | |
| 443 | |||
| 444 |
2/4✓ Branch 3 taken 3 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 3 times.
✗ Branch 7 not taken.
|
3 | it->second.subscriptions_.broadcastState( |
| 445 | PdCom::Subscription::State::Active, q); | ||
| 446 | } | ||
| 447 | |||
| 448 | ✗ | void PeriodicSubscriptionWithoutGroup::dump( | |
| 449 | std::vector<PdCom::Process::SubscriptionInfo> &ans, | ||
| 450 | const std::unordered_map<unsigned, Channel *> &channels) const | ||
| 451 | { | ||
| 452 | ✗ | for (const auto &sub_map : map_) { | |
| 453 | ✗ | const auto var = impl::Variable::toUApi( | |
| 454 | ✗ | std::static_pointer_cast<const MsrProto::Variable>( | |
| 455 | ✗ | channels.at(sub_map.first)->shared_from_this())); | |
| 456 | ✗ | for (const auto &weak_sub : sub_map.second.subscriptions_) { | |
| 457 | ✗ | if (const auto sub = weak_sub.lock()) { | |
| 458 | ✗ | ans.emplace_back(sub->This_, &sub->subscriber_, var); | |
| 459 | } | ||
| 460 | } | ||
| 461 | } | ||
| 462 | } | ||
| 463 |