mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2024-12-25 06:02:08 +00:00
[MAJOR] replace the wait-queue linked list with an rbtree.
This patch from Sin Yu makes use of an rbtree for the wait queue, which will solve the slowdown problem encountered when timeouts are heterogenous in the configuration. The next step will be to turn maintain_proxies() into a per-proxy task so that we won't have to scan them all after each poll() loop.
This commit is contained in:
parent
d59d22e20a
commit
964c936b04
2
Makefile
2
Makefile
@ -182,7 +182,7 @@ OBJS = src/haproxy.o src/list.o src/chtbl.o src/hashpjw.o src/base64.o \
|
||||
src/time.o src/fd.o src/regex.o src/cfgparse.o src/server.o \
|
||||
src/checks.o src/queue.o src/capture.o src/client.o src/proxy.o \
|
||||
src/proto_http.o src/stream_sock.o src/appsession.o src/backend.o \
|
||||
src/session.o src/hdr_idx.o
|
||||
src/session.o src/hdr_idx.o src/rbtree.o
|
||||
|
||||
haproxy: $(OBJS)
|
||||
$(LD) $(LDFLAGS) -o $@ $^ $(LIBS)
|
||||
|
@ -87,7 +87,7 @@ OBJS = src/haproxy.o src/list.o src/chtbl.o src/hashpjw.o src/base64.o \
|
||||
src/time.o src/fd.o src/regex.o src/cfgparse.o src/server.o \
|
||||
src/checks.o src/queue.o src/capture.o src/client.o src/proxy.o \
|
||||
src/proto_http.o src/stream_sock.o src/appsession.o src/backend.o \
|
||||
src/session.o src/hdr_idx.o
|
||||
src/session.o src/hdr_idx.o src/rbtree.o
|
||||
|
||||
all: haproxy
|
||||
|
||||
|
@ -61,8 +61,8 @@ static inline struct task *task_sleep(struct task **q, struct task *t)
|
||||
*/
|
||||
static inline struct task *task_delete(struct task *t)
|
||||
{
|
||||
t->prev->next = t->next;
|
||||
t->next->prev = t->prev;
|
||||
rb_erase(&t->rb_node, t->wq);
|
||||
t->wq = NULL;
|
||||
return t;
|
||||
}
|
||||
|
||||
|
@ -25,6 +25,7 @@
|
||||
#include <sys/time.h>
|
||||
|
||||
#include <common/config.h>
|
||||
#include <common/rbtree.h>
|
||||
|
||||
/* values for task->state */
|
||||
#define TASK_IDLE 0
|
||||
@ -32,9 +33,9 @@
|
||||
|
||||
/* The base for all tasks */
|
||||
struct task {
|
||||
struct task *next, *prev; /* chaining ... */
|
||||
struct rb_node rb_node;
|
||||
struct rb_root *wq;
|
||||
struct task *rqnext; /* chaining in run queue ... */
|
||||
struct task *wq; /* the wait queue this task is in */
|
||||
int state; /* task state : IDLE or RUNNING */
|
||||
struct timeval expire; /* next expiration time for this task, use only for fast sorting */
|
||||
int (*process)(struct task *t); /* the function which processes the task */
|
||||
@ -44,7 +45,7 @@ struct task {
|
||||
#define sizeof_task sizeof(struct task)
|
||||
extern void **pool_task;
|
||||
|
||||
extern struct task wait_queue[2];
|
||||
extern struct rb_root wait_queue[2];
|
||||
extern struct task *rq;
|
||||
|
||||
|
||||
|
@ -113,8 +113,8 @@ int appsession_task_init(void)
|
||||
if (!initialized) {
|
||||
if ((t = pool_alloc(task)) == NULL)
|
||||
return -1;
|
||||
t->next = t->prev = t->rqnext = NULL;
|
||||
t->wq = LIST_HEAD(wait_queue[0]);
|
||||
t->wq = NULL;
|
||||
t->rqnext = NULL;
|
||||
t->state = TASK_IDLE;
|
||||
t->context = NULL;
|
||||
tv_delayfrom(&t->expire, &now, TBLCHKINT);
|
||||
|
@ -2293,8 +2293,8 @@ int readcfgfile(const char *file)
|
||||
return -1;
|
||||
}
|
||||
|
||||
t->next = t->prev = t->rqnext = NULL; /* task not in run queue yet */
|
||||
t->wq = LIST_HEAD(wait_queue[1]); /* already assigned to the eternity queue */
|
||||
t->rqnext = NULL;
|
||||
t->wq = NULL;
|
||||
t->state = TASK_IDLE;
|
||||
t->process = process_srv_queue;
|
||||
t->context = newsrv;
|
||||
@ -2340,8 +2340,8 @@ int readcfgfile(const char *file)
|
||||
return -1;
|
||||
}
|
||||
|
||||
t->next = t->prev = t->rqnext = NULL; /* task not in run queue yet */
|
||||
t->wq = LIST_HEAD(wait_queue[0]); /* but already has a wait queue assigned */
|
||||
t->wq = NULL;
|
||||
t->rqnext = NULL;
|
||||
t->state = TASK_IDLE;
|
||||
t->process = process_chk;
|
||||
t->context = newsrv;
|
||||
|
@ -150,8 +150,8 @@ int event_accept(int fd) {
|
||||
if (p->options & PR_O_TCP_CLI_KA)
|
||||
setsockopt(cfd, SOL_SOCKET, SO_KEEPALIVE, (char *) &one, sizeof(one));
|
||||
|
||||
t->next = t->prev = t->rqnext = NULL; /* task not in run queue yet */
|
||||
t->wq = LIST_HEAD(wait_queue[0]); /* but already has a wait queue assigned */
|
||||
t->wq = NULL;
|
||||
t->rqnext = NULL;
|
||||
t->state = TASK_IDLE;
|
||||
t->process = process_session;
|
||||
t->context = s;
|
||||
|
@ -255,12 +255,13 @@ void sig_dump_state(int sig)
|
||||
|
||||
void dump(int sig)
|
||||
{
|
||||
struct task *t, *tnext;
|
||||
struct task *t;
|
||||
struct session *s;
|
||||
struct rb_node *node;
|
||||
|
||||
tnext = ((struct task *)LIST_HEAD(wait_queue[0]))->next;
|
||||
while ((t = tnext) != LIST_HEAD(wait_queue[0])) { /* we haven't looped ? */
|
||||
tnext = t->next;
|
||||
for(node = rb_first(&wait_queue[0]);
|
||||
node != NULL; node = rb_next(node)) {
|
||||
t = rb_entry(node, struct task, rb_node);
|
||||
s = t->context;
|
||||
qfprintf(stderr,"[dump] wq: task %p, still %ld ms, "
|
||||
"cli=%d, srv=%d, cr=%d, cw=%d, sr=%d, sw=%d, "
|
||||
|
192
src/task.c
192
src/task.c
@ -22,112 +22,86 @@ extern int maintain_proxies(void);
|
||||
|
||||
void **pool_task= NULL;
|
||||
struct task *rq = NULL; /* global run queue */
|
||||
struct task wait_queue[2] = { /* global wait queue */
|
||||
{
|
||||
prev:LIST_HEAD(wait_queue[0]), /* expirable tasks */
|
||||
next:LIST_HEAD(wait_queue[0]),
|
||||
},
|
||||
{
|
||||
prev:LIST_HEAD(wait_queue[1]), /* non-expirable tasks */
|
||||
next:LIST_HEAD(wait_queue[1]),
|
||||
},
|
||||
|
||||
struct rb_root wait_queue[2] = {
|
||||
RB_ROOT,
|
||||
RB_ROOT,
|
||||
};
|
||||
|
||||
|
||||
/* inserts <task> into its assigned wait queue, where it may already be. In this case, it
|
||||
* may be only moved or left where it was, depending on its timing requirements.
|
||||
* <task> is returned.
|
||||
*/
|
||||
static inline void __rb_insert_task_queue(struct task *newtask)
|
||||
{
|
||||
struct rb_node **p = &newtask->wq->rb_node;
|
||||
struct rb_node *parent = NULL;
|
||||
struct task * task;
|
||||
|
||||
while (*p)
|
||||
{
|
||||
parent = *p;
|
||||
task = rb_entry(parent, struct task, rb_node);
|
||||
if (tv_cmp2(&task->expire, &newtask->expire) >= 0)
|
||||
p = &(*p)->rb_left;
|
||||
else
|
||||
p = &(*p)->rb_right;
|
||||
}
|
||||
rb_link_node(&newtask->rb_node, parent, p);
|
||||
}
|
||||
|
||||
static inline void rb_insert_task_queue(struct task *newtask)
|
||||
{
|
||||
__rb_insert_task_queue(newtask);
|
||||
rb_insert_color(&newtask->rb_node, newtask->wq);
|
||||
}
|
||||
|
||||
|
||||
struct task *task_queue(struct task *task)
|
||||
{
|
||||
struct task *list = task->wq;
|
||||
struct task *start_from;
|
||||
struct rb_node *node;
|
||||
struct task *next, *prev;
|
||||
|
||||
/* This is a very dirty hack to queue non-expirable tasks in another queue
|
||||
* in order to avoid pulluting the tail of the standard queue. This will go
|
||||
* away with the new O(log(n)) scheduler anyway.
|
||||
*/
|
||||
if (tv_iseternity(&task->expire)) {
|
||||
/* if the task was queued in the standard wait queue, we must dequeue it */
|
||||
if (task->prev) {
|
||||
if (task->wq == LIST_HEAD(wait_queue[1]))
|
||||
if (task->wq) {
|
||||
if (task->wq == &wait_queue[1])
|
||||
return task;
|
||||
else {
|
||||
else
|
||||
task_delete(task);
|
||||
task->prev = NULL;
|
||||
}
|
||||
task->wq = &wait_queue[1];
|
||||
rb_insert_task_queue(task);
|
||||
return task;
|
||||
} else {
|
||||
if (task->wq != &wait_queue[0]) {
|
||||
if (task->wq)
|
||||
task_delete(task);
|
||||
task->wq = &wait_queue[0];
|
||||
rb_insert_task_queue(task);
|
||||
return task;
|
||||
}
|
||||
|
||||
// check whether task should be re insert
|
||||
node = rb_prev(&task->rb_node);
|
||||
if (node) {
|
||||
prev = rb_entry(node, struct task, rb_node);
|
||||
if (tv_cmp2(&prev->expire, &task->expire) >= 0) {
|
||||
task_delete(task);
|
||||
task->wq = &wait_queue[0];
|
||||
rb_insert_task_queue(task);
|
||||
return task;
|
||||
}
|
||||
}
|
||||
list = task->wq = LIST_HEAD(wait_queue[1]);
|
||||
} else {
|
||||
/* if the task was queued in the eternity queue, we must dequeue it */
|
||||
if (task->prev && (task->wq == LIST_HEAD(wait_queue[1]))) {
|
||||
task_delete(task);
|
||||
task->prev = NULL;
|
||||
list = task->wq = LIST_HEAD(wait_queue[0]);
|
||||
}
|
||||
}
|
||||
|
||||
/* next, test if the task was already in a list */
|
||||
if (task->prev == NULL) {
|
||||
// start_from = list;
|
||||
start_from = list->prev;
|
||||
/* insert the unlinked <task> into the list, searching back from the last entry */
|
||||
while (start_from != list && tv_cmp2(&task->expire, &start_from->expire) < 0) {
|
||||
start_from = start_from->prev;
|
||||
node = rb_next(&task->rb_node);
|
||||
if (node) {
|
||||
next = rb_entry(node, struct task, rb_node);
|
||||
if (tv_cmp2(&task->expire, &next->expire) > 0) {
|
||||
task_delete(task);
|
||||
task->wq = &wait_queue[0];
|
||||
rb_insert_task_queue(task);
|
||||
return task;
|
||||
}
|
||||
}
|
||||
|
||||
// while (start_from->next != list && tv_cmp2(&task->expire, &start_from->next->expire) > 0) {
|
||||
// start_from = start_from->next;
|
||||
// stats_tsk_nsrch++;
|
||||
// }
|
||||
}
|
||||
else if (task->prev == list ||
|
||||
tv_cmp2(&task->expire, &task->prev->expire) >= 0) { /* walk right */
|
||||
start_from = task->next;
|
||||
if (start_from == list || tv_cmp2(&task->expire, &start_from->expire) <= 0) {
|
||||
return task; /* it's already in the right place */
|
||||
}
|
||||
|
||||
/* if the task is not at the right place, there's little chance that
|
||||
* it has only shifted a bit, and it will nearly always be queued
|
||||
* at the end of the list because of constant timeouts
|
||||
* (observed in real case).
|
||||
*/
|
||||
#ifndef WE_REALLY_THINK_THAT_THIS_TASK_MAY_HAVE_SHIFTED
|
||||
start_from = list->prev; /* assume we'll queue to the end of the list */
|
||||
while (start_from != list && tv_cmp2(&task->expire, &start_from->expire) < 0) {
|
||||
start_from = start_from->prev;
|
||||
}
|
||||
#else /* WE_REALLY_... */
|
||||
/* insert the unlinked <task> into the list, searching after position <start_from> */
|
||||
while (start_from->next != list && tv_cmp2(&task->expire, &start_from->next->expire) > 0) {
|
||||
start_from = start_from->next;
|
||||
}
|
||||
#endif /* WE_REALLY_... */
|
||||
|
||||
/* we need to unlink it now */
|
||||
task_delete(task);
|
||||
return task;
|
||||
}
|
||||
else { /* walk left. */
|
||||
#ifdef LEFT_TO_TOP /* not very good */
|
||||
start_from = list;
|
||||
while (start_from->next != list && tv_cmp2(&task->expire, &start_from->next->expire) > 0) {
|
||||
start_from = start_from->next;
|
||||
}
|
||||
#else
|
||||
start_from = task->prev->prev; /* valid because of the previous test above */
|
||||
while (start_from != list && tv_cmp2(&task->expire, &start_from->expire) < 0) {
|
||||
start_from = start_from->prev;
|
||||
}
|
||||
#endif
|
||||
/* we need to unlink it now */
|
||||
task_delete(task);
|
||||
}
|
||||
task->prev = start_from;
|
||||
task->next = start_from->next;
|
||||
task->next->prev = task;
|
||||
start_from->next = task;
|
||||
return task;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -136,37 +110,26 @@ struct task *task_queue(struct task *task)
|
||||
* - call all runnable tasks
|
||||
* - call maintain_proxies() to enable/disable the listeners
|
||||
* - return the delay till next event in ms, -1 = wait indefinitely
|
||||
* Note: this part should be rewritten with the O(ln(n)) scheduler.
|
||||
*
|
||||
*/
|
||||
|
||||
int process_runnable_tasks()
|
||||
{
|
||||
int next_time;
|
||||
int time2;
|
||||
struct task *t, *tnext;
|
||||
struct task *t;
|
||||
struct rb_node *node;
|
||||
|
||||
next_time = TIME_ETERNITY; /* set the timer to wait eternally first */
|
||||
|
||||
/* look for expired tasks and add them to the run queue.
|
||||
*/
|
||||
tnext = ((struct task *)LIST_HEAD(wait_queue[0]))->next;
|
||||
while ((t = tnext) != LIST_HEAD(wait_queue[0])) { /* we haven't looped ? */
|
||||
tnext = t->next;
|
||||
next_time = TIME_ETERNITY;
|
||||
for (node = rb_first(&wait_queue[0]);
|
||||
node != NULL; node = rb_next(node)) {
|
||||
t = rb_entry(node, struct task, rb_node);
|
||||
if (t->state & TASK_RUNNING)
|
||||
continue;
|
||||
|
||||
if (tv_iseternity(&t->expire))
|
||||
continue;
|
||||
|
||||
/* wakeup expired entries. It doesn't matter if they are
|
||||
* already running because of a previous event
|
||||
*/
|
||||
if (tv_cmp_ms(&t->expire, &now) <= 0) {
|
||||
task_wakeup(&rq, t);
|
||||
}
|
||||
else {
|
||||
/* first non-runnable task. Use its expiration date as an upper bound */
|
||||
} else {
|
||||
int temp_time = tv_remain(&now, &t->expire);
|
||||
if (temp_time)
|
||||
next_time = temp_time;
|
||||
@ -177,7 +140,7 @@ int process_runnable_tasks()
|
||||
/* process each task in the run queue now. Each task may be deleted
|
||||
* since we only use the run queue's head. Note that any task can be
|
||||
* woken up by any other task and it will be processed immediately
|
||||
* after as it will be queued on the run queue's head.
|
||||
* after as it will be queued on the run queue's head !
|
||||
*/
|
||||
while ((t = rq) != NULL) {
|
||||
int temp_time;
|
||||
@ -186,13 +149,14 @@ int process_runnable_tasks()
|
||||
temp_time = t->process(t);
|
||||
next_time = MINTIME(temp_time, next_time);
|
||||
}
|
||||
|
||||
/* maintain all proxies in a consistent state. This should quickly become a task */
|
||||
|
||||
/* maintain all proxies in a consistent state. This should quickly
|
||||
* become a task because it becomes expensive when there are huge
|
||||
* numbers of proxies. */
|
||||
time2 = maintain_proxies();
|
||||
return MINTIME(time2, next_time);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Local variables:
|
||||
* c-indent-level: 8
|
||||
|
Loading…
Reference in New Issue
Block a user