MAJOR: session: introduce embryonic sessions

When an incoming connection request is accepted, a connection
structure is needed to store its state. However we don't want to
fully initialize a session until the data layer is about to be
ready.

As long as the connection is physically stored into the session,
it's not easy to split both allocations.

As such, we only initialize the minimum requirements of a session,
which results in what we call an embryonic session. Then once the
data layer is ready, we can complete the function's initialization.

Doing so avoids buffers allocation and ensures that a session only
sees ready connections.

The frontend's client timeout is used as the handshake timeout. It
is likely that another timeout will be used in the future.
This commit is contained in:
Willy Tarreau 2012-08-31 16:01:23 +02:00 committed by Willy Tarreau
parent 15678efc45
commit 2542b53b19
5 changed files with 221 additions and 69 deletions

View File

@ -47,6 +47,7 @@ int parse_track_counters(char **args, int *arg,
int section_type, struct proxy *curpx, int section_type, struct proxy *curpx,
struct track_ctr_prm *prm, struct track_ctr_prm *prm,
struct proxy *defpx, char **err); struct proxy *defpx, char **err);
int conn_session_initialize(struct connection *conn, int flag);
/* Remove the refcount from the session to the tracked counters, and clear the /* Remove the refcount from the session to the tracked counters, and clear the
* pointer to ensure this is only performed once. The caller is responsible for * pointer to ensure this is only performed once. The caller is responsible for

View File

@ -75,6 +75,8 @@ enum {
/* below we have all handshake flags grouped into one */ /* below we have all handshake flags grouped into one */
CO_FL_HANDSHAKE = CO_FL_SI_SEND_PROXY, CO_FL_HANDSHAKE = CO_FL_SI_SEND_PROXY,
CO_FL_INIT_SESS = 0x00000800, /* initialize a session before using data */
/* when any of these flags is set, polling is defined by socket-layer /* when any of these flags is set, polling is defined by socket-layer
* operations, as opposed to data-layer. * operations, as opposed to data-layer.
*/ */

View File

@ -50,7 +50,7 @@
#define SN_FORCE_PRST 0x00000010 /* force persistence here, even if server is down */ #define SN_FORCE_PRST 0x00000010 /* force persistence here, even if server is down */
#define SN_MONITOR 0x00000020 /* this session comes from a monitoring system */ #define SN_MONITOR 0x00000020 /* this session comes from a monitoring system */
#define SN_CURR_SESS 0x00000040 /* a connection is currently being counted on the server */ #define SN_CURR_SESS 0x00000040 /* a connection is currently being counted on the server */
/* unused: 0x00000080 */ #define SN_INITIALIZED 0x00000080 /* the session was fully initialized */
#define SN_REDISP 0x00000100 /* set if this session was redispatched from one server to another */ #define SN_REDISP 0x00000100 /* set if this session was redispatched from one server to another */
#define SN_CONN_TAR 0x00000200 /* set if this session is turning around before reconnecting */ #define SN_CONN_TAR 0x00000200 /* set if this session is turning around before reconnecting */
#define SN_REDIRECTABLE 0x00000400 /* set if this session is redirectable (GET or HEAD) */ #define SN_REDIRECTABLE 0x00000400 /* set if this session is redirectable (GET or HEAD) */

View File

