REORG: thread/sched: move the task_per_thread stuff to thread_ctx

The scheduler contains a lot of stuff that is thread-local and not
exclusively tied to the scheduler. Other parts (namely thread_info)
contain similar thread-local context that ought to be merged with
it but that is even less related to the scheduler. However moving
more data into this structure isn't possible since task.h is high
level and cannot be included everywhere (e.g. activity) without
causing include loops.

In the end, it appears that the task_per_thread represents most of
the per-thread context defined with generic types and should simply
move to tinfo.h so that everyone can use them.

The struct was renamed to thread_ctx and the variable "sched" was
renamed to "th_ctx". "sched" used to be initialized manually from
run_thread_poll_loop(), now it's initialized by ha_set_tid() just
like ti, tid, tid_bit.

The memset() in init_task() was removed in favor of a bss initialization
of the array, so that other subsystems can put their stuff in this array.

Since the tasklet array has TL_CLASSES elements, the TL_* definitions
was moved there as well, but it's not a problem.

The vast majority of the change in this patch is caused by the
renaming of the structures.
This commit is contained in:
Willy Tarreau 2021-10-01 11:30:33 +02:00
parent 6414e4423c
commit 1a9c922b53
10 changed files with 127 additions and 121 deletions

View File

@ -59,14 +59,6 @@
/* unused: 0x20000..0x80000000 */
enum {
TL_URGENT = 0, /* urgent tasklets (I/O callbacks) */
TL_NORMAL = 1, /* normal tasks */
TL_BULK = 2, /* bulk task/tasklets, streaming I/Os */
TL_HEAVY = 3, /* heavy computational tasklets (e.g. TLS handshakes) */
TL_CLASSES /* must be last */
};
struct notification {
struct list purge_me; /* Part of the list of signals to be purged in the
case of the LUA execution stack crash. */
@ -76,30 +68,6 @@ struct notification {
__decl_thread(HA_SPINLOCK_T lock);
};
/* force to split per-thread stuff into separate cache lines */
struct task_per_thread {
// first and second cache lines on 64 bits: thread-local operations only.
struct eb_root timers; /* tree constituting the per-thread wait queue */
struct eb_root rqueue; /* tree constituting the per-thread run queue */
struct task *current; /* current task (not tasklet) */
unsigned int rqueue_ticks; /* Insertion counter for the run queue */
int current_queue; /* points to current tasklet list being run, -1 if none */
unsigned int nb_tasks; /* number of tasks allocated on this thread */
uint8_t tl_class_mask; /* bit mask of non-empty tasklets classes */
// 11 bytes hole here
ALWAYS_ALIGN(2*sizeof(void*));
struct list tasklets[TL_CLASSES]; /* tasklets (and/or tasks) to run, by class */
// third cache line here on 64 bits: accessed mostly using atomic ops
ALWAYS_ALIGN(64);
struct mt_list shared_tasklet_list; /* Tasklet to be run, woken up by other threads */
unsigned int rq_total; /* total size of the run queue, prio_tree + tasklets */
int tasks_in_list; /* Number of tasks in the per-thread tasklets list */
ALWAYS_ALIGN(128);
};
#ifdef DEBUG_TASK
#define TASK_DEBUG_STORAGE \
struct { \

View File

@ -96,15 +96,12 @@ extern unsigned int niced_tasks; /* number of niced tasks in the run queue */
extern struct pool_head *pool_head_task;
extern struct pool_head *pool_head_tasklet;
extern struct pool_head *pool_head_notification;
extern THREAD_LOCAL struct task_per_thread *sched; /* current's thread scheduler context */
#ifdef USE_THREAD
extern struct eb_root timers; /* sorted timers tree, global */
extern struct eb_root rqueue; /* tree constituting the run queue */
#endif
extern struct task_per_thread task_per_thread[MAX_THREADS];
__decl_thread(extern HA_SPINLOCK_T rq_lock); /* spin lock related to run queue */
__decl_thread(extern HA_RWLOCK_T wq_lock); /* RW lock related to the wait queue */
@ -155,7 +152,7 @@ static inline int total_run_queues()
ret = _HA_ATOMIC_LOAD(&grq_total);
#endif
for (thr = 0; thr < global.nbthread; thr++)
ret += _HA_ATOMIC_LOAD(&task_per_thread[thr].rq_total);
ret += _HA_ATOMIC_LOAD(&ha_thread_ctx[thr].rq_total);
return ret;
}
@ -168,7 +165,7 @@ static inline int total_allocated_tasks()
int thr, ret;
for (thr = ret = 0; thr < global.nbthread; thr++)
ret += _HA_ATOMIC_LOAD(&task_per_thread[thr].nb_tasks);
ret += _HA_ATOMIC_LOAD(&ha_thread_ctx[thr].nb_tasks);
return ret;
}
@ -191,9 +188,9 @@ static inline int task_in_wq(struct task *t)
static inline int thread_has_tasks(void)
{
return (!!(global_tasks_mask & tid_bit) |
!eb_is_empty(&sched->rqueue) |
!!sched->tl_class_mask |
!MT_LIST_ISEMPTY(&sched->shared_tasklet_list));
!eb_is_empty(&th_ctx->rqueue) |
!!th_ctx->tl_class_mask |
!MT_LIST_ISEMPTY(&th_ctx->shared_tasklet_list));
}
/* puts the task <t> in run queue with reason flags <f>, and returns <t> */
@ -286,7 +283,7 @@ static inline void task_queue(struct task *task)
{
BUG_ON(task->thread_mask != tid_bit); // should have TASK_SHARED_WQ
if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
__task_queue(task, &sched->timers);
__task_queue(task, &th_ctx->timers);
}
}
@ -336,7 +333,7 @@ static inline struct task *task_unlink_rq(struct task *t)
_HA_ATOMIC_DEC(&grq_total);
}
else
_HA_ATOMIC_DEC(&sched->rq_total);
_HA_ATOMIC_DEC(&th_ctx->rq_total);
if (t->nice)
_HA_ATOMIC_DEC(&niced_tasks);
}
@ -405,7 +402,7 @@ static inline void tasklet_remove_from_tasklet_list(struct tasklet *t)
{
if (MT_LIST_DELETE((struct mt_list *)&t->list)) {
_HA_ATOMIC_AND(&t->state, ~TASK_IN_LIST);
_HA_ATOMIC_DEC(&task_per_thread[t->tid >= 0 ? t->tid : tid].rq_total);
_HA_ATOMIC_DEC(&ha_thread_ctx[t->tid >= 0 ? t->tid : tid].rq_total);
}
}
@ -474,7 +471,7 @@ static inline struct task *task_new(unsigned long thread_mask)
{
struct task *t = pool_alloc(pool_head_task);
if (t) {
sched->nb_tasks++;
th_ctx->nb_tasks++;
task_init(t, thread_mask);
}
return t;
@ -513,8 +510,8 @@ static inline struct task *task_new_anywhere()
*/
static inline void __task_free(struct task *t)
{
if (t == sched->current) {
sched->current = NULL;
if (t == th_ctx->current) {
th_ctx->current = NULL;
__ha_barrier_store();
}
BUG_ON(task_in_wq(t) || task_in_rq(t));
@ -526,7 +523,7 @@ static inline void __task_free(struct task *t)
#endif
pool_free(pool_head_task, t);
sched->nb_tasks--;
th_ctx->nb_tasks--;
if (unlikely(stopping))
pool_flush(pool_head_task);
}
@ -550,7 +547,7 @@ static inline void task_destroy(struct task *t)
/* There's no need to protect t->state with a lock, as the task
* has to run on the current thread.
*/
if (t == sched->current || !(t->state & (TASK_QUEUED | TASK_RUNNING)))
if (t == th_ctx->current || !(t->state & (TASK_QUEUED | TASK_RUNNING)))
__task_free(t);
else
t->process = NULL;
@ -560,7 +557,7 @@ static inline void task_destroy(struct task *t)
static inline void tasklet_free(struct tasklet *tl)
{
if (MT_LIST_DELETE((struct mt_list *)&tl->list))
_HA_ATOMIC_DEC(&task_per_thread[tl->tid >= 0 ? tl->tid : tid].rq_total);
_HA_ATOMIC_DEC(&ha_thread_ctx[tl->tid >= 0 ? tl->tid : tid].rq_total);
#ifdef DEBUG_TASK
if ((unsigned int)tl->debug.caller_idx > 1)
@ -611,7 +608,7 @@ static inline void task_schedule(struct task *task, int when)
task->expire = when;
if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
__task_queue(task, &sched->timers);
__task_queue(task, &th_ctx->timers);
}
}

View File

@ -90,6 +90,7 @@ enum { tid = 0 };
static inline void ha_set_tid(unsigned int tid)
{
ti = &ha_thread_info[tid];
th_ctx = &ha_thread_ctx[tid];
}
static inline void thread_idle_now()
@ -206,6 +207,7 @@ static inline void ha_set_tid(unsigned int data)
tid = data;
tid_bit = (1UL << tid);
ti = &ha_thread_info[tid];
th_ctx = &ha_thread_ctx[tid];
}
/* Marks the thread as idle, which means that not only it's not doing anything

View File

@ -22,8 +22,19 @@
#ifndef _HAPROXY_TINFO_T_H
#define _HAPROXY_TINFO_T_H
#include <import/ebtree-t.h>
#include <haproxy/api-t.h>
/* tasklet classes */
enum {
TL_URGENT = 0, /* urgent tasklets (I/O callbacks) */
TL_NORMAL = 1, /* normal tasks */
TL_BULK = 2, /* bulk task/tasklets, streaming I/Os */
TL_HEAVY = 3, /* heavy computational tasklets (e.g. TLS handshakes) */
TL_CLASSES /* must be last */
};
/* thread info flags, for ha_thread_info[].flags */
#define TI_FL_STUCK 0x00000001
@ -47,4 +58,32 @@ struct thread_info {
char __end[0] __attribute__((aligned(64)));
};
/* This structure describes all the per-thread context we need. This is
* essentially the scheduler-specific stuff and a few important per-thread
* lists that need to be thread-local. We take care of splitting this into
* separate cache lines.
*/
struct thread_ctx {
// first and second cache lines on 64 bits: thread-local operations only.
struct eb_root timers; /* tree constituting the per-thread wait queue */
struct eb_root rqueue; /* tree constituting the per-thread run queue */
struct task *current; /* current task (not tasklet) */
unsigned int rqueue_ticks; /* Insertion counter for the run queue */
int current_queue; /* points to current tasklet list being run, -1 if none */
unsigned int nb_tasks; /* number of tasks allocated on this thread */
uint8_t tl_class_mask; /* bit mask of non-empty tasklets classes */
// 11 bytes hole here
ALWAYS_ALIGN(2*sizeof(void*));
struct list tasklets[TL_CLASSES]; /* tasklets (and/or tasks) to run, by class */
// third cache line here on 64 bits: accessed mostly using atomic ops
ALWAYS_ALIGN(64);
struct mt_list shared_tasklet_list; /* Tasklet to be run, woken up by other threads */
unsigned int rq_total; /* total size of the run queue, prio_tree + tasklets */
int tasks_in_list; /* Number of tasks in the per-thread tasklets list */
ALWAYS_ALIGN(128);
};
#endif /* _HAPROXY_TINFO_T_H */

