[MEDIUM] split stream_sock_process_data

It was a waste to constantly update the file descriptor's status
and timeouts during a flags update. So stream_sock_process_data
has been slit in two parts :
  stream_sock_data_update()  => computes updated flags
  stream_sock_data_finish()  => computes timeouts

Only the first one is called during flag updates. The second one
is only called upon completion. The number of calls to fd_set/fd_clr
has now significantly dropped.

Also, it's useless to check for errors and timeouts in the
process_session() loop, it's enough to check for them at the
beginning.
This commit is contained in:
Willy Tarreau 2008-08-28 08:54:27 +02:00
parent f9839bdffe
commit 3a16b2c9cd
4 changed files with 182 additions and 83 deletions

View File

@ -33,7 +33,9 @@
/* main event functions used to move data between sockets and buffers */
int stream_sock_read(int fd);
int stream_sock_write(int fd);
int stream_sock_process_data(int fd);
int stream_sock_data_check_errors(int fd);
int stream_sock_data_update(int fd);
int stream_sock_data_finish(int fd);
/* This either returns the sockname or the original destination address. Code

View File

@ -71,7 +71,7 @@
#define BF_MASK_INTERFACE_O (BF_EMPTY|BF_HIJACK|BF_MAY_FORWARD|BF_SHUTR|BF_SHUTW|BF_SHUTW_NOW)
#define BF_MASK_INTERFACE (BF_MASK_INTF_I | BF_MASK_INTF_O)
#define BF_MASK_ANALYSER (BF_FULL|BF_READ_ERROR|BF_READ_TIMEOUT|BF_WRITE_ERROR|BF_SHUTW|BF_SHUTR|BF_READ_NULL)
#define BF_MASK_ANALYSER (BF_FULL|BF_READ_NULL|BF_READ_ERROR|BF_READ_TIMEOUT|BF_SHUTR|BF_WRITE_ERROR)
/* Analysers (buffer->analysers).
* Those bits indicate that there are some processing to do on the buffer

View File

@ -660,6 +660,33 @@ void process_session(struct task *t, int *next)
unsigned int rqf_srv, rpf_srv;
unsigned int rqf_req, rpf_rep;
/* check server-side errors during data phase */
if (s->req->cons->state == SI_ST_EST) {
stream_sock_data_check_errors(s->req->cons->fd);
/* When a server-side connection is released, we have to
* count it and check for pending connections on this server.
*/
if (unlikely(s->req->cons->state == SI_ST_CLO)) {
/* Count server-side errors (but not timeouts). */
if (s->req->flags & BF_WRITE_ERROR) {
s->be->failed_resp++;
if (s->srv)
s->srv->failed_resp++;
}
if (s->srv) {
s->srv->cur_sess--;
sess_change_server(s, NULL);
if (may_dequeue_tasks(s->srv, s->be))
process_srv_queue(s->srv);
}
}
}
/* check client-side errors during data phase */
if (s->rep->cons->state == SI_ST_EST)
stream_sock_data_check_errors(s->rep->cons->fd);
/* force one first pass everywhere */
rqf_cli = rqf_srv = rqf_req = ~s->req->flags;
rpf_cli = rpf_srv = rpf_rep = ~s->rep->flags;
@ -667,29 +694,31 @@ void process_session(struct task *t, int *next)
do {
resync = 0;
if (((rqf_cli ^ s->req->flags) & BF_MASK_INTERFACE_I) ||
((rpf_cli ^ s->rep->flags) & BF_MASK_INTERFACE_O)) {
resync = 1;
if (s->rep->cons->state != SI_ST_CLO) {
stream_sock_process_data(s->rep->cons->fd);
if (s->rep->cons->state != SI_ST_CLO) {
if (((rqf_cli ^ s->req->flags) & BF_MASK_INTERFACE_I) ||
((rpf_cli ^ s->rep->flags) & BF_MASK_INTERFACE_O)) {
resync = 1;
stream_sock_data_update(s->rep->cons->fd);
rqf_cli = s->req->flags;
rpf_cli = s->rep->flags;
if (unlikely((s->rep->cons->state == SI_ST_CLO) &&
(global.mode & MODE_DEBUG) &&
(!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE)))) {
int len;
len = sprintf(trash, "%08x:%s.clicls[%04x:%04x]\n",
s->uniq_id, s->be->id, (unsigned short)s->rep->cons->fd, (unsigned short)s->req->cons->fd);
s->uniq_id, s->be->id, (unsigned short)s->rep->prod->fd, (unsigned short)s->req->cons->fd);
write(1, trash, len);
}
}
rqf_cli = s->req->flags;
rpf_cli = s->rep->flags;
}
if (((rpf_srv ^ s->rep->flags) & BF_MASK_INTERFACE_I) ||
((rqf_srv ^ s->req->flags) & BF_MASK_INTERFACE_O)) {
resync = 1;
if (s->req->cons->state != SI_ST_CLO) {
if (s->req->cons->state != SI_ST_CLO) {
if (((rpf_srv ^ s->rep->flags) & BF_MASK_INTERFACE_I) ||
((rqf_srv ^ s->req->flags) & BF_MASK_INTERFACE_O)) {
resync = 1;
if (s->req->cons->state < SI_ST_EST && s->req->flags & BF_MAY_FORWARD)
process_srv_conn(s);
@ -704,14 +733,7 @@ void process_session(struct task *t, int *next)
buffer_shutw_now(s->req);
}
stream_sock_process_data(s->req->cons->fd);
/* Count server-side errors (but not timeouts). */
if (s->req->flags & BF_WRITE_ERROR) {
s->be->failed_resp++;
if (s->srv)
s->srv->failed_resp++;
}
stream_sock_data_update(s->req->cons->fd);
/* When a server-side connection is released, we have to
* count it and check for pending connections on this server.
@ -725,18 +747,18 @@ void process_session(struct task *t, int *next)
}
}
}
rqf_srv = s->req->flags;
rpf_srv = s->rep->flags;
if (unlikely((s->req->cons->state == SI_ST_CLO) &&
(global.mode & MODE_DEBUG) &&
(!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE)))) {
int len;
len = sprintf(trash, "%08x:%s.srvcls[%04x:%04x]\n",
s->uniq_id, s->be->id, (unsigned short)s->cli_fd, (unsigned short)s->req->cons->fd);
s->uniq_id, s->be->id, (unsigned short)s->req->prod->fd, (unsigned short)s->req->cons->fd);
write(1, trash, len);
}
}
rqf_srv = s->req->flags;
rpf_srv = s->rep->flags;
}
if ((rqf_req ^ s->req->flags) & BF_MASK_ANALYSER) {
@ -752,7 +774,8 @@ void process_session(struct task *t, int *next)
if ((rpf_rep ^ s->rep->flags) & BF_MASK_ANALYSER) {
resync = 1;
/* the analysers must block it themselves */
s->rep->flags |= BF_MAY_FORWARD;
if (s->req->cons->state >= SI_ST_EST)
s->rep->flags |= BF_MAY_FORWARD;
if (s->rep->analysers) {
process_response(s);
@ -768,6 +791,12 @@ void process_session(struct task *t, int *next)
if ((s->fe->options & PR_O_CONTSTATS) && (s->flags & SN_BE_ASSIGNED))
session_process_counters(s);
if (s->rep->cons->state == SI_ST_EST)
stream_sock_data_finish(s->rep->cons->fd);
if (s->req->cons->state == SI_ST_EST)
stream_sock_data_finish(s->req->cons->fd);
s->req->flags &= BF_CLEAR_READ & BF_CLEAR_WRITE;
s->rep->flags &= BF_CLEAR_READ & BF_CLEAR_WRITE;
@ -810,7 +839,7 @@ void process_session(struct task *t, int *next)
int len;
len = sprintf(trash, "%08x:%s.closed[%04x:%04x] (term_trace=0x%08x)\n",
s->uniq_id, s->be->id,
(unsigned short)s->cli_fd, (unsigned short)s->req->cons->fd,
(unsigned short)s->req->prod->fd, (unsigned short)s->req->cons->fd,
s->term_trace);
write(1, trash, len);
}
@ -1673,12 +1702,14 @@ int process_request(struct session *t)
struct buffer *req = t->req;
struct buffer *rep = t->rep;
DPRINTF(stderr,"[%u] %s: c=%s set(r,w)=%d,%d exp(r,w)=%u,%u req=%08x rep=%08x analysers=%02x\n",
DPRINTF(stderr,"[%u] %s: session=%p b=%p, exp(r,w)=%u,%u bf=%08x bl=%d analysers=%02x\n",
now_ms, __FUNCTION__,
cli_stnames[t->cli_state],
t->cli_fd >= 0 && fdtab[t->cli_fd].state != FD_STCLOSE ? EV_FD_ISSET(t->cli_fd, DIR_RD) : 0,
t->cli_fd >= 0 && fdtab[t->cli_fd].state != FD_STCLOSE ? EV_FD_ISSET(t->cli_fd, DIR_WR) : 0,
req->rex, rep->wex, req->flags, rep->flags, req->analysers);
t,
req,
req->rex, req->wex,
req->flags,
req->l,
req->analysers);
/* The tcp-inspect analyser is always called alone */
if (req->analysers & AN_REQ_INSPECT) {
@ -2692,10 +2723,14 @@ int process_response(struct session *t)
struct buffer *req = t->req;
struct buffer *rep = t->rep;
DPRINTF(stderr,"[%u] %s: c=%s exp(r,w)=%u,%u req=%08x rep=%08x analysers=%02x\n",
DPRINTF(stderr,"[%u] %s: session=%p b=%p, exp(r,w)=%u,%u bf=%08x bl=%d analysers=%02x\n",
now_ms, __FUNCTION__,
cli_stnames[t->cli_state],
req->rex, rep->wex, req->flags, rep->flags, rep->analysers);
t,
rep,
rep->rex, rep->wex,
rep->flags,
rep->l,
rep->analysers);
if (rep->analysers & AN_RTR_HTTP_HDR) { /* receiving server headers */
/*
@ -2838,7 +2873,7 @@ int process_response(struct session *t)
return 0;
}
/* write error to client, or close from server */
else if (rep->flags & (BF_WRITE_ERROR|BF_SHUTW|BF_SHUTR|BF_READ_NULL)) {
else if (rep->flags & (BF_WRITE_ERROR|BF_SHUTR|BF_READ_NULL)) {
buffer_shutr_now(rep);
buffer_shutw_now(req);
//fd_delete(req->cons->fd);
@ -3146,7 +3181,7 @@ int process_response(struct session *t)
#ifdef CONFIG_HAP_TCPSPLICE
if ((t->fe->options & t->be->options) & PR_O_TCPSPLICE) {
/* TCP splicing supported by both FE and BE */
tcp_splice_splicefd(t->cli_fd, req->cons->fd, 0);
tcp_splice_splicefd(rep->cons->fd, rep->prod->fd, 0);
}
#endif
/* if the user wants to log as soon as possible, without counting
@ -3556,7 +3591,7 @@ int tcp_connection_status(struct session *t)
#ifdef CONFIG_HAP_TCPSPLICE
if ((t->fe->options & t->be->options) & PR_O_TCPSPLICE) {
/* TCP splicing supported by both FE and BE */
tcp_splice_splicefd(t->cli_fd, req->cons->fd, 0);
tcp_splice_splicefd(req->prod->fd, req->cons->fd, 0);
}
#endif
}
@ -5262,7 +5297,7 @@ int stats_check_uri_auth(struct session *t, struct proxy *backend)
/* The request is valid, the user is authenticated. Let's start sending
* data.
*/
EV_FD_CLR(t->cli_fd, DIR_RD);
EV_FD_CLR(t->req->prod->fd, DIR_RD);
buffer_shutr(t->req);
buffer_shutr(t->rep);
buffer_set_rlim(t->req, BUFSIZE); /* no more rewrite needed */
@ -5282,7 +5317,7 @@ void debug_hdr(const char *dir, struct session *t, const char *start, const char
{
int len, max;
len = sprintf(trash, "%08x:%s.%s[%04x:%04x]: ", t->uniq_id, t->be->id,
dir, (unsigned short)t->cli_fd, (unsigned short)t->req->cons->fd);
dir, (unsigned short)t->req->prod->fd, (unsigned short)t->req->cons->fd);
max = end - start;
UBOUND(max, sizeof(trash) - len - 1);
len += strlcpy2(trash + len, start, max + 1);

View File

@ -417,12 +417,11 @@ int stream_sock_write(int fd) {
/*
* Manages a stream_sock connection during its data phase. The file descriptor
* status is checked, and the read and write timeouts are controlled. The
* buffers are examined for special shutdown cases and finally the timeouts,
* file descriptor and buffers' flags are updated accordingly.
* This function only has to be called once after a wakeup event during a data
* phase. It controls the file descriptor's status, as well as read and write
* timeouts.
*/
int stream_sock_process_data(int fd)
int stream_sock_data_check_errors(int fd)
{
struct buffer *ib = fdtab[fd].cb[DIR_RD].b;
struct buffer *ob = fdtab[fd].cb[DIR_WR].b;
@ -436,7 +435,7 @@ int stream_sock_process_data(int fd)
ib->l, ob->l);
/* Read or write error on the file descriptor */
if (fdtab[fd].state == FD_STERROR) {
if (unlikely(fdtab[fd].state == FD_STERROR)) {
//trace_term(t, TT_HTTP_SRV_6);
if (!ob->cons->err_type) {
//ob->cons->err_loc = t->srv;
@ -453,30 +452,114 @@ int stream_sock_process_data(int fd)
return 0;
}
/* Read timeout */
if (unlikely(!(ib->flags & (BF_SHUTR|BF_READ_TIMEOUT)) && tick_is_expired(ib->rex, now_ms))) {
//trace_term(t, TT_HTTP_SRV_12);
ib->flags |= BF_READ_TIMEOUT;
if (!ob->cons->err_type) {
//ob->cons->err_loc = t->srv;
ob->cons->err_type = SI_ET_DATA_TO;
}
buffer_shutr(ib);
if (ob->flags & BF_SHUTW)
goto do_close_and_return;
EV_FD_CLR(fd, DIR_RD);
}
/* Write timeout */
if (unlikely(!(ob->flags & (BF_SHUTW|BF_WRITE_TIMEOUT)) && tick_is_expired(ob->wex, now_ms))) {
//trace_term(t, TT_HTTP_SRV_13);
ob->flags |= BF_WRITE_TIMEOUT;
if (!ob->cons->err_type) {
//ob->cons->err_loc = t->srv;
ob->cons->err_type = SI_ET_DATA_TO;
}
buffer_shutw(ob);
if (ib->flags & BF_SHUTR)
goto do_close_and_return;
EV_FD_CLR(fd, DIR_WR);
shutdown(fd, SHUT_WR);
}
return 0;
}
/*
* Manages a stream_sock connection during its data phase. The buffers are
* examined for various cases of shutdown, then file descriptor and buffers'
* flags are updated accordingly.
*/
int stream_sock_data_update(int fd)
{
struct buffer *ib = fdtab[fd].cb[DIR_RD].b;
struct buffer *ob = fdtab[fd].cb[DIR_WR].b;
DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibl=%d obl=%d\n",
now_ms, __FUNCTION__,
fd, fdtab[fd].owner,
ib, ob,
ib->rex, ob->wex,
ib->flags, ob->flags,
ib->l, ob->l);
/* Check if we need to close the read side */
if (!(ib->flags & BF_SHUTR)) {
/* Last read, forced read-shutdown, or other end closed */
if (ib->flags & (BF_READ_NULL|BF_SHUTR_NOW|BF_SHUTW)) {
//trace_term(t, TT_HTTP_SRV_10);
do_close_read:
buffer_shutr(ib);
if (ob->flags & BF_SHUTW)
goto do_close_and_return;
if (ob->flags & BF_SHUTW) {
fd_delete(fd);
ob->cons->state = SI_ST_CLO;
return 0;
}
EV_FD_CLR(fd, DIR_RD);
}
/* Read timeout */
else if (unlikely(!(ib->flags & BF_READ_TIMEOUT) && tick_is_expired(ib->rex, now_ms))) {
//trace_term(t, TT_HTTP_SRV_12);
ib->flags |= BF_READ_TIMEOUT;
if (!ob->cons->err_type) {
//ob->cons->err_loc = t->srv;
ob->cons->err_type = SI_ET_DATA_TO;
}
/* Check if we need to close the write side */
if (!(ob->flags & BF_SHUTW)) {
/* Forced write-shutdown or other end closed with empty buffer. */
if ((ob->flags & BF_SHUTW_NOW) ||
(ob->flags & (BF_EMPTY|BF_HIJACK|BF_MAY_FORWARD|BF_SHUTR)) == (BF_EMPTY|BF_MAY_FORWARD|BF_SHUTR)) {
//trace_term(t, TT_HTTP_SRV_11);
buffer_shutw(ob);
if (ib->flags & BF_SHUTR) {
fd_delete(fd);
ob->cons->state = SI_ST_CLO;
return 0;
}
goto do_close_read;
EV_FD_CLR(fd, DIR_WR);
shutdown(fd, SHUT_WR);
}
}
return 0; /* other cases change nothing */
}
/*
* Updates a connected stream_sock file descriptor status and timeouts
* according to the buffers' flags. It should only be called once after the
* buffer flags have settled down, and before they are cleared. It doesn't
* harm to call it as often as desired (it just slightly hurts performance).
*/
int stream_sock_data_finish(int fd)
{
struct buffer *ib = fdtab[fd].cb[DIR_RD].b;
struct buffer *ob = fdtab[fd].cb[DIR_WR].b;
DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibl=%d obl=%d\n",
now_ms, __FUNCTION__,
fd, fdtab[fd].owner,
ib, ob,
ib->rex, ob->wex,
ib->flags, ob->flags,
ib->l, ob->l);
/* Check if we need to close the read side */
if (!(ib->flags & BF_SHUTR)) {
/* Read not closed, update FD status and timeout for reads */
else if (ib->flags & (BF_FULL|BF_HIJACK)) {
if (ib->flags & (BF_FULL|BF_HIJACK)) {
/* stop reading */
EV_FD_COND_C(fd, DIR_RD);
ib->rex = TICK_ETERNITY;
@ -494,30 +577,9 @@ int stream_sock_process_data(int fd)
/* Check if we need to close the write side */
if (!(ob->flags & BF_SHUTW)) {
/* Forced write-shutdown or other end closed with empty buffer. */
if ((ob->flags & BF_SHUTW_NOW) ||
(ob->flags & (BF_EMPTY|BF_MAY_FORWARD|BF_SHUTR)) == (BF_EMPTY|BF_MAY_FORWARD|BF_SHUTR)) {
//trace_term(t, TT_HTTP_SRV_11);
do_close_write:
buffer_shutw(ob);
if (ib->flags & BF_SHUTR)
goto do_close_and_return;
EV_FD_CLR(fd, DIR_WR);
shutdown(fd, SHUT_WR);
}
/* Write timeout */
else if (unlikely(!(ob->flags & BF_WRITE_TIMEOUT) && tick_is_expired(ob->wex, now_ms))) {
//trace_term(t, TT_HTTP_SRV_13);
ob->flags |= BF_WRITE_TIMEOUT;
if (!ob->cons->err_type) {
//ob->cons->err_loc = t->srv;
ob->cons->err_type = SI_ET_DATA_TO;
}
goto do_close_write;
}
/* Write not closed, update FD status and timeout for writes */
else if ((ob->flags & (BF_EMPTY|BF_MAY_FORWARD)) != BF_MAY_FORWARD) {
if ((ob->flags & BF_EMPTY) ||
(ob->flags & (BF_HIJACK|BF_MAY_FORWARD)) == 0) {
/* stop writing */
EV_FD_COND_C(fd, DIR_WR);
ob->wex = TICK_ETERNITY;
@ -541,7 +603,7 @@ int stream_sock_process_data(int fd)
}
}
}
return 0; /* other cases change nothing */
return 0;
}