@ -15,6 +15,7 @@
#include <proto/connection.h> #include <proto/connection.h>
#include <proto/proto_tcp.h> #include <proto/proto_tcp.h>
#include <proto/session.h>
#include <proto/stream_interface.h> #include <proto/stream_interface.h>
/* I/O callback for fd-based connections. It calls the read/write handlers /* I/O callback for fd-based connections. It calls the read/write handlers
@ -25,7 +26,7 @@ int conn_fd_handler(int fd)
struct connection *conn = fdtab[fd].owner; struct connection *conn = fdtab[fd].owner;
if (unlikely(!conn)) if (unlikely(!conn))
goto leave; return 0;
process_handshake: process_handshake:
/* The handshake callbacks are called in sequence. If either of them is /* The handshake callbacks are called in sequence. If either of them is
@ -47,6 +48,14 @@ int conn_fd_handler(int fd)
if (!(conn->flags & CO_FL_POLL_SOCK)) if (!(conn->flags & CO_FL_POLL_SOCK))
__conn_sock_stop_both(conn); __conn_sock_stop_both(conn);
/* Maybe we need to finish initializing an incoming session. The
* function may fail and cause the connection to be destroyed, thus
* we must not use it anymore and should immediately leave instead.
*/
if ((conn->flags & CO_FL_INIT_SESS) &&
conn_session_initialize(conn, CO_FL_INIT_SESS) < 0)
return 0;
if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR)) if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR))
conn->app_cb->recv(conn); conn->app_cb->recv(conn);
@ -80,6 +89,13 @@ int conn_fd_handler(int fd)
} }
leave: leave:
/* we may need to release the connection which is an embryonic session */
if ((conn->flags & (CO_FL_ERROR|CO_FL_INIT_SESS)) == (CO_FL_ERROR|CO_FL_INIT_SESS)) {
conn->flags |= CO_FL_ERROR;
conn_session_complete(conn, CO_FL_INIT_SESS);
return 0;
}
if (conn->flags & CO_FL_NOTIFY_SI) if (conn->flags & CO_FL_NOTIFY_SI)
conn_notify_si(conn); conn_notify_si(conn);

View File

