[MAJOR] make stream sockets aware of the stream interface

As of now, a stream socket does not directly wake up the task
but it does contact the stream interface which itself knows the
task. This allows us to perform a few cleanups upon errors and
shutdowns, which reduces the number of calls to data_update()
from 8 per session to 2 per session, and make all the functions
called in the process_session() loop completely swappable.

Some improvements are required. We need to provide a shutw()
function on stream interfaces so that one side which closes
its read part on an empty buffer can propagate the close to
the remote side.
This commit is contained in:
Willy Tarreau 2008-08-30 03:17:31 +02:00
parent eabf313df2
commit e5ed406715
6 changed files with 225 additions and 108 deletions

View File

@ -33,7 +33,7 @@
/* main event functions used to move data between sockets and buffers */
int stream_sock_read(int fd);
int stream_sock_write(int fd);
int stream_sock_data_check_errors(int fd);
int stream_sock_data_check_timeouts(int fd);
int stream_sock_data_update(int fd);
int stream_sock_data_finish(int fd);

View File

@ -42,24 +42,26 @@ enum {
/* error types reported on the streams interface for more accurate reporting */
enum {
SI_ET_NONE = 0, /* no error yet, leave it to zero */
SI_ET_QUEUE_TO, /* queue timeout */
SI_ET_QUEUE_ERR, /* queue error (eg: full) */
SI_ET_QUEUE_ABRT, /* aborted in queue by external cause */
SI_ET_CONN_TO, /* connection timeout */
SI_ET_CONN_ERR, /* connection error (eg: no server available) */
SI_ET_CONN_ABRT, /* connection aborted by external cause (eg: abort) */
SI_ET_CONN_OTHER, /* connection aborted for other reason (eg: 500) */
SI_ET_DATA_TO, /* timeout during data phase */
SI_ET_DATA_ERR, /* error during data phase */
SI_ET_DATA_ABRT, /* data phase aborted by external cause */
SI_ET_NONE = 0x0000, /* no error yet, leave it to zero */
SI_ET_QUEUE_TO = 0x0001, /* queue timeout */
SI_ET_QUEUE_ERR = 0x0002, /* queue error (eg: full) */
SI_ET_QUEUE_ABRT = 0x0004, /* aborted in queue by external cause */
SI_ET_CONN_TO = 0x0008, /* connection timeout */
SI_ET_CONN_ERR = 0x0010, /* connection error (eg: no server available) */
SI_ET_CONN_ABRT = 0x0020, /* connection aborted by external cause (eg: abort) */
SI_ET_CONN_OTHER = 0x0040, /* connection aborted for other reason (eg: 500) */
SI_ET_DATA_TO = 0x0080, /* timeout during data phase */
SI_ET_DATA_ERR = 0x0100, /* error during data phase */
SI_ET_DATA_ABRT = 0x0200, /* data phase aborted by external cause */
};
struct stream_interface {
unsigned int state; /* SI_ST* */
int err_type; /* first error detected, one of SI_ET_* */
void *err_loc; /* commonly the server, NULL when SI_ET_NONE */
unsigned int prev_state;/* SI_ST*, copy of previous state */
void *owner; /* generally a (struct task*) */
int fd; /* file descriptor for a stream driver when known */
unsigned int err_type; /* first error detected, one of SI_ET_* */
void *err_loc; /* commonly the server, NULL when SI_ET_NONE */
};

View File

@ -1805,7 +1805,7 @@ int connect_server(struct session *s)
}
}
fdtab[fd].owner = s->task;
fdtab[fd].owner = s->req->cons;
fdtab[fd].state = FD_STCONN; /* connection in progress */
fdtab[fd].cb[DIR_RD].f = &stream_sock_read;
fdtab[fd].cb[DIR_RD].b = s->rep;

View File

