MAJOR: tasks: create per-thread wait queues

Now we still have a main contention point with the timers in the main
wait queue, but the vast majority of the tasks are pinned to a single
thread. This patch creates a per-thread wait queue and queues a task
to the local wait queue without any locking if the task is bound to a
single thread (the current one) otherwise to the shared queue using
locking. This significantly reduces contention on the wait queue. A
test with 12 threads showed 11 ms spent in the WQ lock compared to
4.7 seconds in the same test without this change. The cache miss ratio
decreased from 19.7% to 19.2% on the 12-thread test, and its performance
increased by 1.5%.

Another indirect benefit is that the average queue size is divided
by the number of threads, which roughly removes log(nbthreads) levels
in the tree and further speeds up lookups.
This commit is contained in:
Willy Tarreau 2018-10-15 14:52:21 +02:00
parent 87d54a9a6d
commit b20aa9eef3
2 changed files with 110 additions and 28 deletions

View File

@ -94,10 +94,12 @@ extern struct pool_head *pool_head_notification;
extern THREAD_LOCAL struct task *curr_task; /* task currently running or NULL */ extern THREAD_LOCAL struct task *curr_task; /* task currently running or NULL */
extern THREAD_LOCAL struct eb32sc_node *rq_next; /* Next task to be potentially run */ extern THREAD_LOCAL struct eb32sc_node *rq_next; /* Next task to be potentially run */
#ifdef USE_THREAD #ifdef USE_THREAD
extern struct eb_root timers; /* sorted timers tree, global */
extern struct eb_root rqueue; /* tree constituting the run queue */ extern struct eb_root rqueue; /* tree constituting the run queue */
extern int global_rqueue_size; /* Number of element sin the global runqueue */ extern int global_rqueue_size; /* Number of element sin the global runqueue */
#endif #endif
extern struct eb_root timers_local[MAX_THREADS]; /* tree constituting the per-thread run queue */
extern struct eb_root rqueue_local[MAX_THREADS]; /* tree constituting the per-thread run queue */ extern struct eb_root rqueue_local[MAX_THREADS]; /* tree constituting the per-thread run queue */
extern int rqueue_size[MAX_THREADS]; /* Number of elements in the per-thread run queue */ extern int rqueue_size[MAX_THREADS]; /* Number of elements in the per-thread run queue */
extern struct list task_list[MAX_THREADS]; /* List of tasks to be run, mixing tasks and tasklets */ extern struct list task_list[MAX_THREADS]; /* List of tasks to be run, mixing tasks and tasklets */
@ -167,12 +169,19 @@ static inline struct task *__task_unlink_wq(struct task *t)
return t; return t;
} }
/* remove a task from its wait queue. It may either be the local wait queue if
* the task is bound to a single thread (in which case there's no locking
* involved) or the global queue, with locking.
*/
static inline struct task *task_unlink_wq(struct task *t) static inline struct task *task_unlink_wq(struct task *t)
{ {
HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock); if (likely(task_in_wq(t))) {
if (likely(task_in_wq(t))) if (atleast2(t->thread_mask))
HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
__task_unlink_wq(t); __task_unlink_wq(t);
HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock); if (atleast2(t->thread_mask))
HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
}
return t; return t;
} }
@ -356,10 +365,14 @@ static inline void tasklet_free(struct tasklet *tl)
pool_flush(pool_head_tasklet); pool_flush(pool_head_tasklet);
} }
void __task_queue(struct task *task, struct eb_root *wq);
/* Place <task> into the wait queue, where it may already be. If the expiration /* Place <task> into the wait queue, where it may already be. If the expiration
* timer is infinite, do nothing and rely on wake_expired_task to clean up. * timer is infinite, do nothing and rely on wake_expired_task to clean up.
* If the task is bound to a single thread, it's assumed to be bound to the
* current thread's queue and is queued without locking. Otherwise it's queued
* into the global wait queue, protected by locks.
*/ */
void __task_queue(struct task *task);
static inline void task_queue(struct task *task) static inline void task_queue(struct task *task)
{ {
/* If we already have a place in the wait queue no later than the /* If we already have a place in the wait queue no later than the
@ -374,10 +387,18 @@ static inline void task_queue(struct task *task)
if (!tick_isset(task->expire)) if (!tick_isset(task->expire))
return; return;
HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock); #ifdef USE_THREAD
if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) if (atleast2(task->thread_mask)) {
__task_queue(task); HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock); if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
__task_queue(task, &timers);
HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
} else
#endif
{
if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
__task_queue(task, &timers_local[tid]);
}
} }
/* Ensure <task> will be woken up at most at <when>. If the task is already in /* Ensure <task> will be woken up at most at <when>. If the task is already in
@ -390,14 +411,26 @@ static inline void task_schedule(struct task *task, int when)
if (task_in_rq(task)) if (task_in_rq(task))
return; return;
HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock); #ifdef USE_THREAD
if (task_in_wq(task)) if (atleast2(task->thread_mask)) {
when = tick_first(when, task->expire); HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
if (task_in_wq(task))
when = tick_first(when, task->expire);
task->expire = when; task->expire = when;
if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
__task_queue(task); __task_queue(task, &timers);
HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock); HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
} else
#endif
{
if (task_in_wq(task))
when = tick_first(when, task->expire);
task->expire = when;
if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
__task_queue(task, &timers_local[tid]);
}
} }
/* This function register a new signal. "lua" is the current lua /* This function register a new signal. "lua" is the current lua

View File

@ -50,14 +50,16 @@ int task_list_size[MAX_THREADS]; /* Number of tasks in the task_list */
__decl_hathreads(HA_SPINLOCK_T __attribute__((aligned(64))) rq_lock); /* spin lock related to run queue */ __decl_hathreads(HA_SPINLOCK_T __attribute__((aligned(64))) rq_lock); /* spin lock related to run queue */
__decl_hathreads(HA_SPINLOCK_T __attribute__((aligned(64))) wq_lock); /* spin lock related to wait queue */ __decl_hathreads(HA_SPINLOCK_T __attribute__((aligned(64))) wq_lock); /* spin lock related to wait queue */
static struct eb_root timers; /* sorted timers tree */
#ifdef USE_THREAD #ifdef USE_THREAD
struct eb_root timers; /* sorted timers tree, global */
struct eb_root rqueue; /* tree constituting the run queue */ struct eb_root rqueue; /* tree constituting the run queue */
int global_rqueue_size; /* Number of element sin the global runqueue */ int global_rqueue_size; /* Number of element sin the global runqueue */
#endif #endif
struct eb_root rqueue_local[MAX_THREADS]; /* tree constituting the per-thread run queue */ struct eb_root rqueue_local[MAX_THREADS]; /* tree constituting the per-thread run queue */
int rqueue_size[MAX_THREADS]; /* Number of elements in the per-thread run queue */ int rqueue_size[MAX_THREADS]; /* Number of elements in the per-thread run queue */
static unsigned int rqueue_ticks; /* insertion count */ static unsigned int rqueue_ticks; /* insertion count */
struct eb_root timers_local[MAX_THREADS]; /* sorted timers tree, per thread */
/* Puts the task <t> in run queue at a position depending on t->nice. <t> is /* Puts the task <t> in run queue at a position depending on t->nice. <t> is
* returned. The nice value assigns boosts in 32th of the run queue size. A * returned. The nice value assigns boosts in 32th of the run queue size. A
@ -170,7 +172,7 @@ redo:
/* /*
* __task_queue() * __task_queue()
* *
* Inserts a task into the wait queue at the position given by its expiration * Inserts a task into wait queue <wq> at the position given by its expiration
* date. It does not matter if the task was already in the wait queue or not, * date. It does not matter if the task was already in the wait queue or not,
* as it will be unlinked. The task must not have an infinite expiration timer. * as it will be unlinked. The task must not have an infinite expiration timer.
* Last, tasks must not be queued further than the end of the tree, which is * Last, tasks must not be queued further than the end of the tree, which is
@ -178,9 +180,11 @@ redo:
* *
* This function should not be used directly, it is meant to be called by the * This function should not be used directly, it is meant to be called by the
* inline version of task_queue() which performs a few cheap preliminary tests * inline version of task_queue() which performs a few cheap preliminary tests
* before deciding to call __task_queue(). * before deciding to call __task_queue(). Moreover this function doesn't care
* at all about locking so the caller must be careful when deciding whether to
* lock or not around this call.
*/ */
void __task_queue(struct task *task) void __task_queue(struct task *task, struct eb_root *wq)
{ {
if (likely(task_in_wq(task))) if (likely(task_in_wq(task)))
__task_unlink_wq(task); __task_unlink_wq(task);
@ -193,9 +197,7 @@ void __task_queue(struct task *task)
return; return;
#endif #endif
eb32_insert(&timers, &task->wq); eb32_insert(wq, &task->wq);
return;
} }
/* /*
@ -209,15 +211,14 @@ int wake_expired_tasks()
int ret = TICK_ETERNITY; int ret = TICK_ETERNITY;
while (1) { while (1) {
HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock); lookup_next_local:
lookup_next: eb = eb32_lookup_ge(&timers_local[tid], now_ms - TIMER_LOOK_BACK);
eb = eb32_lookup_ge(&timers, now_ms - TIMER_LOOK_BACK);
if (!eb) { if (!eb) {
/* we might have reached the end of the tree, typically because /* we might have reached the end of the tree, typically because
* <now_ms> is in the first half and we're first scanning the last * <now_ms> is in the first half and we're first scanning the last
* half. Let's loop back to the beginning of the tree now. * half. Let's loop back to the beginning of the tree now.
*/ */
eb = eb32_first(&timers); eb = eb32_first(&timers_local[tid]);
if (likely(!eb)) if (likely(!eb))
break; break;
} }
@ -247,7 +248,53 @@ int wake_expired_tasks()
*/ */
if (!tick_is_expired(task->expire, now_ms)) { if (!tick_is_expired(task->expire, now_ms)) {
if (tick_isset(task->expire)) if (tick_isset(task->expire))
__task_queue(task); __task_queue(task, &timers_local[tid]);
goto lookup_next_local;
}
task_wakeup(task, TASK_WOKEN_TIMER);
}
#ifdef USE_THREAD
while (1) {
HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
lookup_next:
eb = eb32_lookup_ge(&timers, now_ms - TIMER_LOOK_BACK);
if (!eb) {
/* we might have reached the end of the tree, typically because
* <now_ms> is in the first half and we're first scanning the last
* half. Let's loop back to the beginning of the tree now.
*/
eb = eb32_first(&timers);
if (likely(!eb))
break;
}
if (tick_is_lt(now_ms, eb->key)) {
/* timer not expired yet, revisit it later */
ret = tick_first(ret, eb->key);
break;
}
/* timer looks expired, detach it from the queue */
task = eb32_entry(eb, struct task, wq);
__task_unlink_wq(task);
/* It is possible that this task was left at an earlier place in the
* tree because a recent call to task_queue() has not moved it. This
* happens when the new expiration date is later than the old one.
* Since it is very unlikely that we reach a timeout anyway, it's a
* lot cheaper to proceed like this because we almost never update
* the tree. We may also find disabled expiration dates there. Since
* we have detached the task from the tree, we simply call task_queue
* to take care of this. Note that we might occasionally requeue it at
* the same place, before <eb>, so we have to check if this happens,
* and adjust <eb>, otherwise we may skip it which is not what we want.
* We may also not requeue the task (and not point eb at it) if its
* expiration time is not set.
*/
if (!tick_is_expired(task->expire, now_ms)) {
if (tick_isset(task->expire))
__task_queue(task, &timers);
goto lookup_next; goto lookup_next;
} }
task_wakeup(task, TASK_WOKEN_TIMER); task_wakeup(task, TASK_WOKEN_TIMER);
@ -255,6 +302,7 @@ int wake_expired_tasks()
} }
HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock); HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
#endif
return ret; return ret;
} }
@ -415,13 +463,14 @@ int init_task()
{ {
int i; int i;
memset(&timers, 0, sizeof(timers));
#ifdef USE_THREAD #ifdef USE_THREAD
memset(&timers, 0, sizeof(timers));
memset(&rqueue, 0, sizeof(rqueue)); memset(&rqueue, 0, sizeof(rqueue));
#endif #endif
HA_SPIN_INIT(&wq_lock); HA_SPIN_INIT(&wq_lock);
HA_SPIN_INIT(&rq_lock); HA_SPIN_INIT(&rq_lock);
for (i = 0; i < MAX_THREADS; i++) { for (i = 0; i < MAX_THREADS; i++) {
memset(&timers_local[i], 0, sizeof(timers_local[i]));
memset(&rqueue_local[i], 0, sizeof(rqueue_local[i])); memset(&rqueue_local[i], 0, sizeof(rqueue_local[i]));
LIST_INIT(&task_list[i]); LIST_INIT(&task_list[i]);
task_list_size[i] = 0; task_list_size[i] = 0;