diff --git a/include/proto/session.h b/include/proto/session.h index e9b72bff0..5e26edc76 100644 --- a/include/proto/session.h +++ b/include/proto/session.h @@ -30,6 +30,7 @@ extern struct pool_head *pool2_session; extern struct list sessions; +extern struct list buffer_wq; extern struct data_cb sess_conn_cb; @@ -53,6 +54,7 @@ int parse_track_counters(char **args, int *arg, /* Update the session's backend and server time stats */ void session_update_time_stats(struct session *s); +void session_offer_buffers(int count); int session_alloc_buffers(struct session *s); void session_release_buffers(struct session *s); int session_alloc_recv_buffer(struct session *s, struct buffer **buf); diff --git a/include/types/session.h b/include/types/session.h index f17aff423..1f3ba487c 100644 --- a/include/types/session.h +++ b/include/types/session.h @@ -123,6 +123,7 @@ struct session { struct list list; /* position in global sessions list */ struct list by_srv; /* position in server session list */ struct list back_refs; /* list of users tracking this session */ + struct list buffer_wait; /* position in the list of sessions waiting for a buffer */ struct { struct stksess *ts; diff --git a/src/session.c b/src/session.c index fdd99ba95..d2368599d 100644 --- a/src/session.c +++ b/src/session.c @@ -51,6 +51,9 @@ struct pool_head *pool2_session; struct list sessions; +/* list of sessions waiting for at least one buffer */ +struct list buffer_wq = LIST_HEAD_INIT(buffer_wq); + static int conn_session_complete(struct connection *conn); static int conn_session_update(struct connection *conn); static struct task *expire_mini_session(struct task *t); @@ -409,6 +412,7 @@ int session_complete(struct session *s) /* OK, we're keeping the session, so let's properly initialize the session */ LIST_ADDQ(&sessions, &s->list); LIST_INIT(&s->back_refs); + LIST_INIT(&s->buffer_wait); s->flags |= SN_INITIALIZED; s->unique_id = NULL; @@ -618,8 +622,16 @@ static void session_free(struct session *s) if (s->rep->pipe) put_pipe(s->rep->pipe); - b_free(&s->req->buf); - b_free(&s->rep->buf); + /* We may still be present in the buffer wait queue */ + if (!LIST_ISEMPTY(&s->buffer_wait)) { + LIST_DEL(&s->buffer_wait); + LIST_INIT(&s->buffer_wait); + } + + b_drop(&s->req->buf); + b_drop(&s->rep->buf); + if (!LIST_ISEMPTY(&buffer_wq)) + session_offer_buffers(1); pool_free2(pool2_channel, s->req); pool_free2(pool2_channel, s->rep); @@ -688,48 +700,75 @@ int session_alloc_recv_buffer(struct session *s, struct buffer **buf) if (b) return 1; - /* FIXME: normally we're supposed to subscribe to a list of waiters - * for buffers. We release what we failed to allocate. - */ + if (LIST_ISEMPTY(&s->buffer_wait)) + LIST_ADDQ(&buffer_wq, &s->buffer_wait); return 0; } /* Allocates up to two buffers for session . Only succeeds if both buffers * are properly allocated. It is meant to be called inside process_session() so - * that both request and response buffers are allocated. Returns 0 incase of + * that both request and response buffers are allocated. Returns 0 in case of * failure, non-zero otherwise. */ int session_alloc_buffers(struct session *s) { - if (!s->req->buf->size && !b_alloc(&s->req->buf)) - return 0; - - if (s->rep->buf->size || b_alloc(&s->rep->buf)) - return 1; - - if (buffer_empty(s->req->buf)) { - __b_drop(&s->req->buf); - s->req->buf = &buf_wanted; + if (!LIST_ISEMPTY(&s->buffer_wait)) { + LIST_DEL(&s->buffer_wait); + LIST_INIT(&s->buffer_wait); } - /* FIXME: normally we're supposed to subscribe to a list of waiters - * for buffers. We release what we failed to allocate. - */ + if ((s->req->buf->size || b_alloc(&s->req->buf)) && + (s->rep->buf->size || b_alloc(&s->rep->buf))) + return 1; + + session_release_buffers(s); + LIST_ADDQ(&buffer_wq, &s->buffer_wait); return 0; } /* releases unused buffers after processing. Typically used at the end of the - * update() functions. + * update() functions. It will try to wake up as many tasks as the number of + * buffers that it releases. In practice, most often sessions are blocked on + * a single buffer, so it makes sense to try to wake two up when two buffers + * are released at once. */ void session_release_buffers(struct session *s) { + int release_count = 0; + + release_count = !!s->req->buf->size + !!s->rep->buf->size; + if (s->req->buf->size && buffer_empty(s->req->buf)) b_free(&s->req->buf); if (s->rep->buf->size && buffer_empty(s->rep->buf)) b_free(&s->rep->buf); - /* FIXME: normally we want to wake up pending tasks */ + /* if we're certain to have at least 1 buffer available, and there is + * someone waiting, we can wake up a waiter and offer them. + */ + if (release_count >= 1 && !LIST_ISEMPTY(&buffer_wq)) + session_offer_buffers(release_count); +} + +/* run across the list of pending sessions waiting for a buffer and wake + * one up if buffers are available. + */ +void session_offer_buffers(int count) +{ + struct session *sess, *bak; + + list_for_each_entry_safe(sess, bak, &buffer_wq, buffer_wait) { + if (sess->task->state & TASK_RUNNING) + continue; + + LIST_DEL(&sess->buffer_wait); + LIST_INIT(&sess->buffer_wait); + + task_wakeup(sess->task, TASK_WOKEN_RES); + if (--count <= 0) + break; + } } /* perform minimal intializations, report 0 in case of error, 1 if OK. */ diff --git a/src/stream_interface.c b/src/stream_interface.c index 075deef5d..2c47953d8 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -30,6 +31,7 @@ #include #include #include +#include #include #include