GCC Code Coverage Report


Directory: ./
File: pdcom5/src/Process.h
Date: 2025-01-19 04:08:20
Exec Total Coverage
Lines: 69 71 97.2%
Branches: 22 40 55.0%

Line Branch Exec Source
1 /*****************************************************************************
2 * vim:tw=78
3 *
4 * Copyright (C) 2021 Richard Hacker (lerichi at gmx dot net),
5 * Florian Pose (fp at igh dot de),
6 * Bjarne von Horn (vh at igh dot de).
7 *
8 * This file is part of the PdCom library.
9 *
10 * The PdCom library is free software: you can redistribute it and/or modify
11 * it under the terms of the GNU Lesser General Public License as published by
12 * the Free Software Foundation, either version 3 of the License, or (at your
13 * option) any later version.
14 *
15 * The PdCom library is distributed in the hope that it will be useful, but
16 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
17 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
18 * License for more details.
19 *
20 * You should have received a copy of the GNU Lesser General Public License
21 * along with the PdCom library. If not, see <http://www.gnu.org/licenses/>.
22 *
23 *****************************************************************************/
24
25
26 #ifndef PDCOM5_PROCESS_IMPL_H
27 #define PDCOM5_PROCESS_IMPL_H
28
29 #include "IOLayer.h"
30 #include "ProtocolHandler.h"
31 #include "Subscription.h"
32
33 #include <list>
34 #include <memory>
35 #include <pdcom5/Exception.h>
36 #include <pdcom5/MessageManagerBase.h>
37 #include <pdcom5/Process.h>
38 #include <pdcom5/Sasl.h>
39 #include <pdcom5/Subscription.h>
40 #include <set>
41
42 namespace PdCom { namespace impl {
43 class AuthManager;
44 class Variable;
45 class Subscription;
46 struct Selector;
47
48 /** Base class for PdCom protocol handler
49 *
50 * This is the base class to interact with real time process server. The
51 * PdCom protocol ist implemented using this class.
52 *
53 * For socket input and output, the library completely relies on a
54 * derived class where read(), write(), flush() and connected() methods
55 * are reimplemented.
56 *
57 * When data is available for reading, call asyncData() which in turn
58 * calls the reimplemented read() method.
59 *
60 * When the protocol is initialized, the reimplemented connected()
61 * method is called. Other than startTLS(), login(), none of the command
62 * methods listed below may be called prior to connected().
63 *
64 * After connected(), the following commands can be issued: * list():
65 * list a directory; returns result in listReply() * find(): find a
66 * variable; returns result in findReply() * login(): start a SASL login
67 * interaction; returns result in loginReply() * ping(): ping the
68 * server; returns result in pingReply() * getMessage(): request a
69 * specific message result in multiple calls to processMessage() if
70 * required * Variable interactions * Variable::Subscription
71 * interactions
72 *
73 * startTLS() and login() may only be called prior to connected() when
74 * the library has called startTLSReply() or loginReply() previously.
75 *
76 * All these commands are non-blocking asynchronous calls and either
77 * return the result immediately with the corresponding reply methods or
78 * issue a command to the server using excessive (!) calls to write().
79 * Data should be written to a buffer to optimize network communication.
80 * To flush the buffer to wire, flush() is issued by the library when
81 * required.
82 *
83 * The server may query presence of the user by issuing an alive() call.
84 * Using this call, certain actions could be undertaken by the server if
85 * the user is not active any more.
86 *
87 * Certain actions in the real time process, such as alarms, triggers a
88 * call to processMessage(). Messages are kept in a ring buffer in the
89 * process.
90 *
91 * Calling activeMessages() will return a list of all active messages as
92 * well as the latest message in activeMessagesReply().
93 *
94 * Any message can be recalled with getMessage(), the result returned in
95 * getMessageReply(). If the message does not exist, the Message
96 * returned will have the requested \a seqNo with \a path empty.
97 *
98 * The subscribe() method subscribes a variable using the path only.
99 */
100
101
102 // we need to export this class to allow SecureProcess::Impl to derive from
103 // us
104 class PDCOM5_PUBLIC Process :
105 public std::enable_shared_from_this<Process>,
106 public IOLayer
107 {
108 std::unique_ptr<impl::ProtocolHandler> protocolHandler_;
109
110 330 struct RegisteredSubscription
111 {
112 std::weak_ptr<impl::Subscription> subscription_;
113 std::string path_;
114 std::shared_ptr<const PdCom::impl::Selector> selector_;
115 };
116
117 struct RSComparator
118 {
119 138 bool operator()(
120 RegisteredSubscription const &lhs,
121 RegisteredSubscription const &rhs) const noexcept
122 {
123 138 return lhs.subscription_.owner_before(rhs.subscription_);
124 }
125 };
126
127 std::set<RegisteredSubscription, RSComparator> all_subscriptions_;
128
129 public:
130 Process(PdCom::Process *This);
131 28 static Process &fromUApi(PdCom::Process &p) { return *p.pimpl; }
132
133 /** Destructor
134 *
135 * The destructor cleans up all internally allocated structures
136 */
137 158 virtual ~Process() = default;
138
139
140 /** Reset communications and clean up internal buffers */
141 void reset();
142
143 /** Name of application user application.
144 *
145 * The application name is transferred to the server to be able to
146 * identify the clients more easily.
147 *
148 * \return a descriptive name of your application.
149 */
150 150 std::string applicationName() const { return This->applicationName(); }
151
152 /** Host name of remote server.
153 *
154 * Reimplement this method to return the remote server host name
155 * this library connects to. This is especially important in
156 * multi-hosted TLS environments, where multiple hosts resolv to
157 * the same IP address. TLS needs to know the original server host
158 * name.
159 *
160 * \return server host name
161 */
162 150 std::string hostname() const { return This->hostname(); }
163
164 /** Read data from server
165 *
166 * Reimplement this method to transfer data from the server to
167 * the library. This method is called within the call to
168 * asyncData().
169 *
170 * Essentially this method is a little wrapper around your
171 * socket's `%read()` function:
172 * \code
173 * int MyProcess::read(char *buf, size_t count)
174 * {
175 * return ::read(this->socket_fd, buf, count);
176 * }
177 * \endcode
178 *
179 * The method must return the number of bytes read, which may of
180 * coarse be less than \p count or even 0. Return values of
181 * &lt;=&nbsp;0 are not interpreted by the protocol handler.
182 *
183 * @param buf data destination
184 * @param count buffer size
185 *
186 * \return
187 * return value of `%read()` function, which in turn will be
188 * returned by asyncData()
189 */
190 1353 int read(char *buf, int count) override { return This->read(buf, count); }
191
192
193 void asyncData();
194
195 /** Write data to server
196 *
197 * Reimplement this method to transfer data from the library to
198 * the server. This method is called when any library
199 * operation requires data to be sent to the server.
200 *
201 * Note: the library makes many calls to write(), so use
202 * buffered output otherwise you're in for performance problems!
203 *
204 * Essentially this method is a little wrapper around your
205 * socket's `%write()` function:
206 * \code
207 * void MyProcess::write(const char *buf, size_t count)
208 * {
209 * if (count != ::fwrite(buf, 1, count, this->socket_file)) {
210 * // react to errors, set flags, etc
211 * }
212 * }
213 * \endcode
214 *
215 * Note: the library does not have an internal buffer and expects
216 * that all data is sent. If your implementation might send less
217 * than \p count, it is your responsibility to buffer the data and
218 * send it later.
219 *
220 * @param buf data to be sent
221 * @param count number of bytes to send
222 */
223 15717 void write(const char *buf, size_t count) override
224 {
225 15717 This->write(buf, count);
226 15717 }
227
228 /** Flush unsent data in output buffer
229 *
230 * Reimplement this method to flush data in the output buffer.
231 *
232 * This method tells the user that it is time to flush the
233 * output buffer to the wire. The library only expects that data
234 * is sent to the server within this call.
235 *
236 * Essentially this method is a little wrapper around your
237 * socket's `fflush()` function:
238 * \code
239 * void MyProcess::flush()
240 * {
241 * if (::fflush(this->socket_file)) {
242 * // react to errors
243 * }
244 * }
245 * \endcode
246 */
247 514 void flush() override { This->flush(); }
248
249 /** Protocol initialization completed
250 *
251 * This is a signal emitted by the library to indicate that
252 * protocol initialization has been completed and that library
253 * operations can be performed thereafter.
254 *
255 * Reimplement this method to get the signal.
256 *
257 * Absolutely NO process operations other than asyncData(),
258 * startTLS() and login() (and then only due to a previous
259 * loginReply() are permitted before this signal has been
260 * emitted.
261 */
262 void connected();
263
264 /** Reply to list() call
265 *
266 * You must reimplement this method to receive replies to list()
267 * calls.
268 *
269 * Note that a variable can have the same path as a directory!
270 * An example is a vector variable with atomized elements.
271 *
272 * Replies are in strict order of list() calls.
273 *
274 * @param variables list of variables
275 * @param directories string list of directories
276 */
277 18 void listReply(
278 std::vector<PdCom::Variable> variables,
279 std::vector<std::string> directories)
280 {
281
1/2
✓ Branch 32 taken 18 times.
✗ Branch 33 not taken.
18 This->listReply(std::move(variables), std::move(directories));
282 18 }
283
284 /** Reply to find()
285 *
286 * This virtual method is called within the context of asyncData()
287 * when the server's reply to a variable discovery is processed.
288 *
289 * findReply()ies are called in strict order of find()
290 *
291 * @param variable pointer to Variable; NULL if not found
292 */
293 83 void findReply(const PdCom::Variable &variable)
294 {
295 83 This->findReply(variable);
296 83 }
297
298 /** Ping reply
299 *
300 * You must reimplement this method to receive the reply to a
301 * ping() call.
302 */
303 3 void pingReply() { This->pingReply(); }
304
305 /** Test from process whether client is alive.
306 *
307 * In some cases the server may want to know whether the client
308 * is still alive. Default implementation is to return true.
309 * Reimplement this if you wish to control presence
310 *
311 * \return true to indicate user presence
312 */
313 bool alive() { return This->alive(); }
314
315 /** Client to server communication semaphore
316 *
317 * Reimplement this method to intercept the transmission
318 * semaphore. The default implementation always returns success.
319 *
320 * Every interaction from client to server (change a parameter,
321 * ping, etc) starts a message tag. If the client is
322 * multi-threaded, more than one message tag could be started
323 * which would lead to corruption. This semaphore can be used to
324 * serialize all client to server interactions.
325 *
326 * @param state true = lock; false = release
327 */
328 1324 void transmitSemaphore(bool /* state */) {}
329 54 void setAuthManager(Sasl *am)
330 {
331
2/2
✓ Branch 3 taken 25 times.
✓ Branch 4 taken 29 times.
54 if (auth_manager_) {
332 25 auth_manager_->process_ = {};
333 }
334 54 auth_manager_ = am;
335
2/2
✓ Branch 3 taken 29 times.
✓ Branch 4 taken 25 times.
54 if (auth_manager_)
336 29 auth_manager_->process_ = shared_from_this();
337 54 }
338
339 134 void setMessageManager(MessageManagerBase *mm)
340 {
341
2/2
✓ Branch 3 taken 25 times.
✓ Branch 4 taken 109 times.
134 if (message_manager_) {
342 25 message_manager_->process_ = {};
343 }
344 134 message_manager_ = mm;
345
2/2
✓ Branch 3 taken 109 times.
✓ Branch 4 taken 25 times.
134 if (message_manager_)
346 109 message_manager_->process_ = shared_from_this();
347 134 }
348
349 90 void loginReply(const char *mechlist, const char *serverData, int finished)
350 {
351
1/2
✓ Branch 3 taken 90 times.
✗ Branch 4 not taken.
90 if (auth_manager_)
352 90 auth_manager_->loginReply(mechlist, serverData, finished);
353 90 }
354
355 26 void processMessage(Message message)
356 {
357
1/2
✓ Branch 3 taken 26 times.
✗ Branch 4 not taken.
26 if (message_manager_)
358
1/2
✓ Branch 19 taken 26 times.
✗ Branch 20 not taken.
26 message_manager_->processMessage(std::move(message));
359 26 }
360
361 16 void getMessageReply(Message msg)
362 {
363
1/2
✓ Branch 3 taken 16 times.
✗ Branch 4 not taken.
16 if (message_manager_)
364
1/2
✓ Branch 19 taken 16 times.
✗ Branch 20 not taken.
16 message_manager_->getMessageReply(std::move(msg));
365 16 }
366
367 22 void activeMessagesReply(std::vector<Message> msgs)
368 {
369
1/2
✓ Branch 3 taken 22 times.
✗ Branch 4 not taken.
22 if (message_manager_)
370
1/2
✓ Branch 19 taken 22 times.
✗ Branch 20 not taken.
22 message_manager_->activeMessagesReply(std::move(msgs));
371 22 }
372
373 22 void broadcastReply(
374 const std::string &message,
375 const std::string &attr,
376 std::chrono::nanoseconds time_ns,
377 const std::string &user)
378 {
379 22 This->broadcastReply(message, attr, time_ns, user);
380 22 }
381
382 5 void clientStatisticsReply(std::vector<PdCom::ClientStatistics> statistics)
383 {
384
1/2
✓ Branch 28 taken 5 times.
✗ Branch 29 not taken.
5 This->clientStatisticsReply(std::move(statistics));
385 5 }
386
387 template <typename P = ProtocolHandler>
388 334 P &protocolHandler() const
389 {
390
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 334 times.
334 if (!protocolHandler_)
391 throw NotConnected();
392 334 return static_cast<P &>(*protocolHandler_);
393 }
394
395 template <typename P = ProtocolHandler>
396 320 P *castProtocolHandler() const noexcept
397 {
398
1/2
✓ Branch 2 taken 79 times.
✗ Branch 3 not taken.
320 return static_cast<P *>(protocolHandler_.get());
399 }
400
401 std::shared_ptr<Subscription> subscribe(
402 PdCom::Subscription *subscription,
403 const std::string &path,
404 PdCom::Subscriber &subscriber,
405 const PdCom::Selector &selector);
406 std::shared_ptr<Subscription> subscribe(
407 PdCom::Subscription *subscription,
408 std::shared_ptr<const Variable> var,
409 PdCom::Subscriber &subscriber,
410 const PdCom::Selector &selector);
411
412 // replace one subscriber pointer entry in all_subscriptions
413 // this is needed when impl::Subscription is replaced
414 void
415 replace(std::shared_ptr<Subscription> const &old_s,
416 std::shared_ptr<Subscription> const &new_s);
417
418 PdCom::Process *const This;
419 IOLayer *io;
420 Sasl *auth_manager_ = nullptr;
421 MessageManagerBase *message_manager_ = nullptr;
422
423 317 struct PendingCallbackQueue :
424 std::list<PdCom::impl::Subscription::PendingStateChange>
425 {
426 void flush();
427 } pending_callbacks_;
428
429
430 159 class AsyncDataMutex
431 {
432 bool locked_ = false;
433
434 public:
435 1259 void unlock() { locked_ = false; }
436 1259 void lock()
437 {
438
2/4
✗ Branch 1 not taken.
✓ Branch 2 taken 1259 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 1259 times.
1259 if (locked_)
439 throw PdCom::InvalidArgument("This function must not be called "
440 "from PdCom5 Callback");
441 1259 locked_ = true;
442 1259 }
443
444
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 184 times.
184 bool isLocked() const noexcept { return locked_; }
445 } async_data_mutex_;
446 };
447 }} // namespace PdCom::impl
448
449 #endif // PDCOM5_PROCESS_IMPL_H
450