@ -173,12 +173,14 @@ int event_accept(int fd) {
s->si[0].state = SI_ST_EST;
s->si[0].err_type = SI_ET_NONE;
s->si[0].err_loc = NULL;
s->si[0].owner = t;
s->si[0].fd = cfd;
s->cli_fd = cfd;
s->si[1].state = SI_ST_INI;
s->si[1].err_type = SI_ET_NONE;
s->si[1].err_loc = NULL;
s->si[1].owner = t;
s->si[1].fd = -1; /* just to help with debugging */
s->srv = s->prev_srv = s->srv_conn = NULL;
@ -373,7 +375,7 @@ int event_accept(int fd) {
t->expire = TICK_ETERNITY;
fd_insert(cfd);
fdtab[cfd].owner = t;
fdtab[cfd].owner = &s->si[0];
fdtab[cfd].listener = l;
fdtab[cfd].state = FD_STREADY;
fdtab[cfd].cb[DIR_RD].f = l->proto->read;

View File

@ -660,40 +660,151 @@ void process_session(struct task *t, int *next)
unsigned int rqf_srv, rpf_srv;
unsigned int rqf_req, rpf_rep;
/* check server-side errors during data phase */
if (s->req->cons->state == SI_ST_EST) {
stream_sock_data_check_errors(s->req->cons->fd);
/* When a server-side connection is released, we have to
* count it and check for pending connections on this server.
*/
if (unlikely(s->req->cons->state == SI_ST_CLO)) {
/* Count server-side errors (but not timeouts). */
if (s->req->flags & BF_WRITE_ERROR) {
s->be->failed_resp++;
if (s->srv)
s->srv->failed_resp++;
}
/* Check timeouts only during data phase for now */
if (unlikely(t->state & TASK_WOKEN_TIMER)) {
if (s->rep->cons->state == SI_ST_EST)
stream_sock_data_check_timeouts(s->rep->cons->fd);
if (s->srv) {
s->srv->cur_sess--;
sess_change_server(s, NULL);
if (may_dequeue_tasks(s->srv, s->be))
process_srv_queue(s->srv);
}
if (s->req->cons->state == SI_ST_EST)
stream_sock_data_check_timeouts(s->req->cons->fd);
}
/* When a server-side connection is released, we have to
* count it and check for pending connections on this server.
*/
if (unlikely(s->req->cons->state == SI_ST_CLO &&
s->req->cons->prev_state == SI_ST_EST)) {
/* Count server-side errors (but not timeouts). */
if (s->req->flags & BF_WRITE_ERROR) {
s->be->failed_resp++;
if (s->srv)
s->srv->failed_resp++;
}
if (s->srv) {
s->srv->cur_sess--;
sess_change_server(s, NULL);
if (may_dequeue_tasks(s->srv, s->be))
process_srv_queue(s->srv);
}
if (unlikely((s->req->cons->state == SI_ST_CLO) &&
(global.mode & MODE_DEBUG) &&
(!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE)))) {
int len;
len = sprintf(trash, "%08x:%s.srvcls[%04x:%04x]\n",
s->uniq_id, s->be->id, (unsigned short)s->req->prod->fd, (unsigned short)s->req->cons->fd);
write(1, trash, len);
}
}
/* check client-side errors during data phase */
if (s->rep->cons->state == SI_ST_EST)
stream_sock_data_check_errors(s->rep->cons->fd);
if (unlikely(s->rep->cons->state == SI_ST_CLO &&
s->rep->cons->prev_state == SI_ST_EST)) {
if (unlikely((s->rep->cons->state == SI_ST_CLO) &&
(global.mode & MODE_DEBUG) &&
(!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE)))) {
int len;
len = sprintf(trash, "%08x:%s.clicls[%04x:%04x]\n",
s->uniq_id, s->be->id, (unsigned short)s->rep->prod->fd, (unsigned short)s->req->cons->fd);
write(1, trash, len);
}
}
/* force one first pass everywhere */
/* Check if we need to close the write side. This can only happen
* when either SHUTR or EMPTY appears, because WRITE_ENA cannot appear
* from low level, and neither HIJACK nor SHUTW can disappear from low
* level. Later, this should move to stream_sock_{read,write}.
*/
if ((s->req->flags & (BF_SHUTW|BF_EMPTY|BF_HIJACK|BF_WRITE_ENA|BF_SHUTR)) == (BF_EMPTY|BF_WRITE_ENA|BF_SHUTR)) {
buffer_shutw(s->req);
if (s->rep->flags & BF_SHUTR) {
fd_delete(s->req->cons->fd);
s->req->cons->state = SI_ST_CLO;
}
else {
EV_FD_CLR(s->req->cons->fd, DIR_WR);
shutdown(s->req->cons->fd, SHUT_WR);
}
}
/* Check if we need to close the write side */
if ((s->rep->flags & (BF_SHUTW|BF_EMPTY|BF_HIJACK|BF_WRITE_ENA|BF_SHUTR)) == (BF_EMPTY|BF_WRITE_ENA|BF_SHUTR)) {
buffer_shutw(s->rep);
if (s->req->flags & BF_SHUTR) {
fd_delete(s->rep->cons->fd);
s->rep->cons->state = SI_ST_CLO;
}
else {
EV_FD_CLR(s->rep->cons->fd, DIR_WR);
shutdown(s->rep->cons->fd, SHUT_WR);
}
}
/* Dirty trick: force one first pass everywhere */
rqf_cli = rqf_srv = rqf_req = ~s->req->flags;
rpf_cli = rpf_srv = rpf_rep = ~s->rep->flags;
/* well, the ST_CONN state is already handled properly */
if (s->req->prod->state == SI_ST_EST) {
rqf_cli = s->req->flags;
rpf_cli = s->rep->flags;
}
if (s->req->cons->state == SI_ST_EST) {
rqf_srv = s->req->flags;
rpf_srv = s->rep->flags;
}
do {
DPRINTF(stderr,"[%u] %s: task=%p rq=%p, rp=%p, exp(r,w)=%u,%u rqf=%08x rpf=%08x rql=%d rpl=%d cs=%d ss=%d\n",
now_ms, __FUNCTION__,
t,
s->req, s->rep,
s->req->rex, s->rep->wex,
s->req->flags, s->rep->flags,
s->req->l, s->rep->l, s->rep->cons->state, s->req->cons->state);
resync = 0;
/* Analyse request */
if ((rqf_req ^ s->req->flags) & BF_MASK_ANALYSER) {
if (s->req->prod->state >= SI_ST_EST) {
resync = 1;
/* it's up to the analysers to reset write_ena */
buffer_write_ena(s->req);
if (s->req->analysers)
process_request(s);
rqf_req = s->req->flags;
}
}
/* Analyse response */
if (unlikely(s->rep->flags & BF_HIJACK)) {
/* In inject mode, we wake up everytime something has
* happened on the write side of the buffer.
*/
if ((s->rep->flags & (BF_WRITE_PARTIAL|BF_WRITE_ERROR|BF_SHUTW)) &&
!(s->rep->flags & BF_FULL)) {
if (produce_content(s) != 0)
resync = 1; /* completed, better re-check flags */
}
}
else if (s->rep->prod->state >= SI_ST_EST) {
if ((rpf_rep ^ s->rep->flags) & BF_MASK_ANALYSER) {
resync = 1;
/* it's up to the analysers to reset write_ena */
buffer_write_ena(s->rep);
if (s->rep->analysers)
process_response(s);
rpf_rep = s->rep->flags;
}
}
/* Maybe resync client FD state */
if (s->rep->cons->state != SI_ST_CLO) {
if (((rqf_cli ^ s->req->flags) & BF_MASK_INTERFACE_I) ||
((rpf_cli ^ s->rep->flags) & BF_MASK_INTERFACE_O)) {
@ -713,7 +824,7 @@ void process_session(struct task *t, int *next)
}
}
/* Maybe resync server FD state */
if (s->req->cons->state != SI_ST_CLO) {
if (((rpf_srv ^ s->rep->flags) & BF_MASK_INTERFACE_I) ||
((rqf_srv ^ s->req->flags) & BF_MASK_INTERFACE_O)) {
@ -761,38 +872,6 @@ void process_session(struct task *t, int *next)
}
}
if ((rqf_req ^ s->req->flags) & BF_MASK_ANALYSER) {
/* the analysers must block it themselves */
if (s->req->prod->state >= SI_ST_EST) {
resync = 1;
buffer_write_ena(s->req);
if (s->req->analysers)
process_request(s);
}
rqf_req = s->req->flags;
}
if (unlikely(s->rep->flags & BF_HIJACK)) {
/* In inject mode, we wake up everytime something has
* happened on the write side of the buffer.
*/
if ((s->rep->flags & (BF_WRITE_PARTIAL|BF_WRITE_ERROR|BF_SHUTW)) &&
!(s->rep->flags & BF_FULL)) {
if (produce_content(s) != 0)
resync = 1; /* completed, better re-check flags */
}
}
else if (s->rep->prod->state >= SI_ST_EST) {
if ((rpf_rep ^ s->rep->flags) & BF_MASK_ANALYSER) {
/* the analysers must block it themselves */
resync = 1;
buffer_write_ena(s->rep);
if (s->rep->analysers)
process_response(s);
rpf_rep = s->rep->flags;
}
}
} while (resync);
if (likely((s->rep->cons->state != SI_ST_CLO) ||
@ -809,6 +888,8 @@ void process_session(struct task *t, int *next)
s->req->flags &= BF_CLEAR_READ & BF_CLEAR_WRITE;
s->rep->flags &= BF_CLEAR_READ & BF_CLEAR_WRITE;
s->si[0].prev_state = s->si[0].state;
s->si[1].prev_state = s->si[1].state;
/* Trick: if a request is being waiting for the server to respond,
* and if we know the server can timeout, we don't want the timeout
@ -1766,7 +1847,7 @@ int process_request(struct session *t)
* - if one rule returns KO, then return KO
*/
if (req->flags & (BF_READ_NULL | BF_SHUTR) || tick_is_expired(req->analyse_exp, now_ms))
if (req->flags & BF_SHUTR || tick_is_expired(req->analyse_exp, now_ms))
partial = 0;
else
partial = ACL_PARTIAL;
@ -1921,7 +2002,7 @@ int process_request(struct session *t)
}
/* 4: have we encountered a close ? */
else if (req->flags & (BF_READ_NULL | BF_SHUTR)) {
else if (req->flags & BF_SHUTR) {
txn->status = 400;
client_retnclose(t, error_message(t, HTTP_ERR_400));
msg->msg_state = HTTP_MSG_ERROR;
@ -2607,7 +2688,7 @@ int process_request(struct session *t)
* timeout. We just have to check that the client is still
* there and that the timeout has not expired.
*/
if ((req->flags & (BF_READ_NULL|BF_READ_ERROR)) == 0 &&
if ((req->flags & (BF_SHUTR|BF_READ_ERROR)) == 0 &&
!tick_is_expired(req->analyse_exp, now_ms))
return 0;
@ -2690,7 +2771,7 @@ int process_request(struct session *t)
* buffer closed).
*/
if (req->l - body >= limit || /* enough bytes! */
req->flags & (BF_FULL | BF_READ_ERROR | BF_SHUTR | BF_READ_NULL | BF_READ_TIMEOUT) ||
req->flags & (BF_FULL | BF_READ_ERROR | BF_SHUTR | BF_READ_TIMEOUT) ||
tick_is_expired(req->analyse_exp, now_ms)) {
/* The situation will not evolve, so let's give up on the analysis. */
t->logs.tv_request = now; /* update the request timer to reflect full request */
@ -2887,7 +2968,7 @@ int process_response(struct session *t)
return 0;
}
/* write error to client, or close from server */
else if (rep->flags & (BF_WRITE_ERROR|BF_SHUTR|BF_READ_NULL)) {
else if (rep->flags & (BF_WRITE_ERROR|BF_SHUTR)) {
buffer_shutr_now(rep);
buffer_shutw_now(req);
//fd_delete(req->cons->fd);

View File

@ -42,6 +42,7 @@
int stream_sock_read(int fd) {
__label__ out_wakeup, out_shutdown_r, out_error;
struct buffer *b = fdtab[fd].cb[DIR_RD].b;
struct stream_interface *si = fdtab[fd].owner;
int ret, max, retval, cur_read;
int read_poll = MAX_READ_POLL_LOOPS;
@ -239,16 +240,21 @@ int stream_sock_read(int fd) {
if (!(b->flags & BF_READ_ACTIVITY))
goto out_skip_wakeup;
out_wakeup:
task_wakeup(fdtab[fd].owner, TASK_WOKEN_IO);
task_wakeup(si->owner, TASK_WOKEN_IO);
out_skip_wakeup:
fdtab[fd].ev &= ~FD_POLL_IN;
return retval;
out_shutdown_r:
/* we received a shutdown */
fdtab[fd].ev &= ~FD_POLL_HUP;
b->flags |= BF_READ_NULL;
b->rex = TICK_ETERNITY;
buffer_shutr(b);
/* Maybe we have to completely close the socket */
if (fdtab[fd].cb[DIR_WR].b->flags & BF_SHUTW)
goto do_close_and_return;
EV_FD_CLR(fd, DIR_RD);
goto out_wakeup;
out_error:
@ -258,7 +264,27 @@ int stream_sock_read(int fd) {
fdtab[fd].state = FD_STERROR;
fdtab[fd].ev &= ~FD_POLL_STICKY;
b->rex = TICK_ETERNITY;
goto out_wakeup;
/* Read error on the file descriptor. We close the FD and set
* the error on both buffers.
* Note: right now we only support connected sockets.
*/
if (si->state != SI_ST_EST)
goto out_wakeup;
if (!si->err_type)
si->err_type = SI_ET_DATA_ERR;
buffer_shutr(fdtab[fd].cb[DIR_RD].b);
fdtab[fd].cb[DIR_RD].b->flags |= BF_READ_ERROR;
buffer_shutw(fdtab[fd].cb[DIR_WR].b);
fdtab[fd].cb[DIR_WR].b->flags |= BF_WRITE_ERROR;
do_close_and_return:
fd_delete(fd);
si->state = SI_ST_CLO;
task_wakeup(si->owner, TASK_WOKEN_IO);
return 1;
}
@ -271,6 +297,7 @@ int stream_sock_read(int fd) {
int stream_sock_write(int fd) {
__label__ out_wakeup, out_error;
struct buffer *b = fdtab[fd].cb[DIR_WR].b;
struct stream_interface *si = fdtab[fd].owner;
int ret, max, retval;
int write_poll = MAX_WRITE_POLL_LOOPS;
@ -411,7 +438,7 @@ int stream_sock_write(int fd) {
if (!(b->flags & BF_WRITE_ACTIVITY))
goto out_skip_wakeup;
out_wakeup:
task_wakeup(fdtab[fd].owner, TASK_WOKEN_IO);
task_wakeup(si->owner, TASK_WOKEN_IO);
out_skip_wakeup:
fdtab[fd].ev &= ~FD_POLL_OUT;
@ -424,7 +451,25 @@ int stream_sock_write(int fd) {
fdtab[fd].state = FD_STERROR;
fdtab[fd].ev &= ~FD_POLL_STICKY;
b->wex = TICK_ETERNITY;
goto out_wakeup;
/* Read error on the file descriptor. We close the FD and set
* the error on both buffers.
* Note: right now we only support connected sockets.
*/
if (si->state != SI_ST_EST)
goto out_wakeup;
if (!si->err_type)
si->err_type = SI_ET_DATA_ERR;
buffer_shutr(fdtab[fd].cb[DIR_RD].b);
fdtab[fd].cb[DIR_RD].b->flags |= BF_READ_ERROR;
buffer_shutw(fdtab[fd].cb[DIR_WR].b);
fdtab[fd].cb[DIR_WR].b->flags |= BF_WRITE_ERROR;
fd_delete(fd);
si->state = SI_ST_CLO;
task_wakeup(si->owner, TASK_WOKEN_IO);
return 1;
}
@ -433,7 +478,7 @@ int stream_sock_write(int fd) {
* phase. It controls the file descriptor's status, as well as read and write
* timeouts.
*/
int stream_sock_data_check_errors(int fd)
int stream_sock_data_check_timeouts(int fd)
{
struct buffer *ib = fdtab[fd].cb[DIR_RD].b;
struct buffer *ob = fdtab[fd].cb[DIR_WR].b;
@ -446,24 +491,6 @@ int stream_sock_data_check_errors(int fd)
ib->flags, ob->flags,
ib->l, ob->l);
/* Read or write error on the file descriptor */
if (unlikely(fdtab[fd].state == FD_STERROR)) {
//trace_term(t, TT_HTTP_SRV_6);
if (!ob->cons->err_type) {
//ob->cons->err_loc = t->srv;
ob->cons->err_type = SI_ET_DATA_ERR;
}
buffer_shutw(ob);
ob->flags |= BF_WRITE_ERROR;
buffer_shutr(ib);
ib->flags |= BF_READ_ERROR;
do_close_and_return:
fd_delete(fd);
ob->cons->state = SI_ST_CLO;
return 0;
}
/* Read timeout */
if (unlikely(!(ib->flags & (BF_SHUTR|BF_READ_TIMEOUT)) && tick_is_expired(ib->rex, now_ms))) {
//trace_term(t, TT_HTTP_SRV_12);
@ -473,8 +500,13 @@ int stream_sock_data_check_errors(int fd)
ob->cons->err_type = SI_ET_DATA_TO;
}
buffer_shutr(ib);
if (ob->flags & BF_SHUTW)
goto do_close_and_return;
if (ob->flags & BF_SHUTW) {
do_close_and_return:
fd_delete(fd);
ob->cons->state = SI_ST_CLO;
return 0;
}
EV_FD_CLR(fd, DIR_RD);
}
@ -506,18 +538,18 @@ int stream_sock_data_update(int fd)
struct buffer *ib = fdtab[fd].cb[DIR_RD].b;
struct buffer *ob = fdtab[fd].cb[DIR_WR].b;
DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibl=%d obl=%d\n",
DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibl=%d obl=%d si=%d\n",
now_ms, __FUNCTION__,
fd, fdtab[fd].owner,
ib, ob,
ib->rex, ob->wex,
ib->flags, ob->flags,
ib->l, ob->l);
ib->l, ob->l, ob->cons->state);
/* Check if we need to close the read side */
if (!(ib->flags & BF_SHUTR)) {
/* Last read, forced read-shutdown, or other end closed */
if (ib->flags & (BF_READ_NULL|BF_SHUTR_NOW|BF_SHUTW)) {
if (ib->flags & (BF_SHUTR_NOW|BF_SHUTW)) {
//trace_term(t, TT_HTTP_SRV_10);
buffer_shutr(ib);
if (ob->flags & BF_SHUTW) {
@ -560,13 +592,13 @@ int stream_sock_data_finish(int fd)
struct buffer *ib = fdtab[fd].cb[DIR_RD].b;
struct buffer *ob = fdtab[fd].cb[DIR_WR].b;
DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibl=%d obl=%d\n",
DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibl=%d obl=%d si=%d\n",
now_ms, __FUNCTION__,
fd, fdtab[fd].owner,
ib, ob,
ib->rex, ob->wex,
ib->flags, ob->flags,
ib->l, ob->l);
ib->l, ob->l, ob->cons->state);
/* Check if we need to close the read side */
if (!(ib->flags & BF_SHUTR)) {