MAJOR: tasks: Create a per-thread runqueue.

A lot of tasks are run on one thread only, so instead of having them all
in the global runqueue, create a per-thread runqueue which doesn't require
any locking, and add all tasks belonging to only one thread to the
corresponding runqueue.

The global runqueue is still used for non-local tasks, and is visited
by each thread when checking its own runqueue. The nice parameter is
thus used both in the global runqueue and in the local ones. The rare
tasks that are bound to multiple threads will have their nice value
used twice (once for the global queue, once for the thread-local one).
This commit is contained in:
Olivier Houchard 2018-05-18 18:38:23 +02:00 committed by Willy Tarreau
parent 9f6af33222
commit f6e6dc12cd
2 changed files with 167 additions and 111 deletions

View File

@ -92,6 +92,8 @@ extern struct pool_head *pool_head_task;
extern struct pool_head *pool_head_notification; extern struct pool_head *pool_head_notification;
extern THREAD_LOCAL struct task *curr_task; /* task currently running or NULL */ extern THREAD_LOCAL struct task *curr_task; /* task currently running or NULL */
extern THREAD_LOCAL struct eb32sc_node *rq_next; /* Next task to be potentially run */ extern THREAD_LOCAL struct eb32sc_node *rq_next; /* Next task to be potentially run */
extern struct eb_root rqueue; /* tree constituting the run queue */
extern struct eb_root rqueue_local[MAX_THREADS]; /* tree constituting the per-thread run queue */
__decl_hathreads(extern HA_SPINLOCK_T rq_lock); /* spin lock related to run queue */ __decl_hathreads(extern HA_SPINLOCK_T rq_lock); /* spin lock related to run queue */
__decl_hathreads(extern HA_SPINLOCK_T wq_lock); /* spin lock related to wait queue */ __decl_hathreads(extern HA_SPINLOCK_T wq_lock); /* spin lock related to wait queue */
@ -109,25 +111,28 @@ static inline int task_in_wq(struct task *t)
} }
/* puts the task <t> in run queue with reason flags <f>, and returns <t> */ /* puts the task <t> in run queue with reason flags <f>, and returns <t> */
struct task *__task_wakeup(struct task *t); /* This will put the task in the local runqueue if the task is only runnable
static inline struct task *task_wakeup(struct task *t, unsigned int f) * by the current thread, in the global runqueue otherwies.
*/
void __task_wakeup(struct task *t, struct eb_root *);
static inline void task_wakeup(struct task *t, unsigned int f)
{ {
HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock); unsigned short state;
/* If task is running, we postpone the call #ifdef USE_THREAD
* and backup the state. struct eb_root *root;
*/
if (unlikely(t->state & TASK_RUNNING)) {
t->pending_state |= f;
HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
return t;
}
if (likely(!task_in_rq(t)))
__task_wakeup(t);
t->state |= f;
HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
return t; if (t->thread_mask == tid_bit && global.nbthread > 1)
root = &rqueue_local[tid];
else
root = &rqueue;
#else
struct eb_root *root = &rqueue;
#endif
state = HA_ATOMIC_OR(&t->state, f);
if (!(state & TASK_RUNNING))
__task_wakeup(t, root);
} }
/* change the thread affinity of a task to <thread_mask> */ /* change the thread affinity of a task to <thread_mask> */
@ -167,9 +172,9 @@ static inline struct task *task_unlink_wq(struct task *t)
static inline struct task *__task_unlink_rq(struct task *t) static inline struct task *__task_unlink_rq(struct task *t)
{ {
eb32sc_delete(&t->rq); eb32sc_delete(&t->rq);
tasks_run_queue--; HA_ATOMIC_SUB(&tasks_run_queue, 1);
if (likely(t->nice)) if (likely(t->nice))
niced_tasks--; HA_ATOMIC_SUB(&niced_tasks, 1);
return t; return t;
} }
@ -178,13 +183,15 @@ static inline struct task *__task_unlink_rq(struct task *t)
*/ */
static inline struct task *task_unlink_rq(struct task *t) static inline struct task *task_unlink_rq(struct task *t)
{ {
HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock); if (t->thread_mask != tid_bit)
HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
if (likely(task_in_rq(t))) { if (likely(task_in_rq(t))) {
if (&t->rq == rq_next) if (&t->rq == rq_next)
rq_next = eb32sc_next(rq_next, tid_bit); rq_next = eb32sc_next(rq_next, tid_bit);
__task_unlink_rq(t); __task_unlink_rq(t);
} }
HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); if (t->thread_mask != tid_bit)
HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
return t; return t;
} }
@ -208,7 +215,7 @@ static inline struct task *task_init(struct task *t, unsigned long thread_mask)
{ {
t->wq.node.leaf_p = NULL; t->wq.node.leaf_p = NULL;
t->rq.node.leaf_p = NULL; t->rq.node.leaf_p = NULL;
t->pending_state = t->state = TASK_SLEEPING; t->state = TASK_SLEEPING;
t->thread_mask = thread_mask; t->thread_mask = thread_mask;
t->nice = 0; t->nice = 0;
t->calls = 0; t->calls = 0;

View File

@ -45,7 +45,10 @@ __decl_hathreads(HA_SPINLOCK_T __attribute__((aligned(64))) rq_lock); /* spin lo
__decl_hathreads(HA_SPINLOCK_T __attribute__((aligned(64))) wq_lock); /* spin lock related to wait queue */ __decl_hathreads(HA_SPINLOCK_T __attribute__((aligned(64))) wq_lock); /* spin lock related to wait queue */
static struct eb_root timers; /* sorted timers tree */ static struct eb_root timers; /* sorted timers tree */
static struct eb_root rqueue; /* tree constituting the run queue */ struct eb_root rqueue; /* tree constituting the run queue */
struct eb_root rqueue_local[MAX_THREADS]; /* tree constituting the per-thread run queue */
static int global_rqueue_size; /* Number of element sin the global runqueue */
static int rqueue_size[MAX_THREADS]; /* Number of elements in the per-thread run queue */
static unsigned int rqueue_ticks; /* insertion count */ static unsigned int rqueue_ticks; /* insertion count */
/* Puts the task <t> in run queue at a position depending on t->nice. <t> is /* Puts the task <t> in run queue at a position depending on t->nice. <t> is
@ -56,30 +59,76 @@ static unsigned int rqueue_ticks; /* insertion count */
* The task must not already be in the run queue. If unsure, use the safer * The task must not already be in the run queue. If unsure, use the safer
* task_wakeup() function. * task_wakeup() function.
*/ */
struct task *__task_wakeup(struct task *t) void __task_wakeup(struct task *t, struct eb_root *root)
{ {
tasks_run_queue++; void *expected = NULL;
int *rq_size;
if (root == &rqueue) {
rq_size = &global_rqueue_size;
HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
} else {
int nb = root - &rqueue_local[0];
rq_size = &rqueue_size[nb];
}
/* Make sure if the task isn't in the runqueue, nobody inserts it
* in the meanwhile.
*/
redo:
if (unlikely(!HA_ATOMIC_CAS(&t->rq.node.leaf_p, &expected, 0x1))) {
if (root == &rqueue)
HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
return;
}
/* There's a small race condition, when running a task, the thread
* first sets TASK_RUNNING, and then unlink the task.
* If an another thread calls task_wakeup() for the same task,
* it may set t->state before TASK_RUNNING was set, and then try
* to set t->rq.nod.leaf_p after it was unlinked.
* To make sure it is not a problem, we check if TASK_RUNNING is set
* again. If it is, we unset t->rq.node.leaf_p.
* We then check for TASK_RUNNING a third time. If it is still there,
* then we can give up, the task will be re-queued later if it needs
* to be. If it's not there, and there is still something in t->state,
* then we have to requeue.
*/
if (((volatile unsigned short)(t->state)) & TASK_RUNNING) {
unsigned short state;
t->rq.node.leaf_p = NULL;
__ha_barrier_store();
state = (volatile unsigned short)(t->state);
if (unlikely(state != 0 && !(state & TASK_RUNNING)))
goto redo;
if (root == &rqueue)
HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
return;
}
HA_ATOMIC_ADD(&tasks_run_queue, 1);
active_tasks_mask |= t->thread_mask; active_tasks_mask |= t->thread_mask;
t->rq.key = ++rqueue_ticks; t->rq.key = HA_ATOMIC_ADD(&rqueue_ticks, 1);
if (likely(t->nice)) { if (likely(t->nice)) {
int offset; int offset;
niced_tasks++; HA_ATOMIC_ADD(&niced_tasks, 1);
if (likely(t->nice > 0)) if (likely(t->nice > 0))
offset = (unsigned)((tasks_run_queue * (unsigned int)t->nice) / 32U); offset = (unsigned)((*rq_size * (unsigned int)t->nice) / 32U);
else else
offset = -(unsigned)((tasks_run_queue * (unsigned int)-t->nice) / 32U); offset = -(unsigned)((*rq_size * (unsigned int)-t->nice) / 32U);
t->rq.key += offset; t->rq.key += offset;
} }
/* reset flag to pending ones eb32sc_insert(root, &t->rq, t->thread_mask);
* Note: __task_wakeup must not be called if (root == &rqueue) {
* if task is running global_rqueue_size++;
*/ HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
t->state = t->pending_state; } else {
eb32sc_insert(&rqueue, &t->rq, t->thread_mask); int nb = root - &rqueue_local[0];
return t;
rqueue_size[nb]++;
}
return;
} }
/* /*
@ -185,11 +234,8 @@ int wake_expired_tasks()
void process_runnable_tasks() void process_runnable_tasks()
{ {
struct task *t; struct task *t;
int i;
int max_processed; int max_processed;
struct task *local_tasks[16]; uint64_t average = 0;
int local_tasks_count;
int final_tasks_count;
tasks_run_queue_cur = tasks_run_queue; /* keep a copy for reporting */ tasks_run_queue_cur = tasks_run_queue; /* keep a copy for reporting */
nb_tasks_cur = nb_tasks; nb_tasks_cur = nb_tasks;
@ -216,9 +262,11 @@ void process_runnable_tasks()
t = eb32sc_entry(rq_next, struct task, rq); t = eb32sc_entry(rq_next, struct task, rq);
rq_next = eb32sc_next(rq_next, tid_bit); rq_next = eb32sc_next(rq_next, tid_bit);
global_rqueue_size--;
/* detach the task from the queue */
__task_unlink_rq(t); __task_unlink_rq(t);
t->state |= TASK_RUNNING; t->state |= TASK_RUNNING;
t->pending_state = 0;
t->calls++; t->calls++;
curr_task = t; curr_task = t;
@ -244,8 +292,8 @@ void process_runnable_tasks()
* immediatly, else we defer * immediatly, else we defer
* it into wait queue * it into wait queue
*/ */
if (t->pending_state) if (t->state)
__task_wakeup(t); __task_wakeup(t, &rqueue);
else else
task_queue(t); task_queue(t);
} }
@ -267,104 +315,105 @@ void process_runnable_tasks()
return; return;
} }
average = tasks_run_queue / global.nbthread;
/* Get some elements from the global run queue and put it in the
* local run queue. To try to keep a bit of fairness, just get as
* much elements from the global list as to have a bigger local queue
* than the average.
*/
while (rqueue_size[tid] <= average) {
/* we have to restart looking up after every batch */
rq_next = eb32sc_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK, tid_bit);
if (unlikely(!rq_next)) {
/* either we just started or we reached the end
* of the tree, typically because <rqueue_ticks>
* is in the first half and we're first scanning
* the last half. Let's loop back to the beginning
* of the tree now.
*/
rq_next = eb32sc_first(&rqueue, tid_bit);
if (!rq_next)
break;
}
t = eb32sc_entry(rq_next, struct task, rq);
rq_next = eb32sc_next(rq_next, tid_bit);
/* detach the task from the queue */
__task_unlink_rq(t);
__task_wakeup(t, &rqueue_local[tid]);
}
HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
active_tasks_mask &= ~tid_bit; active_tasks_mask &= ~tid_bit;
while (1) { while (1) {
unsigned short state;
/* Note: this loop is one of the fastest code path in /* Note: this loop is one of the fastest code path in
* the whole program. It should not be re-arranged * the whole program. It should not be re-arranged
* without a good reason. * without a good reason.
*/ */
/* we have to restart looking up after every batch */ /* we have to restart looking up after every batch */
rq_next = eb32sc_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK, tid_bit); rq_next = eb32sc_lookup_ge(&rqueue_local[tid], rqueue_ticks - TIMER_LOOK_BACK, tid_bit);
for (local_tasks_count = 0; local_tasks_count < 16; local_tasks_count++) { if (unlikely(!rq_next)) {
if (unlikely(!rq_next)) { /* either we just started or we reached the end
/* either we just started or we reached the end * of the tree, typically because <rqueue_ticks>
* of the tree, typically because <rqueue_ticks> * is in the first half and we're first scanning
* is in the first half and we're first scanning * the last half. Let's loop back to the beginning
* the last half. Let's loop back to the beginning * of the tree now.
* of the tree now.
*/
rq_next = eb32sc_first(&rqueue, tid_bit);
if (!rq_next)
break;
}
t = eb32sc_entry(rq_next, struct task, rq);
rq_next = eb32sc_next(rq_next, tid_bit);
/* detach the task from the queue */
__task_unlink_rq(t);
local_tasks[local_tasks_count] = t;
t->state |= TASK_RUNNING;
t->pending_state = 0;
t->calls++;
max_processed--;
}
if (!local_tasks_count)
break;
HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
final_tasks_count = 0;
for (i = 0; i < local_tasks_count ; i++) {
t = local_tasks[i];
/* This is an optimisation to help the processor's branch
* predictor take this most common call.
*/ */
curr_task = t; rq_next = eb32sc_first(&rqueue_local[tid], tid_bit);
if (likely(t->process == process_stream)) if (!rq_next)
t = process_stream(t, t->context, t->state); break;
else {
if (t->process != NULL)
t = t->process(t, t->context, t->state);
else {
__task_free(t);
t = NULL;
}
}
curr_task = NULL;
if (t)
local_tasks[final_tasks_count++] = t;
} }
t = eb32sc_entry(rq_next, struct task, rq);
rq_next = eb32sc_next(rq_next, tid_bit);
for (i = 0; i < final_tasks_count ; i++) { state = HA_ATOMIC_XCHG(&t->state, TASK_RUNNING);
t = local_tasks[i]; /* detach the task from the queue */
/* If there is a pending state __task_unlink_rq(t);
* we have to wake up the task t->calls++;
* immediatly, else we defer max_processed--;
* it into wait queue rqueue_size[tid]--;
*/ curr_task = t;
HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock); if (likely(t->process == process_stream))
t->state &= ~TASK_RUNNING; t = process_stream(t, t->context, state);
if (t->pending_state) { else
__task_wakeup(t); t = t->process(t, t->context, state);
HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); curr_task = NULL;
} /* If there is a pending state we have to wake up the task
else { * immediatly, else we defer it into wait queue
/* we must never hold the RQ lock before the WQ lock */ */
HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); if (t != NULL) {
state = HA_ATOMIC_AND(&t->state, ~TASK_RUNNING);
if (state)
__task_wakeup(t, (t->thread_mask == tid_bit) ?
&rqueue_local[tid] : &rqueue);
else
task_queue(t); task_queue(t);
}
} }
HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
if (max_processed <= 0) { if (max_processed <= 0) {
active_tasks_mask |= tid_bit; active_tasks_mask |= tid_bit;
activity[tid].long_rq++; activity[tid].long_rq++;
break; break;
} }
} }
HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
} }
/* perform minimal intializations, report 0 in case of error, 1 if OK. */ /* perform minimal intializations, report 0 in case of error, 1 if OK. */
int init_task() int init_task()
{ {
int i;
memset(&timers, 0, sizeof(timers)); memset(&timers, 0, sizeof(timers));
memset(&rqueue, 0, sizeof(rqueue)); memset(&rqueue, 0, sizeof(rqueue));
HA_SPIN_INIT(&wq_lock); HA_SPIN_INIT(&wq_lock);
HA_SPIN_INIT(&rq_lock); HA_SPIN_INIT(&rq_lock);
for (i = 0; i < MAX_THREADS; i++)
memset(&rqueue_local[i], 0, sizeof(rqueue_local[i]));
pool_head_task = create_pool("task", sizeof(struct task), MEM_F_SHARED); pool_head_task = create_pool("task", sizeof(struct task), MEM_F_SHARED);
if (!pool_head_task) if (!pool_head_task)
return 0; return 0;