View File

@ -25,8 +25,11 @@
#include <haproxy/api.h>
#include <haproxy/tinfo-t.h>
/* the struct is in thread.c */
/* the structs are in thread.c */
extern struct thread_info ha_thread_info[MAX_THREADS];
extern THREAD_LOCAL struct thread_info *ti; /* thread_info for the current thread */
extern struct thread_ctx ha_thread_ctx[MAX_THREADS];
extern THREAD_LOCAL struct thread_ctx *th_ctx; /* ha_thread_ctx for the current thread */
#endif /* _HAPROXY_TINFO_H */

View File

@ -876,7 +876,7 @@ static int cli_io_handler_show_tasks(struct appctx *appctx)
/* 2. all threads's local run queues */
for (thr = 0; thr < global.nbthread; thr++) {
/* task run queue */
rqnode = eb32sc_first(&task_per_thread[thr].rqueue, ~0UL);
rqnode = eb32sc_first(&ha_thread_ctx[thr].rqueue, ~0UL);
while (rqnode) {
t = eb32sc_entry(rqnode, struct task, rq);
entry = sched_activity_entry(tmp_activity, t->process);
@ -890,7 +890,7 @@ static int cli_io_handler_show_tasks(struct appctx *appctx)
}
/* shared tasklet list */
list_for_each_entry(tl, mt_list_to_list(&task_per_thread[thr].shared_tasklet_list), list) {
list_for_each_entry(tl, mt_list_to_list(&ha_thread_ctx[thr].shared_tasklet_list), list) {
t = (const struct task *)tl;
entry = sched_activity_entry(tmp_activity, t->process);
if (!TASK_IS_TASKLET(t) && t->call_date) {
@ -903,7 +903,7 @@ static int cli_io_handler_show_tasks(struct appctx *appctx)
/* classful tasklets */
for (queue = 0; queue < TL_CLASSES; queue++) {
list_for_each_entry(tl, &task_per_thread[thr].tasklets[queue], list) {
list_for_each_entry(tl, &ha_thread_ctx[thr].tasklets[queue], list) {
t = (const struct task *)tl;
entry = sched_activity_entry(tmp_activity, t->process);
if (!TASK_IS_TASKLET(t) && t->call_date) {

View File

@ -161,14 +161,14 @@ void ha_thread_dump(struct buffer *buf, int thr, int calling_tid)
ha_get_pthread_id(thr),
thread_has_tasks(),
!!(global_tasks_mask & thr_bit),
!eb_is_empty(&task_per_thread[thr].timers),
!eb_is_empty(&task_per_thread[thr].rqueue),
!(LIST_ISEMPTY(&task_per_thread[thr].tasklets[TL_URGENT]) &&
LIST_ISEMPTY(&task_per_thread[thr].tasklets[TL_NORMAL]) &&
LIST_ISEMPTY(&task_per_thread[thr].tasklets[TL_BULK]) &&
MT_LIST_ISEMPTY(&task_per_thread[thr].shared_tasklet_list)),
task_per_thread[thr].tasks_in_list,
task_per_thread[thr].rq_total,
!eb_is_empty(&ha_thread_ctx[thr].timers),
!eb_is_empty(&ha_thread_ctx[thr].rqueue),
!(LIST_ISEMPTY(&ha_thread_ctx[thr].tasklets[TL_URGENT]) &&
LIST_ISEMPTY(&ha_thread_ctx[thr].tasklets[TL_NORMAL]) &&
LIST_ISEMPTY(&ha_thread_ctx[thr].tasklets[TL_BULK]) &&
MT_LIST_ISEMPTY(&ha_thread_ctx[thr].shared_tasklet_list)),
ha_thread_ctx[thr].tasks_in_list,
ha_thread_ctx[thr].rq_total,
stuck,
!!(task_profiling_mask & thr_bit));
@ -186,7 +186,7 @@ void ha_thread_dump(struct buffer *buf, int thr, int calling_tid)
return;
chunk_appendf(buf, " curr_task=");
ha_task_dump(buf, sched->current, " ");
ha_task_dump(buf, th_ctx->current, " ");
if (stuck) {
/* We only emit the backtrace for stuck threads in order not to

View File

@ -2699,7 +2699,6 @@ static void *run_thread_poll_loop(void *data)
ha_set_tid((unsigned long)data);
set_thread_cpu_affinity();
sched = &task_per_thread[tid];
clock_set_local_source();
/* Now, initialize one thread init at a time. This is better since

View File

@ -38,8 +38,6 @@ DECLARE_POOL(pool_head_notification, "notification", sizeof(struct notification)
volatile unsigned long global_tasks_mask = 0; /* Mask of threads with tasks in the global runqueue */
unsigned int niced_tasks = 0; /* number of niced tasks in the run queue */
THREAD_LOCAL struct task_per_thread *sched = &task_per_thread[0]; /* scheduler context for the current thread */
__decl_aligned_spinlock(rq_lock); /* spin lock related to run queue */
__decl_aligned_rwlock(wq_lock); /* RW lock related to the wait queue */
@ -51,8 +49,6 @@ static unsigned int global_rqueue_ticks; /* insertion count in the grq, use rq_
#endif
struct task_per_thread task_per_thread[MAX_THREADS];
/* Flags the task <t> for immediate destruction and puts it into its first
* thread's shared tasklet list if not yet queued/running. This will bypass
@ -92,10 +88,10 @@ void task_kill(struct task *t)
thr = my_ffsl(t->thread_mask) - 1;
/* Beware: tasks that have never run don't have their ->list empty yet! */
MT_LIST_APPEND(&task_per_thread[thr].shared_tasklet_list,
MT_LIST_APPEND(&ha_thread_ctx[thr].shared_tasklet_list,
(struct mt_list *)&((struct tasklet *)t)->list);
_HA_ATOMIC_INC(&task_per_thread[thr].rq_total);
_HA_ATOMIC_INC(&task_per_thread[thr].tasks_in_list);
_HA_ATOMIC_INC(&ha_thread_ctx[thr].rq_total);
_HA_ATOMIC_INC(&ha_thread_ctx[thr].tasks_in_list);
if (sleeping_thread_mask & (1UL << thr)) {
_HA_ATOMIC_AND(&sleeping_thread_mask, ~(1UL << thr));
wake_thread(thr);
@ -131,9 +127,9 @@ void tasklet_kill(struct tasklet *t)
*/
if (_HA_ATOMIC_CAS(&t->state, &state, state | TASK_IN_LIST | TASK_KILLED)) {
thr = t->tid > 0 ? t->tid: tid;
MT_LIST_APPEND(&task_per_thread[thr].shared_tasklet_list,
MT_LIST_APPEND(&ha_thread_ctx[thr].shared_tasklet_list,
(struct mt_list *)&t->list);
_HA_ATOMIC_INC(&task_per_thread[thr].rq_total);
_HA_ATOMIC_INC(&ha_thread_ctx[thr].rq_total);
if (sleeping_thread_mask & (1UL << thr)) {
_HA_ATOMIC_AND(&sleeping_thread_mask, ~(1UL << thr));
wake_thread(thr);
@ -153,31 +149,31 @@ void __tasklet_wakeup_on(struct tasklet *tl, int thr)
if (likely(thr < 0)) {
/* this tasklet runs on the caller thread */
if (tl->state & TASK_HEAVY) {
LIST_APPEND(&sched->tasklets[TL_HEAVY], &tl->list);
sched->tl_class_mask |= 1 << TL_HEAVY;
LIST_APPEND(&th_ctx->tasklets[TL_HEAVY], &tl->list);
th_ctx->tl_class_mask |= 1 << TL_HEAVY;
}
else if (tl->state & TASK_SELF_WAKING) {
LIST_APPEND(&sched->tasklets[TL_BULK], &tl->list);
sched->tl_class_mask |= 1 << TL_BULK;
LIST_APPEND(&th_ctx->tasklets[TL_BULK], &tl->list);
th_ctx->tl_class_mask |= 1 << TL_BULK;
}
else if ((struct task *)tl == sched->current) {
else if ((struct task *)tl == th_ctx->current) {
_HA_ATOMIC_OR(&tl->state, TASK_SELF_WAKING);
LIST_APPEND(&sched->tasklets[TL_BULK], &tl->list);
sched->tl_class_mask |= 1 << TL_BULK;
LIST_APPEND(&th_ctx->tasklets[TL_BULK], &tl->list);
th_ctx->tl_class_mask |= 1 << TL_BULK;
}
else if (sched->current_queue < 0) {
LIST_APPEND(&sched->tasklets[TL_URGENT], &tl->list);
sched->tl_class_mask |= 1 << TL_URGENT;
else if (th_ctx->current_queue < 0) {
LIST_APPEND(&th_ctx->tasklets[TL_URGENT], &tl->list);
th_ctx->tl_class_mask |= 1 << TL_URGENT;
}
else {
LIST_APPEND(&sched->tasklets[sched->current_queue], &tl->list);
sched->tl_class_mask |= 1 << sched->current_queue;
LIST_APPEND(&th_ctx->tasklets[th_ctx->current_queue], &tl->list);
th_ctx->tl_class_mask |= 1 << th_ctx->current_queue;
}
_HA_ATOMIC_INC(&sched->rq_total);
_HA_ATOMIC_INC(&th_ctx->rq_total);
} else {
/* this tasklet runs on a specific thread. */
MT_LIST_APPEND(&task_per_thread[thr].shared_tasklet_list, (struct mt_list *)&tl->list);
_HA_ATOMIC_INC(&task_per_thread[thr].rq_total);
MT_LIST_APPEND(&ha_thread_ctx[thr].shared_tasklet_list, (struct mt_list *)&tl->list);
_HA_ATOMIC_INC(&ha_thread_ctx[thr].rq_total);
if (sleeping_thread_mask & (1UL << thr)) {
_HA_ATOMIC_AND(&sleeping_thread_mask, ~(1UL << thr));
wake_thread(thr);
@ -195,7 +191,7 @@ void __tasklet_wakeup_on(struct tasklet *tl, int thr)
*/
void __task_wakeup(struct task *t)
{
struct eb_root *root = &sched->rqueue;
struct eb_root *root = &th_ctx->rqueue;
#ifdef USE_THREAD
if (t->thread_mask != tid_bit && global.nbthread != 1) {
@ -210,8 +206,8 @@ void __task_wakeup(struct task *t)
} else
#endif
{
_HA_ATOMIC_INC(&sched->rq_total);
t->rq.key = ++sched->rqueue_ticks;
_HA_ATOMIC_INC(&th_ctx->rq_total);
t->rq.key = ++th_ctx->rqueue_ticks;
}
if (likely(t->nice)) {
@ -267,8 +263,8 @@ void __task_queue(struct task *task, struct eb_root *wq)
{
#ifdef USE_THREAD
BUG_ON((wq == &timers && !(task->state & TASK_SHARED_WQ)) ||
(wq == &sched->timers && (task->state & TASK_SHARED_WQ)) ||
(wq != &timers && wq != &sched->timers));
(wq == &th_ctx->timers && (task->state & TASK_SHARED_WQ)) ||
(wq != &timers && wq != &th_ctx->timers));
#endif
/* if this happens the process is doomed anyway, so better catch it now
* so that we have the caller in the stack.
@ -295,7 +291,7 @@ void __task_queue(struct task *task, struct eb_root *wq)
*/
void wake_expired_tasks()
{
struct task_per_thread * const tt = sched; // thread's tasks
struct thread_ctx * const tt = th_ctx; // thread's tasks
int max_processed = global.tune.runqueue_depth;
struct task *task;
struct eb32_node *eb;
@ -436,7 +432,7 @@ leave:
*/
int next_timer_expiry()
{
struct task_per_thread * const tt = sched; // thread's tasks
struct thread_ctx * const tt = th_ctx; // thread's tasks
struct eb32_node *eb;
int ret = TICK_ETERNITY;
__decl_thread(int key = TICK_ETERNITY);
@ -470,7 +466,7 @@ int next_timer_expiry()
return ret;
}
/* Walks over tasklet lists sched->tasklets[0..TL_CLASSES-1] and run at most
/* Walks over tasklet lists th_ctx->tasklets[0..TL_CLASSES-1] and run at most
* budget[TL_*] of them. Returns the number of entries effectively processed
* (tasks and tasklets merged). The count of tasks in the list for the current
* thread is adjusted.
@ -478,7 +474,7 @@ int next_timer_expiry()
unsigned int run_tasks_from_lists(unsigned int budgets[])
{
struct task *(*process)(struct task *t, void *ctx, unsigned int state);
struct list *tl_queues = sched->tasklets;
struct list *tl_queues = th_ctx->tasklets;
struct task *t;
uint8_t budget_mask = (1 << TL_CLASSES) - 1;
struct sched_activity *profile_entry = NULL;
@ -488,29 +484,29 @@ unsigned int run_tasks_from_lists(unsigned int budgets[])
void *ctx;
for (queue = 0; queue < TL_CLASSES;) {
sched->current_queue = queue;
th_ctx->current_queue = queue;
/* global.tune.sched.low-latency is set */
if (global.tune.options & GTUNE_SCHED_LOW_LATENCY) {
if (unlikely(sched->tl_class_mask & budget_mask & ((1 << queue) - 1))) {
if (unlikely(th_ctx->tl_class_mask & budget_mask & ((1 << queue) - 1))) {
/* a lower queue index has tasks again and still has a
* budget to run them. Let's switch to it now.
*/
queue = (sched->tl_class_mask & 1) ? 0 :
(sched->tl_class_mask & 2) ? 1 : 2;
queue = (th_ctx->tl_class_mask & 1) ? 0 :
(th_ctx->tl_class_mask & 2) ? 1 : 2;
continue;
}
if (unlikely(queue > TL_URGENT &&
budget_mask & (1 << TL_URGENT) &&
!MT_LIST_ISEMPTY(&sched->shared_tasklet_list))) {
!MT_LIST_ISEMPTY(&th_ctx->shared_tasklet_list))) {
/* an urgent tasklet arrived from another thread */
break;
}
if (unlikely(queue > TL_NORMAL &&
budget_mask & (1 << TL_NORMAL) &&
(!eb_is_empty(&sched->rqueue) ||
(!eb_is_empty(&th_ctx->rqueue) ||
(global_tasks_mask & tid_bit)))) {
/* a task was woken up by a bulk tasklet or another thread */
break;
@ -518,7 +514,7 @@ unsigned int run_tasks_from_lists(unsigned int budgets[])
}
if (LIST_ISEMPTY(&tl_queues[queue])) {
sched->tl_class_mask &= ~(1 << queue);
th_ctx->tl_class_mask &= ~(1 << queue);
queue++;
continue;
}
@ -538,9 +534,9 @@ unsigned int run_tasks_from_lists(unsigned int budgets[])
ctx = t->context;
process = t->process;
t->calls++;
sched->current = t;
th_ctx->current = t;
_HA_ATOMIC_DEC(&sched->rq_total);
_HA_ATOMIC_DEC(&th_ctx->rq_total);
if (state & TASK_F_TASKLET) {
uint64_t before = 0;
@ -567,7 +563,7 @@ unsigned int run_tasks_from_lists(unsigned int budgets[])
}
else {
done++;
sched->current = NULL;
th_ctx->current = NULL;
pool_free(pool_head_tasklet, t);
__ha_barrier_store();
continue;
@ -579,7 +575,7 @@ unsigned int run_tasks_from_lists(unsigned int budgets[])
}
done++;
sched->current = NULL;
th_ctx->current = NULL;
__ha_barrier_store();
continue;
}
@ -591,7 +587,7 @@ unsigned int run_tasks_from_lists(unsigned int budgets[])
/* OK then this is a regular task */
_HA_ATOMIC_DEC(&task_per_thread[tid].tasks_in_list);
_HA_ATOMIC_DEC(&ha_thread_ctx[tid].tasks_in_list);
if (unlikely(t->call_date)) {
uint64_t now_ns = now_mono_time();
uint64_t lat = now_ns - t->call_date;
@ -616,7 +612,7 @@ unsigned int run_tasks_from_lists(unsigned int budgets[])
else {
task_unlink_wq(t);
__task_free(t);
sched->current = NULL;
th_ctx->current = NULL;
__ha_barrier_store();
/* We don't want max_processed to be decremented if
* we're just freeing a destroyed task, we should only
@ -624,7 +620,7 @@ unsigned int run_tasks_from_lists(unsigned int budgets[])
*/
continue;
}
sched->current = NULL;
th_ctx->current = NULL;
__ha_barrier_store();
/* If there is a pending state we have to wake up the task
* immediately, else we defer it into wait queue
@ -650,7 +646,7 @@ unsigned int run_tasks_from_lists(unsigned int budgets[])
}
done++;
}
sched->current_queue = -1;
th_ctx->current_queue = -1;
return done;
}
@ -670,7 +666,7 @@ unsigned int run_tasks_from_lists(unsigned int budgets[])
*/
void process_runnable_tasks()
{
struct task_per_thread * const tt = sched;
struct thread_ctx * const tt = th_ctx;
struct eb32sc_node *lrq; // next local run queue entry
struct eb32sc_node *grq; // next global run queue entry
struct task *t;
@ -701,11 +697,11 @@ void process_runnable_tasks()
if (likely(niced_tasks))
max_processed = (max_processed + 3) / 4;
if (max_processed < sched->rq_total && sched->rq_total <= 2*max_processed) {
if (max_processed < th_ctx->rq_total && th_ctx->rq_total <= 2*max_processed) {
/* If the run queue exceeds the budget by up to 50%, let's cut it
* into two identical halves to improve latency.
*/
max_processed = sched->rq_total / 2;
max_processed = th_ctx->rq_total / 2;
}
not_done_yet:
@ -718,7 +714,7 @@ void process_runnable_tasks()
/* normal tasklets list gets a default weight of ~37% */
if ((tt->tl_class_mask & (1 << TL_NORMAL)) ||
!eb_is_empty(&sched->rqueue) || (global_tasks_mask & tid_bit))
!eb_is_empty(&th_ctx->rqueue) || (global_tasks_mask & tid_bit))
max[TL_NORMAL] = default_weights[TL_NORMAL];
/* bulk tasklets list gets a default weight of ~13% */
@ -889,14 +885,14 @@ void mworker_cleantasks()
#endif
/* clean the per thread run queue */
for (i = 0; i < global.nbthread; i++) {
tmp_rq = eb32sc_first(&task_per_thread[i].rqueue, MAX_THREADS_MASK);
tmp_rq = eb32sc_first(&ha_thread_ctx[i].rqueue, MAX_THREADS_MASK);
while (tmp_rq) {
t = eb32sc_entry(tmp_rq, struct task, rq);
tmp_rq = eb32sc_next(tmp_rq, MAX_THREADS_MASK);
task_destroy(t);
}
/* cleanup the per thread timers queue */
tmp_wq = eb32_first(&task_per_thread[i].timers);
tmp_wq = eb32_first(&ha_thread_ctx[i].timers);
while (tmp_wq) {
t = eb32_entry(tmp_wq, struct task, wq);
tmp_wq = eb32_next(tmp_wq);
@ -914,11 +910,10 @@ static void init_task()
memset(&timers, 0, sizeof(timers));
memset(&rqueue, 0, sizeof(rqueue));
#endif
memset(&task_per_thread, 0, sizeof(task_per_thread));
for (i = 0; i < MAX_THREADS; i++) {
for (q = 0; q < TL_CLASSES; q++)
LIST_INIT(&task_per_thread[i].tasklets[q]);
MT_LIST_INIT(&task_per_thread[i].shared_tasklet_list);
LIST_INIT(&ha_thread_ctx[i].tasklets[q]);
MT_LIST_INIT(&ha_thread_ctx[i].shared_tasklet_list);
}
}

View File

@ -53,6 +53,9 @@
struct thread_info ha_thread_info[MAX_THREADS] = { };
THREAD_LOCAL struct thread_info *ti = &ha_thread_info[0];
struct thread_ctx ha_thread_ctx[MAX_THREADS] = { };
THREAD_LOCAL struct thread_ctx *th_ctx = &ha_thread_ctx[0];
#ifdef USE_THREAD
volatile unsigned long threads_want_rdv_mask __read_mostly = 0;