From f6e6dc12cd533b2d8bb6413a4b5f875ddfd3e6e3 Mon Sep 17 00:00:00 2001 From: Olivier Houchard Date: Fri, 18 May 2018 18:38:23 +0200 Subject: [PATCH] MAJOR: tasks: Create a per-thread runqueue. A lot of tasks are run on one thread only, so instead of having them all in the global runqueue, create a per-thread runqueue which doesn't require any locking, and add all tasks belonging to only one thread to the corresponding runqueue. The global runqueue is still used for non-local tasks, and is visited by each thread when checking its own runqueue. The nice parameter is thus used both in the global runqueue and in the local ones. The rare tasks that are bound to multiple threads will have their nice value used twice (once for the global queue, once for the thread-local one). --- include/proto/task.h | 49 +++++---- src/task.c | 229 ++++++++++++++++++++++++++----------------- 2 files changed, 167 insertions(+), 111 deletions(-) diff --git a/include/proto/task.h b/include/proto/task.h index c1c4c07ec..59ac38258 100644 --- a/include/proto/task.h +++ b/include/proto/task.h @@ -92,6 +92,8 @@ extern struct pool_head *pool_head_task; extern struct pool_head *pool_head_notification; extern THREAD_LOCAL struct task *curr_task; /* task currently running or NULL */ extern THREAD_LOCAL struct eb32sc_node *rq_next; /* Next task to be potentially run */ +extern struct eb_root rqueue; /* tree constituting the run queue */ +extern struct eb_root rqueue_local[MAX_THREADS]; /* tree constituting the per-thread run queue */ __decl_hathreads(extern HA_SPINLOCK_T rq_lock); /* spin lock related to run queue */ __decl_hathreads(extern HA_SPINLOCK_T wq_lock); /* spin lock related to wait queue */ @@ -109,25 +111,28 @@ static inline int task_in_wq(struct task *t) } /* puts the task in run queue with reason flags , and returns */ -struct task *__task_wakeup(struct task *t); -static inline struct task *task_wakeup(struct task *t, unsigned int f) +/* This will put the task in the local runqueue if the task is only runnable + * by the current thread, in the global runqueue otherwies. + */ +void __task_wakeup(struct task *t, struct eb_root *); +static inline void task_wakeup(struct task *t, unsigned int f) { - HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock); + unsigned short state; - /* If task is running, we postpone the call - * and backup the state. - */ - if (unlikely(t->state & TASK_RUNNING)) { - t->pending_state |= f; - HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); - return t; - } - if (likely(!task_in_rq(t))) - __task_wakeup(t); - t->state |= f; - HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); +#ifdef USE_THREAD + struct eb_root *root; - return t; + if (t->thread_mask == tid_bit && global.nbthread > 1) + root = &rqueue_local[tid]; + else + root = &rqueue; +#else + struct eb_root *root = &rqueue; +#endif + + state = HA_ATOMIC_OR(&t->state, f); + if (!(state & TASK_RUNNING)) + __task_wakeup(t, root); } /* change the thread affinity of a task to */ @@ -167,9 +172,9 @@ static inline struct task *task_unlink_wq(struct task *t) static inline struct task *__task_unlink_rq(struct task *t) { eb32sc_delete(&t->rq); - tasks_run_queue--; + HA_ATOMIC_SUB(&tasks_run_queue, 1); if (likely(t->nice)) - niced_tasks--; + HA_ATOMIC_SUB(&niced_tasks, 1); return t; } @@ -178,13 +183,15 @@ static inline struct task *__task_unlink_rq(struct task *t) */ static inline struct task *task_unlink_rq(struct task *t) { - HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock); + if (t->thread_mask != tid_bit) + HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock); if (likely(task_in_rq(t))) { if (&t->rq == rq_next) rq_next = eb32sc_next(rq_next, tid_bit); __task_unlink_rq(t); } - HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); + if (t->thread_mask != tid_bit) + HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); return t; } @@ -208,7 +215,7 @@ static inline struct task *task_init(struct task *t, unsigned long thread_mask) { t->wq.node.leaf_p = NULL; t->rq.node.leaf_p = NULL; - t->pending_state = t->state = TASK_SLEEPING; + t->state = TASK_SLEEPING; t->thread_mask = thread_mask; t->nice = 0; t->calls = 0; diff --git a/src/task.c b/src/task.c index 23e310b09..876b837e8 100644 --- a/src/task.c +++ b/src/task.c @@ -45,7 +45,10 @@ __decl_hathreads(HA_SPINLOCK_T __attribute__((aligned(64))) rq_lock); /* spin lo __decl_hathreads(HA_SPINLOCK_T __attribute__((aligned(64))) wq_lock); /* spin lock related to wait queue */ static struct eb_root timers; /* sorted timers tree */ -static struct eb_root rqueue; /* tree constituting the run queue */ +struct eb_root rqueue; /* tree constituting the run queue */ +struct eb_root rqueue_local[MAX_THREADS]; /* tree constituting the per-thread run queue */ +static int global_rqueue_size; /* Number of element sin the global runqueue */ +static int rqueue_size[MAX_THREADS]; /* Number of elements in the per-thread run queue */ static unsigned int rqueue_ticks; /* insertion count */ /* Puts the task in run queue at a position depending on t->nice. is @@ -56,30 +59,76 @@ static unsigned int rqueue_ticks; /* insertion count */ * The task must not already be in the run queue. If unsure, use the safer * task_wakeup() function. */ -struct task *__task_wakeup(struct task *t) +void __task_wakeup(struct task *t, struct eb_root *root) { - tasks_run_queue++; + void *expected = NULL; + int *rq_size; + + if (root == &rqueue) { + rq_size = &global_rqueue_size; + HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock); + } else { + int nb = root - &rqueue_local[0]; + rq_size = &rqueue_size[nb]; + } + /* Make sure if the task isn't in the runqueue, nobody inserts it + * in the meanwhile. + */ +redo: + if (unlikely(!HA_ATOMIC_CAS(&t->rq.node.leaf_p, &expected, 0x1))) { + if (root == &rqueue) + HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); + return; + } + /* There's a small race condition, when running a task, the thread + * first sets TASK_RUNNING, and then unlink the task. + * If an another thread calls task_wakeup() for the same task, + * it may set t->state before TASK_RUNNING was set, and then try + * to set t->rq.nod.leaf_p after it was unlinked. + * To make sure it is not a problem, we check if TASK_RUNNING is set + * again. If it is, we unset t->rq.node.leaf_p. + * We then check for TASK_RUNNING a third time. If it is still there, + * then we can give up, the task will be re-queued later if it needs + * to be. If it's not there, and there is still something in t->state, + * then we have to requeue. + */ + if (((volatile unsigned short)(t->state)) & TASK_RUNNING) { + unsigned short state; + t->rq.node.leaf_p = NULL; + __ha_barrier_store(); + + state = (volatile unsigned short)(t->state); + if (unlikely(state != 0 && !(state & TASK_RUNNING))) + goto redo; + if (root == &rqueue) + HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); + return; + } + HA_ATOMIC_ADD(&tasks_run_queue, 1); active_tasks_mask |= t->thread_mask; - t->rq.key = ++rqueue_ticks; + t->rq.key = HA_ATOMIC_ADD(&rqueue_ticks, 1); if (likely(t->nice)) { int offset; - niced_tasks++; + HA_ATOMIC_ADD(&niced_tasks, 1); if (likely(t->nice > 0)) - offset = (unsigned)((tasks_run_queue * (unsigned int)t->nice) / 32U); + offset = (unsigned)((*rq_size * (unsigned int)t->nice) / 32U); else - offset = -(unsigned)((tasks_run_queue * (unsigned int)-t->nice) / 32U); + offset = -(unsigned)((*rq_size * (unsigned int)-t->nice) / 32U); t->rq.key += offset; } - /* reset flag to pending ones - * Note: __task_wakeup must not be called - * if task is running - */ - t->state = t->pending_state; - eb32sc_insert(&rqueue, &t->rq, t->thread_mask); - return t; + eb32sc_insert(root, &t->rq, t->thread_mask); + if (root == &rqueue) { + global_rqueue_size++; + HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); + } else { + int nb = root - &rqueue_local[0]; + + rqueue_size[nb]++; + } + return; } /* @@ -185,11 +234,8 @@ int wake_expired_tasks() void process_runnable_tasks() { struct task *t; - int i; int max_processed; - struct task *local_tasks[16]; - int local_tasks_count; - int final_tasks_count; + uint64_t average = 0; tasks_run_queue_cur = tasks_run_queue; /* keep a copy for reporting */ nb_tasks_cur = nb_tasks; @@ -216,9 +262,11 @@ void process_runnable_tasks() t = eb32sc_entry(rq_next, struct task, rq); rq_next = eb32sc_next(rq_next, tid_bit); + global_rqueue_size--; + + /* detach the task from the queue */ __task_unlink_rq(t); t->state |= TASK_RUNNING; - t->pending_state = 0; t->calls++; curr_task = t; @@ -244,8 +292,8 @@ void process_runnable_tasks() * immediatly, else we defer * it into wait queue */ - if (t->pending_state) - __task_wakeup(t); + if (t->state) + __task_wakeup(t, &rqueue); else task_queue(t); } @@ -267,104 +315,105 @@ void process_runnable_tasks() return; } + average = tasks_run_queue / global.nbthread; + + /* Get some elements from the global run queue and put it in the + * local run queue. To try to keep a bit of fairness, just get as + * much elements from the global list as to have a bigger local queue + * than the average. + */ + while (rqueue_size[tid] <= average) { + + /* we have to restart looking up after every batch */ + rq_next = eb32sc_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK, tid_bit); + if (unlikely(!rq_next)) { + /* either we just started or we reached the end + * of the tree, typically because + * is in the first half and we're first scanning + * the last half. Let's loop back to the beginning + * of the tree now. + */ + rq_next = eb32sc_first(&rqueue, tid_bit); + if (!rq_next) + break; + } + + t = eb32sc_entry(rq_next, struct task, rq); + rq_next = eb32sc_next(rq_next, tid_bit); + + /* detach the task from the queue */ + __task_unlink_rq(t); + __task_wakeup(t, &rqueue_local[tid]); + } + + HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); active_tasks_mask &= ~tid_bit; while (1) { + unsigned short state; /* Note: this loop is one of the fastest code path in * the whole program. It should not be re-arranged * without a good reason. */ /* we have to restart looking up after every batch */ - rq_next = eb32sc_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK, tid_bit); - for (local_tasks_count = 0; local_tasks_count < 16; local_tasks_count++) { - if (unlikely(!rq_next)) { - /* either we just started or we reached the end - * of the tree, typically because - * is in the first half and we're first scanning - * the last half. Let's loop back to the beginning - * of the tree now. - */ - rq_next = eb32sc_first(&rqueue, tid_bit); - if (!rq_next) - break; - } - - t = eb32sc_entry(rq_next, struct task, rq); - rq_next = eb32sc_next(rq_next, tid_bit); - - /* detach the task from the queue */ - __task_unlink_rq(t); - local_tasks[local_tasks_count] = t; - t->state |= TASK_RUNNING; - t->pending_state = 0; - t->calls++; - max_processed--; - } - - if (!local_tasks_count) - break; - - HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); - - final_tasks_count = 0; - for (i = 0; i < local_tasks_count ; i++) { - t = local_tasks[i]; - /* This is an optimisation to help the processor's branch - * predictor take this most common call. + rq_next = eb32sc_lookup_ge(&rqueue_local[tid], rqueue_ticks - TIMER_LOOK_BACK, tid_bit); + if (unlikely(!rq_next)) { + /* either we just started or we reached the end + * of the tree, typically because + * is in the first half and we're first scanning + * the last half. Let's loop back to the beginning + * of the tree now. */ - curr_task = t; - if (likely(t->process == process_stream)) - t = process_stream(t, t->context, t->state); - else { - if (t->process != NULL) - t = t->process(t, t->context, t->state); - else { - __task_free(t); - t = NULL; - } - } - curr_task = NULL; - if (t) - local_tasks[final_tasks_count++] = t; + rq_next = eb32sc_first(&rqueue_local[tid], tid_bit); + if (!rq_next) + break; } + t = eb32sc_entry(rq_next, struct task, rq); + rq_next = eb32sc_next(rq_next, tid_bit); - for (i = 0; i < final_tasks_count ; i++) { - t = local_tasks[i]; - /* If there is a pending state - * we have to wake up the task - * immediatly, else we defer - * it into wait queue - */ - HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock); - t->state &= ~TASK_RUNNING; - if (t->pending_state) { - __task_wakeup(t); - HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); - } - else { - /* we must never hold the RQ lock before the WQ lock */ - HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); + state = HA_ATOMIC_XCHG(&t->state, TASK_RUNNING); + /* detach the task from the queue */ + __task_unlink_rq(t); + t->calls++; + max_processed--; + rqueue_size[tid]--; + curr_task = t; + if (likely(t->process == process_stream)) + t = process_stream(t, t->context, state); + else + t = t->process(t, t->context, state); + curr_task = NULL; + /* If there is a pending state we have to wake up the task + * immediatly, else we defer it into wait queue + */ + if (t != NULL) { + state = HA_ATOMIC_AND(&t->state, ~TASK_RUNNING); + if (state) + __task_wakeup(t, (t->thread_mask == tid_bit) ? + &rqueue_local[tid] : &rqueue); + else task_queue(t); - } } - HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock); if (max_processed <= 0) { active_tasks_mask |= tid_bit; activity[tid].long_rq++; break; } } - HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); } /* perform minimal intializations, report 0 in case of error, 1 if OK. */ int init_task() { + int i; + memset(&timers, 0, sizeof(timers)); memset(&rqueue, 0, sizeof(rqueue)); HA_SPIN_INIT(&wq_lock); HA_SPIN_INIT(&rq_lock); + for (i = 0; i < MAX_THREADS; i++) + memset(&rqueue_local[i], 0, sizeof(rqueue_local[i])); pool_head_task = create_pool("task", sizeof(struct task), MEM_F_SHARED); if (!pool_head_task) return 0;