BUG/MAJOR: task: add a new TASK_SHARED_WQ flag to fix foreing requeuing
Since 1.9 with commit b20aa9eef3
("MAJOR: tasks: create per-thread wait
queues") a task bound to a single thread will not use locks when being
queued or dequeued because the wait queue is assumed to be the owner
thread's.
But there exists a rare situation where this is not true: the health
check tasks may be running on one thread waiting for a response, and
may in parallel be requeued by another thread calling health_adjust()
after a detecting a response error in traffic when "observe l7" is set,
and "fastinter" is lower than "inter", requiring to shorten the running
check's timeout. In this case, the task being requeued was present in
another thread's wait queue, thus opening a race during task_unlink_wq(),
and gets requeued into the calling thread's wait queue instead of the
running one's, opening a second race here.
This patch aims at protecting against the risk of calling task_unlink_wq()
from one thread while the task is queued on another thread, hence unlocked,
by introducing a new TASK_SHARED_WQ flag.
This new flag indicates that a task's position in the wait queue may be
adjusted by other threads than then one currently executing it. This means
that such WQ manipulations must be performed under a lock. There are two
types of such tasks:
- the global ones, using the global wait queue (technically speaking,
those whose thread_mask has at least 2 bits set).
- some local ones, which for now will be placed into the global wait
queue as well in order to benefit from its lock.
The flag is automatically set on initialization if the task's thread mask
indicates more than one thread. The caller must also set it if it intends
to let other threads update the task's expiration delay (e.g. delegated
I/Os), or if it intends to change the task's affinity over time as this
could lead to the same situation.
Right now only the situation described above seems to be affected by this
issue, and it is very difficult to trigger, and even then, will often have
no visible effect beyond stopping the checks for example once the race is
met. On my laptop it is feasible with the following config, chained to
httpterm:
global
maxconn 400 # provoke FD errors, calling health_adjust()
defaults
mode http
timeout client 10s
timeout server 10s
timeout connect 10s
listen px
bind :8001
option httpchk /?t=50
server sback 127.0.0.1:8000 backup
server-template s 0-999 127.0.0.1:8000 check port 8001 inter 100 fastinter 10 observe layer7
This patch will automatically address the case for the checks because
check tasks are created with multiple threads bound and will get the
TASK_SHARED_WQ flag set.
If in the future more tasks need to rely on this (multi-threaded muxes
for example) and the use of the global wait queue becomes a bottleneck
again, then it should not be too difficult to place locks on the local
wait queues and queue the task on its bound thread.
This patch needs to be backported to 2.1, 2.0 and 1.9. It depends on
previous patch "MINOR: task: only check TASK_WOKEN_ANY to decide to
requeue a task".
Many thanks to William Dauchy for providing detailed traces allowing to
spot the problem.
This commit is contained in:
parent
8fe4253bf6
commit
dd0e89a084
|
@ -152,7 +152,13 @@ static inline void task_wakeup(struct task *t, unsigned int f)
|
|||
}
|
||||
}
|
||||
|
||||
/* change the thread affinity of a task to <thread_mask> */
|
||||
/* change the thread affinity of a task to <thread_mask>.
|
||||
* This may only be done from within the running task itself or during its
|
||||
* initialization. It will unqueue and requeue the task from the wait queue
|
||||
* if it was in it. This is safe against a concurrent task_queue() call because
|
||||
* task_queue() itself will unlink again if needed after taking into account
|
||||
* the new thread_mask.
|
||||
*/
|
||||
static inline void task_set_affinity(struct task *t, unsigned long thread_mask)
|
||||
{
|
||||
if (unlikely(task_in_wq(t))) {
|
||||
|
@ -177,15 +183,15 @@ static inline struct task *__task_unlink_wq(struct task *t)
|
|||
}
|
||||
|
||||
/* remove a task from its wait queue. It may either be the local wait queue if
|
||||
* the task is bound to a single thread (in which case there's no locking
|
||||
* involved) or the global queue, with locking.
|
||||
* the task is bound to a single thread or the global queue. If the task uses a
|
||||
* shared wait queue, the global wait queue lock is used.
|
||||
*/
|
||||
static inline struct task *task_unlink_wq(struct task *t)
|
||||
{
|
||||
unsigned long locked;
|
||||
|
||||
if (likely(task_in_wq(t))) {
|
||||
locked = atleast2(t->thread_mask);
|
||||
locked = t->state & TASK_SHARED_WQ;
|
||||
if (locked)
|
||||
HA_RWLOCK_WRLOCK(TASK_WQ_LOCK, &wq_lock);
|
||||
__task_unlink_wq(t);
|
||||
|
@ -285,7 +291,8 @@ static inline void tasklet_remove_from_tasklet_list(struct tasklet *t)
|
|||
/*
|
||||
* Initialize a new task. The bare minimum is performed (queue pointers and
|
||||
* state). The task is returned. This function should not be used outside of
|
||||
* task_new().
|
||||
* task_new(). If the thread mask contains more than one thread, TASK_SHARED_WQ
|
||||
* is set.
|
||||
*/
|
||||
static inline struct task *task_init(struct task *t, unsigned long thread_mask)
|
||||
{
|
||||
|
@ -293,6 +300,8 @@ static inline struct task *task_init(struct task *t, unsigned long thread_mask)
|
|||
t->rq.node.leaf_p = NULL;
|
||||
t->state = TASK_SLEEPING;
|
||||
t->thread_mask = thread_mask;
|
||||
if (atleast2(thread_mask))
|
||||
t->state |= TASK_SHARED_WQ;
|
||||
t->nice = 0;
|
||||
t->calls = 0;
|
||||
t->call_date = 0;
|
||||
|
@ -407,9 +416,9 @@ void __task_queue(struct task *task, struct eb_root *wq);
|
|||
|
||||
/* Place <task> into the wait queue, where it may already be. If the expiration
|
||||
* timer is infinite, do nothing and rely on wake_expired_task to clean up.
|
||||
* If the task is bound to a single thread, it's assumed to be bound to the
|
||||
* current thread's queue and is queued without locking. Otherwise it's queued
|
||||
* into the global wait queue, protected by locks.
|
||||
* If the task uses a shared wait queue, it's queued into the global wait queue,
|
||||
* protected by the global wq_lock, otherwise by it necessarily belongs to the
|
||||
* current thread'sand is queued without locking.
|
||||
*/
|
||||
static inline void task_queue(struct task *task)
|
||||
{
|
||||
|
@ -426,7 +435,7 @@ static inline void task_queue(struct task *task)
|
|||
return;
|
||||
|
||||
#ifdef USE_THREAD
|
||||
if (atleast2(task->thread_mask)) {
|
||||
if (task->state & TASK_SHARED_WQ) {
|
||||
HA_RWLOCK_WRLOCK(TASK_WQ_LOCK, &wq_lock);
|
||||
if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
|
||||
__task_queue(task, &timers);
|
||||
|
@ -434,6 +443,7 @@ static inline void task_queue(struct task *task)
|
|||
} else
|
||||
#endif
|
||||
{
|
||||
BUG_ON((task->thread_mask & tid_bit) == 0); // should have TASK_SHARED_WQ
|
||||
if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
|
||||
__task_queue(task, &sched->timers);
|
||||
}
|
||||
|
@ -450,7 +460,7 @@ static inline void task_schedule(struct task *task, int when)
|
|||
return;
|
||||
|
||||
#ifdef USE_THREAD
|
||||
if (atleast2(task->thread_mask)) {
|
||||
if (task->state & TASK_SHARED_WQ) {
|
||||
/* FIXME: is it really needed to lock the WQ during the check ? */
|
||||
HA_RWLOCK_WRLOCK(TASK_WQ_LOCK, &wq_lock);
|
||||
if (task_in_wq(task))
|
||||
|
@ -463,6 +473,7 @@ static inline void task_schedule(struct task *task, int when)
|
|||
} else
|
||||
#endif
|
||||
{
|
||||
BUG_ON((task->thread_mask & tid_bit) == 0); // should have TASK_SHARED_WQ
|
||||
if (task_in_wq(task))
|
||||
when = tick_first(when, task->expire);
|
||||
|
||||
|
|
|
@ -34,6 +34,8 @@
|
|||
#define TASK_RUNNING 0x0001 /* the task is currently running */
|
||||
#define TASK_GLOBAL 0x0002 /* The task is currently in the global runqueue */
|
||||
#define TASK_QUEUED 0x0004 /* The task has been (re-)added to the run queue */
|
||||
#define TASK_SHARED_WQ 0x0008 /* The task's expiration may be updated by other
|
||||
* threads, must be set before first queue/wakeup */
|
||||
|
||||
#define TASK_WOKEN_INIT 0x0100 /* woken up for initialisation purposes */
|
||||
#define TASK_WOKEN_TIMER 0x0200 /* woken up because of expired timer */
|
||||
|
|
|
@ -428,7 +428,8 @@ void process_runnable_tasks()
|
|||
struct task *(*process)(struct task *t, void *ctx, unsigned short state);
|
||||
|
||||
t = (struct task *)LIST_ELEM(task_per_thread[tid].task_list.n, struct tasklet *, list);
|
||||
state = _HA_ATOMIC_XCHG(&t->state, TASK_RUNNING);
|
||||
state = (t->state & TASK_SHARED_WQ) | TASK_RUNNING;
|
||||
state = _HA_ATOMIC_XCHG(&t->state, state);
|
||||
__ha_barrier_atomic_store();
|
||||
__tasklet_remove_from_tasklet_list((struct tasklet *)t);
|
||||
|
||||
|
|
Loading…
Reference in New Issue