@ -48,16 +48,19 @@
struct pool_head *pool2_session; struct pool_head *pool2_session;
struct list sessions; struct list sessions;
/* This function is called from the protocol layer accept() in order to instanciate static struct task *expire_mini_session(struct task *t);
* a new session on behalf of a given listener and frontend. It returns a positive int session_complete(struct session *s);
* value upon success, 0 if the connection can be ignored, or a negative value upon
* critical failure. The accepted file descriptor is closed if we return <= 0. /* This function is called from the protocol layer accept() in order to
* instanciate a new embryonic session on behalf of a given listener and
* frontend. It returns a positive value upon success, 0 if the connection
* can be ignored, or a negative value upon critical failure. The accepted
* file descriptor is closed if we return <= 0.
*/ */
int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr) int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr)
{ {
struct proxy *p = l->frontend; struct proxy *p = l->frontend;
struct session *s; struct session *s;
struct http_txn *txn;
struct task *t; struct task *t;
int ret; int ret;
@ -67,7 +70,12 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr)
if (unlikely((s = pool_alloc2(pool2_session)) == NULL)) if (unlikely((s = pool_alloc2(pool2_session)) == NULL))
goto out_close; goto out_close;
/* minimum session initialization required for monitor mode below */ /* minimum session initialization required for an embryonic session is
* fairly low. We need very little to execute L4 ACLs, then we need a
* task to make the client-side connection live on its own.
* - flags
* - stick-entry tracking
*/
s->flags = 0; s->flags = 0;
s->logs.logwait = p->to_log; s->logs.logwait = p->to_log;
s->stkctr1_entry = NULL; s->stkctr1_entry = NULL;
@ -75,40 +83,25 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr)
s->stkctr1_table = NULL; s->stkctr1_table = NULL;
s->stkctr2_table = NULL; s->stkctr2_table = NULL;
if (unlikely((t = task_new()) == NULL)) s->listener = l;
goto out_free_session; s->fe = p;
/* OK, we're keeping the session, so let's properly initialize the session */ /* OK, we're keeping the session, so let's properly initialize the session */
LIST_ADDQ(&sessions, &s->list);
LIST_INIT(&s->back_refs);
s->unique_id = NULL;
s->term_trace = 0;
s->si[0].conn.t.sock.fd = cfd; s->si[0].conn.t.sock.fd = cfd;
s->si[0].conn.ctrl = l->proto; s->si[0].conn.ctrl = l->proto;
s->si[0].conn.flags = CO_FL_NONE | CO_FL_NOTIFY_SI; /* we're on a stream_interface */ s->si[0].conn.flags = CO_FL_NONE;
s->si[0].conn.addr.from = *addr; s->si[0].conn.addr.from = *addr;
set_target_client(&s->si[0].conn.target, l);
s->logs.accept_date = date; /* user-visible date for logging */ s->logs.accept_date = date; /* user-visible date for logging */
s->logs.tv_accept = now; /* corrected date for internal use */ s->logs.tv_accept = now; /* corrected date for internal use */
s->uniq_id = totalconn; s->uniq_id = totalconn;
p->feconn++; /* beconn will be increased once assigned */ p->feconn++;
/* This session was accepted, count it now */
if (p->feconn > p->fe_counters.conn_max)
p->fe_counters.conn_max = p->feconn;
proxy_inc_fe_conn_ctr(l, p); /* note: cum_beconn will be increased once assigned */ proxy_inc_fe_conn_ctr(l, p);
t->process = l->handler;
t->context = s;
t->nice = l->nice;
t->expire = TICK_ETERNITY;
s->task = t;
s->listener = l;
/* Note: initially, the session's backend points to the frontend.
* This changes later when switching rules are executed or
* when the default backend is assigned.
*/
s->be = s->fe = p;
s->req = s->rep = NULL; /* will be allocated later */
/* if this session comes from a known monitoring system, we want to ignore /* if this session comes from a known monitoring system, we want to ignore
* it as soon as possible, which means closing it immediately for TCP, but * it as soon as possible, which means closing it immediately for TCP, but
@ -129,13 +122,182 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr)
/* let's do a no-linger now to close with a single RST. */ /* let's do a no-linger now to close with a single RST. */
setsockopt(cfd, SOL_SOCKET, SO_LINGER, (struct linger *) &nolinger, sizeof(struct linger)); setsockopt(cfd, SOL_SOCKET, SO_LINGER, (struct linger *) &nolinger, sizeof(struct linger));
ret = 0; /* successful termination */ ret = 0; /* successful termination */
goto out_free_task; goto out_free_session;
} }
/* This session was accepted, count it now */ /* Adjust some socket options */
if (p->feconn > p->fe_counters.conn_max) if (unlikely(fcntl(cfd, F_SETFL, O_NONBLOCK) == -1))
p->fe_counters.conn_max = p->feconn; goto out_free_session;
if (unlikely((t = task_new()) == NULL))
goto out_free_session;
t->context = s;
t->nice = l->nice;
s->task = t;
/* add the various callbacks. Right now the data layer is present but
* not initialized. Also note we need to be careful as the stream int
* is not initialized yet.
*/
si_prepare_conn(&s->si[0], l->proto, l->data);
/* finish initialization of the accepted file descriptor */
fd_insert(cfd);
fdtab[cfd].owner = &s->si[0].conn;
fdtab[cfd].flags = 0;
fdtab[cfd].iocb = conn_fd_handler;
conn_data_want_recv(&s->si[0].conn);
if (conn_data_init(&s->si[0].conn) < 0)
goto out_free_task;
/* OK, now either we have a pending handshake to execute with and
* then we must return to the I/O layer, or we can proceed with the
* end of the session initialization. In case of handshake, we also
* set the I/O timeout to the frontend's client timeout.
*/
if (s->si[0].conn.flags & CO_FL_HANDSHAKE) {
t->process = expire_mini_session;
t->expire = tick_add_ifset(now_ms, p->timeout.client);
task_queue(t);
s->si[0].conn.flags |= CO_FL_INIT_SESS;
return 1;
}
/* OK let's complete session initialization */
ret = session_complete(s);
if (ret > 0)
return ret;
/* Error unrolling */
out_free_task:
task_free(t);
out_free_session:
p->feconn--;
if (s->stkctr1_entry || s->stkctr2_entry)
session_store_counters(s);
pool_free2(pool2_session, s);
out_close:
if (ret < 0 && p->mode == PR_MODE_HTTP) {
/* critical error, no more memory, try to emit a 500 response */
struct chunk *err_msg = error_message(s, HTTP_ERR_500);
send(cfd, err_msg->str, err_msg->len, MSG_DONTWAIT|MSG_NOSIGNAL);
}
if (fdtab[cfd].owner)
fd_delete(cfd);
else
close(cfd);
return ret;
}
/* This function kills an existing embryonic session. It stops the connection's
* data layer, releases assigned resources, resumes the listener if it was
* disabled and finally kills the file descriptor.
*/
static void kill_mini_session(struct session *s)
{
/* kill the connection now */
conn_data_close(&s->si[0].conn);
s->fe->feconn--;
if (s->stkctr1_entry || s->stkctr2_entry)
session_store_counters(s);
if (!(s->listener->options & LI_O_UNLIMITED))
actconn--;
jobs--;
s->listener->nbconn--;
if (s->listener->state == LI_FULL)
resume_listener(s->listener);
/* Dequeues all of the listeners waiting for a resource */
if (!LIST_ISEMPTY(&global_listener_queue))
dequeue_all_listeners(&global_listener_queue);
if (!LIST_ISEMPTY(&s->fe->listener_queue) &&
(!s->fe->fe_sps_lim || freq_ctr_remain(&s->fe->fe_sess_per_sec, s->fe->fe_sps_lim, 0) > 0))
dequeue_all_listeners(&s->fe->listener_queue);
task_delete(s->task);
task_free(s->task);
if (fdtab[s->si[0].conn.t.sock.fd].owner)
fd_delete(s->si[0].conn.t.sock.fd);
else
close(s->si[0].conn.t.sock.fd);
pool_free2(pool2_session, s);
}
/* Finish initializing a session from a connection. Returns <0 if the
* connection was killed.
*/
int conn_session_initialize(struct connection *conn, int flag)
{
struct session *s = container_of(conn, struct session, si[0].conn);
if (session_complete(s) > 0) {
conn->flags &= ~flag;
return 0;
}
/* kill the connection now */
kill_mini_session(s);
return -1;
}
/* Manages embryonic sessions timeout. It is only called when the timeout
* strikes and performs the required cleanup.
*/
static struct task *expire_mini_session(struct task *t)
{
struct session *s = t->context;
if (!(t->state & TASK_WOKEN_TIMER))
return t;
kill_mini_session(s);
return NULL;
}
/* This function is called from the I/O handler which detects the end of
* handshake, in order to complete initialization of a valid session. It must
* be called with an embryonic session. It returns a positive value upon
* success, 0 if the connection can be ignored, or a negative value upon
* critical failure. The accepted file descriptor is closed if we return <= 0.
*/
int session_complete(struct session *s)
{
struct listener *l = s->listener;
struct proxy *p = s->fe;
struct http_txn *txn;
struct task *t = s->task;
int ret;
ret = -1; /* assume unrecoverable error by default */
/* OK, we're keeping the session, so let's properly initialize the session */
LIST_ADDQ(&sessions, &s->list);
LIST_INIT(&s->back_refs);
s->flags |= SN_INITIALIZED;
s->unique_id = NULL;
s->term_trace = 0;
t->process = l->handler;
t->context = s;
t->expire = TICK_ETERNITY;
/* Note: initially, the session's backend points to the frontend.
* This changes later when switching rules are executed or
* when the default backend is assigned.
*/
s->be = s->fe;
s->req = s->rep = NULL; /* will be allocated later */
/* Let's count a session now */
proxy_inc_fe_sess_ctr(l, p); proxy_inc_fe_sess_ctr(l, p);
if (s->stkctr1_entry) { if (s->stkctr1_entry) {
void *ptr; void *ptr;
@ -170,16 +332,12 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr)
s->si[0].err_loc = NULL; s->si[0].err_loc = NULL;
s->si[0].release = NULL; s->si[0].release = NULL;
s->si[0].send_proxy_ofs = 0; s->si[0].send_proxy_ofs = 0;
set_target_client(&s->si[0].conn.target, l);
s->si[0].exp = TICK_ETERNITY; s->si[0].exp = TICK_ETERNITY;
s->si[0].flags = SI_FL_NONE; s->si[0].flags = SI_FL_NONE;
if (likely(s->fe->options2 & PR_O2_INDEPSTR)) if (likely(s->fe->options2 & PR_O2_INDEPSTR))
s->si[0].flags |= SI_FL_INDEP_STR; s->si[0].flags |= SI_FL_INDEP_STR;
/* add the various callbacks */
si_prepare_conn(&s->si[0], l->proto, l->data);
/* pre-initialize the other side's stream interface to an INIT state. The /* pre-initialize the other side's stream interface to an INIT state. The
* callbacks will be initialized before attempting to connect. * callbacks will be initialized before attempting to connect.
*/ */
@ -207,10 +365,6 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr)
/* init store persistence */ /* init store persistence */
s->store_count = 0; s->store_count = 0;
/* Adjust some socket options */
if (unlikely(fcntl(cfd, F_SETFL, O_NONBLOCK) == -1))
goto out_free_task;
if (unlikely((s->req = pool_alloc2(pool2_channel)) == NULL)) if (unlikely((s->req = pool_alloc2(pool2_channel)) == NULL))
goto out_free_task; /* no memory */ goto out_free_task; /* no memory */
@ -273,13 +427,7 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr)
txn->rsp.buf = s->rep; txn->rsp.buf = s->rep;
/* finish initialization of the accepted file descriptor */ /* finish initialization of the accepted file descriptor */
fd_insert(cfd);
fdtab[cfd].owner = &s->si[0].conn;
fdtab[cfd].flags = 0;
fdtab[cfd].iocb = conn_fd_handler;
conn_data_want_recv(&s->si[0].conn); conn_data_want_recv(&s->si[0].conn);
if (conn_data_init(&s->si[0].conn) < 0)
goto out_free_rep;
if (p->accept && (ret = p->accept(s)) <= 0) { if (p->accept && (ret = p->accept(s)) <= 0) {
/* Either we had an unrecoverable error (<0) or work is /* Either we had an unrecoverable error (<0) or work is
@ -289,6 +437,9 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr)
goto out_free_rep; goto out_free_rep;
} }
/* we want the connection handler to notify the stream interface about updates. */
s->si[0].conn.flags |= CO_FL_NOTIFY_SI;
/* it is important not to call the wakeup function directly but to /* it is important not to call the wakeup function directly but to
* pass through task_wakeup(), because this one knows how to apply * pass through task_wakeup(), because this one knows how to apply
* priorities to tasks. * priorities to tasks.
@ -302,24 +453,6 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr)
out_free_req: out_free_req:
pool_free2(pool2_channel, s->req); pool_free2(pool2_channel, s->req);
out_free_task: out_free_task:
p->feconn--;
if (s->stkctr1_entry || s->stkctr2_entry)
session_store_counters(s);
task_free(t);
LIST_DEL(&s->list);
out_free_session:
pool_free2(pool2_session, s);
out_close:
if (ret < 0 && s->fe->mode == PR_MODE_HTTP) {
/* critical error, no more memory, try to emit a 500 response */
struct chunk *err_msg = error_message(s, HTTP_ERR_500);
send(cfd, err_msg->str, err_msg->len, MSG_DONTWAIT|MSG_NOSIGNAL);
}
if (fdtab[cfd].owner)
fd_delete(cfd);
else
close(cfd);
return ret; return ret;
} }