Directory: | ./ |
---|---|
File: | pdserv/src/msrproto/SubscriptionManager.cpp |
Date: | 2025-07-20 04:11:05 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 88 | 146 | 60.3% |
Branches: | 70 | 180 | 38.9% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /***************************************************************************** | ||
2 | * | ||
3 | * $Id$ | ||
4 | * | ||
5 | * Copyright 2010 Richard Hacker (lerichi at gmx dot net) | ||
6 | * | ||
7 | * This file is part of the pdserv library. | ||
8 | * | ||
9 | * The pdserv library is free software: you can redistribute it and/or modify | ||
10 | * it under the terms of the GNU Lesser General Public License as published | ||
11 | * by the Free Software Foundation, either version 3 of the License, or (at | ||
12 | * your option) any later version. | ||
13 | * | ||
14 | * The pdserv library is distributed in the hope that it will be useful, but | ||
15 | * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY | ||
16 | * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public | ||
17 | * License for more details. | ||
18 | * | ||
19 | * You should have received a copy of the GNU Lesser General Public License | ||
20 | * along with the pdserv library. If not, see <http://www.gnu.org/licenses/>. | ||
21 | * | ||
22 | *****************************************************************************/ | ||
23 | |||
24 | #include "../Debug.h" | ||
25 | #include "../Main.h" | ||
26 | #include "../Signal.h" | ||
27 | #include "../Task.h" | ||
28 | #include "../TaskStatistics.h" | ||
29 | #include "SubscriptionManager.h" | ||
30 | #include "Channel.h" | ||
31 | #include "Session.h" | ||
32 | #include "Subscription.h" | ||
33 | |||
34 | using namespace MsrProto; | ||
35 | |||
36 | ///////////////////////////////////////////////////////////////////////////// | ||
37 | struct timespec SubscriptionManager::dummyTime; | ||
38 | PdServ::TaskStatistics SubscriptionManager::dummyTaskStatistics; | ||
39 | |||
40 | ///////////////////////////////////////////////////////////////////////////// | ||
41 | 296 | SubscriptionManager::SubscriptionManager( | |
42 | 296 | Session *s, const PdServ::Task* task): | |
43 | 296 | SessionTask(task), session(s) | |
44 | { | ||
45 | 296 | taskTime = &dummyTime; | |
46 | 296 | taskStatistics = &dummyTaskStatistics; | |
47 | |||
48 | // Call rxPdo() once so that taskTime and taskStatistics are updated | ||
49 |
1/2✓ Branch 6 taken 296 times.
✗ Branch 7 not taken.
|
296 | task->rxPdo(this, &taskTime, &taskStatistics); |
50 | 296 | } | |
51 | |||
52 | ///////////////////////////////////////////////////////////////////////////// | ||
53 | 888 | SubscriptionManager::~SubscriptionManager() | |
54 | { | ||
55 | 296 | clear(); | |
56 | 592 | } | |
57 | |||
58 | ///////////////////////////////////////////////////////////////////////////// | ||
59 | 44 | void SubscriptionManager::subscribe (const Channel *c, size_t group, | |
60 | size_t decimation, size_t blocksize, bool base64, | ||
61 | std::streamsize precision) | ||
62 | { | ||
63 | 44 | Subscription** s = &signalSubscriptionMap[c->signal][c][group]; | |
64 | |||
65 | // First remove possible subscription. It doesn't matter if it is null | ||
66 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 44 times.
|
44 | if (*s) |
67 | ✗ | remove(*s, group); | |
68 | |||
69 |
1/2✓ Branch 3 taken 44 times.
✗ Branch 4 not taken.
|
44 | *s = new Subscription(c, decimation, blocksize, base64, precision); |
70 | |||
71 | // Call subscribe on this signal. It doesn't matter if it is already | ||
72 | // subscribed, but it is useful because newSignal() is called for us | ||
73 | // in this special case | ||
74 | 44 | c->signal->subscribe(this); | |
75 | 44 | } | |
76 | |||
77 | ///////////////////////////////////////////////////////////////////////////// | ||
78 | ✗ | void SubscriptionManager::unsubscribe(const Channel *c, size_t group) | |
79 | { | ||
80 | // Go through all groups (signal, channel, group) to find the signal, | ||
81 | // keeping in mind that the channel may not even be subscribed yet! | ||
82 | |||
83 | ✗ | SignalSubscriptionMap::iterator sit = signalSubscriptionMap.find(c->signal); | |
84 | ✗ | if (sit == signalSubscriptionMap.end()) | |
85 | ✗ | return; | |
86 | |||
87 | ✗ | ChannelSubscriptionMap::iterator cit = sit->second.find(c); | |
88 | ✗ | if (cit == sit->second.end()) | |
89 | ✗ | return; | |
90 | |||
91 | ✗ | SubscriptionGroup::iterator git = cit->second.find(group); | |
92 | ✗ | if (git == cit->second.end()) | |
93 | ✗ | return; | |
94 | |||
95 | // Remove channel from active list | ||
96 | ✗ | remove(git->second, group); | |
97 | |||
98 | // Now that the channel is unsubscribed, check for empty groups, | ||
99 | // jumping out if the group is not empty | ||
100 | |||
101 | ✗ | cit->second.erase(git); | |
102 | ✗ | if (!cit->second.empty()) | |
103 | ✗ | return; | |
104 | |||
105 | ✗ | sit->second.erase(cit); | |
106 | ✗ | if (!sit->second.empty()) | |
107 | ✗ | return; | |
108 | |||
109 | // Don't require signal any more | ||
110 | ✗ | signalSubscriptionMap.erase(sit); | |
111 | ✗ | c->signal->unsubscribe(this); | |
112 | } | ||
113 | |||
114 | ///////////////////////////////////////////////////////////////////////////// | ||
115 | 296 | void SubscriptionManager::clear() | |
116 | { | ||
117 | 592 | SignalSubscriptionMap::const_iterator sit; | |
118 | 592 | ChannelSubscriptionMap::const_iterator cit; | |
119 | 592 | SubscriptionGroup::const_iterator git; | |
120 | |||
121 | 296 | activeSignals.clear(); | |
122 | |||
123 | 340 | for (sit = signalSubscriptionMap.begin(); | |
124 |
2/2✓ Branch 6 taken 44 times.
✓ Branch 7 taken 296 times.
|
340 | sit != signalSubscriptionMap.end(); ++sit) { |
125 |
2/2✓ Branch 14 taken 44 times.
✓ Branch 15 taken 44 times.
|
88 | for (cit = sit->second.begin(); cit != sit->second.end(); ++cit) { |
126 |
1/2✓ Branch 8 taken 44 times.
✗ Branch 9 not taken.
|
44 | sit->first->unsubscribe(this); |
127 |
2/2✓ Branch 14 taken 44 times.
✓ Branch 15 taken 44 times.
|
88 | for (git = cit->second.begin(); git != cit->second.end(); ++git) |
128 |
1/2✓ Branch 2 taken 44 times.
✗ Branch 3 not taken.
|
44 | delete git->second; |
129 | } | ||
130 | } | ||
131 | |||
132 | 296 | signalSubscriptionMap.clear(); | |
133 | 296 | } | |
134 | |||
135 | ///////////////////////////////////////////////////////////////////////////// | ||
136 | 44 | void SubscriptionManager::newSignal( const PdServ::Signal *s) | |
137 | { | ||
138 |
2/4✓ Branch 3 taken 44 times.
✗ Branch 4 not taken.
✓ Branch 10 taken 44 times.
✗ Branch 11 not taken.
|
88 | SignalSubscriptionMap::const_iterator sit = signalSubscriptionMap.find(s); |
139 | |||
140 | // Find out whether this signal is used or whether sit is active already | ||
141 |
1/2✗ Branch 6 not taken.
✓ Branch 7 taken 44 times.
|
44 | if (sit == signalSubscriptionMap.end()) |
142 | ✗ | return; | |
143 | |||
144 |
1/2✓ Branch 7 taken 44 times.
✗ Branch 8 not taken.
|
88 | for (ChannelSubscriptionMap::const_iterator cit = sit->second.begin(); |
145 |
2/2✓ Branch 7 taken 44 times.
✓ Branch 8 taken 44 times.
|
88 | cit != sit->second.end(); ++cit) { |
146 | 88 | for (SubscriptionGroup::const_iterator git = cit->second.begin(); | |
147 |
2/2✓ Branch 7 taken 44 times.
✓ Branch 8 taken 44 times.
|
88 | git != cit->second.end(); ++git) { |
148 | |||
149 | 44 | Subscription* s = git->second; | |
150 | |||
151 | // Put subscription into active list | ||
152 | // Note: for event subscriptions, blocksize == 0 | ||
153 | SubscriptionSet* g = | ||
154 |
3/6✓ Branch 6 taken 44 times.
✗ Branch 7 not taken.
✓ Branch 10 taken 44 times.
✗ Branch 11 not taken.
✓ Branch 14 taken 44 times.
✗ Branch 15 not taken.
|
44 | &activeSignals[git->first][s->decimation][s->blocksize]; |
155 | |||
156 |
2/2✓ Branch 2 taken 36 times.
✓ Branch 3 taken 8 times.
|
44 | if (g->empty()) { |
157 | // The blocksize group jas just been created, add data space for | ||
158 | // current time | ||
159 |
2/4✓ Branch 3 taken 36 times.
✗ Branch 4 not taken.
✓ Branch 9 taken 36 times.
✗ Branch 10 not taken.
|
36 | g->time.reset(new uint64_t[s->blocksize + !s->blocksize]); |
160 | 36 | g->timePtr = g->time.get(); | |
161 | log_debug("s=%zu %zu:%zu:%zu %p", s->blocksize, | ||
162 | git->first, s->decimation, s->blocksize, | ||
163 | g->time.get()); | ||
164 | } | ||
165 | |||
166 |
1/2✓ Branch 2 taken 44 times.
✗ Branch 3 not taken.
|
44 | g->insert(s); |
167 | } | ||
168 | } | ||
169 | } | ||
170 | |||
171 | ///////////////////////////////////////////////////////////////////////////// | ||
172 | ✗ | void SubscriptionManager::remove (Subscription *s, size_t group) | |
173 | { | ||
174 | // Find the map for the group | ||
175 | ✗ | ActiveSignals::iterator git = activeSignals.find(group); | |
176 | ✗ | if (git == activeSignals.end()) | |
177 | ✗ | return; | |
178 | |||
179 | // Find the decimation group | ||
180 | ✗ | DecimationGroup::iterator dit = git->second.find(s->decimation); | |
181 | ✗ | if (dit == git->second.end()) | |
182 | ✗ | return; | |
183 | |||
184 | // Find the blocksize group | ||
185 | ✗ | BlocksizeGroup::iterator bit = dit->second.find(s->blocksize); | |
186 | ✗ | if (bit == dit->second.end()) | |
187 | ✗ | return; | |
188 | |||
189 | // Delete and remove subscription from set | ||
190 | ✗ | delete s; | |
191 | ✗ | bit->second.erase(s); | |
192 | ✗ | if (!bit->second.empty()) | |
193 | ✗ | return; | |
194 | |||
195 | // blocksize group is empty, erase it | ||
196 | ✗ | dit->second.erase(bit); | |
197 | ✗ | if (!dit->second.empty()) | |
198 | ✗ | return; | |
199 | |||
200 | // decimation group is empty, erase it | ||
201 | ✗ | git->second.erase(dit); | |
202 | ✗ | if (!git->second.empty()) | |
203 | ✗ | return; | |
204 | |||
205 | // group is emty, erase it | ||
206 | ✗ | activeSignals.erase(git); | |
207 | } | ||
208 | |||
209 | ///////////////////////////////////////////////////////////////////////////// | ||
210 | 19366 | void SubscriptionManager::rxPdo(bool quiet) | |
211 | { | ||
212 | 38732 | ActiveSignals::iterator git; | |
213 | 38732 | DecimationGroup::iterator dit; | |
214 | 38732 | BlocksizeGroup::iterator bit; | |
215 | 38732 | SubscriptionSet::iterator sit; | |
216 | Subscription* s; | ||
217 | 19366 | Subscription* printQ, **printQEnd; | |
218 | bool print; | ||
219 | |||
220 |
3/4✓ Branch 24 taken 22912 times.
✗ Branch 25 not taken.
✓ Branch 26 taken 3546 times.
✓ Branch 27 taken 19366 times.
|
26458 | while (task->rxPdo(this, &taskTime, &taskStatistics)) { |
221 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3546 times.
|
3546 | if (quiet) |
222 | ✗ | continue; | |
223 | |||
224 | // Go through all groups | ||
225 |
2/2✓ Branch 12 taken 1994 times.
✓ Branch 13 taken 3546 times.
|
5540 | for (git = activeSignals.begin(); git != activeSignals.end(); ++git) { |
226 | |||
227 | // Go through all decimations | ||
228 |
2/2✓ Branch 14 taken 1994 times.
✓ Branch 15 taken 1994 times.
|
3988 | for (dit = git->second.begin(); dit != git->second.end(); ++dit) { |
229 | |||
230 | // Check decimation counter | ||
231 |
2/2✓ Branch 5 taken 1134 times.
✓ Branch 6 taken 860 times.
|
1994 | if (dit->second.busy(dit->first)) |
232 | 1134 | continue; | |
233 | |||
234 | // Go through all blocksizes | ||
235 | 1720 | for (bit = dit->second.begin(); | |
236 |
2/2✓ Branch 7 taken 860 times.
✓ Branch 8 taken 860 times.
|
1720 | bit != dit->second.end(); ++bit) { |
237 | |||
238 | // Capture time | ||
239 | 1720 | *bit->second.timePtr = | |
240 | 1720 | 1000000000ULL * taskTime->tv_sec + taskTime->tv_nsec; | |
241 | |||
242 | // For non-event groups, increment timePtr and check | ||
243 | // whether it points to the end | ||
244 | 860 | print = false; | |
245 |
2/2✓ Branch 2 taken 847 times.
✓ Branch 3 taken 13 times.
|
860 | if (bit->first) { |
246 | 847 | ++bit->second.timePtr; | |
247 | 1694 | print = (bit->second.time.get() + bit->first) | |
248 | 847 | == bit->second.timePtr; | |
249 | } | ||
250 | |||
251 | // Prepare print queue | ||
252 | 860 | printQ = 0; | |
253 | 860 | printQEnd = &printQ; | |
254 | |||
255 | // Go through all subscriptions | ||
256 | 2041 | for (sit = bit->second.begin(); | |
257 |
2/2✓ Branch 7 taken 1181 times.
✓ Branch 8 taken 860 times.
|
2041 | sit != bit->second.end(); ++sit) { |
258 | 1181 | s = *sit; | |
259 |
1/2✓ Branch 25 taken 1181 times.
✗ Branch 26 not taken.
|
1181 | const char *data = s->channel->signal->getValue(this); |
260 | |||
261 |
9/10✓ Branch 2 taken 1181 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 1170 times.
✓ Branch 5 taken 11 times.
✓ Branch 8 taken 1168 times.
✓ Branch 9 taken 2 times.
✓ Branch 10 taken 500 times.
✓ Branch 11 taken 679 times.
✓ Branch 12 taken 502 times.
✓ Branch 13 taken 679 times.
|
1181 | if ((s->newValue(data) and !bit->first) or print) { |
262 | 502 | *printQEnd = s; | |
263 | 502 | printQEnd = &s->next; | |
264 | 502 | s->next = 0; // Sentinel | |
265 | } | ||
266 | } | ||
267 | |||
268 | // Check if any signals need printing | ||
269 |
2/2✓ Branch 0 taken 342 times.
✓ Branch 1 taken 518 times.
|
860 | if (printQ) { |
270 |
1/2✓ Branch 5 taken 342 times.
✗ Branch 6 not taken.
|
684 | XmlElement dataTag(session->createElement("data")); |
271 |
2/2✓ Branch 2 taken 340 times.
✓ Branch 3 taken 2 times.
|
342 | if (git->first) |
272 |
1/2✓ Branch 1 taken 340 times.
✗ Branch 2 not taken.
|
680 | XmlElement::Attribute(dataTag, "group") |
273 |
1/2✓ Branch 5 taken 340 times.
✗ Branch 6 not taken.
|
680 | << git->first; |
274 | |||
275 |
2/4✓ Branch 2 taken 342 times.
✗ Branch 3 not taken.
✓ Branch 8 taken 342 times.
✗ Branch 9 not taken.
|
342 | XmlElement::Attribute(dataTag, "level") << 0; |
276 |
2/4✓ Branch 2 taken 342 times.
✗ Branch 3 not taken.
✓ Branch 10 taken 342 times.
✗ Branch 11 not taken.
|
342 | XmlElement::Attribute(dataTag, "time") << *taskTime; |
277 | |||
278 | // Print time channel | ||
279 | { | ||
280 | size_t len = sizeof(uint64_t) | ||
281 | 342 | * (bit->first + !bit->first); | |
282 | |||
283 |
1/2✓ Branch 2 taken 342 times.
✗ Branch 3 not taken.
|
684 | XmlElement time(dataTag.createChild("time")); |
284 |
1/2✓ Branch 2 taken 342 times.
✗ Branch 3 not taken.
|
684 | XmlElement::Attribute value(time, "d"); |
285 |
1/2✓ Branch 4 taken 342 times.
✗ Branch 5 not taken.
|
342 | value.base64(bit->second.time.get(), len); |
286 | } | ||
287 | |||
288 | // Reset timePtr | ||
289 | 342 | bit->second.timePtr = bit->second.time.get(); | |
290 | |||
291 | // Print every subscription | ||
292 |
2/2✓ Branch 0 taken 502 times.
✓ Branch 1 taken 342 times.
|
844 | for (s = printQ; s; s = s->next) |
293 |
1/2✓ Branch 2 taken 502 times.
✗ Branch 3 not taken.
|
502 | s->print(dataTag); |
294 | } | ||
295 | } | ||
296 | } | ||
297 | } | ||
298 | } | ||
299 | 19366 | } | |
300 | |||
301 | ///////////////////////////////////////////////////////////////////////////// | ||
302 | ✗ | void SubscriptionManager::sync() | |
303 | { | ||
304 | ✗ | ActiveSignals::iterator git; | |
305 | ✗ | DecimationGroup::iterator dit; | |
306 | ✗ | BlocksizeGroup::iterator bit; | |
307 | ✗ | SubscriptionSet::iterator sit; | |
308 | |||
309 | ✗ | for (git = activeSignals.begin(); | |
310 | ✗ | git != activeSignals.end(); ++git) { | |
311 | ✗ | for (dit = git->second.begin(); | |
312 | ✗ | dit != git->second.end(); ++dit) { | |
313 | ✗ | for (bit = dit->second.begin(); | |
314 | ✗ | bit != dit->second.end(); ++bit) { | |
315 | ✗ | bit->second.timePtr = bit->second.time.get(); | |
316 | ✗ | for (sit = bit->second.begin(); | |
317 | ✗ | sit != bit->second.end(); ++sit) | |
318 | ✗ | (*sit)->reset(); | |
319 | } | ||
320 | } | ||
321 | } | ||
322 | } | ||
323 |