GCC Code Coverage Report


Directory: ./
File: pdserv-1.1.0/src/msrproto/Session.cpp
Date: 2024-11-17 04:08:36
Exec Total Coverage
Lines: 1 407 0.2%
Branches: 0 848 0.0%

Line Branch Exec Source
1 /*****************************************************************************
2 *
3 * $Id$
4 *
5 * Copyright 2010 Richard Hacker (lerichi at gmx dot net)
6 *
7 * This file is part of the pdserv library.
8 *
9 * The pdserv library is free software: you can redistribute it and/or modify
10 * it under the terms of the GNU Lesser General Public License as published
11 * by the Free Software Foundation, either version 3 of the License, or (at
12 * your option) any later version.
13 *
14 * The pdserv library is distributed in the hope that it will be useful, but
15 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
16 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
17 * License for more details.
18 *
19 * You should have received a copy of the GNU Lesser General Public License
20 * along with the pdserv library. If not, see <http://www.gnu.org/licenses/>.
21 *
22 *****************************************************************************/
23
24 #include <streambuf>
25 #include <cerrno> // ENAMETOOLONG
26 #include <climits> // HOST_NAME_MAX
27 #include <unistd.h> // gethostname
28 #include <log4cplus/ndc.h>
29 #include <log4cplus/loggingmacros.h>
30
31 #include "../Debug.h"
32
33 #include "../Main.h"
34 #include "../Task.h"
35 #include "../Signal.h"
36 #include "../Parameter.h"
37 #include "../DataType.h"
38
39 #include "Session.h"
40 #include "Server.h"
41 #include "Event.h"
42 #include "Channel.h"
43 #include "Parameter.h"
44 #include "XmlElement.h"
45 #include "XmlParser.h"
46 #include "SubscriptionManager.h"
47
48 using namespace MsrProto;
49
50 /////////////////////////////////////////////////////////////////////////////
51 Session::Session( Server *server, ost::TCPSocket *socket):
52 PdServ::Session(server->main),
53 server(server),
54 tcp(socket), xmlstream(&tcp),
55 mutex(1)
56 {
57 timeTask = 0;
58
59 for (size_t i = 0; i < main->numTasks(); ++i) {
60 const PdServ::Task *task = main->getTask(i);
61
62 subscriptionManager[task] = new SubscriptionManager(this, task);
63
64 if (!timeTask or timeTask->task->sampleTime > task->sampleTime)
65 timeTask = subscriptionManager[task];
66 }
67
68 // Setup some internal variables
69 writeAccess = false;
70 echoOn = false; // FIXME: echoOn is not yet implemented
71 quiet = false;
72 polite = false;
73 aicDelay = 0;
74
75 xmlstream.imbue(std::locale::classic());
76
77 detach();
78 }
79
80 /////////////////////////////////////////////////////////////////////////////
81 Session::~Session()
82 {
83 for (SubscriptionManagerMap::iterator it = subscriptionManager.begin();
84 it != subscriptionManager.end(); ++it)
85 delete it->second;
86
87 server->sessionClosed(this);
88 }
89
90 /////////////////////////////////////////////////////////////////////////////
91 void Session::getSessionStatistics(PdServ::SessionStatistics &stats) const
92 {
93 std::ostringstream os;
94 if (peer.empty())
95 stats.remote = tcp.peer;
96 else
97 stats.remote = peer + " (" + tcp.peer +')';
98 stats.client = client;
99 stats.countIn = tcp.inBytes;
100 stats.countOut = tcp.outBytes;
101 stats.connectedTime = connectedTime;
102 }
103
104 /////////////////////////////////////////////////////////////////////////////
105 const struct timespec *Session::getTaskTime (const PdServ::Task *task) const
106 {
107 return subscriptionManager.find(task)->second->taskTime;
108 }
109
110 /////////////////////////////////////////////////////////////////////////////
111 const PdServ::TaskStatistics *Session::getTaskStatistics (
112 const PdServ::Task *task) const
113 {
114 return subscriptionManager.find(task)->second->taskStatistics;
115 }
116
117 /////////////////////////////////////////////////////////////////////////////
118 void Session::broadcast(Session *, const struct timespec& ts,
119 const std::string& action, const std::string &message)
120 {
121 ost::SemaphoreLock lock(mutex);
122
123 Broadcast *b = new Broadcast;
124 b->ts = ts;
125 b->action = action;
126 b->message = message;
127 broadcastList.push_back(b);
128 }
129
130 /////////////////////////////////////////////////////////////////////////////
131 void Session::setAIC(const Parameter *p)
132 {
133 ost::SemaphoreLock lock(mutex);
134 aic.insert(p->mainParam);
135
136 if (!aicDelay)
137 aicDelay = 5; // 2Hz AIC
138 }
139
140 /////////////////////////////////////////////////////////////////////////////
141 void Session::parameterChanged(const Parameter *p)
142 {
143 ost::SemaphoreLock lock(mutex);
144 changedParameter.insert(p);
145 }
146
147 /////////////////////////////////////////////////////////////////////////////
148 void Session::initial()
149 {
150 log4cplus::getNDC().push(LOG4CPLUS_STRING_TO_TSTRING(tcp.peer));
151
152 LOG4CPLUS_INFO_STR(server->log, LOG4CPLUS_TEXT("New session"));
153
154 // Get the hostname
155 char hostname[HOST_NAME_MAX+1];
156 if (gethostname(hostname, HOST_NAME_MAX)) {
157 if (errno == ENAMETOOLONG)
158 hostname[HOST_NAME_MAX] = '\0';
159 else
160 strcpy(hostname,"unknown");
161 }
162
163 // Greet the new client
164 {
165 XmlElement greeting(tcp.createElement("connected"));
166 XmlElement::Attribute(greeting, "name") << "MSR";
167 XmlElement::Attribute(greeting, "host")
168 << reinterpret_cast<const char*>(hostname);
169 XmlElement::Attribute(greeting, "app") << main->name;
170 XmlElement::Attribute(greeting, "appversion") << main->version;
171 XmlElement::Attribute(greeting, "version") << MSR_VERSION;
172 XmlElement::Attribute(greeting, "features") << MSR_FEATURES;
173 XmlElement::Attribute(greeting, "recievebufsize") << 100000000;
174 }
175 }
176
177 /////////////////////////////////////////////////////////////////////////////
178 void Session::final()
179 {
180 LOG4CPLUS_INFO_STR(server->log, LOG4CPLUS_TEXT("Finished session"));
181 log4cplus::getNDC().remove();
182 }
183
184 /////////////////////////////////////////////////////////////////////////////
185 void Session::run()
186 {
187 ssize_t n;
188 XmlParser inbuf;
189
190 while (xmlstream.good()) {
191 tcp.pubsync();
192
193 try {
194 n = tcp.read(inbuf.bufptr(), inbuf.free(), 100);
195
196 if (!n) {
197 LOG4CPLUS_DEBUG_STR(server->log,
198 LOG4CPLUS_TEXT("Client closed connection"));
199 return;
200 }
201 }
202 catch (ost::Socket *s) {
203 LOG4CPLUS_FATAL(server->log,
204 LOG4CPLUS_TEXT("Socket error ") << s->getErrorNumber());
205 return;
206 }
207 catch (std::exception& e) {
208 LOG4CPLUS_FATAL(server->log,
209 LOG4CPLUS_TEXT("Exception occurred: ")
210 << LOG4CPLUS_C_STR_TO_TSTRING(e.what()));
211 return;
212 }
213 catch (...) {
214 LOG4CPLUS_FATAL_STR(server->log,
215 LOG4CPLUS_TEXT("Aborting on unknown exception"));
216 return;
217 }
218
219 if (n > 0) {
220 LOG4CPLUS_DEBUG(server->log,
221 LOG4CPLUS_TEXT("Rx: ")
222 << LOG4CPLUS_STRING_TO_TSTRING(
223 std::string(inbuf.bufptr(), n)));
224
225 inbuf.newData(n);
226 XmlParser::Element command;
227
228 while ((command = inbuf.nextElement())) {
229 command.getString("id", tcp.commandId);
230 processCommand(command);
231
232 if (!tcp.commandId.empty()) {
233 XmlElement ack(tcp.createElement("ack"));
234 XmlElement::Attribute(ack,"id").setEscaped(
235 tcp.commandId.c_str());
236
237 tcp.commandId.clear();
238 }
239 }
240 }
241
242 // End here for polite conversations
243 if (polite) {
244 ost::SemaphoreLock lock(mutex);
245 changedParameter.clear();
246 broadcastList.clear();
247 aic.clear();
248 continue;
249 }
250
251 // Collect all asynchronous events while holding mutex
252 ParameterSet cp;
253 BroadcastList broadcastList;
254 {
255 // Create an environment for mutex lock. This lock should be kept
256 // as short as possible, and especially not when writing to the
257 // output stream
258 ost::SemaphoreLock lock(mutex);
259
260 if (aicDelay)
261 --aicDelay;
262
263 ParameterSet::iterator it2, it = changedParameter.begin();
264 while (it != changedParameter.end()) {
265 it2 = it++;
266 if (!aicDelay or aic.find((*it2)->mainParam) == aic.end()) {
267 cp.insert(*it2);
268 changedParameter.erase(it2);
269 }
270 }
271
272 std::swap(this->broadcastList, broadcastList);
273 }
274
275 // Write all asynchronous events to the client
276 {
277 for ( ParameterSet::iterator it = cp.begin();
278 it != cp.end(); ++it) {
279 XmlElement pu(tcp.createElement("pu"));
280 XmlElement::Attribute(pu, "index") << (*it)->index;
281 }
282
283 for ( BroadcastList::const_iterator it = broadcastList.begin();
284 it != broadcastList.end(); ++it) {
285
286 XmlElement broadcast(tcp.createElement("broadcast"));
287
288 XmlElement::Attribute(broadcast, "time") << (*it)->ts;
289
290 if (!(*it)->action.empty())
291 XmlElement::Attribute(broadcast, "action").setEscaped(
292 (*it)->action.c_str());
293
294 if (!(*it)->message.empty())
295 XmlElement::Attribute(broadcast, "text").setEscaped(
296 (*it)->message.c_str());
297
298 delete *it;
299 }
300 }
301
302 for (SubscriptionManagerMap::iterator it = subscriptionManager.begin();
303 it != subscriptionManager.end(); ++it)
304 it->second->rxPdo(tcp, quiet);
305
306 const PdServ::Event* mainEvent;
307 bool state;
308 struct timespec t;
309 size_t index;
310 while ((mainEvent = main->getNextEvent(this, &index, &state, &t))) {
311 Event::toXml( tcp, mainEvent, index, state, t);
312 }
313 }
314
315 LOG4CPLUS_FATAL_STR(server->log,
316 LOG4CPLUS_TEXT("Error occurred in output stream"));
317 }
318
319 /////////////////////////////////////////////////////////////////////////////
320 void Session::processCommand(const XmlParser::Element& cmd)
321 {
322 const char *command = cmd.getCommand();
323 size_t commandLen = strlen(command);
324
325 static const struct {
326 size_t len;
327 const char *name;
328 void (Session::*func)(const XmlParser::Element&);
329 } cmds[] = {
330 // First list most common commands
331 { 4, "ping", &Session::ping },
332 { 2, "rs", &Session::readStatistics },
333 { 2, "wp", &Session::writeParameter },
334 { 2, "rp", &Session::readParameter },
335 { 4, "xsad", &Session::xsad },
336 { 4, "xsod", &Session::xsod },
337 { 4, "echo", &Session::echo },
338
339 // Now comes the rest
340 { 2, "rc", &Session::readChannel },
341 { 2, "rk", &Session::readChannel },
342 { 3, "rpv", &Session::readParamValues },
343 { 4, "list", &Session::listDirectory },
344 { 9, "broadcast", &Session::broadcast },
345 {11, "remote_host", &Session::remoteHost },
346 {12, "read_kanaele", &Session::readChannel },
347 {12, "read_statics", &Session::readStatistics },
348 {14, "read_parameter", &Session::readParameter },
349 {15, "read_statistics", &Session::readStatistics },
350 {15, "write_parameter", &Session::writeParameter },
351 {17, "read_param_values", &Session::readParamValues },
352 {0, 0, 0},
353 };
354
355 // Go through the command list
356 for (size_t idx = 0; cmds[idx].len; idx++) {
357 // Check whether the lengths fit and the string matches
358 if (commandLen == cmds[idx].len
359 and !strcmp(cmds[idx].name, command)) {
360
361 LOG4CPLUS_TRACE(server->log,
362 LOG4CPLUS_C_STR_TO_TSTRING(cmds[idx].name));
363
364 // Call the method
365 (this->*cmds[idx].func)(cmd);
366
367 // Finished
368 return;
369 }
370 }
371
372 LOG4CPLUS_WARN(server->log,
373 LOG4CPLUS_TEXT("Unknown command <")
374 << LOG4CPLUS_C_STR_TO_TSTRING(command)
375 << LOG4CPLUS_TEXT(">"));
376
377
378 // Unknown command warning
379 XmlElement warn(tcp.createElement("warn"));
380 XmlElement::Attribute(warn, "num") << 1000;
381 XmlElement::Attribute(warn, "text") << "unknown command";
382 XmlElement::Attribute(warn, "command").setEscaped(command);
383 }
384
385 /////////////////////////////////////////////////////////////////////////////
386 void Session::broadcast(const XmlParser::Element& cmd)
387 {
388 struct timespec ts;
389 std::string action, text;
390
391 main->gettime(&ts);
392 cmd.getString("action", action);
393 cmd.getString("text", text);
394
395 server->broadcast(this, ts, action, text);
396 }
397
398 /////////////////////////////////////////////////////////////////////////////
399 void Session::echo(const XmlParser::Element& cmd)
400 {
401 echoOn = cmd.isTrue("value");
402 }
403
404 /////////////////////////////////////////////////////////////////////////////
405 void Session::ping(const XmlParser::Element& /*cmd*/)
406 {
407 tcp.createElement("ping");
408 }
409
410 /////////////////////////////////////////////////////////////////////////////
411 void Session::readChannel(const XmlParser::Element& cmd)
412 {
413 const Channel *c = 0;
414 bool shortReply = cmd.isTrue("short");
415 std::string name;
416 unsigned int index;
417
418 if (cmd.getString("name", name)) {
419 c = server->find<Channel>(name);
420 if (!c)
421 return;
422 }
423 else if (cmd.getUnsigned("index", index)) {
424 c = server->getChannel(index);
425 if (!c)
426 return;
427 }
428
429 // A single signal was requested
430 if (c) {
431 char buf[c->signal->memSize];
432
433 c->signal->getValue(this, buf);
434
435 XmlElement channel(tcp.createElement("channel"));
436 c->setXmlAttributes(channel, shortReply, buf, 16);
437
438 return;
439 }
440
441 const Server::Channels& chanList = server->getChannels();
442 XmlElement channels(tcp.createElement("channels"));
443 for (Server::Channels::const_iterator it = chanList.begin();
444 it != chanList.end(); it++) {
445 if ((*it)->hidden)
446 continue;
447
448 XmlElement el(channels.createChild("channel"));
449 (*it)->setXmlAttributes( el, shortReply, 0, 16);
450 }
451 }
452
453 /////////////////////////////////////////////////////////////////////////////
454 void Session::listDirectory(const XmlParser::Element& cmd)
455 {
456 const char *path;
457
458 if (!cmd.find("path", path))
459 return;
460
461 XmlElement element(tcp.createElement("listing"));
462 server->list(this, element, path);
463 }
464
465 /////////////////////////////////////////////////////////////////////////////
466 void Session::readParameter(const XmlParser::Element& cmd)
467 {
468 bool shortReply = cmd.isTrue("short");
469 bool hex = cmd.isTrue("hex");
470 std::string name;
471 unsigned int index;
472
473 const Parameter *p = 0;
474 if (cmd.getString("name", name)) {
475 p = server->find<Parameter>(name);
476 if (!p)
477 return;
478 }
479 else if (cmd.getUnsigned("index", index)) {
480 p = server->getParameter(index);
481 if (!p)
482 return;
483 }
484
485 if (p) {
486 char buf[p->mainParam->memSize];
487 struct timespec ts;
488
489 p->mainParam->getValue(this, buf, &ts);
490
491 std::string id;
492 cmd.getString("id", id);
493
494 XmlElement xml(tcp.createElement("parameter"));
495 p->setXmlAttributes(xml, buf, ts, shortReply, hex, 16);
496
497 return;
498 }
499
500 XmlElement parametersElement(tcp.createElement("parameters"));
501
502 const Server::Parameters& parameters = server->getParameters();
503 Server::Parameters::const_iterator it = parameters.begin();
504 while ( it != parameters.end()) {
505 const PdServ::Parameter* mainParam = (*it)->mainParam;
506 char buf[mainParam->memSize];
507 struct timespec ts;
508
509 if ((*it)->hidden) {
510 ++it;
511 continue;
512 }
513
514 mainParam->getValue(this, buf, &ts);
515
516 while (it != parameters.end() and mainParam == (*it)->mainParam) {
517 XmlElement xml(parametersElement.createChild("parameter"));
518 (*it++)->setXmlAttributes(xml, buf, ts, shortReply, hex, 16);
519 }
520 }
521 }
522
523 /////////////////////////////////////////////////////////////////////////////
524 void Session::readParamValues(const XmlParser::Element& /*cmd*/)
525 {
526 XmlElement param_values(tcp.createElement("param_values"));
527 XmlElement::Attribute values(param_values, "value");
528
529 const Server::Parameters& parameters = server->getParameters();
530 Server::Parameters::const_iterator it = parameters.begin();
531 while ( it != parameters.end()) {
532 const PdServ::Parameter* mainParam = (*it)->mainParam;
533 char buf[mainParam->memSize];
534 struct timespec ts;
535
536 mainParam->getValue(this, buf, &ts);
537
538 if (it != parameters.begin())
539 values << ';';
540 values.csv(*it, buf, 1, 16);
541
542 while (it != parameters.end() and mainParam == (*it)->mainParam)
543 ++it;
544 }
545 }
546
547 /////////////////////////////////////////////////////////////////////////////
548 void Session::readStatistics(const XmlParser::Element& /*cmd*/)
549 {
550 // <clients>
551 // <client index="0" name="lansim"
552 // apname="Persistent Manager, Version: 0.3.1"
553 // countin="19908501" countout="27337577"
554 // connectedtime="1282151176.659208"/>
555 // <client index="1" .../>
556 // </clients>
557 typedef std::list<PdServ::SessionStatistics> StatList;
558 StatList stats;
559 main->getSessionStatistics(stats);
560
561 XmlElement clients(tcp.createElement("clients"));
562 for (StatList::const_iterator it = stats.begin();
563 it != stats.end(); it++) {
564 XmlElement client(clients.createChild("client"));
565 XmlElement::Attribute(client,"name").setEscaped(
566 (*it).remote.size() ? (*it).remote.c_str() : "unknown");
567 XmlElement::Attribute(client,"apname").setEscaped(
568 (*it).client.size() ? (*it).client.c_str() : "unknown");
569 XmlElement::Attribute(client,"countin") << (*it).countIn;
570 XmlElement::Attribute(client,"countout") << (*it).countOut;
571 XmlElement::Attribute(client,"connectedtime") << (*it).connectedTime;
572 }
573 }
574
575 /////////////////////////////////////////////////////////////////////////////
576 void Session::remoteHost(const XmlParser::Element& cmd)
577 {
578 cmd.getString("name", peer);
579
580 cmd.getString("applicationname", client);
581
582 writeAccess = cmd.isEqual("access", "allow") or cmd.isTrue("access");
583 polite = cmd.isTrue("polite");
584
585 LOG4CPLUS_INFO(server->log,
586 LOG4CPLUS_TEXT("Logging in ")
587 << LOG4CPLUS_STRING_TO_TSTRING(peer)
588 << LOG4CPLUS_TEXT(" application ")
589 << LOG4CPLUS_STRING_TO_TSTRING(client)
590 << LOG4CPLUS_TEXT(" writeaccess=")
591 << writeAccess);
592 }
593
594 /////////////////////////////////////////////////////////////////////////////
595 void Session::writeParameter(const XmlParser::Element& cmd)
596 {
597 if (!writeAccess) {
598 XmlElement warn(tcp.createElement("warn"));
599 XmlElement::Attribute(warn, "text") << "No write access";
600 return;
601 }
602
603 const Parameter *p = 0;
604
605 unsigned int index;
606 std::string name;
607 if (cmd.getString("name", name)) {
608 p = server->find<Parameter>(name);
609 }
610 else if (cmd.getUnsigned("index", index)) {
611 p = server->getParameter(index);
612 }
613
614 if (!p)
615 return;
616
617 unsigned int startindex = 0;
618 if (cmd.getUnsigned("startindex", startindex)) {
619 if (startindex >= p->dim.nelem)
620 return;
621 }
622
623 if (cmd.isTrue("aic"))
624 server->setAic(p);
625
626 int errnum;
627 const char *s;
628 size_t count;
629 if (cmd.find("hexvalue", s)) {
630 errnum = p->setHexValue(this, s, startindex, count);
631 }
632 else if (cmd.find("value", s)) {
633 errnum = p->setDoubleValue(this, s, startindex, count);
634 }
635 else
636 return;
637
638 if (errnum) {
639 // If an error occurred, tell this client to reread the value
640 parameterChanged(p);
641 }
642 }
643
644 /////////////////////////////////////////////////////////////////////////////
645 void Session::xsad(const XmlParser::Element& cmd)
646 {
647 unsigned int reduction, blocksize, precision;
648 bool base64 = cmd.isEqual("coding", "Base64");
649 bool event = cmd.isTrue("event");
650 bool foundReduction = false, foundBlocksize = false;
651 std::list<unsigned int> indexList;
652 const Server::Channels& channel = server->getChannels();
653
654 if (cmd.isTrue("sync")) {
655 for (SubscriptionManagerMap::iterator it = subscriptionManager.begin();
656 it != subscriptionManager.end(); ++it)
657 it->second->sync();
658 quiet = false;
659 }
660 else {
661 // Quiet will stop all transmission of <data> tags until
662 // sync is called
663 quiet = cmd.isTrue("quiet");
664 }
665
666 if (!cmd.getUnsignedList("channels", indexList))
667 return;
668
669 if (cmd.getUnsigned("reduction", reduction)) {
670 if (!reduction) {
671 XmlElement warn(tcp.createElement("warn"));
672 XmlElement::Attribute(warn, "command") << "xsad";
673 XmlElement::Attribute(warn, "text")
674 << "specified reduction=0, choosing reduction=1";
675
676 reduction = 1;
677 }
678
679 foundReduction = true;
680 }
681
682 if (cmd.getUnsigned("blocksize", blocksize)) {
683 if (!blocksize) {
684 XmlElement warn(tcp.createElement("warn"));
685 XmlElement::Attribute(warn, "command") << "xsad";
686 XmlElement::Attribute(warn, "text")
687 << "specified blocksize=0, choosing blocksize=1";
688
689 blocksize = 1;
690 }
691
692 foundBlocksize = true;
693 }
694
695 if (!cmd.getUnsigned("precision", precision))
696 precision = 16;
697
698 for (std::list<unsigned int>::const_iterator it = indexList.begin();
699 it != indexList.end(); it++) {
700 if (*it >= channel.size())
701 continue;
702
703 const PdServ::Signal *mainSignal = channel[*it]->signal;
704
705 if (event) {
706 if (!foundReduction)
707 // If user did not supply a reduction, limit to a
708 // max of 10Hz automatically
709 reduction = static_cast<unsigned>(
710 0.1 / mainSignal->sampleTime
711 / mainSignal->decimation + 0.5);
712 }
713 else if (!foundReduction or !foundBlocksize) {
714 // Quite possibly user input; choose reduction for 1Hz
715
716 if (!foundBlocksize)
717 blocksize = 1;
718
719 if (!foundReduction)
720 reduction = static_cast<unsigned>(
721 1.0 / mainSignal->sampleTime
722 / mainSignal->decimation
723 / blocksize + 0.5);
724 }
725
726 double ts = mainSignal->sampleTime;
727 //log_debug("Subscribe to signal %s %i %f", channel[*it]->path().c_str(),
728 //*it, ts);
729 subscriptionManager[main->getTask(ts)]->subscribe( channel[*it], event,
730 reduction * mainSignal->decimation,
731 blocksize, base64, precision);
732 }
733 }
734
735 /////////////////////////////////////////////////////////////////////////////
736 void Session::xsod(const XmlParser::Element& cmd)
737 {
738 std::list<unsigned int> intList;
739
740 //cout << __LINE__ << "xsod: " << endl;
741
742 if (cmd.getUnsignedList("channels", intList)) {
743 const Server::Channels& channel = server->getChannels();
744 for (std::list<unsigned int>::const_iterator it = intList.begin();
745 it != intList.end(); it++) {
746 if (*it < channel.size()) {
747 double ts = channel[*it]->signal->sampleTime;
748 subscriptionManager[main->getTask(ts)]->unsubscribe(
749 channel[*it]);
750 }
751 }
752 }
753 else
754 for (SubscriptionManagerMap::iterator it = subscriptionManager.begin();
755 it != subscriptionManager.end(); ++it)
756 it->second->clear();
757 }
758
759 /////////////////////////////////////////////////////////////////////////////
760 /////////////////////////////////////////////////////////////////////////////
761 Session::TCPStream::TCPStream( ost::TCPSocket *server):
762 Socket(::accept(server->getSocket(), NULL, NULL)), os(this)
763 {
764 file = 0;
765
766 ost::tpport_t port;
767 ost::IPV4Host peer = getPeer(&port);
768
769 std::ostringstream os;
770 os << peer << ':' << port;
771 this->peer = os.str();
772
773 if (!server->onAccept(peer, port)) {
774 error(errConnectRejected);
775 return;
776 }
777
778 file = ::fdopen(so, "w");
779 if (!file) {
780 error(errOutput, 0, errno);
781 return;
782 }
783
784 // Set socket to non-blocking mode
785 //setCompletion(false);
786
787 Socket::state = CONNECTED;
788 inBytes = 0;
789 outBytes = 0;
790 }
791
792 /////////////////////////////////////////////////////////////////////////////
793 Session::TCPStream::~TCPStream()
794 {
795 ::fclose(file);
796 }
797
798 /////////////////////////////////////////////////////////////////////////////
799 XmlElement Session::TCPStream::createElement(const char* name)
800 {
801 return XmlElement(name, os, 0, &commandId);
802 }
803
804 /////////////////////////////////////////////////////////////////////////////
805 int Session::TCPStream::read(char *buf, size_t count, timeout_t timeout)
806 {
807 ssize_t n;
808 try {
809 n = readData(buf, count, 0, timeout);
810 }
811 catch (Socket *s) {
812 if (s->getErrorNumber() != errTimeout)
813 throw(s);
814 setError(true);
815 return -1;
816 }
817
818 inBytes += n;
819
820 return n;
821 }
822
823 /////////////////////////////////////////////////////////////////////////////
824 int Session::TCPStream::overflow ( int c )
825 {
826 if (::fputc(c, file) == EOF)
827 return EOF;
828
829 ++outBytes;
830
831 return c;
832 }
833
834 /////////////////////////////////////////////////////////////////////////////
835 std::streamsize Session::TCPStream::xsputn (
836 const char * s, std::streamsize count)
837 {
838 if (::fwrite(s, 1, count, file) != static_cast<size_t>(count))
839 return 0;
840
841 outBytes += count;
842
843 return count;
844 }
845
846 /////////////////////////////////////////////////////////////////////////////
847 int Session::TCPStream::sync ()
848 {
849 return ::fflush(file);
850 3 }
851