[MEDIUM] stream interface: add the ->shutw method as well as in and out buffers

Those entries were really needed for cleaner and better code. Using them
has permitted to automatically close a file descriptor during a shut write,
reducing by 20% the number of calls to process_session() and derived
functions.

Process_session() does not need to know the file descriptor anymore, though
it still remains very complicated due to the special case for the connect
mode.
This commit is contained in:
Willy Tarreau 2008-08-30 04:58:38 +02:00
parent e5ed406715
commit 48adac5db9
5 changed files with 73 additions and 46 deletions

View File

@ -36,6 +36,7 @@ int stream_sock_write(int fd);
int stream_sock_data_check_timeouts(int fd);
int stream_sock_data_update(int fd);
int stream_sock_data_finish(int fd);
int stream_sock_shutw(struct stream_interface *si);
/* This either returns the sockname or the original destination address. Code

View File

@ -24,6 +24,7 @@
#include <stdlib.h>
#include <types/buffers.h>
#include <common/config.h>
/* A stream interface must have its own errors independantly of the buffer's,
@ -60,6 +61,8 @@ struct stream_interface {
unsigned int prev_state;/* SI_ST*, copy of previous state */
void *owner; /* generally a (struct task*) */
int fd; /* file descriptor for a stream driver when known */
int (*shutw)(struct stream_interface *); /* shutw function */
struct buffer *ib, *ob; /* input and output buffers */
unsigned int err_type; /* first error detected, one of SI_ET_* */
void *err_loc; /* commonly the server, NULL when SI_ET_NONE */
};

View File

