mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2025-05-02 07:48:05 +00:00
MAJOR: task: task scheduler rework.
In order to authorize call of task_wakeup on running task: - from within the task handler itself. - in futur, from another thread. The lookups on runqueue and waitqueue are re-worked to prepare multithread stuff. If task_wakeup is called on a running task, the woken message flags are savec in the 'pending_state' attribute of the state. The real wakeup is postponed at the end of the handler process and the woken messages are copied from pending_state to the state attribute of the task. It's important to note that this change will cause a very minor (though measurable) performance loss but it is necessary to make forward progress on a multi-threaded scheduler. Most users won't ever notice.
This commit is contained in:
parent
ff4491726f
commit
0194897e54
@ -85,8 +85,6 @@ extern unsigned int tasks_run_queue_cur;
|
|||||||
extern unsigned int nb_tasks_cur;
|
extern unsigned int nb_tasks_cur;
|
||||||
extern unsigned int niced_tasks; /* number of niced tasks in the run queue */
|
extern unsigned int niced_tasks; /* number of niced tasks in the run queue */
|
||||||
extern struct pool_head *pool2_task;
|
extern struct pool_head *pool2_task;
|
||||||
extern struct eb32_node *last_timer; /* optimization: last queued timer */
|
|
||||||
extern struct eb32_node *rq_next; /* optimization: next task except if delete/insert */
|
|
||||||
|
|
||||||
/* return 0 if task is in run queue, otherwise non-zero */
|
/* return 0 if task is in run queue, otherwise non-zero */
|
||||||
static inline int task_in_rq(struct task *t)
|
static inline int task_in_rq(struct task *t)
|
||||||
@ -104,6 +102,13 @@ static inline int task_in_wq(struct task *t)
|
|||||||
struct task *__task_wakeup(struct task *t);
|
struct task *__task_wakeup(struct task *t);
|
||||||
static inline struct task *task_wakeup(struct task *t, unsigned int f)
|
static inline struct task *task_wakeup(struct task *t, unsigned int f)
|
||||||
{
|
{
|
||||||
|
/* If task is running, we postpone the call
|
||||||
|
* and backup the state.
|
||||||
|
*/
|
||||||
|
if (unlikely(t->state & TASK_RUNNING)) {
|
||||||
|
t->pending_state |= f;
|
||||||
|
return t;
|
||||||
|
}
|
||||||
if (likely(!task_in_rq(t)))
|
if (likely(!task_in_rq(t)))
|
||||||
__task_wakeup(t);
|
__task_wakeup(t);
|
||||||
t->state |= f;
|
t->state |= f;
|
||||||
@ -119,8 +124,6 @@ static inline struct task *task_wakeup(struct task *t, unsigned int f)
|
|||||||
static inline struct task *__task_unlink_wq(struct task *t)
|
static inline struct task *__task_unlink_wq(struct task *t)
|
||||||
{
|
{
|
||||||
eb32_delete(&t->wq);
|
eb32_delete(&t->wq);
|
||||||
if (last_timer == &t->wq)
|
|
||||||
last_timer = NULL;
|
|
||||||
return t;
|
return t;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -153,8 +156,6 @@ static inline struct task *__task_unlink_rq(struct task *t)
|
|||||||
static inline struct task *task_unlink_rq(struct task *t)
|
static inline struct task *task_unlink_rq(struct task *t)
|
||||||
{
|
{
|
||||||
if (likely(task_in_rq(t))) {
|
if (likely(task_in_rq(t))) {
|
||||||
if (&t->rq == rq_next)
|
|
||||||
rq_next = eb32_next(rq_next);
|
|
||||||
__task_unlink_rq(t);
|
__task_unlink_rq(t);
|
||||||
}
|
}
|
||||||
return t;
|
return t;
|
||||||
@ -180,7 +181,7 @@ static inline struct task *task_init(struct task *t)
|
|||||||
{
|
{
|
||||||
t->wq.node.leaf_p = NULL;
|
t->wq.node.leaf_p = NULL;
|
||||||
t->rq.node.leaf_p = NULL;
|
t->rq.node.leaf_p = NULL;
|
||||||
t->state = TASK_SLEEPING;
|
t->pending_state = t->state = TASK_SLEEPING;
|
||||||
t->nice = 0;
|
t->nice = 0;
|
||||||
t->calls = 0;
|
t->calls = 0;
|
||||||
return t;
|
return t;
|
||||||
|
@ -54,6 +54,7 @@
|
|||||||
struct task {
|
struct task {
|
||||||
struct eb32_node rq; /* ebtree node used to hold the task in the run queue */
|
struct eb32_node rq; /* ebtree node used to hold the task in the run queue */
|
||||||
unsigned short state; /* task state : bit field of TASK_* */
|
unsigned short state; /* task state : bit field of TASK_* */
|
||||||
|
unsigned short pending_state; /* pending states for running talk */
|
||||||
short nice; /* the task's current nice value from -1024 to +1024 */
|
short nice; /* the task's current nice value from -1024 to +1024 */
|
||||||
unsigned int calls; /* number of times ->process() was called */
|
unsigned int calls; /* number of times ->process() was called */
|
||||||
struct task * (*process)(struct task *t); /* the function which processes the task */
|
struct task * (*process)(struct task *t); /* the function which processes the task */
|
||||||
|
129
src/task.c
129
src/task.c
@ -30,8 +30,7 @@ unsigned int tasks_run_queue = 0;
|
|||||||
unsigned int tasks_run_queue_cur = 0; /* copy of the run queue size */
|
unsigned int tasks_run_queue_cur = 0; /* copy of the run queue size */
|
||||||
unsigned int nb_tasks_cur = 0; /* copy of the tasks count */
|
unsigned int nb_tasks_cur = 0; /* copy of the tasks count */
|
||||||
unsigned int niced_tasks = 0; /* number of niced tasks in the run queue */
|
unsigned int niced_tasks = 0; /* number of niced tasks in the run queue */
|
||||||
struct eb32_node *last_timer = NULL; /* optimization: last queued timer */
|
|
||||||
struct eb32_node *rq_next = NULL; /* optimization: next task except if delete/insert */
|
|
||||||
|
|
||||||
static struct eb_root timers; /* sorted timers tree */
|
static struct eb_root timers; /* sorted timers tree */
|
||||||
static struct eb_root rqueue; /* tree constituting the run queue */
|
static struct eb_root rqueue; /* tree constituting the run queue */
|
||||||
@ -61,11 +60,12 @@ struct task *__task_wakeup(struct task *t)
|
|||||||
t->rq.key += offset;
|
t->rq.key += offset;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* clear state flags at the same time */
|
/* reset flag to pending ones
|
||||||
t->state &= ~TASK_WOKEN_ANY;
|
* Note: __task_wakeup must not be called
|
||||||
|
* if task is running
|
||||||
|
*/
|
||||||
|
t->state = t->pending_state;
|
||||||
eb32_insert(&rqueue, &t->rq);
|
eb32_insert(&rqueue, &t->rq);
|
||||||
rq_next = NULL;
|
|
||||||
return t;
|
return t;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -95,25 +95,8 @@ void __task_queue(struct task *task)
|
|||||||
return;
|
return;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
if (likely(last_timer &&
|
|
||||||
last_timer->node.bit < 0 &&
|
|
||||||
last_timer->key == task->wq.key &&
|
|
||||||
last_timer->node.node_p)) {
|
|
||||||
/* Most often, last queued timer has the same expiration date, so
|
|
||||||
* if it's not queued at the root, let's queue a dup directly there.
|
|
||||||
* Note that we can only use dups at the dup tree's root (most
|
|
||||||
* negative bit).
|
|
||||||
*/
|
|
||||||
eb_insert_dup(&last_timer->node, &task->wq.node);
|
|
||||||
if (task->wq.node.bit < last_timer->node.bit)
|
|
||||||
last_timer = &task->wq;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
eb32_insert(&timers, &task->wq);
|
eb32_insert(&timers, &task->wq);
|
||||||
|
|
||||||
/* Make sure we don't assign the last_timer to a node-less entry */
|
|
||||||
if (task->wq.node.node_p && (!last_timer || (task->wq.node.bit < last_timer->node.bit)))
|
|
||||||
last_timer = &task->wq;
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -126,8 +109,8 @@ int wake_expired_tasks()
|
|||||||
struct task *task;
|
struct task *task;
|
||||||
struct eb32_node *eb;
|
struct eb32_node *eb;
|
||||||
|
|
||||||
eb = eb32_lookup_ge(&timers, now_ms - TIMER_LOOK_BACK);
|
|
||||||
while (1) {
|
while (1) {
|
||||||
|
eb = eb32_lookup_ge(&timers, now_ms - TIMER_LOOK_BACK);
|
||||||
if (unlikely(!eb)) {
|
if (unlikely(!eb)) {
|
||||||
/* we might have reached the end of the tree, typically because
|
/* we might have reached the end of the tree, typically because
|
||||||
* <now_ms> is in the first half and we're first scanning the last
|
* <now_ms> is in the first half and we're first scanning the last
|
||||||
@ -145,7 +128,6 @@ int wake_expired_tasks()
|
|||||||
|
|
||||||
/* timer looks expired, detach it from the queue */
|
/* timer looks expired, detach it from the queue */
|
||||||
task = eb32_entry(eb, struct task, wq);
|
task = eb32_entry(eb, struct task, wq);
|
||||||
eb = eb32_next(eb);
|
|
||||||
__task_unlink_wq(task);
|
__task_unlink_wq(task);
|
||||||
|
|
||||||
/* It is possible that this task was left at an earlier place in the
|
/* It is possible that this task was left at an earlier place in the
|
||||||
@ -165,8 +147,6 @@ int wake_expired_tasks()
|
|||||||
if (!tick_isset(task->expire))
|
if (!tick_isset(task->expire))
|
||||||
continue;
|
continue;
|
||||||
__task_queue(task);
|
__task_queue(task);
|
||||||
if (!eb || eb->key > task->wq.key)
|
|
||||||
eb = &task->wq;
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
task_wakeup(task, TASK_WOKEN_TIMER);
|
task_wakeup(task, TASK_WOKEN_TIMER);
|
||||||
@ -189,12 +169,15 @@ int wake_expired_tasks()
|
|||||||
void process_runnable_tasks()
|
void process_runnable_tasks()
|
||||||
{
|
{
|
||||||
struct task *t;
|
struct task *t;
|
||||||
unsigned int max_processed;
|
int i;
|
||||||
|
int max_processed;
|
||||||
|
struct eb32_node *rq_next;
|
||||||
|
int rewind;
|
||||||
|
struct task *local_tasks[16];
|
||||||
|
int local_tasks_count;
|
||||||
tasks_run_queue_cur = tasks_run_queue; /* keep a copy for reporting */
|
tasks_run_queue_cur = tasks_run_queue; /* keep a copy for reporting */
|
||||||
nb_tasks_cur = nb_tasks;
|
nb_tasks_cur = nb_tasks;
|
||||||
max_processed = tasks_run_queue;
|
max_processed = tasks_run_queue;
|
||||||
|
|
||||||
if (!tasks_run_queue)
|
if (!tasks_run_queue)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
@ -204,45 +187,75 @@ void process_runnable_tasks()
|
|||||||
if (likely(niced_tasks))
|
if (likely(niced_tasks))
|
||||||
max_processed = (max_processed + 3) / 4;
|
max_processed = (max_processed + 3) / 4;
|
||||||
|
|
||||||
while (max_processed--) {
|
while (max_processed > 0) {
|
||||||
/* Note: this loop is one of the fastest code path in
|
/* Note: this loop is one of the fastest code path in
|
||||||
* the whole program. It should not be re-arranged
|
* the whole program. It should not be re-arranged
|
||||||
* without a good reason.
|
* without a good reason.
|
||||||
*/
|
*/
|
||||||
if (unlikely(!rq_next)) {
|
|
||||||
rq_next = eb32_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK);
|
rewind = 0;
|
||||||
|
rq_next = eb32_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK);
|
||||||
|
if (!rq_next) {
|
||||||
|
/* we might have reached the end of the tree, typically because
|
||||||
|
* <rqueue_ticks> is in the first half and we're first scanning
|
||||||
|
* the last half. Let's loop back to the beginning of the tree now.
|
||||||
|
*/
|
||||||
|
rq_next = eb32_first(&rqueue);
|
||||||
if (!rq_next) {
|
if (!rq_next) {
|
||||||
/* we might have reached the end of the tree, typically because
|
break;
|
||||||
* <rqueue_ticks> is in the first half and we're first scanning
|
}
|
||||||
* the last half. Let's loop back to the beginning of the tree now.
|
rewind = 1;
|
||||||
*/
|
}
|
||||||
rq_next = eb32_first(&rqueue);
|
|
||||||
if (!rq_next)
|
local_tasks_count = 0;
|
||||||
|
while (local_tasks_count < 16) {
|
||||||
|
t = eb32_entry(rq_next, struct task, rq);
|
||||||
|
rq_next = eb32_next(rq_next);
|
||||||
|
/* detach the task from the queue */
|
||||||
|
__task_unlink_rq(t);
|
||||||
|
t->state |= TASK_RUNNING;
|
||||||
|
t->pending_state = 0;
|
||||||
|
t->calls++;
|
||||||
|
local_tasks[local_tasks_count++] = t;
|
||||||
|
if (!rq_next) {
|
||||||
|
if (rewind || !(rq_next = eb32_first(&rqueue))) {
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
|
rewind = 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* detach the task from the queue after updating the pointer to
|
if (!local_tasks_count)
|
||||||
* the next entry.
|
break;
|
||||||
*/
|
|
||||||
t = eb32_entry(rq_next, struct task, rq);
|
|
||||||
rq_next = eb32_next(rq_next);
|
|
||||||
__task_unlink_rq(t);
|
|
||||||
|
|
||||||
t->state |= TASK_RUNNING;
|
|
||||||
/* This is an optimisation to help the processor's branch
|
|
||||||
* predictor take this most common call.
|
|
||||||
*/
|
|
||||||
t->calls++;
|
|
||||||
if (likely(t->process == process_stream))
|
|
||||||
t = process_stream(t);
|
|
||||||
else
|
|
||||||
t = t->process(t);
|
|
||||||
|
|
||||||
if (likely(t != NULL)) {
|
for (i = 0; i < local_tasks_count ; i++) {
|
||||||
t->state &= ~TASK_RUNNING;
|
t = local_tasks[i];
|
||||||
if (t->expire)
|
/* This is an optimisation to help the processor's branch
|
||||||
task_queue(t);
|
* predictor take this most common call.
|
||||||
|
*/
|
||||||
|
if (likely(t->process == process_stream))
|
||||||
|
t = process_stream(t);
|
||||||
|
else
|
||||||
|
t = t->process(t);
|
||||||
|
local_tasks[i] = t;
|
||||||
|
}
|
||||||
|
|
||||||
|
max_processed -= local_tasks_count;
|
||||||
|
for (i = 0; i < local_tasks_count ; i++) {
|
||||||
|
t = local_tasks[i];
|
||||||
|
if (likely(t != NULL)) {
|
||||||
|
t->state &= ~TASK_RUNNING;
|
||||||
|
/* If there is a pending state
|
||||||
|
* we have to wake up the task
|
||||||
|
* immediatly, else we defer
|
||||||
|
* it into wait queue
|
||||||
|
*/
|
||||||
|
if (t->pending_state)
|
||||||
|
__task_wakeup(t);
|
||||||
|
else
|
||||||
|
task_queue(t);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user