Bredbandskollen CLI  1.2
Asynchronous network task engine
eventloop.h
1 // Copyright (c) 2018 IIS (The Internet Foundation in Sweden)
2 // Written by Göran Andersson <initgoran@gmail.com>
3 
4 #pragma once
5 
6 #include <set>
7 #include <map>
8 
9 #include "logger.h"
10 #include "engine.h"
11 #ifdef USE_THREADS
12 #include "msgqueue.h"
13 #endif
14 
15 class Task;
16 class SocketConnection;
17 class ServerSocket;
18 class WorkerProcess;
19 
31 class EventLoop : public Logger {
32 public:
35  EventLoop(std::string log_label = "MainLoop") :
36  Logger(log_label),
37  engine("NetworkEngine"),
38  name(log_label) {
39 #ifdef USE_THREADS
40  do_init(nullptr);
41 #else
42  do_init();
43 #endif
44  }
45 
46  ~EventLoop();
47 
54  void addTask(Task *task, Task *parent = nullptr);
55 
58  bool run(double timeout_s);
59 
61  void runUntilComplete();
62 
63 #ifdef USE_THREADS
69  static void runTask(Task *task, const std::string &name = "MainLoop",
70  std::ostream *log_file = nullptr,
71  EventLoop *parent = nullptr);
72 #else
73  static void runTask(Task *task, const std::string &name = "MainLoop",
74  std::ostream *log_file = nullptr);
75 #endif
76 
79 
81  void abortTask(Task *task);
82 
84  void abort() {
85  interrupt();
86  do_abort = true;
87  }
88 
90  void getChildTasks(std::set<Task *> &tset, Task *parent) const;
91 
93  void abortChildTasks(Task *parent);
94 
96  void wakeUpTask(Task *t);
97 
100  return engine.wakeUpConnection(s);
101  }
102 
105  engine.cancelConnection(s);
106  }
107 
109  std::set<Socket *> findConnByTask(const Task *t) const;
110 
112  bool isActive(const Socket *conn) const {
113  return engine.connActive(conn);
114  }
115 
118  void resetTimer(Task *task, double s);
119 
125  bool addConnection(SocketConnection *conn);
126 
131  bool addConnected(SocketConnection *conn);
132 
135  bool addServer(ServerSocket *conn);
136 #ifdef USE_GNUTLS
138  bool tlsSetKey(ServerSocket *conn, const std::string &crt_path,
139  const std::string &key_path, const std::string &password) {
140  return engine.tlsSetKey(conn, crt_path, key_path, password);
141  }
142 
144  bool setCABundle(const std::string &path) {
145  return engine.setCABundle(path);
146  }
147 #endif
148 
150  bool running(Task *task);
151 
157  void taskDeleted(Task *task);
158 
161  static void interrupt() {
162  Engine::yield();
163  }
164 
165 #ifdef USE_THREADS
167  void spawnThread(Task *task, const std::string &name="ThreadLoop",
168  std::ostream *log_file = nullptr,
169  Task *parent = nullptr);
170 #endif
171 #ifdef _WIN32
172  int externalCommand(Task *owner, const char *const argv[]) {
173  // Not implemented
174  exit(1);
175  }
176 #else
179  int externalCommand(Task *owner, const char *const argv[]);
180 
182  static void daemonize();
183 
186  WorkerProcess *createWorker(Task *parent, std::ostream *log_file,
187  unsigned int channels, unsigned int wno);
188 
191  WorkerProcess *createWorker(Task *parent, const std::string &log_file_name,
192  unsigned int channels, unsigned int wno);
193 
195  void killChildProcesses(int signum);
196 
202  static void setLogFilename(const std::string &filename) {
203  openFileOnSIGHUP = filename;
204  }
205 #endif
206 
211  void notifyTaskFinished(Task *task) {
212  auto ret = finishedTasks.insert(task);
213  if (ret.second)
214  engine.yield();
215  }
216 
221  void notifyTaskMessage(Task *task) {
222  auto ret = messageTasks.insert(task);
223  if (ret.second)
224  engine.yield();
225  }
226 
228  bool aborted() const {
229  return do_abort;
230  }
231 
233  void addSignalHandler(int signum, void (*handler)(int, EventLoop &));
234 
239  bool startObserving(Task *from, Task *to);
240 
242  bool isObserving(Task *observer, Task *task) const {
243  auto p = observed_by.find(observer);
244  return p != observed_by.end() &&
245  p->second.find(task) != p->second.end();
246  }
247 
248 private:
249 #ifndef _WIN32
250 #ifdef USE_THREADS
251  thread_local
252 #endif
253  static std::map<int, int> terminatedPIDs;
254  std::map<int, Task *> pidOwner;
255 #ifdef USE_THREADS
256  thread_local
257 #endif
258  static std::string openFileOnSIGHUP;
259 #endif
260 #ifdef USE_THREADS
261  void do_init(EventLoop *parent);
262 
263  EventLoop(std::string log_label, EventLoop *parent) :
264  Logger(log_label),
265  engine("NetworkEngine"),
266  name(log_label) {
267  do_init(parent);
268  }
269  std::map<Task *, std::thread> threads;
270  static MsgQueue<Task *> finished_threads;
271  std::map<Task *, Task *> threadTaskObserver;
272  void collect_thread(Task *t);
273  EventLoop *parent_loop;
274 #else
275  void do_init();
276 #endif
277 
278 #ifdef USE_THREADS
279  //thread_local
280 #endif
281  static volatile int got_signal;
282 #ifdef USE_THREADS
283  thread_local
284 #endif
285  static volatile int terminatedPIDtmp[100];
286 
287  static void signalHandler(int signum);
288  void removeAllTasks();
289  void check_finished();
290  Task *nextTimerToExecute();
291  void _removeTimer(Task *task);
292  void _removeTask(Task *task, bool killed = false);
293  Engine engine;
294 
295  // Map each task to its parent (or, if it has no parent, to nullptr)
296  std::map<Task *, Task *> tasks;
297 
298  // These are used to keep track of "observation" between tasks.
299  // E.g. a parent task is observing its children.
300  std::map<Task *, std::set<Task *> > observed_by;
301  std::map<Task *, std::set<Task *> > observing;
302 
303  std::set<Task *> finishedTasks, messageTasks;
304  std::multimap<int, void (*)(int, EventLoop &)> userSignalHandler;
305  std::multimap<TimePoint, Task *> timer_queue;
306  std::string name;
307  bool do_abort = false;
308 };
The network engine.
Definition: engine.h:33
bool tlsSetKey(ServerSocket *conn, const std::string &crt_path, const std::string &key_path, const std::string &password)
Use SSL certificate for a listening socket.
Definition: engine.cpp:73
bool connActive(const Socket *conn) const
Definition: engine.h:89
bool wakeUpConnection(SocketConnection *s)
Wake up connection s if it is idle, return false otherwise.
Definition: engine.cpp:587
bool setCABundle(const std::string &path)
Set path to file containing chain of trust for SSL certificate.
Definition: engine.cpp:63
static void yield()
Call this to make the Engine::run method return prematurely.
Definition: engine.h:67
Manage timers and tasks.
Definition: eventloop.h:31
bool run(double timeout_s)
Definition: eventloop.cpp:505
void addSignalHandler(int signum, void(*handler)(int, EventLoop &))
Add handler for the given OS signal.
Definition: eventloop.cpp:249
bool setCABundle(const std::string &path)
Set path to file containing chain of trust for SSL certificate.
Definition: eventloop.h:144
bool addServer(ServerSocket *conn)
Definition: eventloop.cpp:322
void abort()
Remove all tasks.
Definition: eventloop.h:84
bool wakeUpConnection(SocketConnection *s)
Restart an idle connection.
Definition: eventloop.h:99
void taskDeleted(Task *task)
Notify EventLoop that a task object it ows has been deleted.
Definition: eventloop.cpp:339
void runUntilComplete()
Run until all task are done.
Definition: eventloop.cpp:554
bool isObserving(Task *observer, Task *task) const
Return true if observer is observing task.
Definition: eventloop.h:242
void notifyTaskMessage(Task *task)
Notify the EventLoop that the given task has a message to deliver.
Definition: eventloop.h:221
void addTask(Task *task, Task *parent=nullptr)
Definition: eventloop.cpp:271
void wakeUpTask(Task *t)
Restart idle connections owned by the given task.
Definition: eventloop.cpp:404
bool tlsSetKey(ServerSocket *conn, const std::string &crt_path, const std::string &key_path, const std::string &password)
Use SSL certificate for a listening socket.
Definition: eventloop.h:138
void notifyTaskFinished(Task *task)
Mark the given task as finished.
Definition: eventloop.h:211
bool addConnected(SocketConnection *conn)
Definition: eventloop.cpp:308
void cancelConnection(SocketConnection *s)
Remove a connection.
Definition: eventloop.h:104
bool addConnection(SocketConnection *conn)
Definition: eventloop.cpp:294
void killChildProcesses(int signum)
Send signal to all child processes.
Definition: eventloop.cpp:196
void spawnThread(Task *task, const std::string &name="ThreadLoop", std::ostream *log_file=nullptr, Task *parent=nullptr)
Create a new thread and run task in its own loop in that thread.
Definition: eventloop.cpp:589
static void setLogFilename(const std::string &filename)
Set path to log file.
Definition: eventloop.h:202
bool running(Task *task)
Return true if task is running.
Definition: eventloop.cpp:412
std::set< Socket * > findConnByTask(const Task *t) const
Return all connetcions owned by the given task.
Definition: eventloop.cpp:408
bool aborted() const
Return true if the EventLoop is about to be terminated.
Definition: eventloop.h:228
WorkerProcess * createWorker(Task *parent, std::ostream *log_file, unsigned int channels, unsigned int wno)
Definition: eventloop.cpp:117
static void interrupt()
Definition: eventloop.h:161
static void daemonize()
Fork into background, detach from shell.
Definition: eventloop.cpp:90
void resetTimer(Task *task, double s)
Definition: eventloop.cpp:416
int externalCommand(Task *owner, const char *const argv[])
Definition: eventloop.cpp:73
void waitForThreadsToFinish()
Block current thread until all spawned threads have finished.
Definition: eventloop.cpp:629
void getChildTasks(std::set< Task * > &tset, Task *parent) const
Get all tasks with the given parent.
Definition: eventloop.cpp:376
static void runTask(Task *task, const std::string &name="MainLoop", std::ostream *log_file=nullptr, EventLoop *parent=nullptr)
Definition: eventloop.cpp:567
void abortChildTasks(Task *parent)
Remove all tasks with the given parent.
Definition: eventloop.cpp:388
bool startObserving(Task *from, Task *to)
Definition: eventloop.cpp:348
void abortTask(Task *task)
Remove the given task.
Definition: eventloop.cpp:398
EventLoop(std::string log_label="MainLoop")
Definition: eventloop.h:35
bool isActive(const Socket *conn) const
Return true if conn still exists.
Definition: eventloop.h:112
A simple logger. All classes that want to write to the global log file should inherit from this class...
Definition: logger.h:86
Listen on a single socket for incoming connections.
Definition: serversocket.h:15
This class implements low-level socket connection operations. Inherit from it to implement protocols ...
Definition: socketconnection.h:47
This is a slave to the Engine class. You can't use it directly, only through its subclasses,...
Definition: socket.h:18
The purpose of a task is to manage socket connections, and/or to execute timers.
Definition: task.h:39
Used by LoadBalancer to manage child processes.
Definition: workerprocess.h:21