MEDIUM: task: move the shared runqueue to one per thread

Since we only use the shared runqueue to put tasks only assigned to
known threads, let's move that runqueue to each of these threads. The
goal will be to arrange an N*(N-1) mesh instead of a central contention
point.

The global_rqueue_ticks had to be dropped (for good) since we'll now
use the per-thread rqueue_ticks counter for both trees.

A few points to note:
  - the rq_lock stlil remains the global one for now so there should not
    be any gain in doing this, but should this trigger any regression, it
    is important to detect whether it's related to the lock or to the tree.

  - there's no more reason for using the scope-based version of the ebtree
    now, we could switch back to the regular eb32_tree.

  - it's worth checking if we still need TASK_GLOBAL (probably only to
    delete a task in one's own shared queue maybe).
This commit is contained in:
Willy Tarreau 2022-06-16 15:30:50 +02:00
parent a4fb79b4a2
commit 6f78038d72
4 changed files with 22 additions and 20 deletions

View File

@ -99,7 +99,6 @@ extern struct pool_head *pool_head_notification;
#ifdef USE_THREAD
extern struct eb_root timers; /* sorted timers tree, global */
extern struct eb_root rqueue; /* tree constituting the run queue */
#endif
__decl_thread(extern HA_SPINLOCK_T rq_lock); /* spin lock related to run queue */

View File

@ -102,6 +102,9 @@ struct thread_ctx {
uint64_t prev_cpu_time; /* previous per thread CPU time */
uint64_t prev_mono_time; /* previous system wide monotonic time */
struct eb_root rqueue_shared; /* run queue fed by other threads */
ALWAYS_ALIGN(128);
};

View File

@ -873,17 +873,20 @@ static int cli_io_handler_show_tasks(struct appctx *appctx)
/* 1. global run queue */
#ifdef USE_THREAD
rqnode = eb32sc_first(&rqueue, ~0UL);
while (rqnode) {
t = eb32sc_entry(rqnode, struct task, rq);
entry = sched_activity_entry(tmp_activity, t->process);
if (t->call_date) {
lat = now_ns - t->call_date;
if ((int64_t)lat > 0)
entry->lat_time += lat;
for (thr = 0; thr < global.nbthread; thr++) {
/* task run queue */
rqnode = eb32sc_first(&ha_thread_ctx[thr].rqueue_shared, ~0UL);
while (rqnode) {
t = eb32sc_entry(rqnode, struct task, rq);
entry = sched_activity_entry(tmp_activity, t->process);
if (t->call_date) {
lat = now_ns - t->call_date;
if ((int64_t)lat > 0)
entry->lat_time += lat;
}
entry->calls++;
rqnode = eb32sc_next(rqnode, ~0UL);
}
entry->calls++;
rqnode = eb32sc_next(rqnode, ~0UL);
}
#endif
/* 2. all threads's local run queues */

View File

@ -43,9 +43,7 @@ __decl_aligned_rwlock(wq_lock); /* RW lock related to the wait queue */
#ifdef USE_THREAD
struct eb_root timers; /* sorted timers tree, global, accessed under wq_lock */
struct eb_root rqueue; /* tree constituting the global run queue, accessed under rq_lock */
unsigned int grq_total; /* total number of entries in the global run queue, atomic */
static unsigned int global_rqueue_ticks; /* insertion count in the grq, use rq_lock */
#endif
@ -234,7 +232,7 @@ void __task_wakeup(struct task *t)
#ifdef USE_THREAD
if (thr != tid) {
root = &rqueue;
root = &ha_thread_ctx[thr].rqueue_shared;
_HA_ATOMIC_INC(&grq_total);
HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
@ -243,7 +241,7 @@ void __task_wakeup(struct task *t)
global_tasks_mask = all_threads_mask;
else
global_tasks_mask |= 1UL << thr;
t->rq.key = ++global_rqueue_ticks;
t->rq.key = _HA_ATOMIC_ADD_FETCH(&ha_thread_ctx[thr].rqueue_ticks, 1);
__ha_barrier_store();
} else
#endif
@ -838,9 +836,9 @@ void process_runnable_tasks()
if ((global_tasks_mask & tid_bit) && !grq) {
#ifdef USE_THREAD
HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
grq = eb32sc_lookup_ge(&rqueue, global_rqueue_ticks - TIMER_LOOK_BACK, tid_bit);
grq = eb32sc_lookup_ge(&th_ctx->rqueue_shared, _HA_ATOMIC_LOAD(&tt->rqueue_ticks) - TIMER_LOOK_BACK, tid_bit);
if (unlikely(!grq)) {
grq = eb32sc_first(&rqueue, tid_bit);
grq = eb32sc_first(&th_ctx->rqueue_shared, tid_bit);
if (!grq) {
global_tasks_mask &= ~tid_bit;
HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
@ -875,7 +873,7 @@ void process_runnable_tasks()
eb32sc_delete(&t->rq);
if (unlikely(!grq)) {
grq = eb32sc_first(&rqueue, tid_bit);
grq = eb32sc_first(&th_ctx->rqueue_shared, tid_bit);
if (!grq) {
global_tasks_mask &= ~tid_bit;
HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
@ -942,7 +940,7 @@ void mworker_cleantasks()
#ifdef USE_THREAD
/* cleanup the global run queue */
tmp_rq = eb32sc_first(&rqueue, ~0UL);
tmp_rq = eb32sc_first(&th_ctx->rqueue_shared, ~0UL);
while (tmp_rq) {
t = eb32sc_entry(tmp_rq, struct task, rq);
tmp_rq = eb32sc_next(tmp_rq, ~0UL);
@ -981,7 +979,6 @@ static void init_task()
#ifdef USE_THREAD
memset(&timers, 0, sizeof(timers));
memset(&rqueue, 0, sizeof(rqueue));
#endif
for (i = 0; i < MAX_THREADS; i++) {
for (q = 0; q < TL_CLASSES; q++)