MAJOR: applets: Use tasks, instead of rolling our own scheduler.

There's no real reason to have a specific scheduler for applets anymore, so
nuke it and just use tasks. This comes with some benefits, the first one
being that applets cannot induce high latencies anymore since they share
nice values with other tasks. Later it will be possible to configure the
applets' nice value. The second benefit is that the applet scheduler was
not very thread-friendly, having a big lock around it in prevision of this
change. Thus applet-intensive workloads should now scale much better with
threads.

Some more improvement is possible now : some applets also use a task to
handle timers and timeouts. These ones could now be simplified to use only
one task.
This commit is contained in:
Olivier Houchard 2018-05-25 16:58:52 +02:00 committed by Willy Tarreau
parent 1599b80360
commit 673867c357
11 changed files with 65 additions and 154 deletions

View File

@ -28,14 +28,11 @@
#include <common/mini-clist.h>
#include <types/applet.h>
#include <proto/connection.h>
#include <proto/task.h>
extern unsigned int nb_applets;
extern unsigned long active_applets_mask;
extern unsigned int applets_active_queue;
__decl_hathreads(extern HA_SPINLOCK_T applet_active_lock);
extern struct list applet_active_queue;
void applet_run_active();
struct task *task_run_applet(struct task *t, void *context, unsigned short state);
static int inline appctx_res_wakeup(struct appctx *appctx);
@ -52,7 +49,7 @@ static inline void appctx_init(struct appctx *appctx, unsigned long thread_mask)
appctx->chunk = NULL;
appctx->io_release = NULL;
appctx->thread_mask = thread_mask;
appctx->state = APPLET_SLEEPING;
appctx->state = 0;
}
/* Tries to allocate a new appctx and initialize its main fields. The appctx
@ -69,7 +66,13 @@ static inline struct appctx *appctx_new(struct applet *applet, unsigned long thr
appctx->obj_type = OBJ_TYPE_APPCTX;
appctx->applet = applet;
appctx_init(appctx, thread_mask);
LIST_INIT(&appctx->runq);
appctx->t = task_new(thread_mask);
if (unlikely(appctx->t == NULL)) {
pool_free(pool_head_connection, appctx);
return NULL;
}
appctx->t->process = task_run_applet;
appctx->t->context = appctx;
LIST_INIT(&appctx->buffer_wait.list);
appctx->buffer_wait.target = appctx;
appctx->buffer_wait.wakeup_cb = (int (*)(void *))appctx_res_wakeup;
@ -83,11 +86,8 @@ static inline struct appctx *appctx_new(struct applet *applet, unsigned long thr
*/
static inline void __appctx_free(struct appctx *appctx)
{
if (!LIST_ISEMPTY(&appctx->runq)) {
LIST_DEL(&appctx->runq);
applets_active_queue--;
}
task_delete(appctx->t);
task_free(appctx->t);
if (!LIST_ISEMPTY(&appctx->buffer_wait.list)) {
HA_SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
LIST_DEL(&appctx->buffer_wait.list);
@ -98,38 +98,27 @@ static inline void __appctx_free(struct appctx *appctx)
pool_free(pool_head_connection, appctx);
HA_ATOMIC_SUB(&nb_applets, 1);
}
static inline void appctx_free(struct appctx *appctx)
{
HA_SPIN_LOCK(APPLETS_LOCK, &applet_active_lock);
if (appctx->state & APPLET_RUNNING) {
/* The task is supposed to be run on this thread, so we can just
* check if it's running already (or about to run) or not
*/
if (!(appctx->t->state & TASK_RUNNING))
__appctx_free(appctx);
else {
/* if it's running, or about to run, defer the freeing
* until the callback is called.
*/
appctx->state |= APPLET_WANT_DIE;
HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
return;
task_wakeup(appctx->t, TASK_WOKEN_OTHER);
}
__appctx_free(appctx);
HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
}
/* wakes up an applet when conditions have changed */
static inline void __appctx_wakeup(struct appctx *appctx)
{
if (LIST_ISEMPTY(&appctx->runq)) {
LIST_ADDQ(&applet_active_queue, &appctx->runq);
applets_active_queue++;
active_applets_mask |= appctx->thread_mask;
}
}
static inline void appctx_wakeup(struct appctx *appctx)
{
HA_SPIN_LOCK(APPLETS_LOCK, &applet_active_lock);
if (appctx->state & APPLET_RUNNING) {
appctx->state |= APPLET_WOKEN_UP;
HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
return;
}
__appctx_wakeup(appctx);
HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
task_wakeup(appctx->t, TASK_WOKEN_OTHER);
}
/* Callback used to wake up an applet when a buffer is available. The applet
@ -139,19 +128,17 @@ static inline void appctx_wakeup(struct appctx *appctx)
* requested */
static inline int appctx_res_wakeup(struct appctx *appctx)
{
HA_SPIN_LOCK(APPLETS_LOCK, &applet_active_lock);
if (appctx->state & APPLET_RUNNING) {
if (appctx->state & APPLET_WOKEN_UP) {
HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
return 0;
}
appctx->state |= APPLET_WOKEN_UP;
HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
return 1;
}
__appctx_wakeup(appctx);
HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
return 1;
int ret;
/* To detect if we have already been waken or not, we now that
* if the state contains TASK_RUNNING, but not just TASK_RUNNING.
* This is racy, but that's OK. At worst we will wake a little more
* tasks than necessary when a buffer is available.
*/
ret = ((appctx->state & TASK_RUNNING) != 0) &&
((appctx->state != TASK_RUNNING));
task_wakeup(appctx->t, TASK_WOKEN_OTHER);
return ret;
}

View File

@ -36,7 +36,6 @@
#include <types/stream.h>
#include <types/stream_interface.h>
#include <proto/applet.h>
#include <proto/task.h>
/* perform minimal intializations, report 0 in case of error, 1 if OK. */
@ -456,7 +455,7 @@ static inline void channel_release_buffer(struct channel *chn, struct buffer_wai
{
if (chn->buf->size && buffer_empty(chn->buf)) {
b_free(&chn->buf);
offer_buffers(wait->target, tasks_run_queue + applets_active_queue);
offer_buffers(wait->target, tasks_run_queue);
}
}

View File

@ -45,17 +45,13 @@ struct applet {
unsigned int timeout; /* execution timeout. */
};
#define APPLET_SLEEPING 0x00 /* applet is currently sleeping or pending in active queue */
#define APPLET_RUNNING 0x01 /* applet is currently running */
#define APPLET_WOKEN_UP 0x02 /* applet was running and requested to woken up again */
#define APPLET_WANT_DIE 0x04 /* applet was running and requested to die */
#define APPLET_WANT_DIE 0x01 /* applet was running and requested to die */
#define APPCTX_CLI_ST1_PROMPT (1 << 0)
#define APPCTX_CLI_ST1_PAYLOAD (1 << 1)
/* Context of a running applet. */
struct appctx {
struct list runq; /* chaining in the applet run queue */
enum obj_type obj_type; /* OBJ_TYPE_APPCTX */
/* 3 unused bytes here */
unsigned short state; /* Internal appctx state */
@ -72,6 +68,7 @@ struct appctx {
int cli_severity_output; /* used within the cli_io_handler to format severity output of informational feedback */
struct buffer_wait buffer_wait; /* position in the list of objects waiting for a buffer */
unsigned long thread_mask; /* mask of thread IDs authorized to process the applet */
struct task *t; /* task associated to the applet */
union {
struct {

View File

@ -183,7 +183,6 @@ struct activity {
unsigned int loops; // complete loops in run_poll_loop()
unsigned int wake_cache; // active fd_cache prevented poll() from sleeping
unsigned int wake_tasks; // active tasks prevented poll() from sleeping
unsigned int wake_applets; // active applets prevented poll() from sleeping
unsigned int wake_signal; // pending signal prevented poll() from sleeping
unsigned int poll_exp; // number of times poll() sees an expired timeout (includes wake_*)
unsigned int poll_drop; // poller dropped a dead FD from the update list

View File

@ -19,100 +19,36 @@
#include <proto/channel.h>
#include <proto/stream.h>
#include <proto/stream_interface.h>
#include <proto/task.h>
unsigned int nb_applets = 0;
unsigned long active_applets_mask = 0;
unsigned int applets_active_queue = 0;
__decl_hathreads(HA_SPINLOCK_T applet_active_lock); /* spin lock related to applet active queue */
struct list applet_active_queue = LIST_HEAD_INIT(applet_active_queue);
void applet_run_active()
struct task *task_run_applet(struct task *t, void *context, unsigned short state)
{
struct appctx *curr, *next;
struct stream_interface *si;
struct list applet_cur_queue = LIST_HEAD_INIT(applet_cur_queue);
int max_processed;
struct appctx *app = context;
struct stream_interface *si = app->owner;
max_processed = applets_active_queue;
if (max_processed > 200)
max_processed = 200;
HA_SPIN_LOCK(APPLETS_LOCK, &applet_active_lock);
if (!(active_applets_mask & tid_bit)) {
HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
return;
if (app->state & APPLET_WANT_DIE) {
__appctx_free(app);
return NULL;
}
active_applets_mask &= ~tid_bit;
curr = LIST_NEXT(&applet_active_queue, typeof(curr), runq);
while (&curr->runq != &applet_active_queue) {
next = LIST_NEXT(&curr->runq, typeof(next), runq);
if (curr->thread_mask & tid_bit) {
LIST_DEL(&curr->runq);
curr->state = APPLET_RUNNING;
LIST_ADDQ(&applet_cur_queue, &curr->runq);
applets_active_queue--;
max_processed--;
}
curr = next;
if (max_processed <= 0) {
active_applets_mask |= tid_bit;
break;
}
}
HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
/* Now we'll try to allocate the input buffer. We wake up the
* applet in all cases. So this is the applet responsibility to
* check if this buffer was allocated or not. This let a chance
* for applets to do some other processing if needed. */
if (!channel_alloc_buffer(si_ic(si), &app->buffer_wait))
si_applet_cant_put(si);
/* The list is only scanned from the head. This guarantees that if any
* applet removes another one, there is no side effect while walking
* through the list.
/* We always pretend the applet can't get and doesn't want to
* put, it's up to it to change this if needed. This ensures
* that one applet which ignores any event will not spin.
*/
while (!LIST_ISEMPTY(&applet_cur_queue)) {
curr = LIST_ELEM(applet_cur_queue.n, typeof(curr), runq);
si = curr->owner;
si_applet_cant_get(si);
si_applet_stop_put(si);
/* Now we'll try to allocate the input buffer. We wake up the
* applet in all cases. So this is the applet responsibility to
* check if this buffer was allocated or not. This let a chance
* for applets to do some other processing if needed. */
if (!channel_alloc_buffer(si_ic(si), &curr->buffer_wait))
si_applet_cant_put(si);
/* We always pretend the applet can't get and doesn't want to
* put, it's up to it to change this if needed. This ensures
* that one applet which ignores any event will not spin.
*/
si_applet_cant_get(si);
si_applet_stop_put(si);
curr->applet->fct(curr);
si_applet_wake_cb(si);
channel_release_buffer(si_ic(si), &curr->buffer_wait);
if (applet_cur_queue.n == &curr->runq) {
/* curr was left in the list, move it back to the active list */
LIST_DEL(&curr->runq);
LIST_INIT(&curr->runq);
HA_SPIN_LOCK(APPLETS_LOCK, &applet_active_lock);
if (curr->state & APPLET_WANT_DIE) {
curr->state = APPLET_SLEEPING;
__appctx_free(curr);
}
else {
if (curr->state & APPLET_WOKEN_UP) {
curr->state = APPLET_SLEEPING;
__appctx_wakeup(curr);
}
else {
curr->state = APPLET_SLEEPING;
}
}
HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
}
}
app->applet->fct(app);
si_applet_wake_cb(si);
channel_release_buffer(si_ic(si), &app->buffer_wait);
return t;
}
__attribute__((constructor))
static void __applet_init(void)
{
HA_SPIN_INIT(&applet_active_lock);
}

View File

@ -84,6 +84,7 @@
#include <proto/stick_table.h>
#include <proto/task.h>
#include <proto/tcp_rules.h>
#include <proto/connection.h>
/* This is the SSLv3 CLIENT HELLO packet used in conjunction with the

View File

@ -953,7 +953,6 @@ static int cli_io_handler_show_activity(struct appctx *appctx)
chunk_appendf(&trash, "\nloops:"); for (thr = 0; thr < global.nbthread; thr++) chunk_appendf(&trash, " %u", activity[thr].loops);
chunk_appendf(&trash, "\nwake_cache:"); for (thr = 0; thr < global.nbthread; thr++) chunk_appendf(&trash, " %u", activity[thr].wake_cache);
chunk_appendf(&trash, "\nwake_tasks:"); for (thr = 0; thr < global.nbthread; thr++) chunk_appendf(&trash, " %u", activity[thr].wake_tasks);
chunk_appendf(&trash, "\nwake_applets:"); for (thr = 0; thr < global.nbthread; thr++) chunk_appendf(&trash, " %u", activity[thr].wake_applets);
chunk_appendf(&trash, "\nwake_signal:"); for (thr = 0; thr < global.nbthread; thr++) chunk_appendf(&trash, " %u", activity[thr].wake_signal);
chunk_appendf(&trash, "\npoll_exp:"); for (thr = 0; thr < global.nbthread; thr++) chunk_appendf(&trash, " %u", activity[thr].poll_exp);
chunk_appendf(&trash, "\npoll_drop:"); for (thr = 0; thr < global.nbthread; thr++) chunk_appendf(&trash, " %u", activity[thr].poll_drop);

View File

@ -2840,8 +2840,7 @@ spoe_release_buffer(struct buffer **buf, struct buffer_wait *buffer_wait)
/* Release the buffer if needed */
if ((*buf)->size) {
b_free(buf);
offer_buffers(buffer_wait->target,
tasks_run_queue + applets_active_queue);
offer_buffers(buffer_wait->target, tasks_run_queue);
}
}

View File

@ -90,7 +90,6 @@
#include <types/peers.h>
#include <proto/acl.h>
#include <proto/applet.h>
#include <proto/arg.h>
#include <proto/auth.h>
#include <proto/backend.h>
@ -2419,8 +2418,6 @@ static void run_poll_loop()
activity[tid].wake_cache++;
else if (active_tasks_mask & tid_bit)
activity[tid].wake_tasks++;
else if (active_applets_mask & tid_bit)
activity[tid].wake_applets++;
else if (signal_queue_len)
activity[tid].wake_signal++;
else
@ -2429,7 +2426,6 @@ static void run_poll_loop()
/* The poller will ensure it returns around <next> */
cur_poller.poll(&cur_poller, exp);
fd_process_cached_events();
applet_run_active();
/* Synchronize all polling loops */

View File

@ -17,7 +17,6 @@
#include <common/hpack-enc.h>
#include <common/hpack-tbl.h>
#include <common/net_helper.h>
#include <proto/applet.h>
#include <proto/connection.h>
#include <proto/h1.h>
#include <proto/stream.h>
@ -303,8 +302,7 @@ static inline void h2_release_buf(struct h2c *h2c, struct buffer **bptr)
{
if ((*bptr)->size) {
b_free(bptr);
offer_buffers(h2c->buf_wait.target,
tasks_run_queue + applets_active_queue);
offer_buffers(h2c->buf_wait.target, tasks_run_queue);
}
}

View File

@ -339,7 +339,7 @@ static void stream_free(struct stream *s)
if (s->req.buf->size || s->res.buf->size) {
b_drop(&s->req.buf);
b_drop(&s->res.buf);
offer_buffers(NULL, tasks_run_queue + applets_active_queue);
offer_buffers(NULL, tasks_run_queue);
}
hlua_ctx_destroy(s->hlua);
@ -469,7 +469,7 @@ void stream_release_buffers(struct stream *s)
* someone waiting, we can wake up a waiter and offer them.
*/
if (offer)
offer_buffers(s, tasks_run_queue + applets_active_queue);
offer_buffers(s, tasks_run_queue);
}
/* perform minimal intializations, report 0 in case of error, 1 if OK. */