diff --git a/include/haproxy/task-t.h b/include/haproxy/task-t.h index 5081d985b..b68ae1470 100644 --- a/include/haproxy/task-t.h +++ b/include/haproxy/task-t.h @@ -59,14 +59,6 @@ /* unused: 0x20000..0x80000000 */ -enum { - TL_URGENT = 0, /* urgent tasklets (I/O callbacks) */ - TL_NORMAL = 1, /* normal tasks */ - TL_BULK = 2, /* bulk task/tasklets, streaming I/Os */ - TL_HEAVY = 3, /* heavy computational tasklets (e.g. TLS handshakes) */ - TL_CLASSES /* must be last */ -}; - struct notification { struct list purge_me; /* Part of the list of signals to be purged in the case of the LUA execution stack crash. */ @@ -76,30 +68,6 @@ struct notification { __decl_thread(HA_SPINLOCK_T lock); }; -/* force to split per-thread stuff into separate cache lines */ -struct task_per_thread { - // first and second cache lines on 64 bits: thread-local operations only. - struct eb_root timers; /* tree constituting the per-thread wait queue */ - struct eb_root rqueue; /* tree constituting the per-thread run queue */ - struct task *current; /* current task (not tasklet) */ - unsigned int rqueue_ticks; /* Insertion counter for the run queue */ - int current_queue; /* points to current tasklet list being run, -1 if none */ - unsigned int nb_tasks; /* number of tasks allocated on this thread */ - uint8_t tl_class_mask; /* bit mask of non-empty tasklets classes */ - - // 11 bytes hole here - ALWAYS_ALIGN(2*sizeof(void*)); - struct list tasklets[TL_CLASSES]; /* tasklets (and/or tasks) to run, by class */ - - // third cache line here on 64 bits: accessed mostly using atomic ops - ALWAYS_ALIGN(64); - struct mt_list shared_tasklet_list; /* Tasklet to be run, woken up by other threads */ - unsigned int rq_total; /* total size of the run queue, prio_tree + tasklets */ - int tasks_in_list; /* Number of tasks in the per-thread tasklets list */ - ALWAYS_ALIGN(128); -}; - - #ifdef DEBUG_TASK #define TASK_DEBUG_STORAGE \ struct { \ diff --git a/include/haproxy/task.h b/include/haproxy/task.h index c1261c799..b3fea6fed 100644 --- a/include/haproxy/task.h +++ b/include/haproxy/task.h @@ -96,15 +96,12 @@ extern unsigned int niced_tasks; /* number of niced tasks in the run queue */ extern struct pool_head *pool_head_task; extern struct pool_head *pool_head_tasklet; extern struct pool_head *pool_head_notification; -extern THREAD_LOCAL struct task_per_thread *sched; /* current's thread scheduler context */ #ifdef USE_THREAD extern struct eb_root timers; /* sorted timers tree, global */ extern struct eb_root rqueue; /* tree constituting the run queue */ #endif -extern struct task_per_thread task_per_thread[MAX_THREADS]; - __decl_thread(extern HA_SPINLOCK_T rq_lock); /* spin lock related to run queue */ __decl_thread(extern HA_RWLOCK_T wq_lock); /* RW lock related to the wait queue */ @@ -155,7 +152,7 @@ static inline int total_run_queues() ret = _HA_ATOMIC_LOAD(&grq_total); #endif for (thr = 0; thr < global.nbthread; thr++) - ret += _HA_ATOMIC_LOAD(&task_per_thread[thr].rq_total); + ret += _HA_ATOMIC_LOAD(&ha_thread_ctx[thr].rq_total); return ret; } @@ -168,7 +165,7 @@ static inline int total_allocated_tasks() int thr, ret; for (thr = ret = 0; thr < global.nbthread; thr++) - ret += _HA_ATOMIC_LOAD(&task_per_thread[thr].nb_tasks); + ret += _HA_ATOMIC_LOAD(&ha_thread_ctx[thr].nb_tasks); return ret; } @@ -191,9 +188,9 @@ static inline int task_in_wq(struct task *t) static inline int thread_has_tasks(void) { return (!!(global_tasks_mask & tid_bit) | - !eb_is_empty(&sched->rqueue) | - !!sched->tl_class_mask | - !MT_LIST_ISEMPTY(&sched->shared_tasklet_list)); + !eb_is_empty(&th_ctx->rqueue) | + !!th_ctx->tl_class_mask | + !MT_LIST_ISEMPTY(&th_ctx->shared_tasklet_list)); } /* puts the task in run queue with reason flags , and returns */ @@ -286,7 +283,7 @@ static inline void task_queue(struct task *task) { BUG_ON(task->thread_mask != tid_bit); // should have TASK_SHARED_WQ if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) - __task_queue(task, &sched->timers); + __task_queue(task, &th_ctx->timers); } } @@ -336,7 +333,7 @@ static inline struct task *task_unlink_rq(struct task *t) _HA_ATOMIC_DEC(&grq_total); } else - _HA_ATOMIC_DEC(&sched->rq_total); + _HA_ATOMIC_DEC(&th_ctx->rq_total); if (t->nice) _HA_ATOMIC_DEC(&niced_tasks); } @@ -405,7 +402,7 @@ static inline void tasklet_remove_from_tasklet_list(struct tasklet *t) { if (MT_LIST_DELETE((struct mt_list *)&t->list)) { _HA_ATOMIC_AND(&t->state, ~TASK_IN_LIST); - _HA_ATOMIC_DEC(&task_per_thread[t->tid >= 0 ? t->tid : tid].rq_total); + _HA_ATOMIC_DEC(&ha_thread_ctx[t->tid >= 0 ? t->tid : tid].rq_total); } } @@ -474,7 +471,7 @@ static inline struct task *task_new(unsigned long thread_mask) { struct task *t = pool_alloc(pool_head_task); if (t) { - sched->nb_tasks++; + th_ctx->nb_tasks++; task_init(t, thread_mask); } return t; @@ -513,8 +510,8 @@ static inline struct task *task_new_anywhere() */ static inline void __task_free(struct task *t) { - if (t == sched->current) { - sched->current = NULL; + if (t == th_ctx->current) { + th_ctx->current = NULL; __ha_barrier_store(); } BUG_ON(task_in_wq(t) || task_in_rq(t)); @@ -526,7 +523,7 @@ static inline void __task_free(struct task *t) #endif pool_free(pool_head_task, t); - sched->nb_tasks--; + th_ctx->nb_tasks--; if (unlikely(stopping)) pool_flush(pool_head_task); } @@ -550,7 +547,7 @@ static inline void task_destroy(struct task *t) /* There's no need to protect t->state with a lock, as the task * has to run on the current thread. */ - if (t == sched->current || !(t->state & (TASK_QUEUED | TASK_RUNNING))) + if (t == th_ctx->current || !(t->state & (TASK_QUEUED | TASK_RUNNING))) __task_free(t); else t->process = NULL; @@ -560,7 +557,7 @@ static inline void task_destroy(struct task *t) static inline void tasklet_free(struct tasklet *tl) { if (MT_LIST_DELETE((struct mt_list *)&tl->list)) - _HA_ATOMIC_DEC(&task_per_thread[tl->tid >= 0 ? tl->tid : tid].rq_total); + _HA_ATOMIC_DEC(&ha_thread_ctx[tl->tid >= 0 ? tl->tid : tid].rq_total); #ifdef DEBUG_TASK if ((unsigned int)tl->debug.caller_idx > 1) @@ -611,7 +608,7 @@ static inline void task_schedule(struct task *task, int when) task->expire = when; if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) - __task_queue(task, &sched->timers); + __task_queue(task, &th_ctx->timers); } } diff --git a/include/haproxy/thread.h b/include/haproxy/thread.h index 7598327d4..37808d9d7 100644 --- a/include/haproxy/thread.h +++ b/include/haproxy/thread.h @@ -90,6 +90,7 @@ enum { tid = 0 }; static inline void ha_set_tid(unsigned int tid) { ti = &ha_thread_info[tid]; + th_ctx = &ha_thread_ctx[tid]; } static inline void thread_idle_now() @@ -206,6 +207,7 @@ static inline void ha_set_tid(unsigned int data) tid = data; tid_bit = (1UL << tid); ti = &ha_thread_info[tid]; + th_ctx = &ha_thread_ctx[tid]; } /* Marks the thread as idle, which means that not only it's not doing anything diff --git a/include/haproxy/tinfo-t.h b/include/haproxy/tinfo-t.h index 1d32d66c0..3fde2695e 100644 --- a/include/haproxy/tinfo-t.h +++ b/include/haproxy/tinfo-t.h @@ -22,8 +22,19 @@ #ifndef _HAPROXY_TINFO_T_H #define _HAPROXY_TINFO_T_H +#include + #include +/* tasklet classes */ +enum { + TL_URGENT = 0, /* urgent tasklets (I/O callbacks) */ + TL_NORMAL = 1, /* normal tasks */ + TL_BULK = 2, /* bulk task/tasklets, streaming I/Os */ + TL_HEAVY = 3, /* heavy computational tasklets (e.g. TLS handshakes) */ + TL_CLASSES /* must be last */ +}; + /* thread info flags, for ha_thread_info[].flags */ #define TI_FL_STUCK 0x00000001 @@ -47,4 +58,32 @@ struct thread_info { char __end[0] __attribute__((aligned(64))); }; +/* This structure describes all the per-thread context we need. This is + * essentially the scheduler-specific stuff and a few important per-thread + * lists that need to be thread-local. We take care of splitting this into + * separate cache lines. + */ +struct thread_ctx { + // first and second cache lines on 64 bits: thread-local operations only. + struct eb_root timers; /* tree constituting the per-thread wait queue */ + struct eb_root rqueue; /* tree constituting the per-thread run queue */ + struct task *current; /* current task (not tasklet) */ + unsigned int rqueue_ticks; /* Insertion counter for the run queue */ + int current_queue; /* points to current tasklet list being run, -1 if none */ + unsigned int nb_tasks; /* number of tasks allocated on this thread */ + uint8_t tl_class_mask; /* bit mask of non-empty tasklets classes */ + + // 11 bytes hole here + ALWAYS_ALIGN(2*sizeof(void*)); + struct list tasklets[TL_CLASSES]; /* tasklets (and/or tasks) to run, by class */ + + // third cache line here on 64 bits: accessed mostly using atomic ops + ALWAYS_ALIGN(64); + struct mt_list shared_tasklet_list; /* Tasklet to be run, woken up by other threads */ + unsigned int rq_total; /* total size of the run queue, prio_tree + tasklets */ + int tasks_in_list; /* Number of tasks in the per-thread tasklets list */ + ALWAYS_ALIGN(128); +}; + + #endif /* _HAPROXY_TINFO_T_H */ diff --git a/include/haproxy/tinfo.h b/include/haproxy/tinfo.h index be6cddf17..6e73824cb 100644 --- a/include/haproxy/tinfo.h +++ b/include/haproxy/tinfo.h @@ -25,8 +25,11 @@ #include #include -/* the struct is in thread.c */ +/* the structs are in thread.c */ extern struct thread_info ha_thread_info[MAX_THREADS]; extern THREAD_LOCAL struct thread_info *ti; /* thread_info for the current thread */ +extern struct thread_ctx ha_thread_ctx[MAX_THREADS]; +extern THREAD_LOCAL struct thread_ctx *th_ctx; /* ha_thread_ctx for the current thread */ + #endif /* _HAPROXY_TINFO_H */ diff --git a/src/activity.c b/src/activity.c index a4527f2db..b1b91aced 100644 --- a/src/activity.c +++ b/src/activity.c @@ -876,7 +876,7 @@ static int cli_io_handler_show_tasks(struct appctx *appctx) /* 2. all threads's local run queues */ for (thr = 0; thr < global.nbthread; thr++) { /* task run queue */ - rqnode = eb32sc_first(&task_per_thread[thr].rqueue, ~0UL); + rqnode = eb32sc_first(&ha_thread_ctx[thr].rqueue, ~0UL); while (rqnode) { t = eb32sc_entry(rqnode, struct task, rq); entry = sched_activity_entry(tmp_activity, t->process); @@ -890,7 +890,7 @@ static int cli_io_handler_show_tasks(struct appctx *appctx) } /* shared tasklet list */ - list_for_each_entry(tl, mt_list_to_list(&task_per_thread[thr].shared_tasklet_list), list) { + list_for_each_entry(tl, mt_list_to_list(&ha_thread_ctx[thr].shared_tasklet_list), list) { t = (const struct task *)tl; entry = sched_activity_entry(tmp_activity, t->process); if (!TASK_IS_TASKLET(t) && t->call_date) { @@ -903,7 +903,7 @@ static int cli_io_handler_show_tasks(struct appctx *appctx) /* classful tasklets */ for (queue = 0; queue < TL_CLASSES; queue++) { - list_for_each_entry(tl, &task_per_thread[thr].tasklets[queue], list) { + list_for_each_entry(tl, &ha_thread_ctx[thr].tasklets[queue], list) { t = (const struct task *)tl; entry = sched_activity_entry(tmp_activity, t->process); if (!TASK_IS_TASKLET(t) && t->call_date) { diff --git a/src/debug.c b/src/debug.c index ae3d9aebd..51440946f 100644 --- a/src/debug.c +++ b/src/debug.c @@ -161,14 +161,14 @@ void ha_thread_dump(struct buffer *buf, int thr, int calling_tid) ha_get_pthread_id(thr), thread_has_tasks(), !!(global_tasks_mask & thr_bit), - !eb_is_empty(&task_per_thread[thr].timers), - !eb_is_empty(&task_per_thread[thr].rqueue), - !(LIST_ISEMPTY(&task_per_thread[thr].tasklets[TL_URGENT]) && - LIST_ISEMPTY(&task_per_thread[thr].tasklets[TL_NORMAL]) && - LIST_ISEMPTY(&task_per_thread[thr].tasklets[TL_BULK]) && - MT_LIST_ISEMPTY(&task_per_thread[thr].shared_tasklet_list)), - task_per_thread[thr].tasks_in_list, - task_per_thread[thr].rq_total, + !eb_is_empty(&ha_thread_ctx[thr].timers), + !eb_is_empty(&ha_thread_ctx[thr].rqueue), + !(LIST_ISEMPTY(&ha_thread_ctx[thr].tasklets[TL_URGENT]) && + LIST_ISEMPTY(&ha_thread_ctx[thr].tasklets[TL_NORMAL]) && + LIST_ISEMPTY(&ha_thread_ctx[thr].tasklets[TL_BULK]) && + MT_LIST_ISEMPTY(&ha_thread_ctx[thr].shared_tasklet_list)), + ha_thread_ctx[thr].tasks_in_list, + ha_thread_ctx[thr].rq_total, stuck, !!(task_profiling_mask & thr_bit)); @@ -186,7 +186,7 @@ void ha_thread_dump(struct buffer *buf, int thr, int calling_tid) return; chunk_appendf(buf, " curr_task="); - ha_task_dump(buf, sched->current, " "); + ha_task_dump(buf, th_ctx->current, " "); if (stuck) { /* We only emit the backtrace for stuck threads in order not to diff --git a/src/haproxy.c b/src/haproxy.c index 885cc264c..c8b5ee781 100644 --- a/src/haproxy.c +++ b/src/haproxy.c @@ -2699,7 +2699,6 @@ static void *run_thread_poll_loop(void *data) ha_set_tid((unsigned long)data); set_thread_cpu_affinity(); - sched = &task_per_thread[tid]; clock_set_local_source(); /* Now, initialize one thread init at a time. This is better since diff --git a/src/task.c b/src/task.c index 45111f138..65aa6465e 100644 --- a/src/task.c +++ b/src/task.c @@ -38,8 +38,6 @@ DECLARE_POOL(pool_head_notification, "notification", sizeof(struct notification) volatile unsigned long global_tasks_mask = 0; /* Mask of threads with tasks in the global runqueue */ unsigned int niced_tasks = 0; /* number of niced tasks in the run queue */ -THREAD_LOCAL struct task_per_thread *sched = &task_per_thread[0]; /* scheduler context for the current thread */ - __decl_aligned_spinlock(rq_lock); /* spin lock related to run queue */ __decl_aligned_rwlock(wq_lock); /* RW lock related to the wait queue */ @@ -51,8 +49,6 @@ static unsigned int global_rqueue_ticks; /* insertion count in the grq, use rq_ #endif -struct task_per_thread task_per_thread[MAX_THREADS]; - /* Flags the task for immediate destruction and puts it into its first * thread's shared tasklet list if not yet queued/running. This will bypass @@ -92,10 +88,10 @@ void task_kill(struct task *t) thr = my_ffsl(t->thread_mask) - 1; /* Beware: tasks that have never run don't have their ->list empty yet! */ - MT_LIST_APPEND(&task_per_thread[thr].shared_tasklet_list, + MT_LIST_APPEND(&ha_thread_ctx[thr].shared_tasklet_list, (struct mt_list *)&((struct tasklet *)t)->list); - _HA_ATOMIC_INC(&task_per_thread[thr].rq_total); - _HA_ATOMIC_INC(&task_per_thread[thr].tasks_in_list); + _HA_ATOMIC_INC(&ha_thread_ctx[thr].rq_total); + _HA_ATOMIC_INC(&ha_thread_ctx[thr].tasks_in_list); if (sleeping_thread_mask & (1UL << thr)) { _HA_ATOMIC_AND(&sleeping_thread_mask, ~(1UL << thr)); wake_thread(thr); @@ -131,9 +127,9 @@ void tasklet_kill(struct tasklet *t) */ if (_HA_ATOMIC_CAS(&t->state, &state, state | TASK_IN_LIST | TASK_KILLED)) { thr = t->tid > 0 ? t->tid: tid; - MT_LIST_APPEND(&task_per_thread[thr].shared_tasklet_list, + MT_LIST_APPEND(&ha_thread_ctx[thr].shared_tasklet_list, (struct mt_list *)&t->list); - _HA_ATOMIC_INC(&task_per_thread[thr].rq_total); + _HA_ATOMIC_INC(&ha_thread_ctx[thr].rq_total); if (sleeping_thread_mask & (1UL << thr)) { _HA_ATOMIC_AND(&sleeping_thread_mask, ~(1UL << thr)); wake_thread(thr); @@ -153,31 +149,31 @@ void __tasklet_wakeup_on(struct tasklet *tl, int thr) if (likely(thr < 0)) { /* this tasklet runs on the caller thread */ if (tl->state & TASK_HEAVY) { - LIST_APPEND(&sched->tasklets[TL_HEAVY], &tl->list); - sched->tl_class_mask |= 1 << TL_HEAVY; + LIST_APPEND(&th_ctx->tasklets[TL_HEAVY], &tl->list); + th_ctx->tl_class_mask |= 1 << TL_HEAVY; } else if (tl->state & TASK_SELF_WAKING) { - LIST_APPEND(&sched->tasklets[TL_BULK], &tl->list); - sched->tl_class_mask |= 1 << TL_BULK; + LIST_APPEND(&th_ctx->tasklets[TL_BULK], &tl->list); + th_ctx->tl_class_mask |= 1 << TL_BULK; } - else if ((struct task *)tl == sched->current) { + else if ((struct task *)tl == th_ctx->current) { _HA_ATOMIC_OR(&tl->state, TASK_SELF_WAKING); - LIST_APPEND(&sched->tasklets[TL_BULK], &tl->list); - sched->tl_class_mask |= 1 << TL_BULK; + LIST_APPEND(&th_ctx->tasklets[TL_BULK], &tl->list); + th_ctx->tl_class_mask |= 1 << TL_BULK; } - else if (sched->current_queue < 0) { - LIST_APPEND(&sched->tasklets[TL_URGENT], &tl->list); - sched->tl_class_mask |= 1 << TL_URGENT; + else if (th_ctx->current_queue < 0) { + LIST_APPEND(&th_ctx->tasklets[TL_URGENT], &tl->list); + th_ctx->tl_class_mask |= 1 << TL_URGENT; } else { - LIST_APPEND(&sched->tasklets[sched->current_queue], &tl->list); - sched->tl_class_mask |= 1 << sched->current_queue; + LIST_APPEND(&th_ctx->tasklets[th_ctx->current_queue], &tl->list); + th_ctx->tl_class_mask |= 1 << th_ctx->current_queue; } - _HA_ATOMIC_INC(&sched->rq_total); + _HA_ATOMIC_INC(&th_ctx->rq_total); } else { /* this tasklet runs on a specific thread. */ - MT_LIST_APPEND(&task_per_thread[thr].shared_tasklet_list, (struct mt_list *)&tl->list); - _HA_ATOMIC_INC(&task_per_thread[thr].rq_total); + MT_LIST_APPEND(&ha_thread_ctx[thr].shared_tasklet_list, (struct mt_list *)&tl->list); + _HA_ATOMIC_INC(&ha_thread_ctx[thr].rq_total); if (sleeping_thread_mask & (1UL << thr)) { _HA_ATOMIC_AND(&sleeping_thread_mask, ~(1UL << thr)); wake_thread(thr); @@ -195,7 +191,7 @@ void __tasklet_wakeup_on(struct tasklet *tl, int thr) */ void __task_wakeup(struct task *t) { - struct eb_root *root = &sched->rqueue; + struct eb_root *root = &th_ctx->rqueue; #ifdef USE_THREAD if (t->thread_mask != tid_bit && global.nbthread != 1) { @@ -210,8 +206,8 @@ void __task_wakeup(struct task *t) } else #endif { - _HA_ATOMIC_INC(&sched->rq_total); - t->rq.key = ++sched->rqueue_ticks; + _HA_ATOMIC_INC(&th_ctx->rq_total); + t->rq.key = ++th_ctx->rqueue_ticks; } if (likely(t->nice)) { @@ -267,8 +263,8 @@ void __task_queue(struct task *task, struct eb_root *wq) { #ifdef USE_THREAD BUG_ON((wq == &timers && !(task->state & TASK_SHARED_WQ)) || - (wq == &sched->timers && (task->state & TASK_SHARED_WQ)) || - (wq != &timers && wq != &sched->timers)); + (wq == &th_ctx->timers && (task->state & TASK_SHARED_WQ)) || + (wq != &timers && wq != &th_ctx->timers)); #endif /* if this happens the process is doomed anyway, so better catch it now * so that we have the caller in the stack. @@ -295,7 +291,7 @@ void __task_queue(struct task *task, struct eb_root *wq) */ void wake_expired_tasks() { - struct task_per_thread * const tt = sched; // thread's tasks + struct thread_ctx * const tt = th_ctx; // thread's tasks int max_processed = global.tune.runqueue_depth; struct task *task; struct eb32_node *eb; @@ -436,7 +432,7 @@ leave: */ int next_timer_expiry() { - struct task_per_thread * const tt = sched; // thread's tasks + struct thread_ctx * const tt = th_ctx; // thread's tasks struct eb32_node *eb; int ret = TICK_ETERNITY; __decl_thread(int key = TICK_ETERNITY); @@ -470,7 +466,7 @@ int next_timer_expiry() return ret; } -/* Walks over tasklet lists sched->tasklets[0..TL_CLASSES-1] and run at most +/* Walks over tasklet lists th_ctx->tasklets[0..TL_CLASSES-1] and run at most * budget[TL_*] of them. Returns the number of entries effectively processed * (tasks and tasklets merged). The count of tasks in the list for the current * thread is adjusted. @@ -478,7 +474,7 @@ int next_timer_expiry() unsigned int run_tasks_from_lists(unsigned int budgets[]) { struct task *(*process)(struct task *t, void *ctx, unsigned int state); - struct list *tl_queues = sched->tasklets; + struct list *tl_queues = th_ctx->tasklets; struct task *t; uint8_t budget_mask = (1 << TL_CLASSES) - 1; struct sched_activity *profile_entry = NULL; @@ -488,29 +484,29 @@ unsigned int run_tasks_from_lists(unsigned int budgets[]) void *ctx; for (queue = 0; queue < TL_CLASSES;) { - sched->current_queue = queue; + th_ctx->current_queue = queue; /* global.tune.sched.low-latency is set */ if (global.tune.options & GTUNE_SCHED_LOW_LATENCY) { - if (unlikely(sched->tl_class_mask & budget_mask & ((1 << queue) - 1))) { + if (unlikely(th_ctx->tl_class_mask & budget_mask & ((1 << queue) - 1))) { /* a lower queue index has tasks again and still has a * budget to run them. Let's switch to it now. */ - queue = (sched->tl_class_mask & 1) ? 0 : - (sched->tl_class_mask & 2) ? 1 : 2; + queue = (th_ctx->tl_class_mask & 1) ? 0 : + (th_ctx->tl_class_mask & 2) ? 1 : 2; continue; } if (unlikely(queue > TL_URGENT && budget_mask & (1 << TL_URGENT) && - !MT_LIST_ISEMPTY(&sched->shared_tasklet_list))) { + !MT_LIST_ISEMPTY(&th_ctx->shared_tasklet_list))) { /* an urgent tasklet arrived from another thread */ break; } if (unlikely(queue > TL_NORMAL && budget_mask & (1 << TL_NORMAL) && - (!eb_is_empty(&sched->rqueue) || + (!eb_is_empty(&th_ctx->rqueue) || (global_tasks_mask & tid_bit)))) { /* a task was woken up by a bulk tasklet or another thread */ break; @@ -518,7 +514,7 @@ unsigned int run_tasks_from_lists(unsigned int budgets[]) } if (LIST_ISEMPTY(&tl_queues[queue])) { - sched->tl_class_mask &= ~(1 << queue); + th_ctx->tl_class_mask &= ~(1 << queue); queue++; continue; } @@ -538,9 +534,9 @@ unsigned int run_tasks_from_lists(unsigned int budgets[]) ctx = t->context; process = t->process; t->calls++; - sched->current = t; + th_ctx->current = t; - _HA_ATOMIC_DEC(&sched->rq_total); + _HA_ATOMIC_DEC(&th_ctx->rq_total); if (state & TASK_F_TASKLET) { uint64_t before = 0; @@ -567,7 +563,7 @@ unsigned int run_tasks_from_lists(unsigned int budgets[]) } else { done++; - sched->current = NULL; + th_ctx->current = NULL; pool_free(pool_head_tasklet, t); __ha_barrier_store(); continue; @@ -579,7 +575,7 @@ unsigned int run_tasks_from_lists(unsigned int budgets[]) } done++; - sched->current = NULL; + th_ctx->current = NULL; __ha_barrier_store(); continue; } @@ -591,7 +587,7 @@ unsigned int run_tasks_from_lists(unsigned int budgets[]) /* OK then this is a regular task */ - _HA_ATOMIC_DEC(&task_per_thread[tid].tasks_in_list); + _HA_ATOMIC_DEC(&ha_thread_ctx[tid].tasks_in_list); if (unlikely(t->call_date)) { uint64_t now_ns = now_mono_time(); uint64_t lat = now_ns - t->call_date; @@ -616,7 +612,7 @@ unsigned int run_tasks_from_lists(unsigned int budgets[]) else { task_unlink_wq(t); __task_free(t); - sched->current = NULL; + th_ctx->current = NULL; __ha_barrier_store(); /* We don't want max_processed to be decremented if * we're just freeing a destroyed task, we should only @@ -624,7 +620,7 @@ unsigned int run_tasks_from_lists(unsigned int budgets[]) */ continue; } - sched->current = NULL; + th_ctx->current = NULL; __ha_barrier_store(); /* If there is a pending state we have to wake up the task * immediately, else we defer it into wait queue @@ -650,7 +646,7 @@ unsigned int run_tasks_from_lists(unsigned int budgets[]) } done++; } - sched->current_queue = -1; + th_ctx->current_queue = -1; return done; } @@ -670,7 +666,7 @@ unsigned int run_tasks_from_lists(unsigned int budgets[]) */ void process_runnable_tasks() { - struct task_per_thread * const tt = sched; + struct thread_ctx * const tt = th_ctx; struct eb32sc_node *lrq; // next local run queue entry struct eb32sc_node *grq; // next global run queue entry struct task *t; @@ -701,11 +697,11 @@ void process_runnable_tasks() if (likely(niced_tasks)) max_processed = (max_processed + 3) / 4; - if (max_processed < sched->rq_total && sched->rq_total <= 2*max_processed) { + if (max_processed < th_ctx->rq_total && th_ctx->rq_total <= 2*max_processed) { /* If the run queue exceeds the budget by up to 50%, let's cut it * into two identical halves to improve latency. */ - max_processed = sched->rq_total / 2; + max_processed = th_ctx->rq_total / 2; } not_done_yet: @@ -718,7 +714,7 @@ void process_runnable_tasks() /* normal tasklets list gets a default weight of ~37% */ if ((tt->tl_class_mask & (1 << TL_NORMAL)) || - !eb_is_empty(&sched->rqueue) || (global_tasks_mask & tid_bit)) + !eb_is_empty(&th_ctx->rqueue) || (global_tasks_mask & tid_bit)) max[TL_NORMAL] = default_weights[TL_NORMAL]; /* bulk tasklets list gets a default weight of ~13% */ @@ -889,14 +885,14 @@ void mworker_cleantasks() #endif /* clean the per thread run queue */ for (i = 0; i < global.nbthread; i++) { - tmp_rq = eb32sc_first(&task_per_thread[i].rqueue, MAX_THREADS_MASK); + tmp_rq = eb32sc_first(&ha_thread_ctx[i].rqueue, MAX_THREADS_MASK); while (tmp_rq) { t = eb32sc_entry(tmp_rq, struct task, rq); tmp_rq = eb32sc_next(tmp_rq, MAX_THREADS_MASK); task_destroy(t); } /* cleanup the per thread timers queue */ - tmp_wq = eb32_first(&task_per_thread[i].timers); + tmp_wq = eb32_first(&ha_thread_ctx[i].timers); while (tmp_wq) { t = eb32_entry(tmp_wq, struct task, wq); tmp_wq = eb32_next(tmp_wq); @@ -914,11 +910,10 @@ static void init_task() memset(&timers, 0, sizeof(timers)); memset(&rqueue, 0, sizeof(rqueue)); #endif - memset(&task_per_thread, 0, sizeof(task_per_thread)); for (i = 0; i < MAX_THREADS; i++) { for (q = 0; q < TL_CLASSES; q++) - LIST_INIT(&task_per_thread[i].tasklets[q]); - MT_LIST_INIT(&task_per_thread[i].shared_tasklet_list); + LIST_INIT(&ha_thread_ctx[i].tasklets[q]); + MT_LIST_INIT(&ha_thread_ctx[i].shared_tasklet_list); } } diff --git a/src/thread.c b/src/thread.c index ad5327aed..ab9ef90b6 100644 --- a/src/thread.c +++ b/src/thread.c @@ -53,6 +53,9 @@ struct thread_info ha_thread_info[MAX_THREADS] = { }; THREAD_LOCAL struct thread_info *ti = &ha_thread_info[0]; +struct thread_ctx ha_thread_ctx[MAX_THREADS] = { }; +THREAD_LOCAL struct thread_ctx *th_ctx = &ha_thread_ctx[0]; + #ifdef USE_THREAD volatile unsigned long threads_want_rdv_mask __read_mostly = 0;