PdCom  5.3
Process data communication client
advanced_example.cpp

Base class for receiving notifications.This class is in charge of passing notifications from the library to client code. Make sure that the subscriber outlives all assigned subscriptions. Otherwise use-after-free bugs will occur, so be careful.

The Subscriber class is also used to group subscriptions with the same transmission mode. The newValues method is called everytime, after all active subscriptions got their values updated for one realtime cycle.

/*****************************************************************************
*
* Copyright (C) 2021 Bjarne von Horn (vh at igh dot de).
*
* This file is part of the PdCom library.
*
* The PdCom library is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at your
* option) any later version.
*
* The PdCom library is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
* License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with the PdCom library. If not, see <http://www.gnu.org/licenses/>.
*
*****************************************************************************/
/* Advanced PdCom example.
*
* This is an interactive sample application with two signals and one parameter.
* The parameter can be changed via keyboard. If you press 'q', the application
* exits.
*
* The Process basically has the following states:
* -# waiting for connected
* -# waiting for both subscriptions and the parameter
* -# active
* -# finished
*
* The connected() callback switches from state 1 to 2. asyncData() is then
* called until all subscriptions and the parameter are available. After that,
* keyboard inputs are processed and subscription updates are shown until the
* 'q' key is pressed. Then, everything is torn down.
*/
#include <cassert>
#include <cstdio>
#include <iostream>
#include <pdcom5/Process.h>
#include <termios.h>
#include <unistd.h>
#include <unordered_set>
// RAII helper to configure the shell. Nothing to do with PdCom.
class TerminalGuard
{
termios old_;
public:
TerminalGuard()
{
tcgetattr(STDIN_FILENO, &old_);
termios newt = old_;
newt.c_lflag &= ~(ECHO | ICANON);
tcsetattr(STDIN_FILENO, TCSANOW, &newt);
}
~TerminalGuard() { tcsetattr(STDIN_FILENO, TCSANOW, &old_); }
};
template <class Future>
class FutureManager
{
std::unordered_set<Future> futures_;
public:
FutureManager() = default;
const Future &push(Future future)
{
auto ans = futures_.insert(std::move(future));
return *ans.first;
}
void pop(const Future &future) { futures_.erase(future); }
};
class MyProcess :
{
// Two subscriptions, they will be destructed by ~MyProcess before the
// Subscriber part of MyProcess goes away.
FutureManager<PdCom::Variable::SetValueFuture> setValue_futures_;
int read(char *buf, int count) override
{
const int ans = posixRead(buf, count);
// stop on EOF
if (ans == 0)
running_ = false;
return ans;
}
void write(const char *buf, size_t count) override
{
posixWriteBuffered(buf, count);
}
void flush() override { posixFlush(); }
public:
MyProcess(const char *host = "127.0.0.1", unsigned short port = 2345) :
PdCom::Process(),
PdCom::PosixProcess(host, port),
PdCom::Subscriber(
PdCom::event_mode) //{std::chrono::milliseconds(100)})
{}
void execute();
void connected() override;
void findReply(const PdCom::Variable &var) override
{
// the parameter has arrived
p1_ = var;
assert(!p1_.empty());
std::cout << "Found parameter!" << std::endl;
}
void stateChanged(PdCom::Subscription const &s) override
{
// the state of one of our subscriptions has changed.
using State = PdCom::Subscription::State;
if (s.getState() == State::Active) {
if (&s == &s1_)
s1_active_ = true;
if (&s == &s2_)
s2_active_ = true;
}
else if (s.getState() == State::Invalid) {
std::cout << "Invalid subscription!" << std::endl;
running_ = false;
}
}
void newValues(std::chrono::nanoseconds /* time_ns */) override
{
std::cout << "New Data: ";
s1_.print(std::cout, ',');
std::cout << " and ";
s2_.print(std::cout, ',');
std::cout << "\n";
}
bool s1_active_ = false, s2_active_ = false;
bool running_ = true;
bool connected_ = false;
};
void MyProcess::connected()
{
// connection is established, start subscriptions and query the server for
// the parameter.
std::cout << "Connected!" << std::endl;
s1_ = PdCom::Subscription(*this, *this, "/osc/cos");
s2_ = PdCom::Subscription(*this, *this, "/osc/sin");
find("/parameter01");
connected_ = true;
}
void MyProcess::execute()
{
fd_set fds;
const int max_fd = std::max<int>(fd_, STDIN_FILENO);
TerminalGuard tg;
// wait until everything is set up
while (running_ and !(s1_active_ and s2_active_ and !p1_.empty()))
asyncData();
// now we're ready
std::cout << "Ready to rumble!" << std::endl;
while (running_) {
FD_ZERO(&fds);
FD_SET(fd_, &fds);
FD_SET(STDIN_FILENO, &fds);
select(max_fd + 1, &fds, NULL, NULL, NULL);
// process input
if (FD_ISSET(STDIN_FILENO, &fds) and !p1_.empty()) {
char buf;
std::cin.read(&buf, 1);
if (buf == 'q') {
break;
}
const auto &future = setValue_futures_.push(
p1_.setValue(static_cast<unsigned char>(buf)));
future.then([&future, buf, this]() {
std::cout << "Changed Parameter to " << buf << std::endl;
this->setValue_futures_.pop(future);
});
future.handle_exception([&future,
this](PdCom::Exception const &ex) {
std::cout << "Future got exception " << ex.what() << std::endl;
this->running_ = false;
this->setValue_futures_.pop(future);
});
}
// ask PdCom to process incoming data
if (FD_ISSET(fd_, &fds)) {
asyncData();
}
}
}
int main(int argc, char **argv)
{
// usage: example2 <host> <port>
MyProcess p(
argc >= 2 ? argv[1] : "127.0.0.1",
argc >= 3 ? strtoul(argv[2], nullptr, 10) : 2345);
p.execute();
}