MEDIUM: stream-int: make ->end point to the connection or the appctx

The long-term goal is to have a context for applets as an alternative
to the connection and not as a complement. At the moment, the context
is still stored into the stream interface, and we only put a pointer
to the applet's context in si->end, initialize the context with object
type OBJ_TYPE_APPCTX, and this allows us not to allocate an entry when
deciding to switch to an applet.

A special care is taken to never dereference si->conn anymore when
dealing with an applet. That's why it's important that si->end is
always set to the proper type :

    si->end == NULL             => not connected to anything
   *si->end == OBJ_TYPE_APPCTX  => connected to an applet
   *si->end == OBJ_TYPE_CONN    => real connection (server, proxy, ...)

The session management code used to check the applet from the connection's
target. Now it uses the stream interface's end point and does not touch the
connection at all. Similarly, we stop checking the connection's addresses
and file descriptors when reporting the applet's status in the stats dump.
This commit is contained in:
Willy Tarreau 2013-09-29 17:19:56 +02:00
parent 4a59f2f954
commit cf644ed37a
5 changed files with 102 additions and 71 deletions

View File

@ -55,38 +55,52 @@ static inline void si_prepare_none(struct stream_interface *si)
{
si->ops = &si_embedded_ops;
si->end = NULL;
conn_prepare(si->conn, NULL, NULL, NULL, si);
si->conn->target = NULL;
si->appctx.applet = NULL;
}
static inline void si_prepare_conn(struct stream_interface *si, const struct protocol *ctrl, const struct xprt_ops *xprt)
{
si->ops = &si_conn_ops;
si->end = NULL;
si->end = &si->conn->obj_type;
conn_prepare(si->conn, &si_conn_cb, ctrl, xprt, si);
}
static inline void si_takeover_conn(struct stream_interface *si, const struct protocol *ctrl, const struct xprt_ops *xprt)
{
si->ops = &si_conn_ops;
si->end = NULL;
si->end = &si->conn->obj_type;
conn_assign(si->conn, &si_conn_cb, ctrl, xprt, si);
}
static inline void si_prepare_applet(struct stream_interface *si, struct si_applet *applet)
{
si->ops = &si_embedded_ops;
si->end = NULL;
conn_prepare(si->conn, NULL, NULL, NULL, si);
si->conn->target = &applet->obj_type;
si->appctx.applet = applet;
si->appctx.obj_type = OBJ_TYPE_APPCTX;
si->end = &si->appctx.obj_type;
}
/* returns a pointer to the applet being run in the SI or NULL if none */
static inline const struct si_applet *si_applet(struct stream_interface *si)
{
return objt_applet(si->conn->target);
if (objt_appctx(si->end))
return si->appctx.applet;
return NULL;
}
/* Call the applet's main function when an appctx is attached to the stream
* interface. Returns zero if no call was made, or non-zero if a call was made.
*/
static inline int si_applet_call(struct stream_interface *si)
{
const struct si_applet *applet;
applet = si_applet(si);
if (applet) {
applet->fct(si);
return 1;
}
return 0;
}
/* call the applet's release function if any. Needs to be called upon close() */

View File

