Directory: | ./ |
---|---|
File: | pdcom5/src/Process.h |
Date: | 2025-01-19 04:08:20 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 69 | 71 | 97.2% |
Branches: | 22 | 40 | 55.0% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /***************************************************************************** | ||
2 | * vim:tw=78 | ||
3 | * | ||
4 | * Copyright (C) 2021 Richard Hacker (lerichi at gmx dot net), | ||
5 | * Florian Pose (fp at igh dot de), | ||
6 | * Bjarne von Horn (vh at igh dot de). | ||
7 | * | ||
8 | * This file is part of the PdCom library. | ||
9 | * | ||
10 | * The PdCom library is free software: you can redistribute it and/or modify | ||
11 | * it under the terms of the GNU Lesser General Public License as published by | ||
12 | * the Free Software Foundation, either version 3 of the License, or (at your | ||
13 | * option) any later version. | ||
14 | * | ||
15 | * The PdCom library is distributed in the hope that it will be useful, but | ||
16 | * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY | ||
17 | * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public | ||
18 | * License for more details. | ||
19 | * | ||
20 | * You should have received a copy of the GNU Lesser General Public License | ||
21 | * along with the PdCom library. If not, see <http://www.gnu.org/licenses/>. | ||
22 | * | ||
23 | *****************************************************************************/ | ||
24 | |||
25 | |||
26 | #ifndef PDCOM5_PROCESS_IMPL_H | ||
27 | #define PDCOM5_PROCESS_IMPL_H | ||
28 | |||
29 | #include "IOLayer.h" | ||
30 | #include "ProtocolHandler.h" | ||
31 | #include "Subscription.h" | ||
32 | |||
33 | #include <list> | ||
34 | #include <memory> | ||
35 | #include <pdcom5/Exception.h> | ||
36 | #include <pdcom5/MessageManagerBase.h> | ||
37 | #include <pdcom5/Process.h> | ||
38 | #include <pdcom5/Sasl.h> | ||
39 | #include <pdcom5/Subscription.h> | ||
40 | #include <set> | ||
41 | |||
42 | namespace PdCom { namespace impl { | ||
43 | class AuthManager; | ||
44 | class Variable; | ||
45 | class Subscription; | ||
46 | struct Selector; | ||
47 | |||
48 | /** Base class for PdCom protocol handler | ||
49 | * | ||
50 | * This is the base class to interact with real time process server. The | ||
51 | * PdCom protocol ist implemented using this class. | ||
52 | * | ||
53 | * For socket input and output, the library completely relies on a | ||
54 | * derived class where read(), write(), flush() and connected() methods | ||
55 | * are reimplemented. | ||
56 | * | ||
57 | * When data is available for reading, call asyncData() which in turn | ||
58 | * calls the reimplemented read() method. | ||
59 | * | ||
60 | * When the protocol is initialized, the reimplemented connected() | ||
61 | * method is called. Other than startTLS(), login(), none of the command | ||
62 | * methods listed below may be called prior to connected(). | ||
63 | * | ||
64 | * After connected(), the following commands can be issued: * list(): | ||
65 | * list a directory; returns result in listReply() * find(): find a | ||
66 | * variable; returns result in findReply() * login(): start a SASL login | ||
67 | * interaction; returns result in loginReply() * ping(): ping the | ||
68 | * server; returns result in pingReply() * getMessage(): request a | ||
69 | * specific message result in multiple calls to processMessage() if | ||
70 | * required * Variable interactions * Variable::Subscription | ||
71 | * interactions | ||
72 | * | ||
73 | * startTLS() and login() may only be called prior to connected() when | ||
74 | * the library has called startTLSReply() or loginReply() previously. | ||
75 | * | ||
76 | * All these commands are non-blocking asynchronous calls and either | ||
77 | * return the result immediately with the corresponding reply methods or | ||
78 | * issue a command to the server using excessive (!) calls to write(). | ||
79 | * Data should be written to a buffer to optimize network communication. | ||
80 | * To flush the buffer to wire, flush() is issued by the library when | ||
81 | * required. | ||
82 | * | ||
83 | * The server may query presence of the user by issuing an alive() call. | ||
84 | * Using this call, certain actions could be undertaken by the server if | ||
85 | * the user is not active any more. | ||
86 | * | ||
87 | * Certain actions in the real time process, such as alarms, triggers a | ||
88 | * call to processMessage(). Messages are kept in a ring buffer in the | ||
89 | * process. | ||
90 | * | ||
91 | * Calling activeMessages() will return a list of all active messages as | ||
92 | * well as the latest message in activeMessagesReply(). | ||
93 | * | ||
94 | * Any message can be recalled with getMessage(), the result returned in | ||
95 | * getMessageReply(). If the message does not exist, the Message | ||
96 | * returned will have the requested \a seqNo with \a path empty. | ||
97 | * | ||
98 | * The subscribe() method subscribes a variable using the path only. | ||
99 | */ | ||
100 | |||
101 | |||
102 | // we need to export this class to allow SecureProcess::Impl to derive from | ||
103 | // us | ||
104 | class PDCOM5_PUBLIC Process : | ||
105 | public std::enable_shared_from_this<Process>, | ||
106 | public IOLayer | ||
107 | { | ||
108 | std::unique_ptr<impl::ProtocolHandler> protocolHandler_; | ||
109 | |||
110 | 330 | struct RegisteredSubscription | |
111 | { | ||
112 | std::weak_ptr<impl::Subscription> subscription_; | ||
113 | std::string path_; | ||
114 | std::shared_ptr<const PdCom::impl::Selector> selector_; | ||
115 | }; | ||
116 | |||
117 | struct RSComparator | ||
118 | { | ||
119 | 138 | bool operator()( | |
120 | RegisteredSubscription const &lhs, | ||
121 | RegisteredSubscription const &rhs) const noexcept | ||
122 | { | ||
123 | 138 | return lhs.subscription_.owner_before(rhs.subscription_); | |
124 | } | ||
125 | }; | ||
126 | |||
127 | std::set<RegisteredSubscription, RSComparator> all_subscriptions_; | ||
128 | |||
129 | public: | ||
130 | Process(PdCom::Process *This); | ||
131 | 28 | static Process &fromUApi(PdCom::Process &p) { return *p.pimpl; } | |
132 | |||
133 | /** Destructor | ||
134 | * | ||
135 | * The destructor cleans up all internally allocated structures | ||
136 | */ | ||
137 | 158 | virtual ~Process() = default; | |
138 | |||
139 | |||
140 | /** Reset communications and clean up internal buffers */ | ||
141 | void reset(); | ||
142 | |||
143 | /** Name of application user application. | ||
144 | * | ||
145 | * The application name is transferred to the server to be able to | ||
146 | * identify the clients more easily. | ||
147 | * | ||
148 | * \return a descriptive name of your application. | ||
149 | */ | ||
150 | 150 | std::string applicationName() const { return This->applicationName(); } | |
151 | |||
152 | /** Host name of remote server. | ||
153 | * | ||
154 | * Reimplement this method to return the remote server host name | ||
155 | * this library connects to. This is especially important in | ||
156 | * multi-hosted TLS environments, where multiple hosts resolv to | ||
157 | * the same IP address. TLS needs to know the original server host | ||
158 | * name. | ||
159 | * | ||
160 | * \return server host name | ||
161 | */ | ||
162 | 150 | std::string hostname() const { return This->hostname(); } | |
163 | |||
164 | /** Read data from server | ||
165 | * | ||
166 | * Reimplement this method to transfer data from the server to | ||
167 | * the library. This method is called within the call to | ||
168 | * asyncData(). | ||
169 | * | ||
170 | * Essentially this method is a little wrapper around your | ||
171 | * socket's `%read()` function: | ||
172 | * \code | ||
173 | * int MyProcess::read(char *buf, size_t count) | ||
174 | * { | ||
175 | * return ::read(this->socket_fd, buf, count); | ||
176 | * } | ||
177 | * \endcode | ||
178 | * | ||
179 | * The method must return the number of bytes read, which may of | ||
180 | * coarse be less than \p count or even 0. Return values of | ||
181 | * <= 0 are not interpreted by the protocol handler. | ||
182 | * | ||
183 | * @param buf data destination | ||
184 | * @param count buffer size | ||
185 | * | ||
186 | * \return | ||
187 | * return value of `%read()` function, which in turn will be | ||
188 | * returned by asyncData() | ||
189 | */ | ||
190 | 1353 | int read(char *buf, int count) override { return This->read(buf, count); } | |
191 | |||
192 | |||
193 | void asyncData(); | ||
194 | |||
195 | /** Write data to server | ||
196 | * | ||
197 | * Reimplement this method to transfer data from the library to | ||
198 | * the server. This method is called when any library | ||
199 | * operation requires data to be sent to the server. | ||
200 | * | ||
201 | * Note: the library makes many calls to write(), so use | ||
202 | * buffered output otherwise you're in for performance problems! | ||
203 | * | ||
204 | * Essentially this method is a little wrapper around your | ||
205 | * socket's `%write()` function: | ||
206 | * \code | ||
207 | * void MyProcess::write(const char *buf, size_t count) | ||
208 | * { | ||
209 | * if (count != ::fwrite(buf, 1, count, this->socket_file)) { | ||
210 | * // react to errors, set flags, etc | ||
211 | * } | ||
212 | * } | ||
213 | * \endcode | ||
214 | * | ||
215 | * Note: the library does not have an internal buffer and expects | ||
216 | * that all data is sent. If your implementation might send less | ||
217 | * than \p count, it is your responsibility to buffer the data and | ||
218 | * send it later. | ||
219 | * | ||
220 | * @param buf data to be sent | ||
221 | * @param count number of bytes to send | ||
222 | */ | ||
223 | 15717 | void write(const char *buf, size_t count) override | |
224 | { | ||
225 | 15717 | This->write(buf, count); | |
226 | 15717 | } | |
227 | |||
228 | /** Flush unsent data in output buffer | ||
229 | * | ||
230 | * Reimplement this method to flush data in the output buffer. | ||
231 | * | ||
232 | * This method tells the user that it is time to flush the | ||
233 | * output buffer to the wire. The library only expects that data | ||
234 | * is sent to the server within this call. | ||
235 | * | ||
236 | * Essentially this method is a little wrapper around your | ||
237 | * socket's `fflush()` function: | ||
238 | * \code | ||
239 | * void MyProcess::flush() | ||
240 | * { | ||
241 | * if (::fflush(this->socket_file)) { | ||
242 | * // react to errors | ||
243 | * } | ||
244 | * } | ||
245 | * \endcode | ||
246 | */ | ||
247 | 514 | void flush() override { This->flush(); } | |
248 | |||
249 | /** Protocol initialization completed | ||
250 | * | ||
251 | * This is a signal emitted by the library to indicate that | ||
252 | * protocol initialization has been completed and that library | ||
253 | * operations can be performed thereafter. | ||
254 | * | ||
255 | * Reimplement this method to get the signal. | ||
256 | * | ||
257 | * Absolutely NO process operations other than asyncData(), | ||
258 | * startTLS() and login() (and then only due to a previous | ||
259 | * loginReply() are permitted before this signal has been | ||
260 | * emitted. | ||
261 | */ | ||
262 | void connected(); | ||
263 | |||
264 | /** Reply to list() call | ||
265 | * | ||
266 | * You must reimplement this method to receive replies to list() | ||
267 | * calls. | ||
268 | * | ||
269 | * Note that a variable can have the same path as a directory! | ||
270 | * An example is a vector variable with atomized elements. | ||
271 | * | ||
272 | * Replies are in strict order of list() calls. | ||
273 | * | ||
274 | * @param variables list of variables | ||
275 | * @param directories string list of directories | ||
276 | */ | ||
277 | 18 | void listReply( | |
278 | std::vector<PdCom::Variable> variables, | ||
279 | std::vector<std::string> directories) | ||
280 | { | ||
281 |
1/2✓ Branch 32 taken 18 times.
✗ Branch 33 not taken.
|
18 | This->listReply(std::move(variables), std::move(directories)); |
282 | 18 | } | |
283 | |||
284 | /** Reply to find() | ||
285 | * | ||
286 | * This virtual method is called within the context of asyncData() | ||
287 | * when the server's reply to a variable discovery is processed. | ||
288 | * | ||
289 | * findReply()ies are called in strict order of find() | ||
290 | * | ||
291 | * @param variable pointer to Variable; NULL if not found | ||
292 | */ | ||
293 | 83 | void findReply(const PdCom::Variable &variable) | |
294 | { | ||
295 | 83 | This->findReply(variable); | |
296 | 83 | } | |
297 | |||
298 | /** Ping reply | ||
299 | * | ||
300 | * You must reimplement this method to receive the reply to a | ||
301 | * ping() call. | ||
302 | */ | ||
303 | 3 | void pingReply() { This->pingReply(); } | |
304 | |||
305 | /** Test from process whether client is alive. | ||
306 | * | ||
307 | * In some cases the server may want to know whether the client | ||
308 | * is still alive. Default implementation is to return true. | ||
309 | * Reimplement this if you wish to control presence | ||
310 | * | ||
311 | * \return true to indicate user presence | ||
312 | */ | ||
313 | bool alive() { return This->alive(); } | ||
314 | |||
315 | /** Client to server communication semaphore | ||
316 | * | ||
317 | * Reimplement this method to intercept the transmission | ||
318 | * semaphore. The default implementation always returns success. | ||
319 | * | ||
320 | * Every interaction from client to server (change a parameter, | ||
321 | * ping, etc) starts a message tag. If the client is | ||
322 | * multi-threaded, more than one message tag could be started | ||
323 | * which would lead to corruption. This semaphore can be used to | ||
324 | * serialize all client to server interactions. | ||
325 | * | ||
326 | * @param state true = lock; false = release | ||
327 | */ | ||
328 | 1324 | void transmitSemaphore(bool /* state */) {} | |
329 | 54 | void setAuthManager(Sasl *am) | |
330 | { | ||
331 |
2/2✓ Branch 3 taken 25 times.
✓ Branch 4 taken 29 times.
|
54 | if (auth_manager_) { |
332 | 25 | auth_manager_->process_ = {}; | |
333 | } | ||
334 | 54 | auth_manager_ = am; | |
335 |
2/2✓ Branch 3 taken 29 times.
✓ Branch 4 taken 25 times.
|
54 | if (auth_manager_) |
336 | 29 | auth_manager_->process_ = shared_from_this(); | |
337 | 54 | } | |
338 | |||
339 | 134 | void setMessageManager(MessageManagerBase *mm) | |
340 | { | ||
341 |
2/2✓ Branch 3 taken 25 times.
✓ Branch 4 taken 109 times.
|
134 | if (message_manager_) { |
342 | 25 | message_manager_->process_ = {}; | |
343 | } | ||
344 | 134 | message_manager_ = mm; | |
345 |
2/2✓ Branch 3 taken 109 times.
✓ Branch 4 taken 25 times.
|
134 | if (message_manager_) |
346 | 109 | message_manager_->process_ = shared_from_this(); | |
347 | 134 | } | |
348 | |||
349 | 90 | void loginReply(const char *mechlist, const char *serverData, int finished) | |
350 | { | ||
351 |
1/2✓ Branch 3 taken 90 times.
✗ Branch 4 not taken.
|
90 | if (auth_manager_) |
352 | 90 | auth_manager_->loginReply(mechlist, serverData, finished); | |
353 | 90 | } | |
354 | |||
355 | 26 | void processMessage(Message message) | |
356 | { | ||
357 |
1/2✓ Branch 3 taken 26 times.
✗ Branch 4 not taken.
|
26 | if (message_manager_) |
358 |
1/2✓ Branch 19 taken 26 times.
✗ Branch 20 not taken.
|
26 | message_manager_->processMessage(std::move(message)); |
359 | 26 | } | |
360 | |||
361 | 16 | void getMessageReply(Message msg) | |
362 | { | ||
363 |
1/2✓ Branch 3 taken 16 times.
✗ Branch 4 not taken.
|
16 | if (message_manager_) |
364 |
1/2✓ Branch 19 taken 16 times.
✗ Branch 20 not taken.
|
16 | message_manager_->getMessageReply(std::move(msg)); |
365 | 16 | } | |
366 | |||
367 | 22 | void activeMessagesReply(std::vector<Message> msgs) | |
368 | { | ||
369 |
1/2✓ Branch 3 taken 22 times.
✗ Branch 4 not taken.
|
22 | if (message_manager_) |
370 |
1/2✓ Branch 19 taken 22 times.
✗ Branch 20 not taken.
|
22 | message_manager_->activeMessagesReply(std::move(msgs)); |
371 | 22 | } | |
372 | |||
373 | 22 | void broadcastReply( | |
374 | const std::string &message, | ||
375 | const std::string &attr, | ||
376 | std::chrono::nanoseconds time_ns, | ||
377 | const std::string &user) | ||
378 | { | ||
379 | 22 | This->broadcastReply(message, attr, time_ns, user); | |
380 | 22 | } | |
381 | |||
382 | 5 | void clientStatisticsReply(std::vector<PdCom::ClientStatistics> statistics) | |
383 | { | ||
384 |
1/2✓ Branch 28 taken 5 times.
✗ Branch 29 not taken.
|
5 | This->clientStatisticsReply(std::move(statistics)); |
385 | 5 | } | |
386 | |||
387 | template <typename P = ProtocolHandler> | ||
388 | 334 | P &protocolHandler() const | |
389 | { | ||
390 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 334 times.
|
334 | if (!protocolHandler_) |
391 | ✗ | throw NotConnected(); | |
392 | 334 | return static_cast<P &>(*protocolHandler_); | |
393 | } | ||
394 | |||
395 | template <typename P = ProtocolHandler> | ||
396 | 320 | P *castProtocolHandler() const noexcept | |
397 | { | ||
398 |
1/2✓ Branch 2 taken 79 times.
✗ Branch 3 not taken.
|
320 | return static_cast<P *>(protocolHandler_.get()); |
399 | } | ||
400 | |||
401 | std::shared_ptr<Subscription> subscribe( | ||
402 | PdCom::Subscription *subscription, | ||
403 | const std::string &path, | ||
404 | PdCom::Subscriber &subscriber, | ||
405 | const PdCom::Selector &selector); | ||
406 | std::shared_ptr<Subscription> subscribe( | ||
407 | PdCom::Subscription *subscription, | ||
408 | std::shared_ptr<const Variable> var, | ||
409 | PdCom::Subscriber &subscriber, | ||
410 | const PdCom::Selector &selector); | ||
411 | |||
412 | // replace one subscriber pointer entry in all_subscriptions | ||
413 | // this is needed when impl::Subscription is replaced | ||
414 | void | ||
415 | replace(std::shared_ptr<Subscription> const &old_s, | ||
416 | std::shared_ptr<Subscription> const &new_s); | ||
417 | |||
418 | PdCom::Process *const This; | ||
419 | IOLayer *io; | ||
420 | Sasl *auth_manager_ = nullptr; | ||
421 | MessageManagerBase *message_manager_ = nullptr; | ||
422 | |||
423 | 317 | struct PendingCallbackQueue : | |
424 | std::list<PdCom::impl::Subscription::PendingStateChange> | ||
425 | { | ||
426 | void flush(); | ||
427 | } pending_callbacks_; | ||
428 | |||
429 | |||
430 | 159 | class AsyncDataMutex | |
431 | { | ||
432 | bool locked_ = false; | ||
433 | |||
434 | public: | ||
435 | 1259 | void unlock() { locked_ = false; } | |
436 | 1259 | void lock() | |
437 | { | ||
438 |
2/4✗ Branch 1 not taken.
✓ Branch 2 taken 1259 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 1259 times.
|
1259 | if (locked_) |
439 | throw PdCom::InvalidArgument("This function must not be called " | ||
440 | ✗ | "from PdCom5 Callback"); | |
441 | 1259 | locked_ = true; | |
442 | 1259 | } | |
443 | |||
444 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 184 times.
|
184 | bool isLocked() const noexcept { return locked_; } |
445 | } async_data_mutex_; | ||
446 | }; | ||
447 | }} // namespace PdCom::impl | ||
448 | |||
449 | #endif // PDCOM5_PROCESS_IMPL_H | ||
450 |