From bf883e0aa7aaa8f30dedff775d252800a17e8650 Mon Sep 17 00:00:00 2001 From: Willy Tarreau Date: Tue, 25 Nov 2014 21:10:35 +0100 Subject: [PATCH] MAJOR: session: implement a wait-queue for sessions who need a buffer When a session_alloc_buffers() fails to allocate one or two buffers, it subscribes the session to buffer_wq, and waits for another session to release buffers. It's then removed from the queue and woken up with TASK_WAKE_RES, and can attempt its allocation again. We decide to try to wake as many waiters as we release buffers so that if we release 2 and two waiters need only once, they both have their chance. We must never come to the situation where we don't wake enough tasks up. It's common to release buffers after the completion of an I/O callback, which can happen even if the I/O could not be performed due to half a failure on memory allocation. In this situation, we don't want to move out of the wait queue the session that was just added, otherwise it will never get any buffer. Thus, we only force ourselves out of the queue when freeing the session. Note: at the moment, since session_alloc_buffers() is not used, no task is subscribed to the wait queue. --- include/proto/session.h | 2 ++ include/types/session.h | 1 + src/session.c | 79 ++++++++++++++++++++++++++++++----------- src/stream_interface.c | 2 ++ 4 files changed, 64 insertions(+), 20 deletions(-) diff --git a/include/proto/session.h b/include/proto/session.h index e9b72bff0..5e26edc76 100644 --- a/include/proto/session.h +++ b/include/proto/session.h @@ -30,6 +30,7 @@ extern struct pool_head *pool2_session; extern struct list sessions; +extern struct list buffer_wq; extern struct data_cb sess_conn_cb; @@ -53,6 +54,7 @@ int parse_track_counters(char **args, int *arg, /* Update the session's backend and server time stats */ void session_update_time_stats(struct session *s); +void session_offer_buffers(int count); int session_alloc_buffers(struct session *s); void session_release_buffers(struct session *s); int session_alloc_recv_buffer(struct session *s, struct buffer **buf); diff --git a/include/types/session.h b/include/types/session.h index f17aff423..1f3ba487c 100644 --- a/include/types/session.h +++ b/include/types/session.h @@ -123,6 +123,7 @@ struct session { struct list list; /* position in global sessions list */ struct list by_srv; /* position in server session list */ struct list back_refs; /* list of users tracking this session */ + struct list buffer_wait; /* position in the list of sessions waiting for a buffer */ struct { struct stksess *ts; diff --git a/src/session.c b/src/session.c index fdd99ba95..d2368599d 100644 --- a/src/session.c +++ b/src/session.c @@ -51,6 +51,9 @@ struct pool_head *pool2_session; struct list sessions; +/* list of sessions waiting for at least one buffer */ +struct list buffer_wq = LIST_HEAD_INIT(buffer_wq); + static int conn_session_complete(struct connection *conn); static int conn_session_update(struct connection *conn); static struct task *expire_mini_session(struct task *t); @@ -409,6 +412,7 @@ int session_complete(struct session *s) /* OK, we're keeping the session, so let's properly initialize the session */ LIST_ADDQ(&sessions, &s->list); LIST_INIT(&s->back_refs); + LIST_INIT(&s->buffer_wait); s->flags |= SN_INITIALIZED; s->unique_id = NULL; @@ -618,8 +622,16 @@ static void session_free(struct session *s) if (s->rep->pipe) put_pipe(s->rep->pipe); - b_free(&s->req->buf); - b_free(&s->rep->buf); + /* We may still be present in the buffer wait queue */ + if (!LIST_ISEMPTY(&s->buffer_wait)) { + LIST_DEL(&s->buffer_wait); + LIST_INIT(&s->buffer_wait); + } + + b_drop(&s->req->buf); + b_drop(&s->rep->buf); + if (!LIST_ISEMPTY(&buffer_wq)) + session_offer_buffers(1); pool_free2(pool2_channel, s->req); pool_free2(pool2_channel, s->rep); @@ -688,48 +700,75 @@ int session_alloc_recv_buffer(struct session *s, struct buffer **buf) if (b) return 1; - /* FIXME: normally we're supposed to subscribe to a list of waiters - * for buffers. We release what we failed to allocate. - */ + if (LIST_ISEMPTY(&s->buffer_wait)) + LIST_ADDQ(&buffer_wq, &s->buffer_wait); return 0; } /* Allocates up to two buffers for session . Only succeeds if both buffers * are properly allocated. It is meant to be called inside process_session() so - * that both request and response buffers are allocated. Returns 0 incase of + * that both request and response buffers are allocated. Returns 0 in case of * failure, non-zero otherwise. */ int session_alloc_buffers(struct session *s) { - if (!s->req->buf->size && !b_alloc(&s->req->buf)) - return 0; - - if (s->rep->buf->size || b_alloc(&s->rep->buf)) - return 1; - - if (buffer_empty(s->req->buf)) { - __b_drop(&s->req->buf); - s->req->buf = &buf_wanted; + if (!LIST_ISEMPTY(&s->buffer_wait)) { + LIST_DEL(&s->buffer_wait); + LIST_INIT(&s->buffer_wait); } - /* FIXME: normally we're supposed to subscribe to a list of waiters - * for buffers. We release what we failed to allocate. - */ + if ((s->req->buf->size || b_alloc(&s->req->buf)) && + (s->rep->buf->size || b_alloc(&s->rep->buf))) + return 1; + + session_release_buffers(s); + LIST_ADDQ(&buffer_wq, &s->buffer_wait); return 0; } /* releases unused buffers after processing. Typically used at the end of the - * update() functions. + * update() functions. It will try to wake up as many tasks as the number of + * buffers that it releases. In practice, most often sessions are blocked on + * a single buffer, so it makes sense to try to wake two up when two buffers + * are released at once. */ void session_release_buffers(struct session *s) { + int release_count = 0; + + release_count = !!s->req->buf->size + !!s->rep->buf->size; + if (s->req->buf->size && buffer_empty(s->req->buf)) b_free(&s->req->buf); if (s->rep->buf->size && buffer_empty(s->rep->buf)) b_free(&s->rep->buf); - /* FIXME: normally we want to wake up pending tasks */ + /* if we're certain to have at least 1 buffer available, and there is + * someone waiting, we can wake up a waiter and offer them. + */ + if (release_count >= 1 && !LIST_ISEMPTY(&buffer_wq)) + session_offer_buffers(release_count); +} + +/* run across the list of pending sessions waiting for a buffer and wake + * one up if buffers are available. + */ +void session_offer_buffers(int count) +{ + struct session *sess, *bak; + + list_for_each_entry_safe(sess, bak, &buffer_wq, buffer_wait) { + if (sess->task->state & TASK_RUNNING) + continue; + + LIST_DEL(&sess->buffer_wait); + LIST_INIT(&sess->buffer_wait); + + task_wakeup(sess->task, TASK_WOKEN_RES); + if (--count <= 0) + break; + } } /* perform minimal intializations, report 0 in case of error, 1 if OK. */ diff --git a/src/stream_interface.c b/src/stream_interface.c index 075deef5d..2c47953d8 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -30,6 +31,7 @@ #include #include #include +#include #include #include