| Directory: | ./ |
|---|---|
| File: | pdcom5/src/Process.h |
| Date: | 2025-10-26 04:10:09 |
| Exec | Total | Coverage | |
|---|---|---|---|
| Lines: | 69 | 71 | 97.2% |
| Branches: | 22 | 40 | 55.0% |
| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | /***************************************************************************** | ||
| 2 | * vim:tw=78 | ||
| 3 | * | ||
| 4 | * Copyright (C) 2021 Richard Hacker (lerichi at gmx dot net), | ||
| 5 | * Florian Pose (fp at igh dot de), | ||
| 6 | * Bjarne von Horn (vh at igh dot de). | ||
| 7 | * | ||
| 8 | * This file is part of the PdCom library. | ||
| 9 | * | ||
| 10 | * The PdCom library is free software: you can redistribute it and/or modify | ||
| 11 | * it under the terms of the GNU Lesser General Public License as published by | ||
| 12 | * the Free Software Foundation, either version 3 of the License, or (at your | ||
| 13 | * option) any later version. | ||
| 14 | * | ||
| 15 | * The PdCom library is distributed in the hope that it will be useful, but | ||
| 16 | * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY | ||
| 17 | * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public | ||
| 18 | * License for more details. | ||
| 19 | * | ||
| 20 | * You should have received a copy of the GNU Lesser General Public License | ||
| 21 | * along with the PdCom library. If not, see <http://www.gnu.org/licenses/>. | ||
| 22 | * | ||
| 23 | *****************************************************************************/ | ||
| 24 | |||
| 25 | |||
| 26 | #ifndef PDCOM5_PROCESS_IMPL_H | ||
| 27 | #define PDCOM5_PROCESS_IMPL_H | ||
| 28 | |||
| 29 | #include "IOLayer.h" | ||
| 30 | #include "ProtocolHandler.h" | ||
| 31 | #include "Subscription.h" | ||
| 32 | |||
| 33 | #include <list> | ||
| 34 | #include <memory> | ||
| 35 | #include <pdcom5/Exception.h> | ||
| 36 | #include <pdcom5/MessageManagerBase.h> | ||
| 37 | #include <pdcom5/Process.h> | ||
| 38 | #include <pdcom5/Sasl.h> | ||
| 39 | #include <pdcom5/Subscription.h> | ||
| 40 | #include <set> | ||
| 41 | |||
| 42 | namespace PdCom { namespace impl { | ||
| 43 | class AuthManager; | ||
| 44 | class Variable; | ||
| 45 | class Subscription; | ||
| 46 | struct Selector; | ||
| 47 | |||
| 48 | /** Base class for PdCom protocol handler | ||
| 49 | * | ||
| 50 | * This is the base class to interact with real time process server. The | ||
| 51 | * PdCom protocol ist implemented using this class. | ||
| 52 | * | ||
| 53 | * For socket input and output, the library completely relies on a | ||
| 54 | * derived class where read(), write(), flush() and connected() methods | ||
| 55 | * are reimplemented. | ||
| 56 | * | ||
| 57 | * When data is available for reading, call asyncData() which in turn | ||
| 58 | * calls the reimplemented read() method. | ||
| 59 | * | ||
| 60 | * When the protocol is initialized, the reimplemented connected() | ||
| 61 | * method is called. Other than startTLS(), login(), none of the command | ||
| 62 | * methods listed below may be called prior to connected(). | ||
| 63 | * | ||
| 64 | * After connected(), the following commands can be issued: * list(): | ||
| 65 | * list a directory; returns result in listReply() * find(): find a | ||
| 66 | * variable; returns result in findReply() * login(): start a SASL login | ||
| 67 | * interaction; returns result in loginReply() * ping(): ping the | ||
| 68 | * server; returns result in pingReply() * getMessage(): request a | ||
| 69 | * specific message result in multiple calls to processMessage() if | ||
| 70 | * required * Variable interactions * Variable::Subscription | ||
| 71 | * interactions | ||
| 72 | * | ||
| 73 | * startTLS() and login() may only be called prior to connected() when | ||
| 74 | * the library has called startTLSReply() or loginReply() previously. | ||
| 75 | * | ||
| 76 | * All these commands are non-blocking asynchronous calls and either | ||
| 77 | * return the result immediately with the corresponding reply methods or | ||
| 78 | * issue a command to the server using excessive (!) calls to write(). | ||
| 79 | * Data should be written to a buffer to optimize network communication. | ||
| 80 | * To flush the buffer to wire, flush() is issued by the library when | ||
| 81 | * required. | ||
| 82 | * | ||
| 83 | * The server may query presence of the user by issuing an alive() call. | ||
| 84 | * Using this call, certain actions could be undertaken by the server if | ||
| 85 | * the user is not active any more. | ||
| 86 | * | ||
| 87 | * Certain actions in the real time process, such as alarms, triggers a | ||
| 88 | * call to processMessage(). Messages are kept in a ring buffer in the | ||
| 89 | * process. | ||
| 90 | * | ||
| 91 | * Calling activeMessages() will return a list of all active messages as | ||
| 92 | * well as the latest message in activeMessagesReply(). | ||
| 93 | * | ||
| 94 | * Any message can be recalled with getMessage(), the result returned in | ||
| 95 | * getMessageReply(). If the message does not exist, the Message | ||
| 96 | * returned will have the requested \a seqNo with \a path empty. | ||
| 97 | * | ||
| 98 | * The subscribe() method subscribes a variable using the path only. | ||
| 99 | */ | ||
| 100 | |||
| 101 | |||
| 102 | // we need to export this class to allow SecureProcess::Impl to derive from | ||
| 103 | // us | ||
| 104 | class PDCOM5_PUBLIC Process : | ||
| 105 | public std::enable_shared_from_this<Process>, | ||
| 106 | public IOLayer | ||
| 107 | { | ||
| 108 | std::unique_ptr<impl::ProtocolHandler> protocolHandler_; | ||
| 109 | |||
| 110 | 330 | struct RegisteredSubscription | |
| 111 | { | ||
| 112 | std::weak_ptr<impl::Subscription> subscription_; | ||
| 113 | std::string path_; | ||
| 114 | std::shared_ptr<const PdCom::impl::Selector> selector_; | ||
| 115 | }; | ||
| 116 | |||
| 117 | struct RSComparator | ||
| 118 | { | ||
| 119 | 138 | bool operator()( | |
| 120 | RegisteredSubscription const &lhs, | ||
| 121 | RegisteredSubscription const &rhs) const noexcept | ||
| 122 | { | ||
| 123 | 138 | return lhs.subscription_.owner_before(rhs.subscription_); | |
| 124 | } | ||
| 125 | }; | ||
| 126 | |||
| 127 | std::set<RegisteredSubscription, RSComparator> all_subscriptions_; | ||
| 128 | |||
| 129 | public: | ||
| 130 | Process(PdCom::Process *This); | ||
| 131 | 28 | static Process &fromUApi(PdCom::Process &p) { return *p.pimpl; } | |
| 132 | |||
| 133 | /** Destructor | ||
| 134 | * | ||
| 135 | * The destructor cleans up all internally allocated structures | ||
| 136 | */ | ||
| 137 | 158 | virtual ~Process() = default; | |
| 138 | |||
| 139 | |||
| 140 | /** Reset communications and clean up internal buffers */ | ||
| 141 | void reset(); | ||
| 142 | |||
| 143 | /** Name of application user application. | ||
| 144 | * | ||
| 145 | * The application name is transferred to the server to be able to | ||
| 146 | * identify the clients more easily. | ||
| 147 | * | ||
| 148 | * \return a descriptive name of your application. | ||
| 149 | */ | ||
| 150 | 150 | std::string applicationName() const { return This->applicationName(); } | |
| 151 | |||
| 152 | /** Host name of remote server. | ||
| 153 | * | ||
| 154 | * Reimplement this method to return the remote server host name | ||
| 155 | * this library connects to. This is especially important in | ||
| 156 | * multi-hosted TLS environments, where multiple hosts resolv to | ||
| 157 | * the same IP address. TLS needs to know the original server host | ||
| 158 | * name. | ||
| 159 | * | ||
| 160 | * \return server host name | ||
| 161 | */ | ||
| 162 | 150 | std::string hostname() const { return This->hostname(); } | |
| 163 | |||
| 164 | /** Read data from server | ||
| 165 | * | ||
| 166 | * Reimplement this method to transfer data from the server to | ||
| 167 | * the library. This method is called within the call to | ||
| 168 | * asyncData(). | ||
| 169 | * | ||
| 170 | * Essentially this method is a little wrapper around your | ||
| 171 | * socket's `%read()` function: | ||
| 172 | * \code | ||
| 173 | * int MyProcess::read(char *buf, size_t count) | ||
| 174 | * { | ||
| 175 | * return ::read(this->socket_fd, buf, count); | ||
| 176 | * } | ||
| 177 | * \endcode | ||
| 178 | * | ||
| 179 | * The method must return the number of bytes read, which may of | ||
| 180 | * coarse be less than \p count or even 0. Return values of | ||
| 181 | * <= 0 are not interpreted by the protocol handler. | ||
| 182 | * | ||
| 183 | * @param buf data destination | ||
| 184 | * @param count buffer size | ||
| 185 | * | ||
| 186 | * \return | ||
| 187 | * return value of `%read()` function, which in turn will be | ||
| 188 | * returned by asyncData() | ||
| 189 | */ | ||
| 190 | 1353 | int read(char *buf, int count) override { return This->read(buf, count); } | |
| 191 | |||
| 192 | |||
| 193 | void asyncData(); | ||
| 194 | |||
| 195 | /** Write data to server | ||
| 196 | * | ||
| 197 | * Reimplement this method to transfer data from the library to | ||
| 198 | * the server. This method is called when any library | ||
| 199 | * operation requires data to be sent to the server. | ||
| 200 | * | ||
| 201 | * Note: the library makes many calls to write(), so use | ||
| 202 | * buffered output otherwise you're in for performance problems! | ||
| 203 | * | ||
| 204 | * Essentially this method is a little wrapper around your | ||
| 205 | * socket's `%write()` function: | ||
| 206 | * \code | ||
| 207 | * void MyProcess::write(const char *buf, size_t count) | ||
| 208 | * { | ||
| 209 | * if (count != ::fwrite(buf, 1, count, this->socket_file)) { | ||
| 210 | * // react to errors, set flags, etc | ||
| 211 | * } | ||
| 212 | * } | ||
| 213 | * \endcode | ||
| 214 | * | ||
| 215 | * Note: the library does not have an internal buffer and expects | ||
| 216 | * that all data is sent. If your implementation might send less | ||
| 217 | * than \p count, it is your responsibility to buffer the data and | ||
| 218 | * send it later. | ||
| 219 | * | ||
| 220 | * @param buf data to be sent | ||
| 221 | * @param count number of bytes to send | ||
| 222 | */ | ||
| 223 | 15731 | void write(const char *buf, size_t count) override | |
| 224 | { | ||
| 225 | 15731 | This->write(buf, count); | |
| 226 | 15731 | } | |
| 227 | |||
| 228 | /** Flush unsent data in output buffer | ||
| 229 | * | ||
| 230 | * Reimplement this method to flush data in the output buffer. | ||
| 231 | * | ||
| 232 | * This method tells the user that it is time to flush the | ||
| 233 | * output buffer to the wire. The library only expects that data | ||
| 234 | * is sent to the server within this call. | ||
| 235 | * | ||
| 236 | * Essentially this method is a little wrapper around your | ||
| 237 | * socket's `fflush()` function: | ||
| 238 | * \code | ||
| 239 | * void MyProcess::flush() | ||
| 240 | * { | ||
| 241 | * if (::fflush(this->socket_file)) { | ||
| 242 | * // react to errors | ||
| 243 | * } | ||
| 244 | * } | ||
| 245 | * \endcode | ||
| 246 | */ | ||
| 247 | 514 | void flush() override { This->flush(); } | |
| 248 | |||
| 249 | /** Protocol initialization completed | ||
| 250 | * | ||
| 251 | * This is a signal emitted by the library to indicate that | ||
| 252 | * protocol initialization has been completed and that library | ||
| 253 | * operations can be performed thereafter. | ||
| 254 | * | ||
| 255 | * Reimplement this method to get the signal. | ||
| 256 | * | ||
| 257 | * Absolutely NO process operations other than asyncData(), | ||
| 258 | * startTLS() and login() (and then only due to a previous | ||
| 259 | * loginReply() are permitted before this signal has been | ||
| 260 | * emitted. | ||
| 261 | */ | ||
| 262 | void connected(); | ||
| 263 | |||
| 264 | /** Reply to list() call | ||
| 265 | * | ||
| 266 | * You must reimplement this method to receive replies to list() | ||
| 267 | * calls. | ||
| 268 | * | ||
| 269 | * Note that a variable can have the same path as a directory! | ||
| 270 | * An example is a vector variable with atomized elements. | ||
| 271 | * | ||
| 272 | * Replies are in strict order of list() calls. | ||
| 273 | * | ||
| 274 | * @param variables list of variables | ||
| 275 | * @param directories string list of directories | ||
| 276 | */ | ||
| 277 | 18 | void listReply( | |
| 278 | std::vector<PdCom::Variable> variables, | ||
| 279 | std::vector<std::string> directories) | ||
| 280 | { | ||
| 281 |
1/2✓ Branch 32 taken 18 times.
✗ Branch 33 not taken.
|
18 | This->listReply(std::move(variables), std::move(directories)); |
| 282 | 18 | } | |
| 283 | |||
| 284 | /** Reply to find() | ||
| 285 | * | ||
| 286 | * This virtual method is called within the context of asyncData() | ||
| 287 | * when the server's reply to a variable discovery is processed. | ||
| 288 | * | ||
| 289 | * findReply()ies are called in strict order of find() | ||
| 290 | * | ||
| 291 | * @param variable pointer to Variable; NULL if not found | ||
| 292 | */ | ||
| 293 | 83 | void findReply(const PdCom::Variable &variable) | |
| 294 | { | ||
| 295 | 83 | This->findReply(variable); | |
| 296 | 83 | } | |
| 297 | |||
| 298 | /** Ping reply | ||
| 299 | * | ||
| 300 | * You must reimplement this method to receive the reply to a | ||
| 301 | * ping() call. | ||
| 302 | */ | ||
| 303 | 3 | void pingReply() { This->pingReply(); } | |
| 304 | |||
| 305 | /** Test from process whether client is alive. | ||
| 306 | * | ||
| 307 | * In some cases the server may want to know whether the client | ||
| 308 | * is still alive. Default implementation is to return true. | ||
| 309 | * Reimplement this if you wish to control presence | ||
| 310 | * | ||
| 311 | * \return true to indicate user presence | ||
| 312 | */ | ||
| 313 | bool alive() { return This->alive(); } | ||
| 314 | |||
| 315 | /** Client to server communication semaphore | ||
| 316 | * | ||
| 317 | * Reimplement this method to intercept the transmission | ||
| 318 | * semaphore. The default implementation always returns success. | ||
| 319 | * | ||
| 320 | * Every interaction from client to server (change a parameter, | ||
| 321 | * ping, etc) starts a message tag. If the client is | ||
| 322 | * multi-threaded, more than one message tag could be started | ||
| 323 | * which would lead to corruption. This semaphore can be used to | ||
| 324 | * serialize all client to server interactions. | ||
| 325 | * | ||
| 326 | * @param state true = lock; false = release | ||
| 327 | */ | ||
| 328 | 1324 | void transmitSemaphore(bool /* state */) {} | |
| 329 | 54 | void setAuthManager(Sasl *am) | |
| 330 | { | ||
| 331 |
2/2✓ Branch 3 taken 25 times.
✓ Branch 4 taken 29 times.
|
54 | if (auth_manager_) { |
| 332 | 25 | auth_manager_->process_ = {}; | |
| 333 | } | ||
| 334 | 54 | auth_manager_ = am; | |
| 335 |
2/2✓ Branch 3 taken 29 times.
✓ Branch 4 taken 25 times.
|
54 | if (auth_manager_) |
| 336 | 29 | auth_manager_->process_ = shared_from_this(); | |
| 337 | 54 | } | |
| 338 | |||
| 339 | 134 | void setMessageManager(MessageManagerBase *mm) | |
| 340 | { | ||
| 341 |
2/2✓ Branch 3 taken 25 times.
✓ Branch 4 taken 109 times.
|
134 | if (message_manager_) { |
| 342 | 25 | message_manager_->process_ = {}; | |
| 343 | } | ||
| 344 | 134 | message_manager_ = mm; | |
| 345 |
2/2✓ Branch 3 taken 109 times.
✓ Branch 4 taken 25 times.
|
134 | if (message_manager_) |
| 346 | 109 | message_manager_->process_ = shared_from_this(); | |
| 347 | 134 | } | |
| 348 | |||
| 349 | 90 | void loginReply(const char *mechlist, const char *serverData, int finished) | |
| 350 | { | ||
| 351 |
1/2✓ Branch 3 taken 90 times.
✗ Branch 4 not taken.
|
90 | if (auth_manager_) |
| 352 | 90 | auth_manager_->loginReply(mechlist, serverData, finished); | |
| 353 | 90 | } | |
| 354 | |||
| 355 | 26 | void processMessage(Message message) | |
| 356 | { | ||
| 357 |
1/2✓ Branch 3 taken 26 times.
✗ Branch 4 not taken.
|
26 | if (message_manager_) |
| 358 |
1/2✓ Branch 19 taken 26 times.
✗ Branch 20 not taken.
|
26 | message_manager_->processMessage(std::move(message)); |
| 359 | 26 | } | |
| 360 | |||
| 361 | 16 | void getMessageReply(Message msg) | |
| 362 | { | ||
| 363 |
1/2✓ Branch 3 taken 16 times.
✗ Branch 4 not taken.
|
16 | if (message_manager_) |
| 364 |
1/2✓ Branch 19 taken 16 times.
✗ Branch 20 not taken.
|
16 | message_manager_->getMessageReply(std::move(msg)); |
| 365 | 16 | } | |
| 366 | |||
| 367 | 22 | void activeMessagesReply(std::vector<Message> msgs) | |
| 368 | { | ||
| 369 |
1/2✓ Branch 3 taken 22 times.
✗ Branch 4 not taken.
|
22 | if (message_manager_) |
| 370 |
1/2✓ Branch 19 taken 22 times.
✗ Branch 20 not taken.
|
22 | message_manager_->activeMessagesReply(std::move(msgs)); |
| 371 | 22 | } | |
| 372 | |||
| 373 | 22 | void broadcastReply( | |
| 374 | const std::string &message, | ||
| 375 | const std::string &attr, | ||
| 376 | std::chrono::nanoseconds time_ns, | ||
| 377 | const std::string &user) | ||
| 378 | { | ||
| 379 | 22 | This->broadcastReply(message, attr, time_ns, user); | |
| 380 | 22 | } | |
| 381 | |||
| 382 | 5 | void clientStatisticsReply(std::vector<PdCom::ClientStatistics> statistics) | |
| 383 | { | ||
| 384 |
1/2✓ Branch 28 taken 5 times.
✗ Branch 29 not taken.
|
5 | This->clientStatisticsReply(std::move(statistics)); |
| 385 | 5 | } | |
| 386 | |||
| 387 | template <typename P = ProtocolHandler> | ||
| 388 | 334 | P &protocolHandler() const | |
| 389 | { | ||
| 390 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 334 times.
|
334 | if (!protocolHandler_) |
| 391 | ✗ | throw NotConnected(); | |
| 392 | 334 | return static_cast<P &>(*protocolHandler_); | |
| 393 | } | ||
| 394 | |||
| 395 | template <typename P = ProtocolHandler> | ||
| 396 | 320 | P *castProtocolHandler() const noexcept | |
| 397 | { | ||
| 398 |
1/2✓ Branch 2 taken 79 times.
✗ Branch 3 not taken.
|
320 | return static_cast<P *>(protocolHandler_.get()); |
| 399 | } | ||
| 400 | |||
| 401 | std::shared_ptr<Subscription> subscribe( | ||
| 402 | PdCom::Subscription *subscription, | ||
| 403 | const std::string &path, | ||
| 404 | PdCom::Subscriber &subscriber, | ||
| 405 | const PdCom::Selector &selector); | ||
| 406 | std::shared_ptr<Subscription> subscribe( | ||
| 407 | PdCom::Subscription *subscription, | ||
| 408 | std::shared_ptr<const Variable> var, | ||
| 409 | PdCom::Subscriber &subscriber, | ||
| 410 | const PdCom::Selector &selector); | ||
| 411 | |||
| 412 | // replace one subscriber pointer entry in all_subscriptions | ||
| 413 | // this is needed when impl::Subscription is replaced | ||
| 414 | void | ||
| 415 | replace(std::shared_ptr<Subscription> const &old_s, | ||
| 416 | std::shared_ptr<Subscription> const &new_s); | ||
| 417 | |||
| 418 | PdCom::Process *const This; | ||
| 419 | IOLayer *io; | ||
| 420 | Sasl *auth_manager_ = nullptr; | ||
| 421 | MessageManagerBase *message_manager_ = nullptr; | ||
| 422 | |||
| 423 | 317 | struct PendingCallbackQueue : | |
| 424 | std::list<PdCom::impl::Subscription::PendingStateChange> | ||
| 425 | { | ||
| 426 | void flush(); | ||
| 427 | } pending_callbacks_; | ||
| 428 | |||
| 429 | |||
| 430 | 159 | class AsyncDataMutex | |
| 431 | { | ||
| 432 | bool locked_ = false; | ||
| 433 | |||
| 434 | public: | ||
| 435 | 1259 | void unlock() { locked_ = false; } | |
| 436 | 1259 | void lock() | |
| 437 | { | ||
| 438 |
2/4✗ Branch 1 not taken.
✓ Branch 2 taken 1259 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 1259 times.
|
1259 | if (locked_) |
| 439 | throw PdCom::InvalidArgument("This function must not be called " | ||
| 440 | ✗ | "from PdCom5 Callback"); | |
| 441 | 1259 | locked_ = true; | |
| 442 | 1259 | } | |
| 443 | |||
| 444 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 184 times.
|
184 | bool isLocked() const noexcept { return locked_; } |
| 445 | } async_data_mutex_; | ||
| 446 | }; | ||
| 447 | }} // namespace PdCom::impl | ||
| 448 | |||
| 449 | #endif // PDCOM5_PROCESS_IMPL_H | ||
| 450 |