mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2024-12-14 15:34:35 +00:00
MEDIUM: tasks: improve fairness between the local and global queues
Tasks allowed to run on multiple threads, as well as those scheduled by one thread to run on another one pass through the global queue. The local queues only see tasks scheduled by one thread to run on itself. The tasks extracted from the global queue are transferred to the local queue when they're picked by one thread. This causes a priority issue because the global tasks experience a priority contest twice while the local ones experience it only once. Thus if a tasks returns still running, it's immediately reinserted into the local run queue and runs much faster than the ones coming from the global queue. Till 1.9 the tasks going through the global queue were mostly : - health checks initialization - queue management - listener dequeue/requeue These ones are moderately sensitive to unfairness so it was not that big an issue. Since 2.0-dev2 with the multi-queue accept, tasks are scheduled to remote threads on most accept() and it becomes fairly visible under load that the accept slows down, even for the CLI. This patch remedies this by consulting both the local and the global run queues in parallel and by always picking the task whose deadline is the earliest. This guarantees to maintain an excellent fairness between the two queues and removes the cascade effect experienced by the global tasks. Now the CLI always continues to respond quickly even in presence of expensive tasks running for a long time. This patch may possibly be backported to 1.9 if some scheduling issues are reported but at this time it doesn't seem necessary.
This commit is contained in:
parent
24f382f555
commit
cde7902ac9
133
src/task.c
133
src/task.c
@ -310,103 +310,96 @@ int wake_expired_tasks()
|
||||
* other variables (eg: nice value) to set the final position in the tree. The
|
||||
* counter may wrap without a problem, of course. We then limit the number of
|
||||
* tasks processed to 200 in any case, so that general latency remains low and
|
||||
* so that task positions have a chance to be considered.
|
||||
* so that task positions have a chance to be considered. The function scans
|
||||
* both the global and local run queues and picks the most urgent task between
|
||||
* the two. We need to grab the global runqueue lock to touch it so it's taken
|
||||
* on the very first access to the global run queue and is released as soon as
|
||||
* it reaches the end.
|
||||
*
|
||||
* The function adjusts <next> if a new event is closer.
|
||||
*/
|
||||
void process_runnable_tasks()
|
||||
{
|
||||
struct eb32sc_node *rq_next;
|
||||
struct eb32sc_node *lrq = NULL; // next local run queue entry
|
||||
struct eb32sc_node *grq = NULL; // next global run queue entry
|
||||
struct task *t;
|
||||
int max_processed;
|
||||
|
||||
if (!(active_tasks_mask & tid_bit)) {
|
||||
activity[tid].empty_rq++;
|
||||
return;
|
||||
}
|
||||
|
||||
tasks_run_queue_cur = tasks_run_queue; /* keep a copy for reporting */
|
||||
nb_tasks_cur = nb_tasks;
|
||||
max_processed = global.tune.runqueue_depth;
|
||||
|
||||
if (likely(global_tasks_mask & tid_bit)) {
|
||||
HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
|
||||
if (!(active_tasks_mask & tid_bit)) {
|
||||
HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
|
||||
activity[tid].empty_rq++;
|
||||
return;
|
||||
}
|
||||
/* Note: the grq lock is always held when grq is not null */
|
||||
|
||||
#ifdef USE_THREAD
|
||||
/* Get some elements from the global run queue and put it in the
|
||||
* local run queue. To try to keep a bit of fairness, just get as
|
||||
* much elements from the global list as to have a bigger local queue
|
||||
* than the average.
|
||||
*/
|
||||
rq_next = eb32sc_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK, tid_bit);
|
||||
while ((task_per_thread[tid].task_list_size + task_per_thread[tid].rqueue_size) * global.nbthread <= tasks_run_queue + global.nbthread - 1) {
|
||||
if (unlikely(!rq_next)) {
|
||||
/* either we just started or we reached the end
|
||||
* of the tree, typically because <rqueue_ticks>
|
||||
* is in the first half and we're first scanning
|
||||
* the last half. Let's loop back to the beginning
|
||||
* of the tree now.
|
||||
*/
|
||||
rq_next = eb32sc_first(&rqueue, tid_bit);
|
||||
if (!rq_next) {
|
||||
while (task_per_thread[tid].task_list_size < max_processed) {
|
||||
if ((global_tasks_mask & tid_bit) && !grq) {
|
||||
HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
|
||||
grq = eb32sc_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK, tid_bit);
|
||||
if (unlikely(!grq)) {
|
||||
grq = eb32sc_first(&rqueue, tid_bit);
|
||||
if (!grq) {
|
||||
HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
|
||||
_HA_ATOMIC_AND(&global_tasks_mask, ~tid_bit);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
t = eb32sc_entry(rq_next, struct task, rq);
|
||||
rq_next = eb32sc_next(rq_next, tid_bit);
|
||||
|
||||
/* detach the task from the queue */
|
||||
__task_unlink_rq(t);
|
||||
__task_wakeup(t, &task_per_thread[tid].rqueue);
|
||||
}
|
||||
#endif
|
||||
|
||||
HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
|
||||
} else {
|
||||
if (!(active_tasks_mask & tid_bit)) {
|
||||
activity[tid].empty_rq++;
|
||||
return;
|
||||
}
|
||||
}
|
||||
/* Get some tasks from the run queue, make sure we don't
|
||||
* get too much in the task list, but put a bit more than
|
||||
* the max that will be run, to give a bit more fairness
|
||||
*/
|
||||
rq_next = eb32sc_lookup_ge(&task_per_thread[tid].rqueue, rqueue_ticks - TIMER_LOOK_BACK, tid_bit);
|
||||
while (max_processed + (max_processed / 10) > task_per_thread[tid].task_list_size) {
|
||||
/* Note: this loop is one of the fastest code path in
|
||||
* the whole program. It should not be re-arranged
|
||||
* without a good reason.
|
||||
/* If a global task is available for this thread, it's in grq
|
||||
* now and the global RQ is locked.
|
||||
*/
|
||||
if (unlikely(!rq_next)) {
|
||||
/* either we just started or we reached the end
|
||||
* of the tree, typically because <rqueue_ticks>
|
||||
* is in the first half and we're first scanning
|
||||
* the last half. Let's loop back to the beginning
|
||||
* of the tree now.
|
||||
*/
|
||||
rq_next = eb32sc_first(&task_per_thread[tid].rqueue, tid_bit);
|
||||
if (!rq_next)
|
||||
break;
|
||||
|
||||
if (!lrq) {
|
||||
lrq = eb32sc_lookup_ge(&task_per_thread[tid].rqueue, rqueue_ticks - TIMER_LOOK_BACK, tid_bit);
|
||||
if (unlikely(!lrq))
|
||||
lrq = eb32sc_first(&task_per_thread[tid].rqueue, tid_bit);
|
||||
}
|
||||
t = eb32sc_entry(rq_next, struct task, rq);
|
||||
rq_next = eb32sc_next(rq_next, tid_bit);
|
||||
|
||||
if (!lrq && !grq)
|
||||
break;
|
||||
|
||||
if (likely(!grq || (lrq && (int)(lrq->key - grq->key) <= 0))) {
|
||||
t = eb32sc_entry(lrq, struct task, rq);
|
||||
lrq = eb32sc_next(lrq, tid_bit);
|
||||
__task_unlink_rq(t);
|
||||
}
|
||||
else {
|
||||
t = eb32sc_entry(grq, struct task, rq);
|
||||
grq = eb32sc_next(grq, tid_bit);
|
||||
__task_unlink_rq(t);
|
||||
if (unlikely(!grq)) {
|
||||
grq = eb32sc_first(&rqueue, tid_bit);
|
||||
if (!grq) {
|
||||
HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
|
||||
_HA_ATOMIC_AND(&global_tasks_mask, ~tid_bit);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Make sure nobody re-adds the task in the runqueue */
|
||||
_HA_ATOMIC_OR(&t->state, TASK_RUNNING);
|
||||
|
||||
/* detach the task from the queue */
|
||||
__task_unlink_rq(t);
|
||||
/* And add it to the local task list */
|
||||
task_insert_into_tasklet_list(t);
|
||||
}
|
||||
|
||||
/* release the rqueue lock */
|
||||
if (grq) {
|
||||
HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
|
||||
grq = NULL;
|
||||
}
|
||||
|
||||
if (!(global_tasks_mask & tid_bit) && task_per_thread[tid].rqueue_size == 0) {
|
||||
_HA_ATOMIC_AND(&active_tasks_mask, ~tid_bit);
|
||||
__ha_barrier_atomic_load();
|
||||
if (global_tasks_mask & tid_bit)
|
||||
_HA_ATOMIC_OR(&active_tasks_mask, tid_bit);
|
||||
}
|
||||
|
||||
while (max_processed > 0 && !LIST_ISEMPTY(&task_per_thread[tid].task_list)) {
|
||||
struct task *t;
|
||||
unsigned short state;
|
||||
@ -463,11 +456,11 @@ void process_runnable_tasks()
|
||||
}
|
||||
|
||||
max_processed--;
|
||||
if (max_processed <= 0) {
|
||||
_HA_ATOMIC_OR(&active_tasks_mask, tid_bit);
|
||||
activity[tid].long_rq++;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!LIST_ISEMPTY(&task_per_thread[tid].task_list)) {
|
||||
_HA_ATOMIC_OR(&active_tasks_mask, tid_bit);
|
||||
activity[tid].long_rq++;
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user