diff --git a/include/haproxy/task.h b/include/haproxy/task.h index 4bc216f63..1f31a76bd 100644 --- a/include/haproxy/task.h +++ b/include/haproxy/task.h @@ -109,6 +109,7 @@ extern struct task_per_thread task_per_thread[MAX_THREADS]; __decl_thread(extern HA_SPINLOCK_T rq_lock); /* spin lock related to run queue */ __decl_thread(extern HA_RWLOCK_T wq_lock); /* RW lock related to the wait queue */ +void task_kill(struct task *t); void __task_wakeup(struct task *t, struct eb_root *); void __task_queue(struct task *task, struct eb_root *wq); diff --git a/src/task.c b/src/task.c index 7626e6037..761fc29a3 100644 --- a/src/task.c +++ b/src/task.c @@ -57,6 +57,57 @@ static unsigned int rqueue_ticks; /* insertion count */ struct task_per_thread task_per_thread[MAX_THREADS]; + +/* Flags the task for immediate destruction and puts it into its first + * thread's shared tasklet list if not yet queued/running. This will bypass + * the priority scheduling and make the task show up as fast as possible in + * the other thread's queue. Note that this operation isn't idempotent and is + * not supposed to be run on the same task from multiple threads at once. It's + * the caller's responsibility to make sure it is the only one able to kill the + * task. + */ +void task_kill(struct task *t) +{ + unsigned short state = t->state; + unsigned int thr; + + BUG_ON(state & TASK_KILLED); + + while (1) { + while (state & (TASK_RUNNING | TASK_QUEUED)) { + /* task already in the queue and about to be executed, + * or even currently running. Just add the flag and be + * done with it, the process loop will detect it and kill + * it. The CAS will fail if we arrive too late. + */ + if (_HA_ATOMIC_CAS(&t->state, &state, state | TASK_KILLED)) + return; + } + + /* We'll have to wake it up, but we must also secure it so that + * it doesn't vanish under us. TASK_QUEUED guarantees nobody will + * add past us. + */ + if (_HA_ATOMIC_CAS(&t->state, &state, state | TASK_QUEUED | TASK_KILLED)) { + /* Bypass the tree and go directly into the shared tasklet list. + * Note: that's a task so it must be accounted for as such. Pick + * the task's first thread for the job. + */ + thr = my_ffsl(t->thread_mask) - 1; + if (MT_LIST_ADDQ(&task_per_thread[thr].shared_tasklet_list, + (struct mt_list *)&((struct tasklet *)t)->list)) { + _HA_ATOMIC_ADD(&tasks_run_queue, 1); + _HA_ATOMIC_ADD(&task_per_thread[thr].task_list_size, 1); + if (sleeping_thread_mask & (1UL << thr)) { + _HA_ATOMIC_AND(&sleeping_thread_mask, ~(1UL << thr)); + wake_thread(thr); + } + return; + } + } + } +} + /* Puts the task in run queue at a position depending on t->nice. is * returned. The nice value assigns boosts in 32th of the run queue size. A * nice value of -1024 sets the task to -tasks_run_queue*32, while a nice value