MEDIUM: task: use regular eb32 trees for the run queues

Since we don't mix tasks from different threads in the run queues
anymore, we don't need to use the eb32sc_ trees and we can switch
to the regular eb32 ones. This uses cheaper lookup and insert code,
and a 16-thread test on the queues shows a performance increase
from 570k RPS to 585k RPS.
This commit is contained in:
Willy Tarreau 2022-06-16 16:28:01 +02:00
parent c958c70ec8
commit 319d136ff9
4 changed files with 29 additions and 31 deletions

View File

@ -102,7 +102,7 @@ struct notification {
/* The base for all tasks */
struct task {
TASK_COMMON; /* must be at the beginning! */
struct eb32sc_node rq; /* ebtree node used to hold the task in the run queue */
struct eb32_node rq; /* ebtree node used to hold the task in the run queue */
/* WARNING: the struct task is often aliased as a struct tasklet when
* it is NOT in the run queue. The tasklet has its struct list here
* where rq starts and this works because both are exclusive. Never

View File

@ -25,7 +25,6 @@
#include <sys/time.h>
#include <import/eb32sctree.h>
#include <import/eb32tree.h>
#include <haproxy/activity.h>

View File

@ -845,7 +845,7 @@ static int cli_io_handler_show_tasks(struct appctx *appctx)
const struct tasklet *tl;
const struct task *t;
uint64_t now_ns, lat;
struct eb32sc_node *rqnode;
struct eb32_node *rqnode;
uint64_t tot_calls;
int thr, queue;
int i, max;
@ -875,9 +875,9 @@ static int cli_io_handler_show_tasks(struct appctx *appctx)
#ifdef USE_THREAD
for (thr = 0; thr < global.nbthread; thr++) {
/* task run queue */
rqnode = eb32sc_first(&ha_thread_ctx[thr].rqueue_shared, ~0UL);
rqnode = eb32_first(&ha_thread_ctx[thr].rqueue_shared);
while (rqnode) {
t = eb32sc_entry(rqnode, struct task, rq);
t = eb32_entry(rqnode, struct task, rq);
entry = sched_activity_entry(tmp_activity, t->process);
if (t->call_date) {
lat = now_ns - t->call_date;
@ -885,16 +885,16 @@ static int cli_io_handler_show_tasks(struct appctx *appctx)
entry->lat_time += lat;
}
entry->calls++;
rqnode = eb32sc_next(rqnode, ~0UL);
rqnode = eb32_next(rqnode);
}
}
#endif
/* 2. all threads's local run queues */
for (thr = 0; thr < global.nbthread; thr++) {
/* task run queue */
rqnode = eb32sc_first(&ha_thread_ctx[thr].rqueue, ~0UL);
rqnode = eb32_first(&ha_thread_ctx[thr].rqueue);
while (rqnode) {
t = eb32sc_entry(rqnode, struct task, rq);
t = eb32_entry(rqnode, struct task, rq);
entry = sched_activity_entry(tmp_activity, t->process);
if (t->call_date) {
lat = now_ns - t->call_date;
@ -902,7 +902,7 @@ static int cli_io_handler_show_tasks(struct appctx *appctx)
entry->lat_time += lat;
}
entry->calls++;
rqnode = eb32sc_next(rqnode, ~0UL);
rqnode = eb32_next(rqnode);
}
/* shared tasklet list */

View File

@ -12,7 +12,6 @@
#include <string.h>
#include <import/eb32sctree.h>
#include <import/eb32tree.h>
#include <haproxy/api.h>
@ -254,7 +253,7 @@ void __task_wakeup(struct task *t)
if (th_ctx->flags & TH_FL_TASK_PROFILING)
t->call_date = now_mono_time();
eb32sc_insert(root, &t->rq, 1UL << thr);
eb32_insert(root, &t->rq);
#ifdef USE_THREAD
if (thr != tid) {
@ -731,8 +730,8 @@ unsigned int run_tasks_from_lists(unsigned int budgets[])
void process_runnable_tasks()
{
struct thread_ctx * const tt = th_ctx;
struct eb32sc_node *lrq; // next local run queue entry
struct eb32sc_node *grq; // next global run queue entry
struct eb32_node *lrq; // next local run queue entry
struct eb32_node *grq; // next global run queue entry
struct task *t;
const unsigned int default_weights[TL_CLASSES] = {
[TL_URGENT] = 64, // ~50% of CPU bandwidth for I/O
@ -828,9 +827,9 @@ void process_runnable_tasks()
if (!eb_is_empty(&th_ctx->rqueue_shared) && !grq) {
#ifdef USE_THREAD
HA_SPIN_LOCK(TASK_RQ_LOCK, &th_ctx->rqsh_lock);
grq = eb32sc_lookup_ge(&th_ctx->rqueue_shared, _HA_ATOMIC_LOAD(&tt->rqueue_ticks) - TIMER_LOOK_BACK, tid_bit);
grq = eb32_lookup_ge(&th_ctx->rqueue_shared, _HA_ATOMIC_LOAD(&tt->rqueue_ticks) - TIMER_LOOK_BACK);
if (unlikely(!grq)) {
grq = eb32sc_first(&th_ctx->rqueue_shared, tid_bit);
grq = eb32_first(&th_ctx->rqueue_shared);
if (!grq)
HA_SPIN_UNLOCK(TASK_RQ_LOCK, &th_ctx->rqsh_lock);
}
@ -842,28 +841,28 @@ void process_runnable_tasks()
*/
if (!lrq) {
lrq = eb32sc_lookup_ge(&tt->rqueue, _HA_ATOMIC_LOAD(&tt->rqueue_ticks) - TIMER_LOOK_BACK, tid_bit);
lrq = eb32_lookup_ge(&tt->rqueue, _HA_ATOMIC_LOAD(&tt->rqueue_ticks) - TIMER_LOOK_BACK);
if (unlikely(!lrq))
lrq = eb32sc_first(&tt->rqueue, tid_bit);
lrq = eb32_first(&tt->rqueue);
}
if (!lrq && !grq)
break;
if (likely(!grq || (lrq && (int)(lrq->key - grq->key) <= 0))) {
t = eb32sc_entry(lrq, struct task, rq);
lrq = eb32sc_next(lrq, tid_bit);
eb32sc_delete(&t->rq);
t = eb32_entry(lrq, struct task, rq);
lrq = eb32_next(lrq);
eb32_delete(&t->rq);
lpicked++;
}
#ifdef USE_THREAD
else {
t = eb32sc_entry(grq, struct task, rq);
grq = eb32sc_next(grq, tid_bit);
eb32sc_delete(&t->rq);
t = eb32_entry(grq, struct task, rq);
grq = eb32_next(grq);
eb32_delete(&t->rq);
if (unlikely(!grq)) {
grq = eb32sc_first(&th_ctx->rqueue_shared, tid_bit);
grq = eb32_first(&th_ctx->rqueue_shared);
if (!grq)
HA_SPIN_UNLOCK(TASK_RQ_LOCK, &th_ctx->rqsh_lock);
}
@ -918,14 +917,14 @@ void mworker_cleantasks()
struct task *t;
int i;
struct eb32_node *tmp_wq = NULL;
struct eb32sc_node *tmp_rq = NULL;
struct eb32_node *tmp_rq = NULL;
#ifdef USE_THREAD
/* cleanup the global run queue */
tmp_rq = eb32sc_first(&th_ctx->rqueue_shared, ~0UL);
tmp_rq = eb32_first(&th_ctx->rqueue_shared);
while (tmp_rq) {
t = eb32sc_entry(tmp_rq, struct task, rq);
tmp_rq = eb32sc_next(tmp_rq, ~0UL);
t = eb32_entry(tmp_rq, struct task, rq);
tmp_rq = eb32_next(tmp_rq);
task_destroy(t);
}
/* cleanup the timers queue */
@ -938,10 +937,10 @@ void mworker_cleantasks()
#endif
/* clean the per thread run queue */
for (i = 0; i < global.nbthread; i++) {
tmp_rq = eb32sc_first(&ha_thread_ctx[i].rqueue, ~0UL);
tmp_rq = eb32_first(&ha_thread_ctx[i].rqueue);
while (tmp_rq) {
t = eb32sc_entry(tmp_rq, struct task, rq);
tmp_rq = eb32sc_next(tmp_rq, ~0UL);
t = eb32_entry(tmp_rq, struct task, rq);
tmp_rq = eb32_next(tmp_rq);
task_destroy(t);
}
/* cleanup the per thread timers queue */