| Directory: | ./ |
|---|---|
| File: | pdserv/src/msrproto/SubscriptionManager.cpp |
| Date: | 2025-11-02 04:09:49 |
| Exec | Total | Coverage | |
|---|---|---|---|
| Lines: | 88 | 147 | 59.9% |
| Branches: | 70 | 180 | 38.9% |
| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | /***************************************************************************** | ||
| 2 | * | ||
| 3 | * Copyright 2010 Richard Hacker (lerichi at gmx dot net) | ||
| 4 | * | ||
| 5 | * This file is part of the pdserv library. | ||
| 6 | * | ||
| 7 | * The pdserv library is free software: you can redistribute it and/or modify | ||
| 8 | * it under the terms of the GNU Lesser General Public License as published | ||
| 9 | * by the Free Software Foundation, either version 3 of the License, or (at | ||
| 10 | * your option) any later version. | ||
| 11 | * | ||
| 12 | * The pdserv library is distributed in the hope that it will be useful, but | ||
| 13 | * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY | ||
| 14 | * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public | ||
| 15 | * License for more details. | ||
| 16 | * | ||
| 17 | * You should have received a copy of the GNU Lesser General Public License | ||
| 18 | * along with the pdserv library. If not, see <http://www.gnu.org/licenses/>. | ||
| 19 | * | ||
| 20 | ****************************************************************************/ | ||
| 21 | |||
| 22 | #include "../Debug.h" | ||
| 23 | #include "../Main.h" | ||
| 24 | #include "../Signal.h" | ||
| 25 | #include "../Task.h" | ||
| 26 | #include "../TaskStatistics.h" | ||
| 27 | #include "SubscriptionManager.h" | ||
| 28 | #include "Channel.h" | ||
| 29 | #include "Session.h" | ||
| 30 | #include "Subscription.h" | ||
| 31 | |||
| 32 | using namespace MsrProto; | ||
| 33 | |||
| 34 | ///////////////////////////////////////////////////////////////////////////// | ||
| 35 | struct timespec SubscriptionManager::dummyTime; | ||
| 36 | PdServ::TaskStatistics SubscriptionManager::dummyTaskStatistics; | ||
| 37 | |||
| 38 | ///////////////////////////////////////////////////////////////////////////// | ||
| 39 | 296 | SubscriptionManager::SubscriptionManager( | |
| 40 | 296 | Session *s, const PdServ::Task* task): | |
| 41 | 296 | SessionTask(task), session(s) | |
| 42 | { | ||
| 43 | 296 | taskTime = &dummyTime; | |
| 44 | 296 | taskStatistics = &dummyTaskStatistics; | |
| 45 | |||
| 46 | // Call rxPdo() once so that taskTime and taskStatistics are updated | ||
| 47 |
1/2✓ Branch 6 taken 296 times.
✗ Branch 7 not taken.
|
296 | task->rxPdo(this, &taskTime, &taskStatistics); |
| 48 | 296 | } | |
| 49 | |||
| 50 | ///////////////////////////////////////////////////////////////////////////// | ||
| 51 | 888 | SubscriptionManager::~SubscriptionManager() | |
| 52 | { | ||
| 53 | 296 | clear(); | |
| 54 | 592 | } | |
| 55 | |||
| 56 | ///////////////////////////////////////////////////////////////////////////// | ||
| 57 | 44 | void SubscriptionManager::subscribe (const Channel *c, size_t group, | |
| 58 | size_t decimation, size_t blocksize, bool base64, | ||
| 59 | std::streamsize precision) | ||
| 60 | { | ||
| 61 | 44 | Subscription** s = &signalSubscriptionMap[c->signal][c][group]; | |
| 62 | |||
| 63 | // First remove possible subscription. It doesn't matter if it is null | ||
| 64 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 44 times.
|
44 | if (*s) |
| 65 | ✗ | remove(*s, group); | |
| 66 | |||
| 67 |
1/2✓ Branch 3 taken 44 times.
✗ Branch 4 not taken.
|
44 | *s = new Subscription(c, decimation, blocksize, base64, precision); |
| 68 | |||
| 69 | // Call subscribe on this signal. It doesn't matter if it is already | ||
| 70 | // subscribed, but it is useful because newSignal() is called for us | ||
| 71 | // in this special case | ||
| 72 | 44 | c->signal->subscribe(this); | |
| 73 | 44 | } | |
| 74 | |||
| 75 | ///////////////////////////////////////////////////////////////////////////// | ||
| 76 | ✗ | void SubscriptionManager::unsubscribe(const Channel *c, size_t group) | |
| 77 | { | ||
| 78 | // Go through all groups (signal, channel, group) to find the signal, | ||
| 79 | // keeping in mind that the channel may not even be subscribed yet! | ||
| 80 | |||
| 81 | ✗ | SignalSubscriptionMap::iterator sit = | |
| 82 | ✗ | signalSubscriptionMap.find(c->signal); | |
| 83 | ✗ | if (sit == signalSubscriptionMap.end()) | |
| 84 | ✗ | return; | |
| 85 | |||
| 86 | ✗ | ChannelSubscriptionMap::iterator cit = sit->second.find(c); | |
| 87 | ✗ | if (cit == sit->second.end()) | |
| 88 | ✗ | return; | |
| 89 | |||
| 90 | ✗ | SubscriptionGroup::iterator git = cit->second.find(group); | |
| 91 | ✗ | if (git == cit->second.end()) | |
| 92 | ✗ | return; | |
| 93 | |||
| 94 | // Remove channel from active list | ||
| 95 | ✗ | remove(git->second, group); | |
| 96 | |||
| 97 | // Now that the channel is unsubscribed, check for empty groups, | ||
| 98 | // jumping out if the group is not empty | ||
| 99 | |||
| 100 | ✗ | cit->second.erase(git); | |
| 101 | ✗ | if (!cit->second.empty()) | |
| 102 | ✗ | return; | |
| 103 | |||
| 104 | ✗ | sit->second.erase(cit); | |
| 105 | ✗ | if (!sit->second.empty()) | |
| 106 | ✗ | return; | |
| 107 | |||
| 108 | // Don't require signal any more | ||
| 109 | ✗ | signalSubscriptionMap.erase(sit); | |
| 110 | ✗ | c->signal->unsubscribe(this); | |
| 111 | } | ||
| 112 | |||
| 113 | ///////////////////////////////////////////////////////////////////////////// | ||
| 114 | 296 | void SubscriptionManager::clear() | |
| 115 | { | ||
| 116 | 592 | SignalSubscriptionMap::const_iterator sit; | |
| 117 | 592 | ChannelSubscriptionMap::const_iterator cit; | |
| 118 | 592 | SubscriptionGroup::const_iterator git; | |
| 119 | |||
| 120 | 296 | activeSignals.clear(); | |
| 121 | |||
| 122 | 340 | for (sit = signalSubscriptionMap.begin(); | |
| 123 |
2/2✓ Branch 6 taken 44 times.
✓ Branch 7 taken 296 times.
|
340 | sit != signalSubscriptionMap.end(); ++sit) { |
| 124 |
2/2✓ Branch 14 taken 44 times.
✓ Branch 15 taken 44 times.
|
88 | for (cit = sit->second.begin(); cit != sit->second.end(); ++cit) { |
| 125 |
1/2✓ Branch 8 taken 44 times.
✗ Branch 9 not taken.
|
44 | sit->first->unsubscribe(this); |
| 126 |
2/2✓ Branch 14 taken 44 times.
✓ Branch 15 taken 44 times.
|
88 | for (git = cit->second.begin(); git != cit->second.end(); ++git) |
| 127 |
1/2✓ Branch 2 taken 44 times.
✗ Branch 3 not taken.
|
44 | delete git->second; |
| 128 | } | ||
| 129 | } | ||
| 130 | |||
| 131 | 296 | signalSubscriptionMap.clear(); | |
| 132 | 296 | } | |
| 133 | |||
| 134 | ///////////////////////////////////////////////////////////////////////////// | ||
| 135 | 44 | void SubscriptionManager::newSignal( const PdServ::Signal *s) | |
| 136 | { | ||
| 137 |
2/4✓ Branch 3 taken 44 times.
✗ Branch 4 not taken.
✓ Branch 10 taken 44 times.
✗ Branch 11 not taken.
|
88 | SignalSubscriptionMap::const_iterator sit = signalSubscriptionMap.find(s); |
| 138 | |||
| 139 | // Find out whether this signal is used or whether sit is active already | ||
| 140 |
1/2✗ Branch 6 not taken.
✓ Branch 7 taken 44 times.
|
44 | if (sit == signalSubscriptionMap.end()) |
| 141 | ✗ | return; | |
| 142 | |||
| 143 |
1/2✓ Branch 7 taken 44 times.
✗ Branch 8 not taken.
|
88 | for (ChannelSubscriptionMap::const_iterator cit = sit->second.begin(); |
| 144 |
2/2✓ Branch 7 taken 44 times.
✓ Branch 8 taken 44 times.
|
88 | cit != sit->second.end(); ++cit) { |
| 145 | 88 | for (SubscriptionGroup::const_iterator git = cit->second.begin(); | |
| 146 |
2/2✓ Branch 7 taken 44 times.
✓ Branch 8 taken 44 times.
|
88 | git != cit->second.end(); ++git) { |
| 147 | |||
| 148 | 44 | Subscription* s = git->second; | |
| 149 | |||
| 150 | // Put subscription into active list | ||
| 151 | // Note: for event subscriptions, blocksize == 0 | ||
| 152 | SubscriptionSet* g = | ||
| 153 |
3/6✓ Branch 6 taken 44 times.
✗ Branch 7 not taken.
✓ Branch 10 taken 44 times.
✗ Branch 11 not taken.
✓ Branch 14 taken 44 times.
✗ Branch 15 not taken.
|
44 | &activeSignals[git->first][s->decimation][s->blocksize]; |
| 154 | |||
| 155 |
2/2✓ Branch 2 taken 36 times.
✓ Branch 3 taken 8 times.
|
44 | if (g->empty()) { |
| 156 | // The blocksize group jas just been created, add data space | ||
| 157 | // for current time | ||
| 158 |
2/4✓ Branch 3 taken 36 times.
✗ Branch 4 not taken.
✓ Branch 9 taken 36 times.
✗ Branch 10 not taken.
|
36 | g->time.reset(new uint64_t[s->blocksize + !s->blocksize]); |
| 159 | 36 | g->timePtr = g->time.get(); | |
| 160 | log_debug("s=%zu %zu:%zu:%zu %p", s->blocksize, | ||
| 161 | git->first, s->decimation, s->blocksize, | ||
| 162 | g->time.get()); | ||
| 163 | } | ||
| 164 | |||
| 165 |
1/2✓ Branch 2 taken 44 times.
✗ Branch 3 not taken.
|
44 | g->insert(s); |
| 166 | } | ||
| 167 | } | ||
| 168 | } | ||
| 169 | |||
| 170 | ///////////////////////////////////////////////////////////////////////////// | ||
| 171 | ✗ | void SubscriptionManager::remove (Subscription *s, size_t group) | |
| 172 | { | ||
| 173 | // Find the map for the group | ||
| 174 | ✗ | ActiveSignals::iterator git = activeSignals.find(group); | |
| 175 | ✗ | if (git == activeSignals.end()) | |
| 176 | ✗ | return; | |
| 177 | |||
| 178 | // Find the decimation group | ||
| 179 | ✗ | DecimationGroup::iterator dit = git->second.find(s->decimation); | |
| 180 | ✗ | if (dit == git->second.end()) | |
| 181 | ✗ | return; | |
| 182 | |||
| 183 | // Find the blocksize group | ||
| 184 | ✗ | BlocksizeGroup::iterator bit = dit->second.find(s->blocksize); | |
| 185 | ✗ | if (bit == dit->second.end()) | |
| 186 | ✗ | return; | |
| 187 | |||
| 188 | // Delete and remove subscription from set | ||
| 189 | ✗ | delete s; | |
| 190 | ✗ | bit->second.erase(s); | |
| 191 | ✗ | if (!bit->second.empty()) | |
| 192 | ✗ | return; | |
| 193 | |||
| 194 | // blocksize group is empty, erase it | ||
| 195 | ✗ | dit->second.erase(bit); | |
| 196 | ✗ | if (!dit->second.empty()) | |
| 197 | ✗ | return; | |
| 198 | |||
| 199 | // decimation group is empty, erase it | ||
| 200 | ✗ | git->second.erase(dit); | |
| 201 | ✗ | if (!git->second.empty()) | |
| 202 | ✗ | return; | |
| 203 | |||
| 204 | // group is emty, erase it | ||
| 205 | ✗ | activeSignals.erase(git); | |
| 206 | } | ||
| 207 | |||
| 208 | ///////////////////////////////////////////////////////////////////////////// | ||
| 209 | 19316 | void SubscriptionManager::rxPdo(bool quiet) | |
| 210 | { | ||
| 211 | 38632 | ActiveSignals::iterator git; | |
| 212 | 38632 | DecimationGroup::iterator dit; | |
| 213 | 38632 | BlocksizeGroup::iterator bit; | |
| 214 | 38632 | SubscriptionSet::iterator sit; | |
| 215 | Subscription* s; | ||
| 216 | 19316 | Subscription* printQ, **printQEnd; | |
| 217 | bool print; | ||
| 218 | |||
| 219 |
3/4✓ Branch 24 taken 22895 times.
✗ Branch 25 not taken.
✓ Branch 26 taken 3579 times.
✓ Branch 27 taken 19316 times.
|
26474 | while (task->rxPdo(this, &taskTime, &taskStatistics)) { |
| 220 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3579 times.
|
3579 | if (quiet) |
| 221 | ✗ | continue; | |
| 222 | |||
| 223 | // Go through all groups | ||
| 224 |
2/2✓ Branch 12 taken 1988 times.
✓ Branch 13 taken 3579 times.
|
5567 | for (git = activeSignals.begin(); git != activeSignals.end(); ++git) { |
| 225 | |||
| 226 | // Go through all decimations | ||
| 227 |
2/2✓ Branch 14 taken 1988 times.
✓ Branch 15 taken 1988 times.
|
3976 | for (dit = git->second.begin(); dit != git->second.end(); ++dit) { |
| 228 | |||
| 229 | // Check decimation counter | ||
| 230 |
2/2✓ Branch 5 taken 1130 times.
✓ Branch 6 taken 858 times.
|
1988 | if (dit->second.busy(dit->first)) |
| 231 | 1130 | continue; | |
| 232 | |||
| 233 | // Go through all blocksizes | ||
| 234 | 1716 | for (bit = dit->second.begin(); | |
| 235 |
2/2✓ Branch 7 taken 858 times.
✓ Branch 8 taken 858 times.
|
1716 | bit != dit->second.end(); ++bit) { |
| 236 | |||
| 237 | // Capture time | ||
| 238 | 1716 | *bit->second.timePtr = | |
| 239 | 1716 | 1000000000ULL * taskTime->tv_sec + taskTime->tv_nsec; | |
| 240 | |||
| 241 | // For non-event groups, increment timePtr and check | ||
| 242 | // whether it points to the end | ||
| 243 | 858 | print = false; | |
| 244 |
2/2✓ Branch 2 taken 846 times.
✓ Branch 3 taken 12 times.
|
858 | if (bit->first) { |
| 245 | 846 | ++bit->second.timePtr; | |
| 246 | 1692 | print = (bit->second.time.get() + bit->first) | |
| 247 | 846 | == bit->second.timePtr; | |
| 248 | } | ||
| 249 | |||
| 250 | // Prepare print queue | ||
| 251 | 858 | printQ = 0; | |
| 252 | 858 | printQEnd = &printQ; | |
| 253 | |||
| 254 | // Go through all subscriptions | ||
| 255 | 2036 | for (sit = bit->second.begin(); | |
| 256 |
2/2✓ Branch 7 taken 1178 times.
✓ Branch 8 taken 858 times.
|
2036 | sit != bit->second.end(); ++sit) { |
| 257 | 1178 | s = *sit; | |
| 258 |
1/2✓ Branch 25 taken 1178 times.
✗ Branch 26 not taken.
|
1178 | const char *data = s->channel->signal->getValue(this); |
| 259 | |||
| 260 |
9/10✓ Branch 2 taken 1178 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 1168 times.
✓ Branch 5 taken 10 times.
✓ Branch 8 taken 1166 times.
✓ Branch 9 taken 2 times.
✓ Branch 10 taken 500 times.
✓ Branch 11 taken 676 times.
✓ Branch 12 taken 502 times.
✓ Branch 13 taken 676 times.
|
1178 | if ((s->newValue(data) and !bit->first) or print) { |
| 261 | 502 | *printQEnd = s; | |
| 262 | 502 | printQEnd = &s->next; | |
| 263 | 502 | s->next = 0; // Sentinel | |
| 264 | } | ||
| 265 | } | ||
| 266 | |||
| 267 | // Check if any signals need printing | ||
| 268 |
2/2✓ Branch 0 taken 342 times.
✓ Branch 1 taken 516 times.
|
858 | if (printQ) { |
| 269 |
1/2✓ Branch 5 taken 342 times.
✗ Branch 6 not taken.
|
684 | XmlElement dataTag(session->createElement("data")); |
| 270 |
2/2✓ Branch 2 taken 340 times.
✓ Branch 3 taken 2 times.
|
342 | if (git->first) |
| 271 |
1/2✓ Branch 1 taken 340 times.
✗ Branch 2 not taken.
|
680 | XmlElement::Attribute(dataTag, "group") |
| 272 |
1/2✓ Branch 5 taken 340 times.
✗ Branch 6 not taken.
|
680 | << git->first; |
| 273 | |||
| 274 |
2/4✓ Branch 2 taken 342 times.
✗ Branch 3 not taken.
✓ Branch 8 taken 342 times.
✗ Branch 9 not taken.
|
342 | XmlElement::Attribute(dataTag, "level") << 0; |
| 275 |
2/4✓ Branch 2 taken 342 times.
✗ Branch 3 not taken.
✓ Branch 10 taken 342 times.
✗ Branch 11 not taken.
|
342 | XmlElement::Attribute(dataTag, "time") << *taskTime; |
| 276 | |||
| 277 | // Print time channel | ||
| 278 | { | ||
| 279 | size_t len = sizeof(uint64_t) | ||
| 280 | 342 | * (bit->first + !bit->first); | |
| 281 | |||
| 282 |
1/2✓ Branch 2 taken 342 times.
✗ Branch 3 not taken.
|
684 | XmlElement time(dataTag.createChild("time")); |
| 283 |
1/2✓ Branch 2 taken 342 times.
✗ Branch 3 not taken.
|
684 | XmlElement::Attribute value(time, "d"); |
| 284 |
1/2✓ Branch 4 taken 342 times.
✗ Branch 5 not taken.
|
342 | value.base64(bit->second.time.get(), len); |
| 285 | } | ||
| 286 | |||
| 287 | // Reset timePtr | ||
| 288 | 342 | bit->second.timePtr = bit->second.time.get(); | |
| 289 | |||
| 290 | // Print every subscription | ||
| 291 |
2/2✓ Branch 0 taken 502 times.
✓ Branch 1 taken 342 times.
|
844 | for (s = printQ; s; s = s->next) |
| 292 |
1/2✓ Branch 2 taken 502 times.
✗ Branch 3 not taken.
|
502 | s->print(dataTag); |
| 293 | } | ||
| 294 | } | ||
| 295 | } | ||
| 296 | } | ||
| 297 | } | ||
| 298 | 19316 | } | |
| 299 | |||
| 300 | ///////////////////////////////////////////////////////////////////////////// | ||
| 301 | ✗ | void SubscriptionManager::sync() | |
| 302 | { | ||
| 303 | ✗ | ActiveSignals::iterator git; | |
| 304 | ✗ | DecimationGroup::iterator dit; | |
| 305 | ✗ | BlocksizeGroup::iterator bit; | |
| 306 | ✗ | SubscriptionSet::iterator sit; | |
| 307 | |||
| 308 | ✗ | for (git = activeSignals.begin(); | |
| 309 | ✗ | git != activeSignals.end(); ++git) { | |
| 310 | ✗ | for (dit = git->second.begin(); | |
| 311 | ✗ | dit != git->second.end(); ++dit) { | |
| 312 | ✗ | for (bit = dit->second.begin(); | |
| 313 | ✗ | bit != dit->second.end(); ++bit) { | |
| 314 | ✗ | bit->second.timePtr = bit->second.time.get(); | |
| 315 | ✗ | for (sit = bit->second.begin(); | |
| 316 | ✗ | sit != bit->second.end(); ++sit) | |
| 317 | ✗ | (*sit)->reset(); | |
| 318 | } | ||
| 319 | } | ||
| 320 | } | ||
| 321 | } | ||
| 322 |