9 #include "taskconfig.h"
10 #include "eventloop.h"
49 Task(
const std::string &task_name);
63 dbg_log() <<
"Task starting. No timer.";
70 dbg_log() <<
"Default timerEvent: will kill task.";
85 return (has_started && is_finished && !was_killed &&
86 !was_error && !was_timeout);
122 kill_children =
true;
226 log() <<
"Will not call handleExecution since task isn't observed.";
243 log() <<
"missing createWorkerTask, cannot create worker " << wno;
261 log() <<
"Worker message: " << std::string(buf, len);
286 const std::string & );
290 return tot_bytes_sent;
295 return tot_bytes_received;
303 tot_bytes_received = 0;
323 tot_bytes_received += n;
368 const std::string &key_path,
const std::string &password) {
369 return supervisor->
tlsSetKey(conn, crt_path, key_path, password);
384 log() <<
"Task failure: " << msg;
404 dbg_log() <<
"Task " << task->
label() <<
" died, no handler defined.";
410 dbg_log() <<
"No taskMessage handler implemented for " << task->
label();
416 << (sender ? sender->
label() :
"unknown")
417 <<
", no handler implemented";
435 supervisor->
addTask(task, parent);
441 std::ostream *log_file =
nullptr,
442 Task *parent =
nullptr) {
443 supervisor->
spawnThread(task, name, log_file, parent);
479 unsigned int channels = 1,
480 unsigned int wno = 0) {
481 return supervisor->
createWorker(
this, log_file, channels, wno);
488 unsigned int channels = 1,
489 unsigned int wno = 0) {
490 return supervisor->
createWorker(
this, log_file_name, channels, wno);
496 void setTerminated() {
503 return this->
start();
511 std::string the_result;
512 std::string the_message;
513 uint64_t tot_bytes_sent = 0, tot_bytes_received = 0;
514 bool is_finished =
false;
515 bool has_started =
false;
516 bool was_killed =
false, was_error =
false, was_timeout =
false;
517 bool kill_children =
false;
518 bool is_child_thread =
false;
Manage timers and tasks.
Definition: eventloop.h:31
void abort()
Remove all tasks.
Definition: eventloop.h:84
bool wakeUpConnection(SocketConnection *s)
Restart an idle connection.
Definition: eventloop.h:99
bool isObserving(Task *observer, Task *task) const
Return true if observer is observing task.
Definition: eventloop.h:242
void addTask(Task *task, Task *parent=nullptr)
Definition: eventloop.cpp:271
bool tlsSetKey(ServerSocket *conn, const std::string &crt_path, const std::string &key_path, const std::string &password)
Use SSL certificate for a listening socket.
Definition: eventloop.h:138
void cancelConnection(SocketConnection *s)
Remove a connection.
Definition: eventloop.h:104
void spawnThread(Task *task, const std::string &name="ThreadLoop", std::ostream *log_file=nullptr, Task *parent=nullptr)
Create a new thread and run task in its own loop in that thread.
Definition: eventloop.cpp:589
WorkerProcess * createWorker(Task *parent, std::ostream *log_file, unsigned int channels, unsigned int wno)
Definition: eventloop.cpp:117
void resetTimer(Task *task, double s)
Definition: eventloop.cpp:416
void getChildTasks(std::set< Task * > &tset, Task *parent) const
Get all tasks with the given parent.
Definition: eventloop.cpp:376
void abortChildTasks(Task *parent)
Remove all tasks with the given parent.
Definition: eventloop.cpp:388
bool startObserving(Task *from, Task *to)
Definition: eventloop.cpp:348
void abortTask(Task *task)
Remove the given task.
Definition: eventloop.cpp:398
bool isActive(const Socket *conn) const
Return true if conn still exists.
Definition: eventloop.h:112
A simple logger. All classes that want to write to the global log file should inherit from this class...
Definition: logger.h:86
std::string label() const
Return the object's log label.
Definition: logger.h:251
static TimePoint timeNow()
Return current time.
Definition: logger.h:218
std::ostream & log() const
Write a line of info log.
Definition: logger.h:328
std::ostream & dbg_log() const
Write a line of debug log.
Definition: logger.h:345
static double secondsSince(const TimePoint &t)
Definition: logger.cpp:16
Listen on a single socket for incoming connections.
Definition: serversocket.h:15
This class implements low-level socket connection operations. Inherit from it to implement protocols ...
Definition: socketconnection.h:47
Pass sockets and messages between processes.
Definition: socketreceiver.h:21
This is a slave to the Engine class. You can't use it directly, only through its subclasses,...
Definition: socket.h:18
Read configuration from file or string.
Definition: taskconfig.h:44
The purpose of a task is to manage socket connections, and/or to execute timers.
Definition: task.h:39
WorkerProcess * createWorker(const std::string &log_file_name, unsigned int channels=1, unsigned int wno=0)
Run task returned by this->createWorkerTask in a child process. Return nullptr on failure.
Definition: task.h:487
bool parseListen(const TaskConfig &tc, const std::string &log_label)
Definition: task.cpp:98
void setResult(const std::string &res)
Definition: task.cpp:18
uint64_t bytesReceived() const
Number of bytes received through SocketConnection objects owned by me.
Definition: task.h:294
virtual bool tlsSetKey(ServerSocket *conn, const std::string &crt_path, const std::string &key_path, const std::string &password)
Use SSL certificate for a listening socket.
Definition: task.h:367
std::string message() const
Return the current (outgoing) message.
Definition: task.h:193
virtual PollState connectionReady(SocketConnection *)
Definition: task.cpp:35
bool wasError() const
Return true if the task terminated with an error.
Definition: task.h:101
virtual bool adoptConnection(Socket *conn)
Definition: task.cpp:64
virtual void finishWorkerTask(unsigned int)
Definition: task.h:249
bool terminated() const
Return true if task is finished.
Definition: task.h:429
void wakeUp()
Restart all idle connections.
Definition: task.cpp:73
virtual void taskFinished(Task *task)
Definition: task.h:403
virtual void serverRemoved(ServerSocket *)
Definition: task.h:163
virtual void connRemoved(SocketConnection *)
Definition: task.h:151
virtual ~Task()
Definition: task.cpp:77
bool wakeUpConnection(SocketConnection *s)
If s is idle, restart it and return true. Otherwise return false.
Definition: task.h:183
virtual void handleExecution(Task *sender, const std::string &message)
Callback to execute code on behalf of another Task.
Definition: task.h:414
virtual void connAdded(SocketConnection *)
Definition: task.h:145
void notifyBytesSent(uint64_t n)
Notify the task that data has been sent on its behalf.
Definition: task.h:312
void resetByteCount()
Reset the values for the methods Task::bytesSent and Task::bytesReceived.
Definition: task.h:301
virtual void taskMessage(Task *task)
Definition: task.h:409
void executeHandler(Task *receiver, const std::string &message)
Definition: task.h:222
bool wasKilled() const
Return true if the task is finished and was aborted by another task.
Definition: task.h:93
Task(const std::string &task_name)
Create a task with the given name.
Definition: task.cpp:13
void abortMyTasks()
Terminate all my child tasks.
Definition: task.h:453
void resetTimer(double s)
Run timerEvent after s seconds instead of previous value.
Definition: task.h:76
void abortAllTasks()
Terminate all tasks and exit the EventLoop.
Definition: task.h:463
double elapsed() const
Return number of seconds since the task was started.
Definition: task.h:234
bool hasStarted() const
Definition: task.h:115
virtual void newWorkerChannel(SocketReceiver *, unsigned int)
Definition: task.h:256
virtual double start()
Definition: task.h:62
virtual void setTimeout()
Definition: task.h:391
bool wasTimeout() const
Return true if the task terminated with a timeout.
Definition: task.h:109
WorkerProcess * createWorker(std::ostream *log_file=nullptr, unsigned int channels=1, unsigned int wno=0)
Run task returned by this->createWorkerTask in a child process. Return nullptr on failure.
Definition: task.h:478
bool isActive(Socket *conn) const
Return true if the connection still exists.
Definition: task.h:175
virtual void setError(const std::string &msg)
Definition: task.h:383
void setMessage(const std::string &msg)
Definition: task.cpp:30
std::set< Socket * > getMyConnections() const
Return all current connections.
Definition: task.cpp:69
virtual void serverAdded(ServerSocket *)
Definition: task.h:157
void notifyBytesReceived(uint64_t n)
Notify the task that data has been received on its behalf.
Definition: task.h:322
bool addServer(ServerSocket *conn)
As Task::addConnected, but with a server connection.
Definition: task.cpp:81
void cancelConnection(SocketConnection *s)
Terminate and remove a connection.
Definition: task.h:188
virtual void processFinished(int pid, int wstatus)
Will be called to notify when an external process has terminated.
Definition: task.cpp:94
void getMyTasks(std::set< Task * > &tset)
Add all my child tasks to the given set.
Definition: task.h:448
int runProcess(const char *const argv[])
Definition: task.cpp:90
void killChildTaskWhenFinished()
Definition: task.h:121
void addNewTask(Task *task, Task *parent=nullptr)
Insert another Task for execution by the EventLoop.
Definition: task.h:434
bool addConnected(SocketConnection *conn)
Definition: task.cpp:55
std::string result() const
To get the "result" of the task after it has finished.
Definition: task.h:167
virtual double timerEvent()
Definition: task.h:69
virtual SocketConnection * newClient(int, const char *, uint16_t, ServerSocket *)
Definition: task.h:133
virtual void workerMessage(SocketReceiver *, const char *buf, size_t len)
Called if parent/worker sends a message through a SocketReceiver:
Definition: task.h:260
bool startObserving(Task *to)
Definition: task.h:200
virtual PollState msgFromConnection(SocketConnection *, const std::string &)
Definition: task.cpp:40
void addNewThread(Task *task, const std::string &name="ThreadLoop", std::ostream *log_file=nullptr, Task *parent=nullptr)
Run task in a new thread.
Definition: task.h:440
uint64_t bytesSent() const
Number of bytes sent through SocketConnection objects owned by me.
Definition: task.h:289
void abortTask(Task *task)
Terminate a task.
Definition: task.h:458
bool addConnection(SocketConnection *conn)
Definition: task.cpp:46
virtual Task * createWorkerTask(unsigned int wno)
Definition: task.h:242
bool finishedOK() const
Return true if the task has finished normally.
Definition: task.h:84
Measure elapsed time during execution, for example by timer events.
Used by LoadBalancer to manage child processes.
Definition: workerprocess.h:21
PollState
Definition: pollstate.h:11