@ -174,6 +174,7 @@ int event_accept(int fd) {
s->si[0].err_type = SI_ET_NONE;
s->si[0].err_loc = NULL;
s->si[0].owner = t;
s->si[0].shutw = stream_sock_shutw;
s->si[0].fd = cfd;
s->cli_fd = cfd;
@ -181,6 +182,7 @@ int event_accept(int fd) {
s->si[1].err_type = SI_ET_NONE;
s->si[1].err_loc = NULL;
s->si[1].owner = t;
s->si[1].shutw = stream_sock_shutw;
s->si[1].fd = -1; /* just to help with debugging */
s->srv = s->prev_srv = s->srv_conn = NULL;
@ -338,6 +340,7 @@ int event_accept(int fd) {
buffer_init(s->req);
s->req->prod = &s->si[0];
s->req->cons = &s->si[1];
s->si[0].ib = s->si[1].ob = s->req;
if (p->mode == PR_MODE_HTTP) /* reserve some space for header rewriting */
s->req->rlim -= MAXREWRITE;
@ -361,6 +364,7 @@ int event_accept(int fd) {
buffer_init(s->rep);
s->rep->prod = &s->si[1];
s->rep->cons = &s->si[0];
s->si[0].ob = s->si[1].ib = s->rep;
s->rep->rto = s->be->timeout.server;
s->rep->wto = s->fe->timeout.client;

View File

@ -669,6 +669,21 @@ void process_session(struct task *t, int *next)
stream_sock_data_check_timeouts(s->req->cons->fd);
}
/* Check if we need to close the write side. This can only happen
* when either SHUTR or EMPTY appears, because WRITE_ENA cannot appear
* from low level, and neither HIJACK nor SHUTW can disappear from low
* level.
*/
if (unlikely((s->req->flags & (BF_SHUTW|BF_EMPTY|BF_HIJACK|BF_WRITE_ENA|BF_SHUTR)) == (BF_EMPTY|BF_WRITE_ENA|BF_SHUTR))) {
buffer_shutw(s->req);
s->req->cons->shutw(s->req->cons);
}
if (unlikely((s->rep->flags & (BF_SHUTW|BF_EMPTY|BF_HIJACK|BF_WRITE_ENA|BF_SHUTR)) == (BF_EMPTY|BF_WRITE_ENA|BF_SHUTR))) {
buffer_shutw(s->rep);
s->rep->cons->shutw(s->rep->cons);
}
/* When a server-side connection is released, we have to
* count it and check for pending connections on this server.
*/
@ -698,6 +713,7 @@ void process_session(struct task *t, int *next)
}
}
/* This is needed when debugging is enabled, to indicate client-side close */
if (unlikely(s->rep->cons->state == SI_ST_CLO &&
s->rep->cons->prev_state == SI_ST_EST)) {
if (unlikely((s->rep->cons->state == SI_ST_CLO) &&
@ -710,39 +726,6 @@ void process_session(struct task *t, int *next)
}
}
/* Check if we need to close the write side. This can only happen
* when either SHUTR or EMPTY appears, because WRITE_ENA cannot appear
* from low level, and neither HIJACK nor SHUTW can disappear from low
* level. Later, this should move to stream_sock_{read,write}.
*/
if ((s->req->flags & (BF_SHUTW|BF_EMPTY|BF_HIJACK|BF_WRITE_ENA|BF_SHUTR)) == (BF_EMPTY|BF_WRITE_ENA|BF_SHUTR)) {
buffer_shutw(s->req);
if (s->rep->flags & BF_SHUTR) {
fd_delete(s->req->cons->fd);
s->req->cons->state = SI_ST_CLO;
}
else {
EV_FD_CLR(s->req->cons->fd, DIR_WR);
shutdown(s->req->cons->fd, SHUT_WR);
}
}
/* Check if we need to close the write side */
if ((s->rep->flags & (BF_SHUTW|BF_EMPTY|BF_HIJACK|BF_WRITE_ENA|BF_SHUTR)) == (BF_EMPTY|BF_WRITE_ENA|BF_SHUTR)) {
buffer_shutw(s->rep);
if (s->req->flags & BF_SHUTR) {
fd_delete(s->rep->cons->fd);
s->rep->cons->state = SI_ST_CLO;
}
else {
EV_FD_CLR(s->rep->cons->fd, DIR_WR);
shutdown(s->rep->cons->fd, SHUT_WR);
}
}
/* Dirty trick: force one first pass everywhere */
rqf_cli = rqf_srv = rqf_req = ~s->req->flags;
rpf_cli = rpf_srv = rpf_rep = ~s->rep->flags;

View File

@ -41,8 +41,8 @@
*/
int stream_sock_read(int fd) {
__label__ out_wakeup, out_shutdown_r, out_error;
struct buffer *b = fdtab[fd].cb[DIR_RD].b;
struct stream_interface *si = fdtab[fd].owner;
struct buffer *b = si->ib;
int ret, max, retval, cur_read;
int read_poll = MAX_READ_POLL_LOOPS;
@ -251,8 +251,9 @@ int stream_sock_read(int fd) {
fdtab[fd].ev &= ~FD_POLL_HUP;
b->flags |= BF_READ_NULL;
buffer_shutr(b);
/* Maybe we have to completely close the socket */
if (fdtab[fd].cb[DIR_WR].b->flags & BF_SHUTW)
/* Maybe we have to completely close the local socket */
if (si->ob->flags & BF_SHUTW)
goto do_close_and_return;
EV_FD_CLR(fd, DIR_RD);
goto out_wakeup;
@ -275,10 +276,10 @@ int stream_sock_read(int fd) {
if (!si->err_type)
si->err_type = SI_ET_DATA_ERR;
buffer_shutr(fdtab[fd].cb[DIR_RD].b);
fdtab[fd].cb[DIR_RD].b->flags |= BF_READ_ERROR;
buffer_shutw(fdtab[fd].cb[DIR_WR].b);
fdtab[fd].cb[DIR_WR].b->flags |= BF_WRITE_ERROR;
buffer_shutr(b);
b->flags |= BF_READ_ERROR;
buffer_shutw(si->ob);
si->ob->flags |= BF_WRITE_ERROR;
do_close_and_return:
fd_delete(fd);
@ -296,8 +297,8 @@ int stream_sock_read(int fd) {
*/
int stream_sock_write(int fd) {
__label__ out_wakeup, out_error;
struct buffer *b = fdtab[fd].cb[DIR_WR].b;
struct stream_interface *si = fdtab[fd].owner;
struct buffer *b = si->ob;
int ret, max, retval;
int write_poll = MAX_WRITE_POLL_LOOPS;
@ -390,6 +391,17 @@ int stream_sock_write(int fd) {
if (!b->l) {
b->flags |= BF_EMPTY;
/* Maybe we just wrote the last chunk and need to close ? */
if ((b->flags & (BF_SHUTW|BF_EMPTY|BF_HIJACK|BF_WRITE_ENA|BF_SHUTR)) == (BF_EMPTY|BF_WRITE_ENA|BF_SHUTR)) {
if (si->state == SI_ST_EST) {
buffer_shutw(b);
if (si->ib->flags & BF_SHUTR)
goto do_close_and_return;
shutdown(fd, SHUT_WR);
}
}
EV_FD_CLR(fd, DIR_WR);
b->wex = TICK_ETERNITY;
goto out_wakeup;
@ -461,17 +473,41 @@ int stream_sock_write(int fd) {
if (!si->err_type)
si->err_type = SI_ET_DATA_ERR;
buffer_shutr(fdtab[fd].cb[DIR_RD].b);
fdtab[fd].cb[DIR_RD].b->flags |= BF_READ_ERROR;
buffer_shutw(fdtab[fd].cb[DIR_WR].b);
fdtab[fd].cb[DIR_WR].b->flags |= BF_WRITE_ERROR;
buffer_shutw(b);
b->flags |= BF_WRITE_ERROR;
buffer_shutr(si->ib);
si->ib->flags |= BF_READ_ERROR;
do_close_and_return:
fd_delete(fd);
si->state = SI_ST_CLO;
task_wakeup(si->owner, TASK_WOKEN_IO);
return 1;
}
/*
* This function performs a shutdown-write on a stream interface in a connected
* state (it does nothing for other states). It either shuts the write side or
* closes the file descriptor and marks itself as closed. No buffer flags are
* changed, it's up to the caller to adjust them. The sole purpose of this
* function is to be called from the other stream interface to notify of a
* close_read, or by itself upon a full write leading to an empty buffer.
* It normally returns zero, unless it has completely closed the socket, in
* which case it returns 1.
*/
int stream_sock_shutw(struct stream_interface *si)
{
if (si->state != SI_ST_EST)
return 0;
if (si->ib->flags & BF_SHUTR) {
fd_delete(si->fd);
si->state = SI_ST_CLO;
return 1;
}
EV_FD_CLR(si->fd, DIR_WR);
shutdown(si->fd, SHUT_WR);
return 0;
}
/*
* This function only has to be called once after a wakeup event during a data