MEDIUM: tasks: improve fairness between the local and global queues

Tasks allowed to run on multiple threads, as well as those scheduled by
one thread to run on another one pass through the global queue. The
local queues only see tasks scheduled by one thread to run on itself.
The tasks extracted from the global queue are transferred to the local
queue when they're picked by one thread. This causes a priority issue
because the global tasks experience a priority contest twice while the
local ones experience it only once. Thus if a tasks returns still
running, it's immediately reinserted into the local run queue and runs
much faster than the ones coming from the global queue.

Till 1.9 the tasks going through the global queue were mostly :
  - health checks initialization
  - queue management
  - listener dequeue/requeue

These ones are moderately sensitive to unfairness so it was not that
big an issue.

Since 2.0-dev2 with the multi-queue accept, tasks are scheduled to
remote threads on most accept() and it becomes fairly visible under
load that the accept slows down, even for the CLI.

This patch remedies this by consulting both the local and the global
run queues in parallel and by always picking the task whose deadline
is the earliest. This guarantees to maintain an excellent fairness
between the two queues and removes the cascade effect experienced
by the global tasks.

Now the CLI always continues to respond quickly even in presence of
expensive tasks running for a long time.

This patch may possibly be backported to 1.9 if some scheduling issues
are reported but at this time it doesn't seem necessary.
This commit is contained in:
Willy Tarreau 2019-04-12 18:03:41 +02:00
parent 24f382f555
commit cde7902ac9

View File

@ -310,103 +310,96 @@ int wake_expired_tasks()
* other variables (eg: nice value) to set the final position in the tree. The
* counter may wrap without a problem, of course. We then limit the number of
* tasks processed to 200 in any case, so that general latency remains low and
* so that task positions have a chance to be considered.
* so that task positions have a chance to be considered. The function scans
* both the global and local run queues and picks the most urgent task between
* the two. We need to grab the global runqueue lock to touch it so it's taken
* on the very first access to the global run queue and is released as soon as
* it reaches the end.
*
* The function adjusts <next> if a new event is closer.
*/
void process_runnable_tasks()
{
struct eb32sc_node *rq_next;
struct eb32sc_node *lrq = NULL; // next local run queue entry
struct eb32sc_node *grq = NULL; // next global run queue entry
struct task *t;
int max_processed;
if (!(active_tasks_mask & tid_bit)) {
activity[tid].empty_rq++;
return;
}
tasks_run_queue_cur = tasks_run_queue; /* keep a copy for reporting */
nb_tasks_cur = nb_tasks;
max_processed = global.tune.runqueue_depth;
if (likely(global_tasks_mask & tid_bit)) {
HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
if (!(active_tasks_mask & tid_bit)) {
HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
activity[tid].empty_rq++;
return;
}
/* Note: the grq lock is always held when grq is not null */
#ifdef USE_THREAD
/* Get some elements from the global run queue and put it in the
* local run queue. To try to keep a bit of fairness, just get as
* much elements from the global list as to have a bigger local queue
* than the average.
*/
rq_next = eb32sc_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK, tid_bit);
while ((task_per_thread[tid].task_list_size + task_per_thread[tid].rqueue_size) * global.nbthread <= tasks_run_queue + global.nbthread - 1) {
if (unlikely(!rq_next)) {
/* either we just started or we reached the end
* of the tree, typically because <rqueue_ticks>
* is in the first half and we're first scanning
* the last half. Let's loop back to the beginning
* of the tree now.
*/
rq_next = eb32sc_first(&rqueue, tid_bit);
if (!rq_next) {
while (task_per_thread[tid].task_list_size < max_processed) {
if ((global_tasks_mask & tid_bit) && !grq) {
HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
grq = eb32sc_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK, tid_bit);
if (unlikely(!grq)) {
grq = eb32sc_first(&rqueue, tid_bit);
if (!grq) {
HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
_HA_ATOMIC_AND(&global_tasks_mask, ~tid_bit);
break;
}
}
t = eb32sc_entry(rq_next, struct task, rq);
rq_next = eb32sc_next(rq_next, tid_bit);
/* detach the task from the queue */
__task_unlink_rq(t);
__task_wakeup(t, &task_per_thread[tid].rqueue);
}
#endif
HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
} else {
if (!(active_tasks_mask & tid_bit)) {
activity[tid].empty_rq++;
return;
}
}
/* Get some tasks from the run queue, make sure we don't
* get too much in the task list, but put a bit more than
* the max that will be run, to give a bit more fairness
*/
rq_next = eb32sc_lookup_ge(&task_per_thread[tid].rqueue, rqueue_ticks - TIMER_LOOK_BACK, tid_bit);
while (max_processed + (max_processed / 10) > task_per_thread[tid].task_list_size) {
/* Note: this loop is one of the fastest code path in
* the whole program. It should not be re-arranged
* without a good reason.
/* If a global task is available for this thread, it's in grq
* now and the global RQ is locked.
*/
if (unlikely(!rq_next)) {
/* either we just started or we reached the end
* of the tree, typically because <rqueue_ticks>
* is in the first half and we're first scanning
* the last half. Let's loop back to the beginning
* of the tree now.
*/
rq_next = eb32sc_first(&task_per_thread[tid].rqueue, tid_bit);
if (!rq_next)
break;
if (!lrq) {
lrq = eb32sc_lookup_ge(&task_per_thread[tid].rqueue, rqueue_ticks - TIMER_LOOK_BACK, tid_bit);
if (unlikely(!lrq))
lrq = eb32sc_first(&task_per_thread[tid].rqueue, tid_bit);
}
t = eb32sc_entry(rq_next, struct task, rq);
rq_next = eb32sc_next(rq_next, tid_bit);
if (!lrq && !grq)
break;
if (likely(!grq || (lrq && (int)(lrq->key - grq->key) <= 0))) {
t = eb32sc_entry(lrq, struct task, rq);
lrq = eb32sc_next(lrq, tid_bit);
__task_unlink_rq(t);
}
else {
t = eb32sc_entry(grq, struct task, rq);
grq = eb32sc_next(grq, tid_bit);
__task_unlink_rq(t);
if (unlikely(!grq)) {
grq = eb32sc_first(&rqueue, tid_bit);
if (!grq) {
HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
_HA_ATOMIC_AND(&global_tasks_mask, ~tid_bit);
}
}
}
/* Make sure nobody re-adds the task in the runqueue */
_HA_ATOMIC_OR(&t->state, TASK_RUNNING);
/* detach the task from the queue */
__task_unlink_rq(t);
/* And add it to the local task list */
task_insert_into_tasklet_list(t);
}
/* release the rqueue lock */
if (grq) {
HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
grq = NULL;
}
if (!(global_tasks_mask & tid_bit) && task_per_thread[tid].rqueue_size == 0) {
_HA_ATOMIC_AND(&active_tasks_mask, ~tid_bit);
__ha_barrier_atomic_load();
if (global_tasks_mask & tid_bit)
_HA_ATOMIC_OR(&active_tasks_mask, tid_bit);
}
while (max_processed > 0 && !LIST_ISEMPTY(&task_per_thread[tid].task_list)) {
struct task *t;
unsigned short state;
@ -463,11 +456,11 @@ void process_runnable_tasks()
}
max_processed--;
if (max_processed <= 0) {
_HA_ATOMIC_OR(&active_tasks_mask, tid_bit);
activity[tid].long_rq++;
break;
}
}
if (!LIST_ISEMPTY(&task_per_thread[tid].task_list)) {
_HA_ATOMIC_OR(&active_tasks_mask, tid_bit);
activity[tid].long_rq++;
}
}