Bredbandskollen CLI  1.2
Asynchronous network task engine
task.h
1 // Copyright (c) 2019 Internetstiftelsen
2 // Written by Göran Andersson <initgoran@gmail.com>
3 
4 #pragma once
5 
6 #include <set>
7 #include <fstream>
8 #include "logger.h"
9 #include "taskconfig.h"
10 #include "eventloop.h"
11 
12 class Socket;
13 class ServerSocket;
14 class SocketConnection;
15 enum class PollState;
16 class SocketReceiver;
17 class WorkerProcess;
18 
39 class Task : public Logger {
40 public:
49  Task(const std::string &task_name);
50 
53  virtual ~Task();
54 
62  virtual double start() {
63  dbg_log() << "Task starting. No timer.";
64  return 0;
65  }
66 
69  virtual double timerEvent() {
70  dbg_log() << "Default timerEvent: will kill task.";
71  setTimeout();
72  return 0;
73  }
74 
76  void resetTimer(double s) {
77  supervisor->resetTimer(this, s);
78  }
79 
84  bool finishedOK() const {
85  return (has_started && is_finished && !was_killed &&
86  !was_error && !was_timeout);
87  }
88 
93  bool wasKilled() const {
94  return was_killed;
95  }
96 
101  bool wasError() const {
102  return was_error;
103  }
104 
109  bool wasTimeout() const {
110  return was_timeout;
111  }
112 
115  bool hasStarted() const {
116  return has_started;
117  }
118 
122  kill_children = true;
123  }
124 
133  virtual SocketConnection *newClient(int, const char *, uint16_t,
134  ServerSocket *) {
135  return nullptr;
136  }
137 
140  virtual bool adoptConnection(Socket *conn);
141 
145  virtual void connAdded(SocketConnection *) {
146  }
147 
151  virtual void connRemoved(SocketConnection *) {
152  }
153 
157  virtual void serverAdded(ServerSocket *) {
158  }
159 
163  virtual void serverRemoved(ServerSocket *) {
164  }
165 
167  std::string result() const {
168  return the_result;
169  }
170 
172  std::set<Socket *> getMyConnections() const;
173 
175  bool isActive(Socket *conn) const {
176  return supervisor->isActive(conn);
177  }
178 
180  void wakeUp();
181 
184  return supervisor->wakeUpConnection(s);
185  }
186 
189  supervisor->cancelConnection(s);
190  }
191 
193  std::string message() const {
194  return the_message;
195  }
196 
200  bool startObserving(Task *to) {
201  return supervisor->startObserving(this, to);
202  }
203 
222  void executeHandler(Task *receiver, const std::string &message) {
223  if (supervisor->isObserving(this, receiver) && !receiver->terminated())
224  receiver->handleExecution(this, message);
225  else
226  log() << "Will not call handleExecution since task isn't observed.";
227  }
228 
234  double elapsed() const {
235  return secondsSince(start_time);
236  }
237 #ifndef _WIN32
242  virtual Task *createWorkerTask(unsigned int wno) {
243  log() << "missing createWorkerTask, cannot create worker " << wno;
244  return nullptr;
245  }
246 
249  virtual void finishWorkerTask(unsigned int ) {
250  }
251 
256  virtual void newWorkerChannel(SocketReceiver *, unsigned int ) {
257  }
258 
260  virtual void workerMessage(SocketReceiver *, const char *buf, size_t len) {
261  log() << "Worker message: " << std::string(buf, len);
262  }
263 #endif
264 
274  virtual PollState connectionReady(SocketConnection * /* conn */);
275 
276 
285  virtual PollState msgFromConnection(SocketConnection * /* conn */,
286  const std::string & /* msg */);
287 
289  uint64_t bytesSent() const {
290  return tot_bytes_sent;
291  }
292 
294  uint64_t bytesReceived() const {
295  return tot_bytes_received;
296  }
297 
301  void resetByteCount() {
302  tot_bytes_sent = 0;
303  tot_bytes_received = 0;
304  }
305 
312  void notifyBytesSent(uint64_t n) {
313  tot_bytes_sent += n;
314  }
315 
322  void notifyBytesReceived(uint64_t n) {
323  tot_bytes_received += n;
324  }
325 
326 protected:
327 
339  bool addConnection(SocketConnection *conn);
340 
345  bool addConnected(SocketConnection *conn);
346 
348  bool addServer(ServerSocket *conn);
349 
363  bool parseListen(const TaskConfig &tc, const std::string &log_label);
364 
365 #ifdef USE_GNUTLS
367  virtual bool tlsSetKey(ServerSocket *conn, const std::string &crt_path,
368  const std::string &key_path, const std::string &password) {
369  return supervisor->tlsSetKey(conn, crt_path, key_path, password);
370  }
371 #endif
372 
379  void setResult(const std::string &res);
380 
383  virtual void setError(const std::string &msg) {
384  log() << "Task failure: " << msg;
385  was_error = true;
386  setResult("");
387  }
388 
391  virtual void setTimeout() {
392  was_timeout = true;
393  setResult("");
394  }
395 
399  void setMessage(const std::string &msg);
400 
403  virtual void taskFinished(Task *task) {
404  dbg_log() << "Task " << task->label() << " died, no handler defined.";
405  }
406 
409  virtual void taskMessage(Task *task) {
410  dbg_log() << "No taskMessage handler implemented for " << task->label();
411  }
412 
414  virtual void handleExecution(Task *sender, const std::string &message) {
415  dbg_log() << "Event " << message << " from "
416  << (sender ? sender->label() : "unknown")
417  << ", no handler implemented";
418  }
419 
429  bool terminated() const {
430  return is_finished;
431  }
432 
434  void addNewTask(Task *task, Task *parent = nullptr) {
435  supervisor->addTask(task, parent);
436  }
437 
438 #ifdef USE_THREADS
440  void addNewThread(Task *task, const std::string &name="ThreadLoop",
441  std::ostream *log_file = nullptr,
442  Task *parent = nullptr) {
443  supervisor->spawnThread(task, name, log_file, parent);
444  }
445 #endif
446 
448  void getMyTasks(std::set<Task *> &tset) {
449  supervisor->getChildTasks(tset, this);
450  }
451 
453  void abortMyTasks() {
454  supervisor->abortChildTasks(this);
455  }
456 
458  void abortTask(Task *task) {
459  supervisor->abortTask(task);
460  }
461 
463  void abortAllTasks() {
464  supervisor->abort();
465  }
466 
469  int runProcess(const char *const argv[]);
470 
472  virtual void processFinished(int pid, int wstatus);
473 
474 #ifndef _WIN32
478  WorkerProcess *createWorker(std::ostream *log_file = nullptr,
479  unsigned int channels = 1,
480  unsigned int wno = 0) {
481  return supervisor->createWorker(this, log_file, channels, wno);
482  }
483 
487  WorkerProcess *createWorker(const std::string &log_file_name,
488  unsigned int channels = 1,
489  unsigned int wno = 0) {
490  return supervisor->createWorker(this, log_file_name, channels, wno);
491  }
492 #endif
493 
494 private:
495  friend class EventLoop;
496  void setTerminated() {
497  is_finished = true;
498  }
499  // Called by EventLoop when task is scheduled for execution:
500  double begin() {
501  has_started = true;
502  start_time = timeNow();
503  return this->start();
504  }
505 
506 #ifdef USE_THREADS
507  thread_local
508 #endif
509  static EventLoop *supervisor;
510  TimePoint start_time;
511  std::string the_result;
512  std::string the_message;
513  uint64_t tot_bytes_sent = 0, tot_bytes_received = 0;
514  bool is_finished = false;
515  bool has_started = false;
516  bool was_killed = false, was_error = false, was_timeout = false;
517  bool kill_children = false;
518  bool is_child_thread = false;
519 };
Manage timers and tasks.
Definition: eventloop.h:31
void abort()
Remove all tasks.
Definition: eventloop.h:84
bool wakeUpConnection(SocketConnection *s)
Restart an idle connection.
Definition: eventloop.h:99
bool isObserving(Task *observer, Task *task) const
Return true if observer is observing task.
Definition: eventloop.h:242
void addTask(Task *task, Task *parent=nullptr)
Definition: eventloop.cpp:271
bool tlsSetKey(ServerSocket *conn, const std::string &crt_path, const std::string &key_path, const std::string &password)
Use SSL certificate for a listening socket.
Definition: eventloop.h:138
void cancelConnection(SocketConnection *s)
Remove a connection.
Definition: eventloop.h:104
void spawnThread(Task *task, const std::string &name="ThreadLoop", std::ostream *log_file=nullptr, Task *parent=nullptr)
Create a new thread and run task in its own loop in that thread.
Definition: eventloop.cpp:589
WorkerProcess * createWorker(Task *parent, std::ostream *log_file, unsigned int channels, unsigned int wno)
Definition: eventloop.cpp:117
void resetTimer(Task *task, double s)
Definition: eventloop.cpp:416
void getChildTasks(std::set< Task * > &tset, Task *parent) const
Get all tasks with the given parent.
Definition: eventloop.cpp:376
void abortChildTasks(Task *parent)
Remove all tasks with the given parent.
Definition: eventloop.cpp:388
bool startObserving(Task *from, Task *to)
Definition: eventloop.cpp:348
void abortTask(Task *task)
Remove the given task.
Definition: eventloop.cpp:398
bool isActive(const Socket *conn) const
Return true if conn still exists.
Definition: eventloop.h:112
A simple logger. All classes that want to write to the global log file should inherit from this class...
Definition: logger.h:86
std::string label() const
Return the object's log label.
Definition: logger.h:251
static TimePoint timeNow()
Return current time.
Definition: logger.h:218
std::ostream & log() const
Write a line of info log.
Definition: logger.h:328
std::ostream & dbg_log() const
Write a line of debug log.
Definition: logger.h:345
static double secondsSince(const TimePoint &t)
Definition: logger.cpp:16
Listen on a single socket for incoming connections.
Definition: serversocket.h:15
This class implements low-level socket connection operations. Inherit from it to implement protocols ...
Definition: socketconnection.h:47
Pass sockets and messages between processes.
Definition: socketreceiver.h:21
This is a slave to the Engine class. You can't use it directly, only through its subclasses,...
Definition: socket.h:18
Read configuration from file or string.
Definition: taskconfig.h:44
The purpose of a task is to manage socket connections, and/or to execute timers.
Definition: task.h:39
WorkerProcess * createWorker(const std::string &log_file_name, unsigned int channels=1, unsigned int wno=0)
Run task returned by this->createWorkerTask in a child process. Return nullptr on failure.
Definition: task.h:487
bool parseListen(const TaskConfig &tc, const std::string &log_label)
Definition: task.cpp:98
void setResult(const std::string &res)
Definition: task.cpp:18
uint64_t bytesReceived() const
Number of bytes received through SocketConnection objects owned by me.
Definition: task.h:294
virtual bool tlsSetKey(ServerSocket *conn, const std::string &crt_path, const std::string &key_path, const std::string &password)
Use SSL certificate for a listening socket.
Definition: task.h:367
std::string message() const
Return the current (outgoing) message.
Definition: task.h:193
virtual PollState connectionReady(SocketConnection *)
Definition: task.cpp:35
bool wasError() const
Return true if the task terminated with an error.
Definition: task.h:101
virtual bool adoptConnection(Socket *conn)
Definition: task.cpp:64
virtual void finishWorkerTask(unsigned int)
Definition: task.h:249
bool terminated() const
Return true if task is finished.
Definition: task.h:429
void wakeUp()
Restart all idle connections.
Definition: task.cpp:73
virtual void taskFinished(Task *task)
Definition: task.h:403
virtual void serverRemoved(ServerSocket *)
Definition: task.h:163
virtual void connRemoved(SocketConnection *)
Definition: task.h:151
virtual ~Task()
Definition: task.cpp:77
bool wakeUpConnection(SocketConnection *s)
If s is idle, restart it and return true. Otherwise return false.
Definition: task.h:183
virtual void handleExecution(Task *sender, const std::string &message)
Callback to execute code on behalf of another Task.
Definition: task.h:414
virtual void connAdded(SocketConnection *)
Definition: task.h:145
void notifyBytesSent(uint64_t n)
Notify the task that data has been sent on its behalf.
Definition: task.h:312
void resetByteCount()
Reset the values for the methods Task::bytesSent and Task::bytesReceived.
Definition: task.h:301
virtual void taskMessage(Task *task)
Definition: task.h:409
void executeHandler(Task *receiver, const std::string &message)
Definition: task.h:222
bool wasKilled() const
Return true if the task is finished and was aborted by another task.
Definition: task.h:93
Task(const std::string &task_name)
Create a task with the given name.
Definition: task.cpp:13
void abortMyTasks()
Terminate all my child tasks.
Definition: task.h:453
void resetTimer(double s)
Run timerEvent after s seconds instead of previous value.
Definition: task.h:76
void abortAllTasks()
Terminate all tasks and exit the EventLoop.
Definition: task.h:463
double elapsed() const
Return number of seconds since the task was started.
Definition: task.h:234
bool hasStarted() const
Definition: task.h:115
virtual void newWorkerChannel(SocketReceiver *, unsigned int)
Definition: task.h:256
virtual double start()
Definition: task.h:62
virtual void setTimeout()
Definition: task.h:391
bool wasTimeout() const
Return true if the task terminated with a timeout.
Definition: task.h:109
WorkerProcess * createWorker(std::ostream *log_file=nullptr, unsigned int channels=1, unsigned int wno=0)
Run task returned by this->createWorkerTask in a child process. Return nullptr on failure.
Definition: task.h:478
bool isActive(Socket *conn) const
Return true if the connection still exists.
Definition: task.h:175
virtual void setError(const std::string &msg)
Definition: task.h:383
void setMessage(const std::string &msg)
Definition: task.cpp:30
std::set< Socket * > getMyConnections() const
Return all current connections.
Definition: task.cpp:69
virtual void serverAdded(ServerSocket *)
Definition: task.h:157
void notifyBytesReceived(uint64_t n)
Notify the task that data has been received on its behalf.
Definition: task.h:322
bool addServer(ServerSocket *conn)
As Task::addConnected, but with a server connection.
Definition: task.cpp:81
void cancelConnection(SocketConnection *s)
Terminate and remove a connection.
Definition: task.h:188
virtual void processFinished(int pid, int wstatus)
Will be called to notify when an external process has terminated.
Definition: task.cpp:94
void getMyTasks(std::set< Task * > &tset)
Add all my child tasks to the given set.
Definition: task.h:448
int runProcess(const char *const argv[])
Definition: task.cpp:90
void killChildTaskWhenFinished()
Definition: task.h:121
void addNewTask(Task *task, Task *parent=nullptr)
Insert another Task for execution by the EventLoop.
Definition: task.h:434
bool addConnected(SocketConnection *conn)
Definition: task.cpp:55
std::string result() const
To get the "result" of the task after it has finished.
Definition: task.h:167
virtual double timerEvent()
Definition: task.h:69
virtual SocketConnection * newClient(int, const char *, uint16_t, ServerSocket *)
Definition: task.h:133
virtual void workerMessage(SocketReceiver *, const char *buf, size_t len)
Called if parent/worker sends a message through a SocketReceiver:
Definition: task.h:260
bool startObserving(Task *to)
Definition: task.h:200
virtual PollState msgFromConnection(SocketConnection *, const std::string &)
Definition: task.cpp:40
void addNewThread(Task *task, const std::string &name="ThreadLoop", std::ostream *log_file=nullptr, Task *parent=nullptr)
Run task in a new thread.
Definition: task.h:440
uint64_t bytesSent() const
Number of bytes sent through SocketConnection objects owned by me.
Definition: task.h:289
void abortTask(Task *task)
Terminate a task.
Definition: task.h:458
bool addConnection(SocketConnection *conn)
Definition: task.cpp:46
virtual Task * createWorkerTask(unsigned int wno)
Definition: task.h:242
bool finishedOK() const
Return true if the task has finished normally.
Definition: task.h:84
Measure elapsed time during execution, for example by timer events.
Used by LoadBalancer to manage child processes.
Definition: workerprocess.h:21
PollState
Definition: pollstate.h:11