MAJOR: tasks: Introduce tasklets.

Introduce tasklets, lightweight tasks. They have no notion of priority,
they are just run as soon as possible, and will probably be used for I/O
later.

For the moment they're used to replace the temporary thread-local list
that was used in the scheduler. The first part of the struct is common
with tasks so that tasks can be cast to tasklets and queued in this list.
Once a task is in the tasklet list, it has its leaf_p set to 0x1 so that
it cannot accidently be confused as not in the queue.

Pure tasklets are identifiable by their nice value of -32768 (which is
normally not possible).
This commit is contained in:
Olivier Houchard 2018-05-18 18:45:28 +02:00 committed by Willy Tarreau
parent f6e6dc12cd
commit b0bdae7b88
3 changed files with 163 additions and 105 deletions

View File

@ -89,11 +89,14 @@ extern unsigned int tasks_run_queue_cur;
extern unsigned int nb_tasks_cur;
extern unsigned int niced_tasks; /* number of niced tasks in the run queue */
extern struct pool_head *pool_head_task;
extern struct pool_head *pool_head_tasklet;
extern struct pool_head *pool_head_notification;
extern THREAD_LOCAL struct task *curr_task; /* task currently running or NULL */
extern THREAD_LOCAL struct eb32sc_node *rq_next; /* Next task to be potentially run */
extern struct eb_root rqueue; /* tree constituting the run queue */
extern struct eb_root rqueue_local[MAX_THREADS]; /* tree constituting the per-thread run queue */
extern struct list task_list[MAX_THREADS]; /* List of tasks to be run, mixing tasks and tasklets */
extern int task_list_size[MAX_THREADS]; /* Number of task sin the task_list */
__decl_hathreads(extern HA_SPINLOCK_T rq_lock); /* spin lock related to run queue */
__decl_hathreads(extern HA_SPINLOCK_T wq_lock); /* spin lock related to wait queue */
@ -101,7 +104,10 @@ __decl_hathreads(extern HA_SPINLOCK_T wq_lock); /* spin lock related to wait qu
/* return 0 if task is in run queue, otherwise non-zero */
static inline int task_in_rq(struct task *t)
{
return t->rq.node.leaf_p != NULL;
/* Check if leaf_p is NULL, in case he's not in the runqueue, and if
* it's not 0x1, which would mean it's in the tasklet list.
*/
return t->rq.node.leaf_p != NULL && t->rq.node.leaf_p != (void *)0x1;
}
/* return 0 if task is in wait queue, otherwise non-zero */
@ -122,7 +128,7 @@ static inline void task_wakeup(struct task *t, unsigned int f)
#ifdef USE_THREAD
struct eb_root *root;
if (t->thread_mask == tid_bit && global.nbthread > 1)
if (t->thread_mask == tid_bit || global.nbthread == 1)
root = &rqueue_local[tid];
else
root = &rqueue;
@ -172,7 +178,6 @@ static inline struct task *task_unlink_wq(struct task *t)
static inline struct task *__task_unlink_rq(struct task *t)
{
eb32sc_delete(&t->rq);
HA_ATOMIC_SUB(&tasks_run_queue, 1);
if (likely(t->nice))
HA_ATOMIC_SUB(&niced_tasks, 1);
return t;
@ -195,6 +200,41 @@ static inline struct task *task_unlink_rq(struct task *t)
return t;
}
static inline void tasklet_wakeup(struct tasklet *tl)
{
LIST_ADDQ(&task_list[tid], &tl->list);
task_list_size[tid]++;
HA_ATOMIC_ADD(&tasks_run_queue, 1);
}
static inline void task_insert_into_tasklet_list(struct task *t)
{
struct tasklet *tl;
void *expected = NULL;
/* Protect ourself against anybody trying to insert the task into
* another runqueue. We set leaf_p to 0x1 to indicate that the node is
* not in a tree but that it's in the tasklet list. See task_in_rq().
*/
if (unlikely(!HA_ATOMIC_CAS(&t->rq.node.leaf_p, &expected, 0x1)))
return;
task_list_size[tid]++;
tl = (struct tasklet *)t;
LIST_ADDQ(&task_list[tid], &tl->list);
}
static inline void task_remove_from_task_list(struct task *t)
{
LIST_DEL(&((struct tasklet *)t)->list);
task_list_size[tid]--;
HA_ATOMIC_SUB(&tasks_run_queue, 1);
if (!TASK_IS_TASKLET(t)) {
t->rq.node.leaf_p = NULL; // was 0x1
__ha_barrier_store();
}
}
/*
* Unlinks the task and adjusts run queue stats.
* A pointer to the task itself is returned.
@ -223,6 +263,24 @@ static inline struct task *task_init(struct task *t, unsigned long thread_mask)
return t;
}
static inline void tasklet_init(struct tasklet *t)
{
t->nice = -32768;
t->calls = 0;
t->state = 0;
t->list.p = t->list.n = NULL;
}
static inline struct tasklet *tasklet_new(void)
{
struct tasklet *t = pool_alloc(pool_head_tasklet);
if (t) {
tasklet_init(t);
}
return t;
}
/*
* Allocate and initialise a new task. The new task is returned, or NULL in
* case of lack of memory. The task count is incremented. Tasks should only
@ -262,6 +320,13 @@ static inline void task_free(struct task *t)
}
static inline void tasklet_free(struct tasklet *tl)
{
pool_free(pool_head_tasklet, tl);
if (unlikely(stopping))
pool_flush(pool_head_tasklet);
}
/* Place <task> into the wait queue, where it may already be. If the expiration
* timer is infinite, do nothing and rely on wake_expired_task to clean up.
*/