@ -158,7 +158,7 @@ static int stats_accept(struct session *s)
{
/* we have a dedicated I/O handler for the stats */
stream_int_register_handler(&s->si[1], &cli_applet);
s->target = s->si[1].conn->target; // for logging only
s->target = &cli_applet.obj_type; // for logging only
s->si[1].appctx.st1 = 0;
s->si[1].appctx.st0 = STAT_CLI_INIT;
@ -3910,7 +3910,8 @@ static int stats_dump_full_sess_to_buffer(struct stream_interface *si, struct se
sess->uniq_id,
sess->listener && sess->listener->proto->name ? sess->listener->proto->name : "?");
switch (addr_to_str(&sess->si[0].conn->addr.from, pn, sizeof(pn))) {
switch ((obj_type(sess->si[0].end) == OBJ_TYPE_CONN) ?
addr_to_str(&sess->si[0].conn->addr.from, pn, sizeof(pn)) : AF_UNSPEC) {
case AF_INET:
case AF_INET6:
chunk_appendf(&trash, " source=%s:%d\n",
@ -3935,8 +3936,11 @@ static int stats_dump_full_sess_to_buffer(struct stream_interface *si, struct se
sess->listener ? sess->listener->name ? sess->listener->name : "?" : "?",
sess->listener ? sess->listener->luid : 0);
conn_get_to_addr(sess->si[0].conn);
switch (addr_to_str(&sess->si[0].conn->addr.to, pn, sizeof(pn))) {
if (obj_type(sess->si[0].end) == OBJ_TYPE_CONN)
conn_get_to_addr(sess->si[0].conn);
switch ((obj_type(sess->si[0].end) == OBJ_TYPE_CONN) ?
addr_to_str(&sess->si[0].conn->addr.to, pn, sizeof(pn)) : AF_UNSPEC) {
case AF_INET:
case AF_INET6:
chunk_appendf(&trash, " addr=%s:%d\n",
@ -3959,8 +3963,11 @@ static int stats_dump_full_sess_to_buffer(struct stream_interface *si, struct se
else
chunk_appendf(&trash, " backend=<NONE> (id=-1 mode=-)");
conn_get_from_addr(sess->si[1].conn);
switch (addr_to_str(&sess->si[1].conn->addr.from, pn, sizeof(pn))) {
if (obj_type(sess->si[1].end) == OBJ_TYPE_CONN)
conn_get_from_addr(sess->si[1].conn);
switch ((obj_type(sess->si[1].end) == OBJ_TYPE_CONN) ?
addr_to_str(&sess->si[1].conn->addr.from, pn, sizeof(pn)) : AF_UNSPEC) {
case AF_INET:
case AF_INET6:
chunk_appendf(&trash, " addr=%s:%d\n",
@ -3983,8 +3990,11 @@ static int stats_dump_full_sess_to_buffer(struct stream_interface *si, struct se
else
chunk_appendf(&trash, " server=<NONE> (id=-1)");
conn_get_to_addr(sess->si[1].conn);
switch (addr_to_str(&sess->si[1].conn->addr.to, pn, sizeof(pn))) {
if (obj_type(sess->si[1].end) == OBJ_TYPE_CONN)
conn_get_to_addr(sess->si[1].conn);
switch ((obj_type(sess->si[1].end) == OBJ_TYPE_CONN) ?
addr_to_str(&sess->si[1].conn->addr.to, pn, sizeof(pn)) : AF_UNSPEC) {
case AF_INET:
case AF_INET6:
chunk_appendf(&trash, " addr=%s:%d\n",
@ -4020,10 +4030,12 @@ static int stats_dump_full_sess_to_buffer(struct stream_interface *si, struct se
http_msg_state_str(sess->txn.req.msg_state), http_msg_state_str(sess->txn.rsp.msg_state));
chunk_appendf(&trash,
" si[0]=%p (state=%s flags=0x%02x conn0=%p exp=%s, et=0x%03x)\n",
" si[0]=%p (state=%s flags=0x%02x endp0=%s:%p conn0=%p exp=%s, et=0x%03x)\n",
&sess->si[0],
si_state_str(sess->si[0].state),
sess->si[0].flags,
obj_type_name(sess->si[0].end),
obj_base_ptr(sess->si[0].end),
sess->si[0].conn,
sess->si[0].exp ?
tick_is_expired(sess->si[0].exp, now_ms) ? "<PAST>" :
@ -4032,10 +4044,12 @@ static int stats_dump_full_sess_to_buffer(struct stream_interface *si, struct se
sess->si[0].err_type);
chunk_appendf(&trash,
" si[1]=%p (state=%s flags=0x%02x conn1=%p exp=%s, et=0x%03x)\n",
" si[1]=%p (state=%s flags=0x%02x endp1=%s:%p conn1=%p exp=%s, et=0x%03x)\n",
&sess->si[1],
si_state_str(sess->si[1].state),
sess->si[1].flags,
obj_type_name(sess->si[1].end),
obj_base_ptr(sess->si[1].end),
sess->si[1].conn,
sess->si[1].exp ?
tick_is_expired(sess->si[1].exp, now_ms) ? "<PAST>" :
@ -4043,39 +4057,43 @@ static int stats_dump_full_sess_to_buffer(struct stream_interface *si, struct se
TICKS_TO_MS(1000)) : "<NEVER>",
sess->si[1].err_type);
chunk_appendf(&trash,
" co0=%p ctrl=%s xprt=%s data=%s target=%s:%p\n",
sess->si[0].conn,
get_conn_ctrl_name(sess->si[0].conn),
get_conn_xprt_name(sess->si[0].conn),
get_conn_data_name(sess->si[0].conn),
obj_type_name(sess->si[0].conn->target),
obj_base_ptr(sess->si[0].conn->target));
if (obj_type(sess->si[0].end) == OBJ_TYPE_CONN) {
chunk_appendf(&trash,
" co0=%p ctrl=%s xprt=%s data=%s target=%s:%p\n",
sess->si[0].conn,
get_conn_ctrl_name(sess->si[0].conn),
get_conn_xprt_name(sess->si[0].conn),
get_conn_data_name(sess->si[0].conn),
obj_type_name(sess->si[0].conn->target),
obj_base_ptr(sess->si[0].conn->target));
chunk_appendf(&trash,
" flags=0x%08x fd=%d fd_spec_e=%02x fd_spec_p=%d updt=%d\n",
sess->si[0].conn->flags,
sess->si[0].conn->t.sock.fd,
sess->si[0].conn->t.sock.fd >= 0 ? fdtab[sess->si[0].conn->t.sock.fd].spec_e : 0,
sess->si[0].conn->t.sock.fd >= 0 ? fdtab[sess->si[0].conn->t.sock.fd].spec_p : 0,
sess->si[0].conn->t.sock.fd >= 0 ? fdtab[sess->si[0].conn->t.sock.fd].updated : 0);
chunk_appendf(&trash,
" flags=0x%08x fd=%d fd_spec_e=%02x fd_spec_p=%d updt=%d\n",
sess->si[0].conn->flags,
sess->si[0].conn->t.sock.fd,
sess->si[0].conn->t.sock.fd >= 0 ? fdtab[sess->si[0].conn->t.sock.fd].spec_e : 0,
sess->si[0].conn->t.sock.fd >= 0 ? fdtab[sess->si[0].conn->t.sock.fd].spec_p : 0,
sess->si[0].conn->t.sock.fd >= 0 ? fdtab[sess->si[0].conn->t.sock.fd].updated : 0);
}
chunk_appendf(&trash,
" co1=%p ctrl=%s xprt=%s data=%s target=%s:%p\n",
sess->si[1].conn,
get_conn_ctrl_name(sess->si[1].conn),
get_conn_xprt_name(sess->si[1].conn),
get_conn_data_name(sess->si[1].conn),
obj_type_name(sess->si[1].conn->target),
obj_base_ptr(sess->si[1].conn->target));
if (obj_type(sess->si[1].end) == OBJ_TYPE_CONN) {
chunk_appendf(&trash,
" co1=%p ctrl=%s xprt=%s data=%s target=%s:%p\n",
sess->si[1].conn,
get_conn_ctrl_name(sess->si[1].conn),
get_conn_xprt_name(sess->si[1].conn),
get_conn_data_name(sess->si[1].conn),
obj_type_name(sess->si[1].conn->target),
obj_base_ptr(sess->si[1].conn->target));
chunk_appendf(&trash,
" flags=0x%08x fd=%d fd_spec_e=%02x fd_spec_p=%d updt=%d\n",
sess->si[1].conn->flags,
sess->si[1].conn->t.sock.fd,
sess->si[1].conn->t.sock.fd >= 0 ? fdtab[sess->si[1].conn->t.sock.fd].spec_e : 0,
sess->si[1].conn->t.sock.fd >= 0 ? fdtab[sess->si[1].conn->t.sock.fd].spec_p : 0,
sess->si[1].conn->t.sock.fd >= 0 ? fdtab[sess->si[1].conn->t.sock.fd].updated : 0);
chunk_appendf(&trash,
" flags=0x%08x fd=%d fd_spec_e=%02x fd_spec_p=%d updt=%d\n",
sess->si[1].conn->flags,
sess->si[1].conn->t.sock.fd,
sess->si[1].conn->t.sock.fd >= 0 ? fdtab[sess->si[1].conn->t.sock.fd].spec_e : 0,
sess->si[1].conn->t.sock.fd >= 0 ? fdtab[sess->si[1].conn->t.sock.fd].spec_p : 0,
sess->si[1].conn->t.sock.fd >= 0 ? fdtab[sess->si[1].conn->t.sock.fd].updated : 0);
}
chunk_appendf(&trash,
" req=%p (f=0x%06x an=0x%x pipe=%d tofwd=%d total=%lld)\n"
@ -4222,7 +4240,8 @@ static int stats_dump_sess_to_buffer(struct stream_interface *si)
curr_sess->listener->proto->name);
switch (addr_to_str(&curr_sess->si[0].conn->addr.from, pn, sizeof(pn))) {
switch ((obj_type(curr_sess->si[0].end) == OBJ_TYPE_CONN) ?
addr_to_str(&curr_sess->si[0].conn->addr.from, pn, sizeof(pn)) : AF_UNSPEC) {
case AF_INET:
case AF_INET6:
chunk_appendf(&trash,
@ -4297,7 +4316,8 @@ static int stats_dump_sess_to_buffer(struct stream_interface *si)
" s0=[%d,%1xh,fd=%d,ex=%s]",
curr_sess->si[0].state,
curr_sess->si[0].flags,
curr_sess->si[0].conn->t.sock.fd,
(obj_type(curr_sess->si[0].end) == OBJ_TYPE_CONN) ?
curr_sess->si[0].conn->t.sock.fd : -1,
curr_sess->si[0].exp ?
human_time(TICKS_TO_MS(curr_sess->si[0].exp - now_ms),
TICKS_TO_MS(1000)) : "");
@ -4306,7 +4326,8 @@ static int stats_dump_sess_to_buffer(struct stream_interface *si)
" s1=[%d,%1xh,fd=%d,ex=%s]",
curr_sess->si[1].state,
curr_sess->si[1].flags,
curr_sess->si[1].conn->t.sock.fd,
(obj_type(curr_sess->si[1].end) == OBJ_TYPE_CONN) ?
curr_sess->si[1].conn->t.sock.fd : -1,
curr_sess->si[1].exp ?
human_time(TICKS_TO_MS(curr_sess->si[1].exp - now_ms),
TICKS_TO_MS(1000)) : "");

View File

@ -1062,7 +1062,7 @@ static void peer_session_forceshutdown(struct session * session)
{
struct stream_interface *oldsi;
if (objt_applet(session->si[0].conn->target) == &peer_applet) {
if (si_applet(&session->si[0]) == &peer_applet) {
oldsi = &session->si[0];
}
else {
@ -1086,7 +1086,7 @@ int peer_accept(struct session *s)
{
/* we have a dedicated I/O handler for the stats */
stream_int_register_handler(&s->si[1], &peer_applet);
s->target = s->si[1].conn->target; // for logging only
s->target = &peer_applet.obj_type; // for logging only
s->si[1].appctx.ctx.peers.ptr = s;
s->si[1].appctx.st0 = PEER_SESSION_ACCEPT;

View File

@ -2182,9 +2182,9 @@ struct task *process_session(struct task *t)
if (s->req->cons->state == SI_ST_INI) {
if (!(s->req->flags & CF_SHUTW)) {
if ((s->req->flags & CF_AUTO_CONNECT) || !channel_is_empty(s->req)) {
/* If we have an applet without a connect method, we immediately
* switch to the connected state, otherwise we perform a connection
* request.
/* If we have an appctx, there is no connect method, so we
* immediately switch to the connected state, otherwise we
* perform a connection request.
*/
s->req->cons->state = SI_ST_REQ; /* new connection requested */
s->req->cons->conn_retries = s->be->conn_retries;
@ -2366,10 +2366,10 @@ struct task *process_session(struct task *t)
if ((s->fe->options & PR_O_CONTSTATS) && (s->flags & SN_BE_ASSIGNED))
session_process_counters(s);
if (s->rep->cons->state == SI_ST_EST && obj_type(s->rep->cons->conn->target) != OBJ_TYPE_APPLET)
if (s->rep->cons->state == SI_ST_EST && obj_type(s->rep->cons->end) != OBJ_TYPE_APPCTX)
si_update(s->rep->cons);
if (s->req->cons->state == SI_ST_EST && obj_type(s->req->cons->conn->target) != OBJ_TYPE_APPLET)
if (s->req->cons->state == SI_ST_EST && obj_type(s->req->cons->end) != OBJ_TYPE_APPCTX)
si_update(s->req->cons);
s->req->flags &= ~(CF_READ_NULL|CF_READ_PARTIAL|CF_WRITE_NULL|CF_WRITE_PARTIAL|CF_READ_ATTACHED);
@ -2393,20 +2393,17 @@ struct task *process_session(struct task *t)
s->req->rex = TICK_ETERNITY;
}
/* Call the stream interfaces' I/O handlers when embedded.
* Note that this one may wake the task up again.
/* When any of the stream interfaces is attached to an applet,
* we have to call it here. Note that this one may wake the
* task up again. If at least one applet was called, the current
* task might have been woken up, in which case we don't want it
* to be requeued to the wait queue but rather to the run queue
* to run ASAP. The bitwise "or" in the condition ensures that
* both functions are always called and that we wake up if at
* least one did something.
*/
if (obj_type(s->req->cons->conn->target) == OBJ_TYPE_APPLET ||
obj_type(s->rep->cons->conn->target) == OBJ_TYPE_APPLET) {
if (objt_applet(s->req->cons->conn->target))
objt_applet(s->req->cons->conn->target)->fct(s->req->cons);
if (objt_applet(s->rep->cons->conn->target))
objt_applet(s->rep->cons->conn->target)->fct(s->rep->cons);
if ((si_applet_call(s->req->cons) | si_applet_call(s->rep->cons)) != 0) {
if (task_in_rq(t)) {
/* If we woke up, we don't want to requeue the
* task to the wait queue, but rather requeue
* it into the runqueue ASAP.
*/
t->expire = TICK_ETERNITY;
return t;
}

View File

@ -365,7 +365,6 @@ struct task *stream_int_register_handler(struct stream_interface *si, struct si_
void stream_int_unregister_handler(struct stream_interface *si)
{
si->owner = NULL;
si->conn->target = NULL;
si->end = NULL;
}