mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2025-03-31 23:58:16 +00:00
MEDIUM: task/thread: move the task shared wait queues per thread group
Their migration was postponed for convenience only but now's time for having the shared wait queues per thread group and not just per process, otherwise the WQ lock uses a huge amount of CPU alone.
This commit is contained in:
parent
82e378aa8a
commit
b0e7712fb2
@ -94,12 +94,6 @@ extern struct pool_head *pool_head_task;
|
||||
extern struct pool_head *pool_head_tasklet;
|
||||
extern struct pool_head *pool_head_notification;
|
||||
|
||||
#ifdef USE_THREAD
|
||||
extern struct eb_root timers; /* sorted timers tree, global */
|
||||
#endif
|
||||
|
||||
__decl_thread(extern HA_RWLOCK_T wq_lock); /* RW lock related to the wait queue */
|
||||
|
||||
void __tasklet_wakeup_on(struct tasklet *tl, int thr);
|
||||
struct list *__tasklet_wakeup_after(struct list *head, struct tasklet *tl);
|
||||
void task_kill(struct task *t);
|
||||
@ -273,10 +267,10 @@ static inline struct task *task_unlink_wq(struct task *t)
|
||||
locked = t->tid < 0;
|
||||
BUG_ON(t->tid >= 0 && t->tid != tid);
|
||||
if (locked)
|
||||
HA_RWLOCK_WRLOCK(TASK_WQ_LOCK, &wq_lock);
|
||||
HA_RWLOCK_WRLOCK(TASK_WQ_LOCK, &tg_ctx->wq_lock);
|
||||
__task_unlink_wq(t);
|
||||
if (locked)
|
||||
HA_RWLOCK_WRUNLOCK(TASK_WQ_LOCK, &wq_lock);
|
||||
HA_RWLOCK_WRUNLOCK(TASK_WQ_LOCK, &tg_ctx->wq_lock);
|
||||
}
|
||||
return t;
|
||||
}
|
||||
@ -303,10 +297,10 @@ static inline void task_queue(struct task *task)
|
||||
|
||||
#ifdef USE_THREAD
|
||||
if (task->tid < 0) {
|
||||
HA_RWLOCK_WRLOCK(TASK_WQ_LOCK, &wq_lock);
|
||||
HA_RWLOCK_WRLOCK(TASK_WQ_LOCK, &tg_ctx->wq_lock);
|
||||
if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
|
||||
__task_queue(task, &timers);
|
||||
HA_RWLOCK_WRUNLOCK(TASK_WQ_LOCK, &wq_lock);
|
||||
__task_queue(task, &tg_ctx->timers);
|
||||
HA_RWLOCK_WRUNLOCK(TASK_WQ_LOCK, &tg_ctx->wq_lock);
|
||||
} else
|
||||
#endif
|
||||
{
|
||||
@ -666,14 +660,14 @@ static inline void task_schedule(struct task *task, int when)
|
||||
#ifdef USE_THREAD
|
||||
if (task->tid < 0) {
|
||||
/* FIXME: is it really needed to lock the WQ during the check ? */
|
||||
HA_RWLOCK_WRLOCK(TASK_WQ_LOCK, &wq_lock);
|
||||
HA_RWLOCK_WRLOCK(TASK_WQ_LOCK, &tg_ctx->wq_lock);
|
||||
if (task_in_wq(task))
|
||||
when = tick_first(when, task->expire);
|
||||
|
||||
task->expire = when;
|
||||
if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
|
||||
__task_queue(task, &timers);
|
||||
HA_RWLOCK_WRUNLOCK(TASK_WQ_LOCK, &wq_lock);
|
||||
__task_queue(task, &tg_ctx->timers);
|
||||
HA_RWLOCK_WRUNLOCK(TASK_WQ_LOCK, &tg_ctx->wq_lock);
|
||||
} else
|
||||
#endif
|
||||
{
|
||||
|
@ -69,6 +69,10 @@ struct tgroup_ctx {
|
||||
ulong threads_harmless; /* mask of threads that are not modifying anything */
|
||||
ulong threads_idle; /* mask of threads idling in the poller */
|
||||
ulong stopping_threads; /* mask of threads currently stopping */
|
||||
|
||||
HA_RWLOCK_T wq_lock; /* RW lock related to the wait queue below */
|
||||
struct eb_root timers; /* wait queue (sorted timers tree, global, accessed under wq_lock) */
|
||||
|
||||
/* pad to cache line (64B) */
|
||||
char __pad[0]; /* unused except to check remaining room */
|
||||
char __end[0] __attribute__((aligned(64)));
|
||||
|
61
src/task.c
61
src/task.c
@ -36,13 +36,6 @@ DECLARE_POOL(pool_head_notification, "notification", sizeof(struct notification)
|
||||
|
||||
unsigned int niced_tasks = 0; /* number of niced tasks in the run queue */
|
||||
|
||||
__decl_aligned_rwlock(wq_lock); /* RW lock related to the wait queue */
|
||||
|
||||
#ifdef USE_THREAD
|
||||
struct eb_root timers; /* sorted timers tree, global, accessed under wq_lock */
|
||||
#endif
|
||||
|
||||
|
||||
|
||||
/* Flags the task <t> for immediate destruction and puts it into its first
|
||||
* thread's shared tasklet list if not yet queued/running. This will bypass
|
||||
@ -277,9 +270,9 @@ void __task_wakeup(struct task *t)
|
||||
void __task_queue(struct task *task, struct eb_root *wq)
|
||||
{
|
||||
#ifdef USE_THREAD
|
||||
BUG_ON((wq == &timers && task->tid >= 0) ||
|
||||
BUG_ON((wq == &tg_ctx->timers && task->tid >= 0) ||
|
||||
(wq == &th_ctx->timers && task->tid < 0) ||
|
||||
(wq != &timers && wq != &th_ctx->timers));
|
||||
(wq != &tg_ctx->timers && wq != &th_ctx->timers));
|
||||
#endif
|
||||
/* if this happens the process is doomed anyway, so better catch it now
|
||||
* so that we have the caller in the stack.
|
||||
@ -367,32 +360,32 @@ void wake_expired_tasks()
|
||||
}
|
||||
|
||||
#ifdef USE_THREAD
|
||||
if (eb_is_empty(&timers))
|
||||
if (eb_is_empty(&tg_ctx->timers))
|
||||
goto leave;
|
||||
|
||||
HA_RWLOCK_RDLOCK(TASK_WQ_LOCK, &wq_lock);
|
||||
eb = eb32_lookup_ge(&timers, now_ms - TIMER_LOOK_BACK);
|
||||
HA_RWLOCK_RDLOCK(TASK_WQ_LOCK, &tg_ctx->wq_lock);
|
||||
eb = eb32_lookup_ge(&tg_ctx->timers, now_ms - TIMER_LOOK_BACK);
|
||||
if (!eb) {
|
||||
eb = eb32_first(&timers);
|
||||
eb = eb32_first(&tg_ctx->timers);
|
||||
if (likely(!eb)) {
|
||||
HA_RWLOCK_RDUNLOCK(TASK_WQ_LOCK, &wq_lock);
|
||||
HA_RWLOCK_RDUNLOCK(TASK_WQ_LOCK, &tg_ctx->wq_lock);
|
||||
goto leave;
|
||||
}
|
||||
}
|
||||
key = eb->key;
|
||||
|
||||
if (tick_is_lt(now_ms, key)) {
|
||||
HA_RWLOCK_RDUNLOCK(TASK_WQ_LOCK, &wq_lock);
|
||||
HA_RWLOCK_RDUNLOCK(TASK_WQ_LOCK, &tg_ctx->wq_lock);
|
||||
goto leave;
|
||||
}
|
||||
|
||||
/* There's really something of interest here, let's visit the queue */
|
||||
|
||||
if (HA_RWLOCK_TRYRDTOSK(TASK_WQ_LOCK, &wq_lock)) {
|
||||
if (HA_RWLOCK_TRYRDTOSK(TASK_WQ_LOCK, &tg_ctx->wq_lock)) {
|
||||
/* if we failed to grab the lock it means another thread is
|
||||
* already doing the same here, so let it do the job.
|
||||
*/
|
||||
HA_RWLOCK_RDUNLOCK(TASK_WQ_LOCK, &wq_lock);
|
||||
HA_RWLOCK_RDUNLOCK(TASK_WQ_LOCK, &tg_ctx->wq_lock);
|
||||
goto leave;
|
||||
}
|
||||
|
||||
@ -400,13 +393,13 @@ void wake_expired_tasks()
|
||||
lookup_next:
|
||||
if (max_processed-- <= 0)
|
||||
break;
|
||||
eb = eb32_lookup_ge(&timers, now_ms - TIMER_LOOK_BACK);
|
||||
eb = eb32_lookup_ge(&tg_ctx->timers, now_ms - TIMER_LOOK_BACK);
|
||||
if (!eb) {
|
||||
/* we might have reached the end of the tree, typically because
|
||||
* <now_ms> is in the first half and we're first scanning the last
|
||||
* half. Let's loop back to the beginning of the tree now.
|
||||
*/
|
||||
eb = eb32_first(&timers);
|
||||
eb = eb32_first(&tg_ctx->timers);
|
||||
if (likely(!eb))
|
||||
break;
|
||||
}
|
||||
@ -431,20 +424,20 @@ void wake_expired_tasks()
|
||||
|
||||
if (tick_is_expired(task->expire, now_ms)) {
|
||||
/* expired task, wake it up */
|
||||
HA_RWLOCK_SKTOWR(TASK_WQ_LOCK, &wq_lock);
|
||||
HA_RWLOCK_SKTOWR(TASK_WQ_LOCK, &tg_ctx->wq_lock);
|
||||
__task_unlink_wq(task);
|
||||
HA_RWLOCK_WRTOSK(TASK_WQ_LOCK, &wq_lock);
|
||||
HA_RWLOCK_WRTOSK(TASK_WQ_LOCK, &tg_ctx->wq_lock);
|
||||
task_drop_running(task, TASK_WOKEN_TIMER);
|
||||
}
|
||||
else if (task->expire != eb->key) {
|
||||
/* task is not expired but its key doesn't match so let's
|
||||
* update it and skip to next apparently expired task.
|
||||
*/
|
||||
HA_RWLOCK_SKTOWR(TASK_WQ_LOCK, &wq_lock);
|
||||
HA_RWLOCK_SKTOWR(TASK_WQ_LOCK, &tg_ctx->wq_lock);
|
||||
__task_unlink_wq(task);
|
||||
if (tick_isset(task->expire))
|
||||
__task_queue(task, &timers);
|
||||
HA_RWLOCK_WRTOSK(TASK_WQ_LOCK, &wq_lock);
|
||||
__task_queue(task, &tg_ctx->timers);
|
||||
HA_RWLOCK_WRTOSK(TASK_WQ_LOCK, &tg_ctx->wq_lock);
|
||||
task_drop_running(task, 0);
|
||||
goto lookup_next;
|
||||
}
|
||||
@ -456,7 +449,7 @@ void wake_expired_tasks()
|
||||
}
|
||||
}
|
||||
|
||||
HA_RWLOCK_SKUNLOCK(TASK_WQ_LOCK, &wq_lock);
|
||||
HA_RWLOCK_SKUNLOCK(TASK_WQ_LOCK, &tg_ctx->wq_lock);
|
||||
#endif
|
||||
leave:
|
||||
return;
|
||||
@ -487,14 +480,14 @@ int next_timer_expiry()
|
||||
ret = eb->key;
|
||||
|
||||
#ifdef USE_THREAD
|
||||
if (!eb_is_empty(&timers)) {
|
||||
HA_RWLOCK_RDLOCK(TASK_WQ_LOCK, &wq_lock);
|
||||
eb = eb32_lookup_ge(&timers, now_ms - TIMER_LOOK_BACK);
|
||||
if (!eb_is_empty(&tg_ctx->timers)) {
|
||||
HA_RWLOCK_RDLOCK(TASK_WQ_LOCK, &tg_ctx->wq_lock);
|
||||
eb = eb32_lookup_ge(&tg_ctx->timers, now_ms - TIMER_LOOK_BACK);
|
||||
if (!eb)
|
||||
eb = eb32_first(&timers);
|
||||
eb = eb32_first(&tg_ctx->timers);
|
||||
if (eb)
|
||||
key = eb->key;
|
||||
HA_RWLOCK_RDUNLOCK(TASK_WQ_LOCK, &wq_lock);
|
||||
HA_RWLOCK_RDUNLOCK(TASK_WQ_LOCK, &tg_ctx->wq_lock);
|
||||
if (eb)
|
||||
ret = tick_first(ret, key);
|
||||
}
|
||||
@ -914,7 +907,7 @@ void mworker_cleantasks()
|
||||
task_destroy(t);
|
||||
}
|
||||
/* cleanup the timers queue */
|
||||
tmp_wq = eb32_first(&timers);
|
||||
tmp_wq = eb32_first(&tg_ctx->timers);
|
||||
while (tmp_wq) {
|
||||
t = eb32_entry(tmp_wq, struct task, wq);
|
||||
tmp_wq = eb32_next(tmp_wq);
|
||||
@ -944,9 +937,9 @@ static void init_task()
|
||||
{
|
||||
int i, q;
|
||||
|
||||
#ifdef USE_THREAD
|
||||
memset(&timers, 0, sizeof(timers));
|
||||
#endif
|
||||
for (i = 0; i < MAX_TGROUPS; i++)
|
||||
memset(&ha_tgroup_ctx[i].timers, 0, sizeof(ha_tgroup_ctx[i].timers));
|
||||
|
||||
for (i = 0; i < MAX_THREADS; i++) {
|
||||
for (q = 0; q < TL_CLASSES; q++)
|
||||
LIST_INIT(&ha_thread_ctx[i].tasklets[q]);
|
||||
|
Loading…
Reference in New Issue
Block a user