mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2025-02-08 06:17:26 +00:00
OPTIM: tasks: group all tree roots per cache line
Currently we have per-thread arrays of trees and counts, but these ones unfortunately share cache lines and are accessed very often. This patch moves the task-specific stuff into a structure taking a multiple of a cache line, and has one such per thread. Just doing this has reduced the cache miss ratio from 19.2% to 18.7% and increased the 12-thread test performance by 3%. It starts to become visible that we really need a process-wide per-thread storage area that would cover more than just these parts of the tasks. The code was arranged so that it's easy to move the pieces elsewhere if needed.
This commit is contained in:
parent
b20aa9eef3
commit
8d8747abe0
@ -99,11 +99,17 @@ extern struct eb_root rqueue; /* tree constituting the run queue */
|
||||
extern int global_rqueue_size; /* Number of element sin the global runqueue */
|
||||
#endif
|
||||
|
||||
extern struct eb_root timers_local[MAX_THREADS]; /* tree constituting the per-thread run queue */
|
||||
extern struct eb_root rqueue_local[MAX_THREADS]; /* tree constituting the per-thread run queue */
|
||||
extern int rqueue_size[MAX_THREADS]; /* Number of elements in the per-thread run queue */
|
||||
extern struct list task_list[MAX_THREADS]; /* List of tasks to be run, mixing tasks and tasklets */
|
||||
extern int task_list_size[MAX_THREADS]; /* Number of task sin the task_list */
|
||||
/* force to split per-thread stuff into separate cache lines */
|
||||
struct task_per_thread {
|
||||
struct eb_root timers; /* tree constituting the per-thread wait queue */
|
||||
struct eb_root rqueue; /* tree constituting the per-thread run queue */
|
||||
struct list task_list; /* List of tasks to be run, mixing tasks and tasklets */
|
||||
int task_list_size; /* Number of tasks in the task_list */
|
||||
int rqueue_size; /* Number of elements in the per-thread run queue */
|
||||
__attribute__((aligned(64))) char end[0];
|
||||
};
|
||||
|
||||
extern struct task_per_thread task_per_thread[MAX_THREADS];
|
||||
|
||||
__decl_hathreads(extern HA_SPINLOCK_T rq_lock); /* spin lock related to run queue */
|
||||
__decl_hathreads(extern HA_SPINLOCK_T wq_lock); /* spin lock related to wait queue */
|
||||
@ -139,11 +145,11 @@ static inline void task_wakeup(struct task *t, unsigned int f)
|
||||
struct eb_root *root;
|
||||
|
||||
if (t->thread_mask == tid_bit || global.nbthread == 1)
|
||||
root = &rqueue_local[tid];
|
||||
root = &task_per_thread[tid].rqueue;
|
||||
else
|
||||
root = &rqueue;
|
||||
#else
|
||||
struct eb_root *root = &rqueue_local[tid];
|
||||
struct eb_root *root = &task_per_thread[tid].rqueue;
|
||||
#endif
|
||||
|
||||
state = HA_ATOMIC_OR(&t->state, f);
|
||||
@ -201,7 +207,7 @@ static inline struct task *__task_unlink_rq(struct task *t)
|
||||
global_rqueue_size--;
|
||||
} else
|
||||
#endif
|
||||
rqueue_size[tid]--;
|
||||
task_per_thread[tid].rqueue_size--;
|
||||
eb32sc_delete(&t->rq);
|
||||
if (likely(t->nice))
|
||||
HA_ATOMIC_SUB(&niced_tasks, 1);
|
||||
@ -233,8 +239,8 @@ static inline void tasklet_wakeup(struct tasklet *tl)
|
||||
}
|
||||
if (!LIST_ISEMPTY(&tl->list))
|
||||
return;
|
||||
LIST_ADDQ(&task_list[tid], &tl->list);
|
||||
task_list_size[tid]++;
|
||||
LIST_ADDQ(&task_per_thread[tid].task_list, &tl->list);
|
||||
task_per_thread[tid].task_list_size++;
|
||||
HA_ATOMIC_OR(&active_tasks_mask, tid_bit);
|
||||
HA_ATOMIC_ADD(&tasks_run_queue, 1);
|
||||
|
||||
@ -252,16 +258,16 @@ static inline void task_insert_into_tasklet_list(struct task *t)
|
||||
if (unlikely(!HA_ATOMIC_CAS(&t->rq.node.leaf_p, &expected, (void *)0x1)))
|
||||
return;
|
||||
HA_ATOMIC_ADD(&tasks_run_queue, 1);
|
||||
task_list_size[tid]++;
|
||||
task_per_thread[tid].task_list_size++;
|
||||
tl = (struct tasklet *)t;
|
||||
LIST_ADDQ(&task_list[tid], &tl->list);
|
||||
LIST_ADDQ(&task_per_thread[tid].task_list, &tl->list);
|
||||
}
|
||||
|
||||
static inline void task_remove_from_task_list(struct task *t)
|
||||
{
|
||||
LIST_DEL(&((struct tasklet *)t)->list);
|
||||
LIST_INIT(&((struct tasklet *)t)->list);
|
||||
task_list_size[tid]--;
|
||||
task_per_thread[tid].task_list_size--;
|
||||
HA_ATOMIC_SUB(&tasks_run_queue, 1);
|
||||
if (!TASK_IS_TASKLET(t)) {
|
||||
t->rq.node.leaf_p = NULL; // was 0x1
|
||||
@ -357,7 +363,7 @@ static inline void task_free(struct task *t)
|
||||
static inline void tasklet_free(struct tasklet *tl)
|
||||
{
|
||||
if (!LIST_ISEMPTY(&tl->list))
|
||||
task_list_size[tid]--;
|
||||
task_per_thread[tid].task_list_size--;
|
||||
LIST_DEL(&tl->list);
|
||||
|
||||
pool_free(pool_head_tasklet, tl);
|
||||
@ -397,7 +403,7 @@ static inline void task_queue(struct task *task)
|
||||
#endif
|
||||
{
|
||||
if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
|
||||
__task_queue(task, &timers_local[tid]);
|
||||
__task_queue(task, &task_per_thread[tid].timers);
|
||||
}
|
||||
}
|
||||
|
||||
@ -429,7 +435,7 @@ static inline void task_schedule(struct task *task, int when)
|
||||
|
||||
task->expire = when;
|
||||
if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
|
||||
__task_queue(task, &timers_local[tid]);
|
||||
__task_queue(task, &task_per_thread[tid].timers);
|
||||
}
|
||||
}
|
||||
|
||||
|
49
src/task.c
49
src/task.c
@ -44,9 +44,6 @@ unsigned int niced_tasks = 0; /* number of niced tasks in the run queue */
|
||||
THREAD_LOCAL struct task *curr_task = NULL; /* task currently running or NULL */
|
||||
THREAD_LOCAL struct eb32sc_node *rq_next = NULL; /* Next task to be potentially run */
|
||||
|
||||
struct list task_list[MAX_THREADS]; /* List of tasks to be run, mixing tasks and tasklets */
|
||||
int task_list_size[MAX_THREADS]; /* Number of tasks in the task_list */
|
||||
|
||||
__decl_hathreads(HA_SPINLOCK_T __attribute__((aligned(64))) rq_lock); /* spin lock related to run queue */
|
||||
__decl_hathreads(HA_SPINLOCK_T __attribute__((aligned(64))) wq_lock); /* spin lock related to wait queue */
|
||||
|
||||
@ -56,10 +53,9 @@ struct eb_root rqueue; /* tree constituting the run queue */
|
||||
int global_rqueue_size; /* Number of element sin the global runqueue */
|
||||
#endif
|
||||
|
||||
struct eb_root rqueue_local[MAX_THREADS]; /* tree constituting the per-thread run queue */
|
||||
int rqueue_size[MAX_THREADS]; /* Number of elements in the per-thread run queue */
|
||||
static unsigned int rqueue_ticks; /* insertion count */
|
||||
struct eb_root timers_local[MAX_THREADS]; /* sorted timers tree, per thread */
|
||||
|
||||
struct task_per_thread task_per_thread[MAX_THREADS];
|
||||
|
||||
/* Puts the task <t> in run queue at a position depending on t->nice. <t> is
|
||||
* returned. The nice value assigns boosts in 32th of the run queue size. A
|
||||
@ -82,8 +78,8 @@ void __task_wakeup(struct task *t, struct eb_root *root)
|
||||
} else
|
||||
#endif
|
||||
{
|
||||
int nb = root - &rqueue_local[0];
|
||||
rq_size = &rqueue_size[nb];
|
||||
int nb = ((void *)root - (void *)&task_per_thread[0].rqueue) / sizeof(task_per_thread[0]);
|
||||
rq_size = &task_per_thread[nb].rqueue_size;
|
||||
}
|
||||
/* Make sure if the task isn't in the runqueue, nobody inserts it
|
||||
* in the meanwhile.
|
||||
@ -153,9 +149,8 @@ redo:
|
||||
} else
|
||||
#endif
|
||||
{
|
||||
int nb = root - &rqueue_local[0];
|
||||
|
||||
rqueue_size[nb]++;
|
||||
int nb = ((void *)root - (void *)&task_per_thread[0].rqueue) / sizeof(task_per_thread[0]);
|
||||
task_per_thread[nb].rqueue_size++;
|
||||
}
|
||||
#ifdef USE_THREAD
|
||||
/* If all threads that are supposed to handle this task are sleeping,
|
||||
@ -212,13 +207,13 @@ int wake_expired_tasks()
|
||||
|
||||
while (1) {
|
||||
lookup_next_local:
|
||||
eb = eb32_lookup_ge(&timers_local[tid], now_ms - TIMER_LOOK_BACK);
|
||||
eb = eb32_lookup_ge(&task_per_thread[tid].timers, now_ms - TIMER_LOOK_BACK);
|
||||
if (!eb) {
|
||||
/* we might have reached the end of the tree, typically because
|
||||
* <now_ms> is in the first half and we're first scanning the last
|
||||
* half. Let's loop back to the beginning of the tree now.
|
||||
*/
|
||||
eb = eb32_first(&timers_local[tid]);
|
||||
eb = eb32_first(&task_per_thread[tid].timers);
|
||||
if (likely(!eb))
|
||||
break;
|
||||
}
|
||||
@ -248,7 +243,7 @@ int wake_expired_tasks()
|
||||
*/
|
||||
if (!tick_is_expired(task->expire, now_ms)) {
|
||||
if (tick_isset(task->expire))
|
||||
__task_queue(task, &timers_local[tid]);
|
||||
__task_queue(task, &task_per_thread[tid].timers);
|
||||
goto lookup_next_local;
|
||||
}
|
||||
task_wakeup(task, TASK_WOKEN_TIMER);
|
||||
@ -339,7 +334,7 @@ void process_runnable_tasks()
|
||||
* than the average.
|
||||
*/
|
||||
rq_next = eb32sc_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK, tid_bit);
|
||||
while ((task_list_size[tid] + rqueue_size[tid]) * global.nbthread <= tasks_run_queue) {
|
||||
while ((task_per_thread[tid].task_list_size + task_per_thread[tid].rqueue_size) * global.nbthread <= tasks_run_queue) {
|
||||
if (unlikely(!rq_next)) {
|
||||
/* either we just started or we reached the end
|
||||
* of the tree, typically because <rqueue_ticks>
|
||||
@ -359,7 +354,7 @@ void process_runnable_tasks()
|
||||
|
||||
/* detach the task from the queue */
|
||||
__task_unlink_rq(t);
|
||||
__task_wakeup(t, &rqueue_local[tid]);
|
||||
__task_wakeup(t, &task_per_thread[tid].rqueue);
|
||||
}
|
||||
#endif
|
||||
|
||||
@ -374,8 +369,8 @@ void process_runnable_tasks()
|
||||
* get too much in the task list, but put a bit more than
|
||||
* the max that will be run, to give a bit more fairness
|
||||
*/
|
||||
rq_next = eb32sc_lookup_ge(&rqueue_local[tid], rqueue_ticks - TIMER_LOOK_BACK, tid_bit);
|
||||
while (max_processed + (max_processed / 10) > task_list_size[tid]) {
|
||||
rq_next = eb32sc_lookup_ge(&task_per_thread[tid].rqueue, rqueue_ticks - TIMER_LOOK_BACK, tid_bit);
|
||||
while (max_processed + (max_processed / 10) > task_per_thread[tid].task_list_size) {
|
||||
/* Note: this loop is one of the fastest code path in
|
||||
* the whole program. It should not be re-arranged
|
||||
* without a good reason.
|
||||
@ -387,7 +382,7 @@ void process_runnable_tasks()
|
||||
* the last half. Let's loop back to the beginning
|
||||
* of the tree now.
|
||||
*/
|
||||
rq_next = eb32sc_first(&rqueue_local[tid], tid_bit);
|
||||
rq_next = eb32sc_first(&task_per_thread[tid].rqueue, tid_bit);
|
||||
if (!rq_next)
|
||||
break;
|
||||
}
|
||||
@ -401,19 +396,19 @@ void process_runnable_tasks()
|
||||
/* And add it to the local task list */
|
||||
task_insert_into_tasklet_list(t);
|
||||
}
|
||||
if (!(global_tasks_mask & tid_bit) && rqueue_size[tid] == 0) {
|
||||
if (!(global_tasks_mask & tid_bit) && task_per_thread[tid].rqueue_size == 0) {
|
||||
HA_ATOMIC_AND(&active_tasks_mask, ~tid_bit);
|
||||
__ha_barrier_load();
|
||||
if (global_tasks_mask & tid_bit)
|
||||
HA_ATOMIC_OR(&active_tasks_mask, tid_bit);
|
||||
}
|
||||
while (max_processed > 0 && !LIST_ISEMPTY(&task_list[tid])) {
|
||||
while (max_processed > 0 && !LIST_ISEMPTY(&task_per_thread[tid].task_list)) {
|
||||
struct task *t;
|
||||
unsigned short state;
|
||||
void *ctx;
|
||||
struct task *(*process)(struct task *t, void *ctx, unsigned short state);
|
||||
|
||||
t = (struct task *)LIST_ELEM(task_list[tid].n, struct tasklet *, list);
|
||||
t = (struct task *)LIST_ELEM(task_per_thread[tid].task_list.n, struct tasklet *, list);
|
||||
state = HA_ATOMIC_XCHG(&t->state, TASK_RUNNING);
|
||||
__ha_barrier_store();
|
||||
task_remove_from_task_list(t);
|
||||
@ -441,9 +436,9 @@ void process_runnable_tasks()
|
||||
if (state)
|
||||
#ifdef USE_THREAD
|
||||
__task_wakeup(t, ((t->thread_mask & all_threads_mask) == tid_bit) ?
|
||||
&rqueue_local[tid] : &rqueue);
|
||||
&task_per_thread[tid].rqueue : &rqueue);
|
||||
#else
|
||||
__task_wakeup(t, &rqueue_local[tid]);
|
||||
__task_wakeup(t, &task_per_thread[tid].rqueue);
|
||||
#endif
|
||||
else
|
||||
task_queue(t);
|
||||
@ -469,11 +464,9 @@ int init_task()
|
||||
#endif
|
||||
HA_SPIN_INIT(&wq_lock);
|
||||
HA_SPIN_INIT(&rq_lock);
|
||||
memset(&task_per_thread, 0, sizeof(task_per_thread));
|
||||
for (i = 0; i < MAX_THREADS; i++) {
|
||||
memset(&timers_local[i], 0, sizeof(timers_local[i]));
|
||||
memset(&rqueue_local[i], 0, sizeof(rqueue_local[i]));
|
||||
LIST_INIT(&task_list[i]);
|
||||
task_list_size[i] = 0;
|
||||
LIST_INIT(&task_per_thread[i].task_list);
|
||||
}
|
||||
pool_head_task = create_pool("task", sizeof(struct task), MEM_F_SHARED);
|
||||
if (!pool_head_task)
|
||||
|
Loading…
Reference in New Issue
Block a user