Base class for receiving notifications.This class is in charge of passing notifications from the library to client code. Make sure that the subscriber outlives all assigned subscriptions. Otherwise use-after-free bugs will occur, so be careful.
The Subscriber class is also used to group subscriptions with the same transmission mode. The newValues method is called everytime, after all active subscriptions got their values updated for one realtime cycle.
#include <cassert>
#include <cstdio>
#include <iostream>
#include <termios.h>
#include <unistd.h>
#include <unordered_set>
class TerminalGuard
{
termios old_;
public:
TerminalGuard()
{
tcgetattr(STDIN_FILENO, &old_);
termios newt = old_;
newt.c_lflag &= ~(ECHO | ICANON);
tcsetattr(STDIN_FILENO, TCSANOW, &newt);
}
~TerminalGuard() { tcsetattr(STDIN_FILENO, TCSANOW, &old_); }
};
template <class Future>
class FutureManager
{
std::unordered_set<Future> futures_;
public:
FutureManager() = default;
const Future &push(Future future)
{
auto ans = futures_.insert(std::move(future));
return *ans.first;
}
void pop(const Future &future) { futures_.erase(future); }
};
class MyProcess :
{
FutureManager<PdCom::Variable::SetValueFuture> setValue_futures_;
int read(
char *buf,
int count)
override {
if (ans == 0)
running_ = false;
return ans;
}
void write(
const char *buf,
size_t count)
override {
}
public:
MyProcess(const char *host = "127.0.0.1", unsigned short port = 2345) :
PdCom::PosixProcess(host, port),
{}
void execute();
{
p1_ = var;
std::cout << "Found parameter!" << std::endl;
}
{
using State = PdCom::Subscription::State;
if (&s == &s1_)
s1_active_ = true;
if (&s == &s2_)
s2_active_ = true;
}
else if (s.
getState() == State::Invalid) {
std::cout << "Invalid subscription!" << std::endl;
running_ = false;
}
}
void newValues(std::chrono::nanoseconds ) override
{
std::cout << "New Data: ";
s1_.
print(std::cout,
',');
std::cout << " and ";
s2_.
print(std::cout,
',');
std::cout << "\n";
}
bool s1_active_ = false, s2_active_ = false;
bool running_ = true;
bool connected_ = false;
};
void MyProcess::connected()
{
std::cout << "Connected!" << std::endl;
find("/parameter01");
connected_ = true;
}
void MyProcess::execute()
{
fd_set fds;
const int max_fd = std::max<int>(fd_, STDIN_FILENO);
TerminalGuard tg;
while (running_ and !(s1_active_ and s2_active_ and !p1_.
empty()))
asyncData();
std::cout << "Ready to rumble!" << std::endl;
while (running_) {
FD_ZERO(&fds);
FD_SET(fd_, &fds);
FD_SET(STDIN_FILENO, &fds);
select(max_fd + 1, &fds, NULL, NULL, NULL);
if (FD_ISSET(STDIN_FILENO, &fds) and !p1_.
empty()) {
char buf;
std::cin.read(&buf, 1);
if (buf == 'q') {
break;
}
const auto &future = setValue_futures_.push(
p1_.
setValue(static_cast<unsigned char>(buf)));
future.then([&future, buf, this]() {
std::cout << "Changed Parameter to " << buf << std::endl;
this->setValue_futures_.pop(future);
});
future.handle_exception([&future,
std::cout << "Future got exception " << ex.what() << std::endl;
this->running_ = false;
this->setValue_futures_.pop(future);
});
}
if (FD_ISSET(fd_, &fds)) {
asyncData();
}
}
}
int main(int argc, char **argv)
{
MyProcess p(
argc >= 2 ? argv[1] : "127.0.0.1",
argc >= 3 ? strtoul(argv[2], nullptr, 10) : 2345);
p.execute();
}