MAJOR: session: implement a wait-queue for sessions who need a buffer

When a session_alloc_buffers() fails to allocate one or two buffers,
it subscribes the session to buffer_wq, and waits for another session
to release buffers. It's then removed from the queue and woken up with
TASK_WAKE_RES, and can attempt its allocation again.

We decide to try to wake as many waiters as we release buffers so
that if we release 2 and two waiters need only once, they both have
their chance. We must never come to the situation where we don't wake
enough tasks up.

It's common to release buffers after the completion of an I/O callback,
which can happen even if the I/O could not be performed due to half a
failure on memory allocation. In this situation, we don't want to move
out of the wait queue the session that was just added, otherwise it
will never get any buffer. Thus, we only force ourselves out of the
queue when freeing the session.

Note: at the moment, since session_alloc_buffers() is not used, no task
is subscribed to the wait queue.
This commit is contained in:
Willy Tarreau 2014-11-25 21:10:35 +01:00
parent 656859d478
commit bf883e0aa7
4 changed files with 64 additions and 20 deletions

View File

@ -30,6 +30,7 @@
extern struct pool_head *pool2_session; extern struct pool_head *pool2_session;
extern struct list sessions; extern struct list sessions;
extern struct list buffer_wq;
extern struct data_cb sess_conn_cb; extern struct data_cb sess_conn_cb;
@ -53,6 +54,7 @@ int parse_track_counters(char **args, int *arg,
/* Update the session's backend and server time stats */ /* Update the session's backend and server time stats */
void session_update_time_stats(struct session *s); void session_update_time_stats(struct session *s);
void session_offer_buffers(int count);
int session_alloc_buffers(struct session *s); int session_alloc_buffers(struct session *s);
void session_release_buffers(struct session *s); void session_release_buffers(struct session *s);
int session_alloc_recv_buffer(struct session *s, struct buffer **buf); int session_alloc_recv_buffer(struct session *s, struct buffer **buf);

View File

@ -123,6 +123,7 @@ struct session {
struct list list; /* position in global sessions list */ struct list list; /* position in global sessions list */
struct list by_srv; /* position in server session list */ struct list by_srv; /* position in server session list */
struct list back_refs; /* list of users tracking this session */ struct list back_refs; /* list of users tracking this session */
struct list buffer_wait; /* position in the list of sessions waiting for a buffer */
struct { struct {
struct stksess *ts; struct stksess *ts;

View File

@ -51,6 +51,9 @@
struct pool_head *pool2_session; struct pool_head *pool2_session;
struct list sessions; struct list sessions;
/* list of sessions waiting for at least one buffer */
struct list buffer_wq = LIST_HEAD_INIT(buffer_wq);
static int conn_session_complete(struct connection *conn); static int conn_session_complete(struct connection *conn);
static int conn_session_update(struct connection *conn); static int conn_session_update(struct connection *conn);
static struct task *expire_mini_session(struct task *t); static struct task *expire_mini_session(struct task *t);
@ -409,6 +412,7 @@ int session_complete(struct session *s)
/* OK, we're keeping the session, so let's properly initialize the session */ /* OK, we're keeping the session, so let's properly initialize the session */
LIST_ADDQ(&sessions, &s->list); LIST_ADDQ(&sessions, &s->list);
LIST_INIT(&s->back_refs); LIST_INIT(&s->back_refs);
LIST_INIT(&s->buffer_wait);
s->flags |= SN_INITIALIZED; s->flags |= SN_INITIALIZED;
s->unique_id = NULL; s->unique_id = NULL;
@ -618,8 +622,16 @@ static void session_free(struct session *s)
if (s->rep->pipe) if (s->rep->pipe)
put_pipe(s->rep->pipe); put_pipe(s->rep->pipe);
b_free(&s->req->buf); /* We may still be present in the buffer wait queue */
b_free(&s->rep->buf); if (!LIST_ISEMPTY(&s->buffer_wait)) {
LIST_DEL(&s->buffer_wait);
LIST_INIT(&s->buffer_wait);
}
b_drop(&s->req->buf);
b_drop(&s->rep->buf);
if (!LIST_ISEMPTY(&buffer_wq))
session_offer_buffers(1);
pool_free2(pool2_channel, s->req); pool_free2(pool2_channel, s->req);
pool_free2(pool2_channel, s->rep); pool_free2(pool2_channel, s->rep);
@ -688,48 +700,75 @@ int session_alloc_recv_buffer(struct session *s, struct buffer **buf)
if (b) if (b)
return 1; return 1;
/* FIXME: normally we're supposed to subscribe to a list of waiters if (LIST_ISEMPTY(&s->buffer_wait))
* for buffers. We release what we failed to allocate. LIST_ADDQ(&buffer_wq, &s->buffer_wait);
*/
return 0; return 0;
} }
/* Allocates up to two buffers for session <s>. Only succeeds if both buffers /* Allocates up to two buffers for session <s>. Only succeeds if both buffers
* are properly allocated. It is meant to be called inside process_session() so * are properly allocated. It is meant to be called inside process_session() so
* that both request and response buffers are allocated. Returns 0 incase of * that both request and response buffers are allocated. Returns 0 in case of
* failure, non-zero otherwise. * failure, non-zero otherwise.
*/ */
int session_alloc_buffers(struct session *s) int session_alloc_buffers(struct session *s)
{ {
if (!s->req->buf->size && !b_alloc(&s->req->buf)) if (!LIST_ISEMPTY(&s->buffer_wait)) {
return 0; LIST_DEL(&s->buffer_wait);
LIST_INIT(&s->buffer_wait);
if (s->rep->buf->size || b_alloc(&s->rep->buf))
return 1;
if (buffer_empty(s->req->buf)) {
__b_drop(&s->req->buf);
s->req->buf = &buf_wanted;
} }
/* FIXME: normally we're supposed to subscribe to a list of waiters if ((s->req->buf->size || b_alloc(&s->req->buf)) &&
* for buffers. We release what we failed to allocate. (s->rep->buf->size || b_alloc(&s->rep->buf)))
*/ return 1;
session_release_buffers(s);
LIST_ADDQ(&buffer_wq, &s->buffer_wait);
return 0; return 0;
} }
/* releases unused buffers after processing. Typically used at the end of the /* releases unused buffers after processing. Typically used at the end of the
* update() functions. * update() functions. It will try to wake up as many tasks as the number of
* buffers that it releases. In practice, most often sessions are blocked on
* a single buffer, so it makes sense to try to wake two up when two buffers
* are released at once.
*/ */
void session_release_buffers(struct session *s) void session_release_buffers(struct session *s)
{ {
int release_count = 0;
release_count = !!s->req->buf->size + !!s->rep->buf->size;
if (s->req->buf->size && buffer_empty(s->req->buf)) if (s->req->buf->size && buffer_empty(s->req->buf))
b_free(&s->req->buf); b_free(&s->req->buf);
if (s->rep->buf->size && buffer_empty(s->rep->buf)) if (s->rep->buf->size && buffer_empty(s->rep->buf))
b_free(&s->rep->buf); b_free(&s->rep->buf);
/* FIXME: normally we want to wake up pending tasks */ /* if we're certain to have at least 1 buffer available, and there is
* someone waiting, we can wake up a waiter and offer them.
*/
if (release_count >= 1 && !LIST_ISEMPTY(&buffer_wq))
session_offer_buffers(release_count);
}
/* run across the list of pending sessions waiting for a buffer and wake
* one up if buffers are available.
*/
void session_offer_buffers(int count)
{
struct session *sess, *bak;
list_for_each_entry_safe(sess, bak, &buffer_wq, buffer_wait) {
if (sess->task->state & TASK_RUNNING)
continue;
LIST_DEL(&sess->buffer_wait);
LIST_INIT(&sess->buffer_wait);
task_wakeup(sess->task, TASK_WOKEN_RES);
if (--count <= 0)
break;
}
} }
/* perform minimal intializations, report 0 in case of error, 1 if OK. */ /* perform minimal intializations, report 0 in case of error, 1 if OK. */

View File

@ -19,6 +19,7 @@
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/types.h> #include <sys/types.h>
#include <common/buffer.h>
#include <common/compat.h> #include <common/compat.h>
#include <common/config.h> #include <common/config.h>
#include <common/debug.h> #include <common/debug.h>
@ -30,6 +31,7 @@
#include <proto/connection.h> #include <proto/connection.h>
#include <proto/fd.h> #include <proto/fd.h>
#include <proto/pipe.h> #include <proto/pipe.h>
#include <proto/session.h>
#include <proto/stream_interface.h> #include <proto/stream_interface.h>
#include <proto/task.h> #include <proto/task.h>