MEDIUM: lua: make use of stream_new() to create an outgoing connection

This significantly simplifies the session management because we don't
have to know all the intimate tricks of setting up a stream and a
session.
This commit is contained in:
Willy Tarreau 2015-04-06 00:39:18 +02:00
parent d1769b8b9a
commit d420a975d7

View File

@ -1999,6 +1999,7 @@ __LJMP static int hlua_socket_new(lua_State *L)
struct hlua_socket *socket;
struct appctx *appctx;
struct session *sess;
struct task *task;
/* Check stack size. */
if (!lua_checkstack(L, 3)) {
@ -2022,47 +2023,35 @@ __LJMP static int hlua_socket_new(lua_State *L)
lua_rawgeti(L, LUA_REGISTRYINDEX, class_socket_ref);
lua_setmetatable(L, -2);
/*
*
* Get memory for the request.
*
*/
sess = pool_alloc2(pool2_session);
/* Create the applet context */
appctx = appctx_new(&update_applet);
if (!appctx)
goto out_fail_conf;
appctx->ctx.hlua.socket = socket;
appctx->ctx.hlua.connected = 0;
LIST_INIT(&appctx->ctx.hlua.wake_on_write);
LIST_INIT(&appctx->ctx.hlua.wake_on_read);
/* Now create a session, task and stream for this applet */
sess = session_new(&socket_proxy, NULL, &appctx->obj_type);
if (!sess) {
hlua_pusherror(L, "socket: out of memory");
goto out_fail_conf;
goto out_fail_sess;
}
sess->accept_date = date; /* user-visible date for logging */
sess->tv_accept = now; /* corrected date for internal use */
memset(sess->stkctr, 0, sizeof(sess->stkctr));
if ((task = task_new()) == NULL) {
hlua_pusherror(L, "socket: out of memory");
goto out_fail_task;
}
task->nice = 0;
socket->s = pool_alloc2(pool2_stream);
if (!socket->s) {
if ((socket->s = stream_new(sess, task)) == NULL) {
hlua_pusherror(L, "socket: out of memory");
goto out_fail_stream;
}
socket->s->sess = sess;
socket->s->task = task_new();
if (!socket->s->task) {
hlua_pusherror(L, "socket: out of memory");
goto out_fail_task;
}
socket->s->req.buf = pool_alloc2(pool2_buffer);
if (!socket->s->req.buf) {
hlua_pusherror(L, "socket: out of memory");
goto out_fail_req_buf;
}
socket->s->res.buf = pool_alloc2(pool2_buffer);
if (!socket->s->res.buf) {
hlua_pusherror(L, "socket: out of memory");
goto out_fail_rep_buf;
}
/* Configura empty Lua for the stream. */
/* Configure an empty Lua for the stream. */
socket->s->hlua.T = NULL;
socket->s->hlua.Tref = LUA_REFNIL;
socket->s->hlua.Mref = LUA_REFNIL;
@ -2070,158 +2059,23 @@ __LJMP static int hlua_socket_new(lua_State *L)
socket->s->hlua.flags = 0;
LIST_INIT(&socket->s->hlua.com);
/* stream initialisation. */
stream_init_srv_conn(socket->s);
/*
*
* Configure the associated task.
*
*/
/* This is the dedicated function to process the stream. This function
* is able to establish the conection, process the timeouts, etc ...
*/
socket->s->task->process = process_stream;
/* Back reference to stream. This is used by process_stream(). */
socket->s->task->context = socket->s;
/* The priority of the task is normal. */
socket->s->task->nice = 0;
/* Init the next run to eternity. Later in this function, this task is
* waked.
*/
socket->s->task->expire = TICK_ETERNITY;
/*
*
* Initialize the attached buffers
*
*/
socket->s->req.buf->size = global.tune.bufsize;
socket->s->res.buf->size = global.tune.bufsize;
/*
*
* Initialize channels.
*
*/
/* This function reset the struct. It must be called
* before the configuration.
*/
channel_init(&socket->s->req);
channel_init(&socket->s->res);
socket->s->res.flags |= CF_ISRESP;
socket->s->req.analysers = 0;
/* Adjust the stream's timeouts */
socket->s->req.rto = socket_proxy.timeout.client;
socket->s->req.wto = socket_proxy.timeout.server;
socket->s->req.rex = TICK_ETERNITY;
socket->s->req.wex = TICK_ETERNITY;
socket->s->req.analyse_exp = TICK_ETERNITY;
socket->s->res.analysers = 0;
socket->s->res.rto = socket_proxy.timeout.server;
socket->s->res.wto = socket_proxy.timeout.client;
socket->s->res.rex = TICK_ETERNITY;
socket->s->res.wex = TICK_ETERNITY;
socket->s->res.analyse_exp = TICK_ETERNITY;
/*
*
* Configure the stream.
*
*/
/* The stream dont have listener. The listener is used with real
* proxies.
*/
socket->s->sess->listener = NULL;
/* The flags are initialized to 0. Values are setted later. */
socket->s->flags = 0;
/* Assign the configured proxy to the new stream. */
socket->s->sess->fe = &socket_proxy;
socket->s->be = &socket_proxy;
/* XXX: Set namy variables */
socket->s->store_count = 0;
memset(socket->s->stkctr, 0, sizeof(socket->s->stkctr));
/* Configure logs. */
socket->s->logs.logwait = 0;
socket->s->logs.level = 0;
socket->s->logs.accept_date = sess->accept_date; /* user-visible date for logging */
socket->s->logs.tv_accept = sess->tv_accept; /* corrected date for internal use */
socket->s->do_log = NULL;
/* Function used if an error is occured. */
socket->s->srv_error = default_srv_error;
/* Init the list of buffers. */
LIST_INIT(&socket->s->buffer_wait);
/* Dont configure the unique ID. */
socket->s->uniq_id = 0;
socket->s->unique_id = NULL;
/* XXX: ? */
socket->s->pend_pos = NULL;
socket->s->req_cap = NULL;
socket->s->res_cap = NULL;
/* XXX: See later. */
socket->s->txn = NULL;
/* Configure "left" stream interface as applet. This "si" produce
* and use the data received from the server. The applet is initialized
* and is attached to the stream interface.
*/
/* The data producer is already connected. It is the applet. */
socket->s->req.flags = CF_READ_ATTACHED;
channel_auto_connect(&socket->s->req); /* don't wait to establish connection */
channel_auto_close(&socket->s->req); /* let the producer forward close requests */
socket->s->si[0].flags = SI_FL_NONE;
si_reset(&socket->s->si[0]);
si_set_state(&socket->s->si[0], SI_ST_EST); /* connection established (resource exists) */
appctx = stream_int_register_handler(&socket->s->si[0], &update_applet);
if (!appctx)
goto out_fail_conn1;
appctx->ctx.hlua.socket = socket;
appctx->ctx.hlua.connected = 0;
socket->s->sess->origin = &appctx->obj_type;
LIST_INIT(&appctx->ctx.hlua.wake_on_write);
LIST_INIT(&appctx->ctx.hlua.wake_on_read);
/* Configure "right" stream interface. this "si" is used to connect
* and retrieve data from the server. The connection is initialized
* with the "struct server".
*/
socket->s->si[1].flags = SI_FL_ISBACK;
si_reset(&socket->s->si[1]);
si_set_state(&socket->s->si[1], SI_ST_INI);
si_set_state(&socket->s->si[1], SI_ST_ASS);
socket->s->si[1].conn_retries = socket_proxy.conn_retries;
/* Force destination server. */
socket->s->flags |= SF_DIRECT | SF_ASSIGNED | SF_ADDR_SET | SF_BE_ASSIGNED;
socket->s->target = &socket_tcp.obj_type;
/* This stream is added to te lists of alive streams. */
LIST_ADDQ(&streams, &socket->s->list);
/* XXX: I think that this list is used by stats. */
LIST_INIT(&socket->s->back_refs);
/* Update statistics counters. */
socket_proxy.feconn++; /* beconn will be increased later */
jobs++;
@ -2230,17 +2084,13 @@ __LJMP static int hlua_socket_new(lua_State *L)
/* Return yield waiting for connection. */
return 1;
out_fail_conn1:
pool_free2(pool2_buffer, socket->s->res.buf);
out_fail_rep_buf:
pool_free2(pool2_buffer, socket->s->req.buf);
out_fail_req_buf:
task_free(socket->s->task);
out_fail_task:
pool_free2(pool2_stream, socket->s);
out_fail_stream:
out_fail_stream:
task_free(task);
out_fail_task:
session_free(sess);
out_fail_conf:
out_fail_sess:
appctx_free(appctx);
out_fail_conf:
WILL_LJMP(lua_error(L));
return 0;
}