View File

@ -60,20 +60,35 @@ struct notification {
__decl_hathreads(HA_SPINLOCK_T lock);
};
/* This part is common between struct task and struct tasklet so that tasks
* can be used as-is as tasklets.
*/
#define TASK_COMMON \
struct { \
unsigned short state; /* task state : bitfield of TASK_ */ \
short nice; /* task prio from -1024 to +1024, or -32768 for tasklets */ \
unsigned int calls; /* number of times process was called */ \
struct task *(*process)(struct task *t, void *ctx, unsigned short state); /* the function which processes the task */ \
void *context; /* the task's context */ \
}
/* The base for all tasks */
struct task {
TASK_COMMON; /* must be at the beginning! */
struct eb32sc_node rq; /* ebtree node used to hold the task in the run queue */
unsigned short state; /* task state : bit field of TASK_* */
unsigned short pending_state; /* pending states for running talk */
short nice; /* the task's current nice value from -1024 to +1024 */
unsigned int calls; /* number of times ->process() was called */
struct task * (*process)(struct task *t, void *ctx, unsigned short state); /* the function which processes the task */
void *context; /* the task's context */
struct eb32_node wq; /* ebtree node used to hold the task in the wait queue */
int expire; /* next expiration date for this task, in ticks */
unsigned long thread_mask; /* mask of thread IDs authorized to process the task */
};
/* lightweight tasks, without priority, mainly used for I/Os */
struct tasklet {
TASK_COMMON; /* must be at the beginning! */
struct list list;
};
#define TASK_IS_TASKLET(t) ((t)->nice == -32768)
/*
* The task callback (->process) is responsible for updating ->expire. It must
* return a pointer to the task itself, except if the task has been deleted, in

View File

@ -25,6 +25,7 @@
#include <proto/task.h>
struct pool_head *pool_head_task;
struct pool_head *pool_head_tasklet;
/* This is the memory pool containing all the signal structs. These
* struct are used to store each requiered signal between two tasks.
@ -41,6 +42,9 @@ unsigned int niced_tasks = 0; /* number of niced tasks in the run queue */
THREAD_LOCAL struct task *curr_task = NULL; /* task currently running or NULL */
THREAD_LOCAL struct eb32sc_node *rq_next = NULL; /* Next task to be potentially run */
struct list task_list[MAX_THREADS]; /* List of tasks to be run, mixing tasks and tasklets */
int task_list_size[MAX_THREADS]; /* Number of tasks in the task_list */
__decl_hathreads(HA_SPINLOCK_T __attribute__((aligned(64))) rq_lock); /* spin lock related to run queue */
__decl_hathreads(HA_SPINLOCK_T __attribute__((aligned(64))) wq_lock); /* spin lock related to wait queue */
@ -240,20 +244,32 @@ void process_runnable_tasks()
tasks_run_queue_cur = tasks_run_queue; /* keep a copy for reporting */
nb_tasks_cur = nb_tasks;
max_processed = 200;
if (unlikely(global.nbthread <= 1)) {
/* when no lock is needed, this loop is much faster */
if (likely(global.nbthread > 1)) {
HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
if (!(active_tasks_mask & tid_bit)) {
HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
activity[tid].empty_rq++;
return;
}
active_tasks_mask &= ~tid_bit;
rq_next = eb32sc_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK, tid_bit);
while (1) {
if (!rq_next) {
/* we might have reached the end of the tree, typically because
* <rqueue_ticks> is in the first half and we're first scanning
* the last half. Let's loop back to the beginning of the tree now.
average = tasks_run_queue / global.nbthread;
/* Get some elements from the global run queue and put it in the
* local run queue. To try to keep a bit of fairness, just get as
* much elements from the global list as to have a bigger local queue
* than the average.
*/
while (rqueue_size[tid] <= average) {
/* we have to restart looking up after every batch */
rq_next = eb32sc_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK, tid_bit);
if (unlikely(!rq_next)) {
/* either we just started or we reached the end
* of the tree, typically because <rqueue_ticks>
* is in the first half and we're first scanning
* the last half. Let's loop back to the beginning
* of the tree now.
*/
rq_next = eb32sc_first(&rqueue, tid_bit);
if (!rq_next)
@ -266,90 +282,22 @@ void process_runnable_tasks()
/* detach the task from the queue */
__task_unlink_rq(t);
t->state |= TASK_RUNNING;
t->calls++;
curr_task = t;
/* This is an optimisation to help the processor's branch
* predictor take this most common call.
*/
if (likely(t->process == process_stream))
t = process_stream(t, t->context, t->state);
else {
if (t->process != NULL)
t = t->process(t, t->context, t->state);
else {
__task_free(t);
t = NULL;
}
}
curr_task = NULL;
if (likely(t != NULL)) {
t->state &= ~TASK_RUNNING;
/* If there is a pending state
* we have to wake up the task
* immediatly, else we defer
* it into wait queue
*/
if (t->state)
__task_wakeup(t, &rqueue);
else
task_queue(t);
}
max_processed--;
if (max_processed <= 0) {
active_tasks_mask |= tid_bit;
activity[tid].long_rq++;
break;
}
__task_wakeup(t, &rqueue_local[tid]);
}
return;
}
HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
if (!(active_tasks_mask & tid_bit)) {
HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
activity[tid].empty_rq++;
return;
}
average = tasks_run_queue / global.nbthread;
/* Get some elements from the global run queue and put it in the
* local run queue. To try to keep a bit of fairness, just get as
* much elements from the global list as to have a bigger local queue
* than the average.
*/
while (rqueue_size[tid] <= average) {
/* we have to restart looking up after every batch */
rq_next = eb32sc_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK, tid_bit);
if (unlikely(!rq_next)) {
/* either we just started or we reached the end
* of the tree, typically because <rqueue_ticks>
* is in the first half and we're first scanning
* the last half. Let's loop back to the beginning
* of the tree now.
*/
rq_next = eb32sc_first(&rqueue, tid_bit);
if (!rq_next)
break;
} else {
if (!(active_tasks_mask & tid_bit)) {
activity[tid].empty_rq++;
return;
}
t = eb32sc_entry(rq_next, struct task, rq);
rq_next = eb32sc_next(rq_next, tid_bit);
/* detach the task from the queue */
__task_unlink_rq(t);
__task_wakeup(t, &rqueue_local[tid]);
}
HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
active_tasks_mask &= ~tid_bit;
while (1) {
unsigned short state;
/* Get some tasks from the run queue, make sure we don't
* get too much in the task list, but put a bit more than
* the max that will be run, to give a bit more fairness
*/
while (max_processed + 20 > task_list_size[tid]) {
/* Note: this loop is one of the fastest code path in
* the whole program. It should not be re-arranged
* without a good reason.
@ -370,18 +318,42 @@ void process_runnable_tasks()
}
t = eb32sc_entry(rq_next, struct task, rq);
rq_next = eb32sc_next(rq_next, tid_bit);
/* Make sure nobody re-adds the task in the runqueue */
HA_ATOMIC_OR(&t->state, TASK_RUNNING);
state = HA_ATOMIC_XCHG(&t->state, TASK_RUNNING);
/* detach the task from the queue */
__task_unlink_rq(t);
t->calls++;
max_processed--;
/* And add it to the local task list */
task_insert_into_tasklet_list(t);
}
while (max_processed > 0 && !LIST_ISEMPTY(&task_list[tid])) {
struct task *t;
unsigned short state;
void *ctx;
struct task *(*process)(struct task *t, void *ctx, unsigned short state);
t = (struct task *)LIST_ELEM(task_list[tid].n, struct tasklet *, list);
state = HA_ATOMIC_XCHG(&t->state, TASK_RUNNING);
__ha_barrier_store();
task_remove_from_task_list(t);
ctx = t->context;
process = t->process;
rqueue_size[tid]--;
curr_task = t;
if (likely(t->process == process_stream))
t = process_stream(t, t->context, state);
else
t = t->process(t, t->context, state);
t->calls++;
curr_task = (struct task *)t;
if (TASK_IS_TASKLET(t))
t = NULL;
if (likely(process == process_stream))
t = process_stream(t, ctx, state);
else {
if (t->process != NULL)
t = process(t, ctx, state);
else {
__task_free(t);
t = NULL;
}
}
curr_task = NULL;
/* If there is a pending state we have to wake up the task
* immediatly, else we defer it into wait queue
@ -412,11 +384,17 @@ int init_task()
memset(&rqueue, 0, sizeof(rqueue));
HA_SPIN_INIT(&wq_lock);
HA_SPIN_INIT(&rq_lock);
for (i = 0; i < MAX_THREADS; i++)
for (i = 0; i < MAX_THREADS; i++) {
memset(&rqueue_local[i], 0, sizeof(rqueue_local[i]));
LIST_INIT(&task_list[i]);
task_list_size[i] = 0;
}
pool_head_task = create_pool("task", sizeof(struct task), MEM_F_SHARED);
if (!pool_head_task)
return 0;
pool_head_tasklet = create_pool("tasklet", sizeof(struct tasklet), MEM_F_SHARED);
if (!pool_head_tasklet)
return 0;
pool_head_notification = create_pool("notification", sizeof(struct notification), MEM_F_SHARED);
if (!pool_head_notification)
return 0;