diff --git a/include/haproxy/task.h b/include/haproxy/task.h index 336dad4203..f83aba7842 100644 --- a/include/haproxy/task.h +++ b/include/haproxy/task.h @@ -107,6 +107,7 @@ __decl_thread(extern HA_RWLOCK_T wq_lock); /* RW lock related to the wait que void __tasklet_wakeup_on(struct tasklet *tl, int thr); void task_kill(struct task *t); +void tasklet_kill(struct tasklet *t); void __task_wakeup(struct task *t); void __task_queue(struct task *task, struct eb_root *wq); diff --git a/src/task.c b/src/task.c index c3145b04d9..7dc0cd41cf 100644 --- a/src/task.c +++ b/src/task.c @@ -103,6 +103,44 @@ void task_kill(struct task *t) } } +/* Equivalent of task_kill for tasklets. Mark the tasklet for destruction. + * It will be deleted on the next scheduler invocation. This function is + * thread-safe : a thread can kill a tasklet of another thread. + */ +void tasklet_kill(struct tasklet *t) +{ + unsigned int state = t->state; + unsigned int thr; + + BUG_ON(state & TASK_KILLED); + + while (1) { + while (state & (TASK_IN_LIST)) { + /* Tasklet already in the list ready to be executed. Add + * the killed flag and wait for the process loop to + * detect it. + */ + if (_HA_ATOMIC_CAS(&t->state, &state, state | TASK_KILLED)) + return; + } + + /* Mark the tasklet as killed and wake the thread to process it + * as soon as possible. + */ + if (_HA_ATOMIC_CAS(&t->state, &state, state | TASK_IN_LIST | TASK_KILLED)) { + thr = t->tid > 0 ? t->tid: tid; + MT_LIST_APPEND(&task_per_thread[thr].shared_tasklet_list, + (struct mt_list *)&t->list); + _HA_ATOMIC_INC(&task_per_thread[thr].rq_total); + if (sleeping_thread_mask & (1UL << thr)) { + _HA_ATOMIC_AND(&sleeping_thread_mask, ~(1UL << thr)); + wake_thread(thr); + } + return; + } + } +} + /* Do not call this one, please use tasklet_wakeup_on() instead, as this one is * the slow path of tasklet_wakeup_on() which performs some preliminary checks * and sets TASK_IN_LIST before calling this one. A negative designates @@ -485,7 +523,7 @@ unsigned int run_tasks_from_lists(unsigned int budgets[]) budgets[queue]--; t = (struct task *)LIST_ELEM(tl_queues[queue].n, struct tasklet *, list); - state = t->state & (TASK_SHARED_WQ|TASK_SELF_WAKING|TASK_HEAVY|TASK_F_TASKLET|TASK_KILLED|TASK_F_USR1); + state = t->state & (TASK_SHARED_WQ|TASK_SELF_WAKING|TASK_HEAVY|TASK_F_TASKLET|TASK_KILLED|TASK_F_USR1|TASK_KILLED); ti->flags &= ~TI_FL_STUCK; // this thread is still running activity[tid].ctxsw++; @@ -516,7 +554,16 @@ unsigned int run_tasks_from_lists(unsigned int budgets[]) state = _HA_ATOMIC_XCHG(&t->state, state); __ha_barrier_atomic_store(); - process(t, ctx, state); + if (likely(!(state & TASK_KILLED))) { + process(t, ctx, state); + } + else { + done++; + sched->current = NULL; + pool_free(pool_head_tasklet, t); + __ha_barrier_store(); + continue; + } if (unlikely(task_profiling_mask & tid_bit)) { HA_ATOMIC_INC(&profile_entry->calls);