mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2025-01-31 10:31:46 +00:00
MAJOR: stream: don't initialize the stream anymore in stream_accept
The function now only initializes a session, calls the tcp req connection rules, and calls stream_complete() to finish initialization. If a handshake is needed, it is done without allocating the stream at all. Temporarily, in order to limit the amount of changes, the task allocated is put into sess->task, and it is used by the connection for the handshake or is offered to the stream. At this point we set the relation between sess/task/conn this way : orig -- sess <-- context | ^ +- task -+ | v | v | conn -- owner task The task must not remain in the session and ultimately it is planned to remove this task pointer from the session because it can be found by having conn->owner = task, and looping back from sess to conn, and to find the session from the connection via the task.
This commit is contained in:
parent
5ecb069c58
commit
02a0c0e407
@ -34,6 +34,7 @@
|
||||
#include <types/obj_type.h>
|
||||
#include <types/proxy.h>
|
||||
#include <types/stick_table.h>
|
||||
#include <types/task.h>
|
||||
|
||||
struct session {
|
||||
struct proxy *fe; /* the proxy this session depends on for the client side */
|
||||
@ -42,6 +43,7 @@ struct session {
|
||||
struct timeval accept_date; /* date of the session's accept() in user date */
|
||||
struct timeval tv_accept; /* date of the session's accept() in internal date (monotonic) */
|
||||
struct stkctr stkctr[MAX_SESS_STKCTR]; /* stick counters for tcp-connection */
|
||||
struct task *task; /* temporary, for embryonic sessions */
|
||||
};
|
||||
|
||||
#endif /* _TYPES_SESSION_H */
|
||||
|
154
src/stream.c
154
src/stream.c
@ -59,7 +59,7 @@ struct list buffer_wq = LIST_HEAD_INIT(buffer_wq);
|
||||
static int conn_stream_complete(struct connection *conn);
|
||||
static int conn_stream_update(struct connection *conn);
|
||||
static struct task *expire_mini_session(struct task *t);
|
||||
int stream_complete(struct stream *s);
|
||||
int stream_complete(struct session *s);
|
||||
|
||||
/* data layer callbacks for an embryonic stream */
|
||||
struct data_cb sess_conn_cb = {
|
||||
@ -80,7 +80,6 @@ int stream_accept(struct listener *l, int cfd, struct sockaddr_storage *addr)
|
||||
struct connection *cli_conn;
|
||||
struct proxy *p = l->frontend;
|
||||
struct session *sess;
|
||||
struct stream *s;
|
||||
struct task *t;
|
||||
int ret;
|
||||
|
||||
@ -106,6 +105,14 @@ int stream_accept(struct listener *l, int cfd, struct sockaddr_storage *addr)
|
||||
conn_sock_want_recv(cli_conn);
|
||||
}
|
||||
|
||||
/* Finish setting the callbacks. Right now the transport layer is present
|
||||
* but not initialized. Also note we need to be careful as the stream
|
||||
* int is not initialized yet.
|
||||
*/
|
||||
conn_data_want_recv(cli_conn);
|
||||
if (conn_xprt_init(cli_conn) < 0)
|
||||
goto out_free_conn;
|
||||
|
||||
sess = pool_alloc2(pool2_session);
|
||||
if (!sess)
|
||||
goto out_free_conn;
|
||||
@ -163,65 +170,27 @@ int stream_accept(struct listener *l, int cfd, struct sockaddr_storage *addr)
|
||||
goto out_free_sess;
|
||||
}
|
||||
|
||||
if (unlikely((s = pool_alloc2(pool2_stream)) == NULL))
|
||||
if (unlikely((t = task_new()) == NULL))
|
||||
goto out_free_sess;
|
||||
|
||||
/* minimum stream initialization required for an embryonic stream is
|
||||
* fairly low. We need very little to execute L4 ACLs, then we need a
|
||||
* task to make the client-side connection live on its own.
|
||||
* - flags
|
||||
* - stick-entry tracking
|
||||
*/
|
||||
s->flags = 0;
|
||||
s->logs.logwait = p->to_log;
|
||||
s->logs.level = 0;
|
||||
|
||||
/* Initialise the current rule list pointer to NULL. We are sure that
|
||||
* any rulelist match the NULL pointer.
|
||||
*/
|
||||
s->current_rule_list = NULL;
|
||||
|
||||
memset(s->stkctr, 0, sizeof(s->stkctr));
|
||||
|
||||
s->sess = sess;
|
||||
s->si[0].flags = SI_FL_NONE;
|
||||
s->si[1].flags = SI_FL_ISBACK;
|
||||
|
||||
s->logs.accept_date = sess->accept_date; /* user-visible date for logging */
|
||||
s->logs.tv_accept = sess->tv_accept; /* corrected date for internal use */
|
||||
s->uniq_id = global.req_count++;
|
||||
|
||||
/* Add the minimum callbacks to prepare the connection's control layer.
|
||||
* We need this so that we can safely execute the ACLs used by the
|
||||
* "tcp-request connection" ruleset. We also carefully attach the
|
||||
* connection to the stream interface without initializing the rest,
|
||||
* so that ACLs can use si[0]->end.
|
||||
*/
|
||||
si_attach_conn(&s->si[0], cli_conn);
|
||||
conn_attach(cli_conn, s, &sess_conn_cb);
|
||||
|
||||
if (unlikely((t = task_new()) == NULL))
|
||||
goto out_free_strm;
|
||||
|
||||
t->context = s;
|
||||
t->context = sess;
|
||||
t->nice = l->nice;
|
||||
s->task = t;
|
||||
|
||||
/* Finish setting the callbacks. Right now the transport layer is present
|
||||
* but not initialized. Also note we need to be careful as the stream
|
||||
* int is not initialized yet.
|
||||
*/
|
||||
conn_data_want_recv(cli_conn);
|
||||
if (conn_xprt_init(cli_conn) < 0)
|
||||
goto out_free_task;
|
||||
sess->task = t;
|
||||
|
||||
/* OK, now either we have a pending handshake to execute with and
|
||||
* then we must return to the I/O layer, or we can proceed with the
|
||||
* end of the stream initialization. In case of handshake, we also
|
||||
* set the I/O timeout to the frontend's client timeout.
|
||||
*
|
||||
* At this point we set the relation between sess/task/conn this way :
|
||||
*
|
||||
* orig -- sess <-- context
|
||||
* | ^ +- task -+ |
|
||||
* v | v |
|
||||
* conn -- owner task
|
||||
*/
|
||||
|
||||
if (cli_conn->flags & CO_FL_HANDSHAKE) {
|
||||
conn_attach(cli_conn, sess, &sess_conn_cb);
|
||||
t->process = expire_mini_session;
|
||||
t->expire = tick_add_ifset(now_ms, p->timeout.client);
|
||||
task_queue(t);
|
||||
@ -229,17 +198,11 @@ int stream_accept(struct listener *l, int cfd, struct sockaddr_storage *addr)
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* OK let's complete stream initialization since there is no handshake */
|
||||
cli_conn->flags |= CO_FL_CONNECTED;
|
||||
ret = stream_complete(s);
|
||||
ret = stream_complete(sess);
|
||||
if (ret > 0)
|
||||
return ret;
|
||||
|
||||
/* Error unrolling */
|
||||
out_free_task:
|
||||
task_free(t);
|
||||
out_free_strm:
|
||||
pool_free2(pool2_stream, s);
|
||||
out_free_sess:
|
||||
p->feconn--;
|
||||
session_free(sess);
|
||||
@ -298,10 +261,9 @@ static void prepare_mini_sess_log_prefix(struct session *sess)
|
||||
* disabled and finally kills the file descriptor. This function requires that
|
||||
* at sess->origin points to the incoming connection.
|
||||
*/
|
||||
static void kill_mini_session(struct stream *s)
|
||||
static void kill_mini_session(struct session *sess)
|
||||
{
|
||||
int level = LOG_INFO;
|
||||
struct session *sess = s->sess;
|
||||
struct connection *conn = __objt_conn(sess->origin);
|
||||
unsigned int log = sess->fe->to_log;
|
||||
const char *err_msg;
|
||||
@ -318,7 +280,7 @@ static void kill_mini_session(struct stream *s)
|
||||
}
|
||||
|
||||
if (log) {
|
||||
if (!conn->err_code && (s->task->state & TASK_WOKEN_TIMER)) {
|
||||
if (!conn->err_code && (sess->task->state & TASK_WOKEN_TIMER)) {
|
||||
if (conn->flags & CO_FL_ACCEPT_PROXY)
|
||||
conn->err_code = CO_ER_PRX_TIMEOUT;
|
||||
else if (conn->flags & CO_FL_SSL_WAIT_HS)
|
||||
@ -355,12 +317,8 @@ static void kill_mini_session(struct stream *s)
|
||||
(!sess->fe->fe_sps_lim || freq_ctr_remain(&sess->fe->fe_sess_per_sec, sess->fe->fe_sps_lim, 0) > 0))
|
||||
dequeue_all_listeners(&sess->fe->listener_queue);
|
||||
|
||||
task_delete(s->task);
|
||||
task_free(s->task);
|
||||
/* FIXME: for now we have a 1:1 relation between stream and session so
|
||||
* the stream must free the session.
|
||||
*/
|
||||
pool_free2(pool2_stream, s);
|
||||
task_delete(sess->task);
|
||||
task_free(sess->task);
|
||||
session_free(sess);
|
||||
}
|
||||
|
||||
@ -369,15 +327,15 @@ static void kill_mini_session(struct stream *s)
|
||||
*/
|
||||
static int conn_stream_complete(struct connection *conn)
|
||||
{
|
||||
struct stream *s = conn->owner;
|
||||
struct session *sess = conn->owner;
|
||||
|
||||
if (!(conn->flags & CO_FL_ERROR) && (stream_complete(s) > 0)) {
|
||||
if (!(conn->flags & CO_FL_ERROR) && (stream_complete(sess) > 0)) {
|
||||
conn->flags &= ~CO_FL_INIT_DATA;
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* kill the connection now */
|
||||
kill_mini_session(s);
|
||||
kill_mini_session(sess);
|
||||
return -1;
|
||||
}
|
||||
|
||||
@ -398,12 +356,12 @@ static int conn_stream_update(struct connection *conn)
|
||||
*/
|
||||
static struct task *expire_mini_session(struct task *t)
|
||||
{
|
||||
struct stream *s = t->context;
|
||||
struct session *sess = t->context;
|
||||
|
||||
if (!(t->state & TASK_WOKEN_TIMER))
|
||||
return t;
|
||||
|
||||
kill_mini_session(s);
|
||||
kill_mini_session(sess);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
@ -415,18 +373,58 @@ static struct task *expire_mini_session(struct task *t)
|
||||
* The client-side end point is assumed to be a connection, whose pointer is
|
||||
* taken from sess->origin which is assumed to be valid.
|
||||
*/
|
||||
int stream_complete(struct stream *s)
|
||||
int stream_complete(struct session *sess)
|
||||
{
|
||||
struct session *sess = s->sess;
|
||||
struct stream *s;
|
||||
struct listener *l = sess->listener;
|
||||
struct proxy *p = sess->fe;
|
||||
struct task *t = s->task;
|
||||
struct task *t = sess->task;
|
||||
struct connection *conn = __objt_conn(sess->origin);
|
||||
int ret;
|
||||
int i;
|
||||
|
||||
ret = -1; /* assume unrecoverable error by default */
|
||||
|
||||
if (unlikely((s = pool_alloc2(pool2_stream)) == NULL))
|
||||
goto out_return;
|
||||
|
||||
/* minimum stream initialization required for an embryonic stream is
|
||||
* fairly low. We need very little to execute L4 ACLs, then we need a
|
||||
* task to make the client-side connection live on its own.
|
||||
* - flags
|
||||
* - stick-entry tracking
|
||||
*/
|
||||
s->flags = 0;
|
||||
s->logs.logwait = p->to_log;
|
||||
s->logs.level = 0;
|
||||
|
||||
/* Initialise the current rule list pointer to NULL. We are sure that
|
||||
* any rulelist match the NULL pointer.
|
||||
*/
|
||||
s->current_rule_list = NULL;
|
||||
|
||||
memset(s->stkctr, 0, sizeof(s->stkctr));
|
||||
|
||||
s->sess = sess;
|
||||
s->si[0].flags = SI_FL_NONE;
|
||||
s->si[1].flags = SI_FL_ISBACK;
|
||||
|
||||
s->logs.accept_date = sess->accept_date; /* user-visible date for logging */
|
||||
s->logs.tv_accept = sess->tv_accept; /* corrected date for internal use */
|
||||
s->uniq_id = global.req_count++;
|
||||
|
||||
/* Add the minimum callbacks to prepare the connection's control layer.
|
||||
* We need this so that we can safely execute the ACLs used by the
|
||||
* "tcp-request connection" ruleset. We also carefully attach the
|
||||
* connection to the stream interface without initializing the rest,
|
||||
* so that ACLs can use si[0]->end.
|
||||
*/
|
||||
si_attach_conn(&s->si[0], conn);
|
||||
conn_attach(conn, s, &sess_conn_cb);
|
||||
|
||||
/* OK let's complete stream initialization since there is no handshake */
|
||||
conn->flags |= CO_FL_CONNECTED;
|
||||
|
||||
/* OK, we're keeping the stream, so let's properly initialize the stream */
|
||||
LIST_ADDQ(&streams, &s->list);
|
||||
LIST_INIT(&s->back_refs);
|
||||
@ -435,6 +433,7 @@ int stream_complete(struct stream *s)
|
||||
s->flags |= SF_INITIALIZED;
|
||||
s->unique_id = NULL;
|
||||
|
||||
s->task = t;
|
||||
t->process = l->handler;
|
||||
t->context = s;
|
||||
t->expire = TICK_ETERNITY;
|
||||
@ -553,10 +552,9 @@ int stream_complete(struct stream *s)
|
||||
|
||||
/* Error unrolling */
|
||||
out_free_strm:
|
||||
/* and restore the connection pointer in case we destroyed it,
|
||||
* because kill_mini_session() will need it.
|
||||
*/
|
||||
LIST_DEL(&s->list);
|
||||
pool_free2(pool2_stream, s);
|
||||
out_return:
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user