GCC Code Coverage Report


Directory: ./
File: pdserv-1.1.0/src/msrproto/SubscriptionManager.cpp
Date: 2023-11-12 04:06:57
Exec Total Coverage
Lines: 1 102 1.0%
Branches: 0 132 0.0%

Line Branch Exec Source
1 /*****************************************************************************
2 *
3 * $Id$
4 *
5 * Copyright 2010 Richard Hacker (lerichi at gmx dot net)
6 *
7 * This file is part of the pdserv library.
8 *
9 * The pdserv library is free software: you can redistribute it and/or modify
10 * it under the terms of the GNU Lesser General Public License as published
11 * by the Free Software Foundation, either version 3 of the License, or (at
12 * your option) any later version.
13 *
14 * The pdserv library is distributed in the hope that it will be useful, but
15 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
16 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
17 * License for more details.
18 *
19 * You should have received a copy of the GNU Lesser General Public License
20 * along with the pdserv library. If not, see <http://www.gnu.org/licenses/>.
21 *
22 *****************************************************************************/
23
24 #include "../Debug.h"
25 #include "../Main.h"
26 #include "../Signal.h"
27 #include "../Task.h"
28 #include "../TaskStatistics.h"
29 #include "SubscriptionManager.h"
30 #include "Channel.h"
31 #include "Session.h"
32 #include "Subscription.h"
33 #include "XmlElement.h"
34
35 #include <algorithm>
36 #include <queue>
37
38 using namespace MsrProto;
39
40 /////////////////////////////////////////////////////////////////////////////
41 struct timespec SubscriptionManager::dummyTime;
42 PdServ::TaskStatistics SubscriptionManager::dummyTaskStatistics;
43
44 /////////////////////////////////////////////////////////////////////////////
45 SubscriptionManager::SubscriptionManager(
46 const Session *s, const PdServ::Task* task):
47 SessionTask(task), session(s)
48 {
49 taskTime = &dummyTime;
50 taskStatistics = &dummyTaskStatistics;
51
52 unsubscribedCount = 0;
53 subscribedCount = 0;
54 doSync = false;
55 }
56
57 /////////////////////////////////////////////////////////////////////////////
58 SubscriptionManager::~SubscriptionManager()
59 {
60 clear();
61 }
62
63 /////////////////////////////////////////////////////////////////////////////
64 void SubscriptionManager::subscribe (const Channel *c,
65 bool event, unsigned int decimation,
66 size_t blocksize, bool base64, size_t precision)
67 {
68 remove(signalSubscriptionMap[c->signal][c]);
69
70 Subscription *subscription =
71 new Subscription(c, event, decimation, blocksize, base64, precision);
72 signalSubscriptionMap[c->signal][c] = subscription;
73
74 DecimationTuple& dt = activeSignals[subscription->decimation];
75 if (!dt.first)
76 dt.first = 1;
77
78 ++unsubscribedCount;
79 c->signal->subscribe(this);
80 }
81
82 /////////////////////////////////////////////////////////////////////////////
83 void SubscriptionManager::clear()
84 {
85 SignalSubscriptionMap::const_iterator it;
86 for (it = signalSubscriptionMap.begin();
87 it != signalSubscriptionMap.end(); ++it) {
88 it->first->unsubscribe(this);
89 for (ChannelSubscriptionMap::const_iterator it2 = it->second.begin();
90 it2 != it->second.end(); ++it2)
91 delete it2->second;
92 }
93
94 signalSubscriptionMap.clear();
95 activeSignals.clear();
96
97 subscribedCount = 0;
98 unsubscribedCount = 0;
99 doSync = 0;
100 }
101
102 /////////////////////////////////////////////////////////////////////////////
103 void SubscriptionManager::remove (Subscription *s)
104 {
105 if (!s)
106 return;
107
108 DecimationGroup::iterator it = activeSignals.find(s->decimation);
109 if (it != activeSignals.end()) {
110 SubscriptionSet::iterator it2 = it->second.second.find(s);
111 if (it2 != it->second.second.end()) {
112 it->second.second.erase(s);
113 if (it->second.second.empty())
114 activeSignals.erase(it);
115 --subscribedCount;
116 }
117 else
118 --unsubscribedCount;
119 }
120
121 delete s;
122 }
123
124 /////////////////////////////////////////////////////////////////////////////
125 void SubscriptionManager::unsubscribe(const Channel *c)
126 {
127 // cout << __func__ << c->path() << endl;
128 SignalSubscriptionMap::iterator it = signalSubscriptionMap.find(c->signal);
129
130 if (it == signalSubscriptionMap.end())
131 return;
132
133 ChannelSubscriptionMap::iterator it2 = it->second.find(c);
134 if (it2 == it->second.end())
135 return;
136
137 remove(it2->second);
138
139 it->second.erase(it2);
140 if (it->second.empty()) {
141 it->first->unsubscribe(this);
142 signalSubscriptionMap.erase(it);
143 }
144 }
145
146 /////////////////////////////////////////////////////////////////////////////
147 void SubscriptionManager::newSignal( const PdServ::Signal *s)
148 {
149 if (!unsubscribedCount)
150 return;
151
152 SignalSubscriptionMap::const_iterator sit = signalSubscriptionMap.find(s);
153
154 // Find out whether this signal is used or whether it is active already
155 if (sit == signalSubscriptionMap.end())
156 return;
157
158 for (ChannelSubscriptionMap::const_iterator it2 = sit->second.begin();
159 it2 != sit->second.end(); ++it2) {
160 Subscription *s = it2->second;
161 DecimationTuple& dt = activeSignals[s->decimation];
162
163 SubscriptionSet::iterator it3 = dt.second.find(s);
164 if (it3 == dt.second.end()) {
165 dt.second.insert(s);
166 doSync = !--unsubscribedCount;
167 }
168 }
169 }
170
171 /////////////////////////////////////////////////////////////////////////////
172 void SubscriptionManager::rxPdo(Session::TCPStream& tcp, bool quiet)
173 {
174 typedef std::queue<Subscription*> PrintQ;
175 PrintQ printQ;
176 DecimationGroup::iterator it;
177 SubscriptionSet::const_iterator it2, it2_end;
178
179 while (task->rxPdo(this, &taskTime, &taskStatistics)) {
180 if (quiet)
181 continue;
182
183 for (it = activeSignals.begin(); it != activeSignals.end(); ++it) {
184 DecimationTuple& dt = it->second;
185
186 if (--dt.first)
187 continue;
188
189 dt.first = it->first;
190 it2_end = dt.second.end();
191
192 for (it2 = dt.second.begin(); it2 != it2_end; ++it2) {
193 const char *data = (*it2)->channel->signal->getValue(this);
194 if ((*it2)->newValue(data) or (doSync and !(*it2)->event))
195 printQ.push(*it2);
196 }
197 }
198 doSync = false;
199
200 if (!printQ.empty()) {
201 XmlElement dataTag(tcp.createElement("data"));
202 XmlElement::Attribute(dataTag, "level") << 0;
203 XmlElement::Attribute(dataTag, "time") << *taskTime;
204
205 while (!printQ.empty()) {
206 printQ.front()->print(dataTag);
207 printQ.pop();
208 }
209 }
210 }
211 }
212
213 /////////////////////////////////////////////////////////////////////////////
214 void SubscriptionManager::sync()
215 {
216 doSync = !unsubscribedCount;
217 for (DecimationGroup::iterator it = activeSignals.begin();
218 it != activeSignals.end(); ++it)
219 it->second.first = 1;
220 3 }
221