From ff1e9f39b91ab945e0846bc8493c9e00445b337d Mon Sep 17 00:00:00 2001 From: Olivier Houchard Date: Fri, 20 Sep 2019 17:18:35 +0200 Subject: [PATCH] MEDIUM: tasklets: Make the tasklet list a struct mt_list. Change the tasklet code so that the tasklet list is now a mt_list. That means that tasklet now do have an associated tid, for the thread it is expected to run on, and any thread can now call tasklet_wakeup() for that tasklet. One can change the associated tid with tasklet_set_tid(). --- include/proto/task.h | 32 ++++++++++++++++++-------------- include/types/task.h | 5 +++-- src/checks.c | 4 ++++ src/debug.c | 2 +- src/task.c | 17 ++++++++++------- 5 files changed, 36 insertions(+), 24 deletions(-) diff --git a/include/proto/task.h b/include/proto/task.h index f213f47d0..da7d892b3 100644 --- a/include/proto/task.h +++ b/include/proto/task.h @@ -226,10 +226,8 @@ static inline struct task *task_unlink_rq(struct task *t) static inline void tasklet_wakeup(struct tasklet *tl) { - if (!LIST_ISEMPTY(&tl->list)) - return; - LIST_ADDQ(&task_per_thread[tid].task_list, &tl->list); - _HA_ATOMIC_ADD(&tasks_run_queue, 1); + if (MT_LIST_ADDQ(&task_per_thread[tl->tid].task_list, &tl->list) == 1) + _HA_ATOMIC_ADD(&tasks_run_queue, 1); } @@ -238,8 +236,8 @@ static inline void tasklet_wakeup(struct tasklet *tl) */ static inline void tasklet_insert_into_tasklet_list(struct tasklet *tl) { - _HA_ATOMIC_ADD(&tasks_run_queue, 1); - LIST_ADDQ(&task_per_thread[tid].task_list, &tl->list); + if (MT_LIST_ADDQ(&task_per_thread[tid].task_list, &tl->list) == 1) + _HA_ATOMIC_ADD(&tasks_run_queue, 1); } /* Remove the tasklet from the tasklet list. The tasklet MUST already be there. @@ -248,13 +246,13 @@ static inline void tasklet_insert_into_tasklet_list(struct tasklet *tl) */ static inline void __tasklet_remove_from_tasklet_list(struct tasklet *t) { - LIST_DEL_INIT(&t->list); - _HA_ATOMIC_SUB(&tasks_run_queue, 1); + if (MT_LIST_DEL(&t->list) == 1) + _HA_ATOMIC_SUB(&tasks_run_queue, 1); } static inline void tasklet_remove_from_tasklet_list(struct tasklet *t) { - if (likely(!LIST_ISEMPTY(&t->list))) + if (likely(!MT_LIST_ISEMPTY(&t->list))) __tasklet_remove_from_tasklet_list(t); } @@ -284,7 +282,8 @@ static inline void tasklet_init(struct tasklet *t) t->calls = 0; t->state = 0; t->process = NULL; - LIST_INIT(&t->list); + t->tid = tid; + MT_LIST_INIT(&t->list); } static inline struct tasklet *tasklet_new(void) @@ -355,9 +354,9 @@ static inline void task_destroy(struct task *t) static inline void tasklet_free(struct tasklet *tl) { - if (!LIST_ISEMPTY(&tl->list)) { - LIST_DEL(&tl->list); - _HA_ATOMIC_SUB(&tasks_run_queue, 1); + if (!MT_LIST_ISEMPTY(&tl->list)) { + if(MT_LIST_DEL(&tl->list) == 1) + _HA_ATOMIC_SUB(&tasks_run_queue, 1); } if ((struct task *)tl == curr_task) { @@ -369,6 +368,11 @@ static inline void tasklet_free(struct tasklet *tl) pool_flush(pool_head_tasklet); } +static inline void tasklet_set_tid(struct tasklet *tl, int tid) +{ + tl->tid = tid; +} + void __task_queue(struct task *task, struct eb_root *wq); /* Place into the wait queue, where it may already be. If the expiration @@ -538,7 +542,7 @@ static inline int thread_has_tasks(void) { return (!!(global_tasks_mask & tid_bit) | (task_per_thread[tid].rqueue_size > 0) | - !LIST_ISEMPTY(&task_per_thread[tid].task_list)); + !MT_LIST_ISEMPTY(&task_per_thread[tid].task_list)); } /* adds list item to work list and wake up the associated task */ diff --git a/include/types/task.h b/include/types/task.h index 481d56331..aac3a6c11 100644 --- a/include/types/task.h +++ b/include/types/task.h @@ -61,7 +61,7 @@ struct notification { struct task_per_thread { struct eb_root timers; /* tree constituting the per-thread wait queue */ struct eb_root rqueue; /* tree constituting the per-thread run queue */ - struct list task_list; /* List of tasks to be run, mixing tasks and tasklets */ + struct mt_list task_list; /* List of tasks to be run, mixing tasks and tasklets */ int task_list_size; /* Number of tasks in the task_list */ int rqueue_size; /* Number of elements in the per-thread run queue */ __attribute__((aligned(64))) char end[0]; @@ -94,7 +94,8 @@ struct task { /* lightweight tasks, without priority, mainly used for I/Os */ struct tasklet { TASK_COMMON; /* must be at the beginning! */ - struct list list; + struct mt_list list; + int tid; /* TID of the tasklet owner */ }; #define TASK_IS_TASKLET(t) ((t)->nice == -32768) diff --git a/src/checks.c b/src/checks.c index b879100fa..6bde41291 100644 --- a/src/checks.c +++ b/src/checks.c @@ -1638,6 +1638,8 @@ static int connect_conn_chk(struct task *t) conn = cs->conn; /* Maybe there were an older connection we were waiting on */ check->wait_list.events = 0; + tasklet_set_tid(check->wait_list.tasklet, tid); + if (!sockaddr_alloc(&conn->dst)) return SF_ERR_RESOURCE; @@ -2891,6 +2893,8 @@ static int tcpcheck_main(struct check *check) cs_destroy(check->cs); } + tasklet_set_tid(check->wait_list.tasklet, tid); + check->cs = cs; conn = cs->conn; /* Maybe there were an older connection we were waiting on */ diff --git a/src/debug.c b/src/debug.c index 409bd2cbd..a2b99c9cd 100644 --- a/src/debug.c +++ b/src/debug.c @@ -57,7 +57,7 @@ void ha_thread_dump(struct buffer *buf, int thr, int calling_tid) !!(global_tasks_mask & thr_bit), !eb_is_empty(&task_per_thread[thr].timers), !eb_is_empty(&task_per_thread[thr].rqueue), - !LIST_ISEMPTY(&task_per_thread[thr].task_list), + !MT_LIST_ISEMPTY(&task_per_thread[thr].task_list), task_per_thread[thr].task_list_size, task_per_thread[thr].rqueue_size, stuck, diff --git a/src/task.c b/src/task.c index dfec419f6..9aa76463c 100644 --- a/src/task.c +++ b/src/task.c @@ -368,9 +368,11 @@ void process_runnable_tasks() } #endif + /* Make sure the entry doesn't appear to be in a list */ + MT_LIST_INIT(&((struct tasklet *)t)->list); /* And add it to the local task list */ tasklet_insert_into_tasklet_list((struct tasklet *)t); - task_per_thread[tid].task_list_size++; + HA_ATOMIC_ADD(&task_per_thread[tid].task_list_size, 1); activity[tid].tasksw++; } @@ -380,18 +382,19 @@ void process_runnable_tasks() grq = NULL; } - while (max_processed > 0 && !LIST_ISEMPTY(&task_per_thread[tid].task_list)) { + while (max_processed > 0 && !MT_LIST_ISEMPTY(&task_per_thread[tid].task_list)) { struct task *t; unsigned short state; void *ctx; struct task *(*process)(struct task *t, void *ctx, unsigned short state); - t = (struct task *)LIST_ELEM(task_per_thread[tid].task_list.n, struct tasklet *, list); + t = (struct task *)MT_LIST_POP(&task_per_thread[tid].task_list, struct tasklet *, list); + if (!t) + break; state = _HA_ATOMIC_XCHG(&t->state, TASK_RUNNING); __ha_barrier_atomic_store(); - __tasklet_remove_from_tasklet_list((struct tasklet *)t); if (!TASK_IS_TASKLET(t)) - task_per_thread[tid].task_list_size--; + _HA_ATOMIC_SUB(&task_per_thread[tid].task_list_size, 1); ti->flags &= ~TI_FL_STUCK; // this thread is still running activity[tid].ctxsw++; @@ -443,7 +446,7 @@ void process_runnable_tasks() max_processed--; } - if (!LIST_ISEMPTY(&task_per_thread[tid].task_list)) + if (!MT_LIST_ISEMPTY(&task_per_thread[tid].task_list)) activity[tid].long_rq++; } @@ -547,7 +550,7 @@ static void init_task() #endif memset(&task_per_thread, 0, sizeof(task_per_thread)); for (i = 0; i < MAX_THREADS; i++) { - LIST_INIT(&task_per_thread[i].task_list); + MT_LIST_INIT(&task_per_thread[i].task_list); } }