diff --git a/include/common/defaults.h b/include/common/defaults.h index e6552de805..c99aafe4fe 100644 --- a/include/common/defaults.h +++ b/include/common/defaults.h @@ -64,6 +64,13 @@ #define MAX_READ_POLL_LOOPS 4 #endif +// same, but for writes. Generally, it's enough to write twice: one time for +// first half of the buffer, and a second time for the last half after a +// wrap-around. +#ifndef MAX_WRITE_POLL_LOOPS +#define MAX_WRITE_POLL_LOOPS 2 +#endif + // the number of bytes returned by a read below which we will not try to // poll the socket again. Generally, return values below the MSS are worthless // to try again. diff --git a/src/checks.c b/src/checks.c index 2ae01db561..309d0c4a76 100644 --- a/src/checks.c +++ b/src/checks.c @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -47,7 +48,7 @@ * remaining servers on the proxy and transfers queued sessions whenever * possible to other servers. */ -void set_server_down(struct server *s) +static void set_server_down(struct server *s) { struct pendconn *pc, *pc_bck, *pc_end; struct session *sess; @@ -102,25 +103,31 @@ void set_server_down(struct server *s) /* * This function is used only for server health-checks. It handles * the connection acknowledgement. If the proxy requires HTTP health-checks, - * it sends the request. In other cases, it returns 1 if the socket is OK, - * or -1 if an error occured. + * it sends the request. In other cases, it returns 1 in s->result if the + * socket is OK, or -1 if an error occured. + * The function itself returns 0 if it needs some polling before being called + * again, otherwise 1. */ -int event_srv_chk_w(int fd) +static int event_srv_chk_w(int fd) { + __label__ out_wakeup, out_nowake; struct task *t = fdtab[fd].owner; struct server *s = t->context; int skerr; socklen_t lskerr = sizeof(skerr); skerr = 1; - if ((getsockopt(fd, SOL_SOCKET, SO_ERROR, &skerr, &lskerr) == -1) - || (skerr != 0)) { + if (unlikely(fdtab[fd].state == FD_STERROR || + (fdtab[fd].ev & FD_POLL_ERR) || + (getsockopt(fd, SOL_SOCKET, SO_ERROR, &skerr, &lskerr) == -1) || + (skerr != 0))) { /* in case of TCP only, this tells us if the connection failed */ s->result = -1; fdtab[fd].state = FD_STERROR; - EV_FD_CLR(fd, DIR_WR); + goto out_wakeup; } - else if (s->result != -1) { + + if (s->result != -1) { /* we don't want to mark 'UP' a server on which we detected an error earlier */ if ((s->proxy->options & PR_O_HTTP_CHK) || (s->proxy->options & PR_O_SSL3_CHK)) { @@ -142,7 +149,11 @@ int event_srv_chk_w(int fd) #endif if (ret == s->proxy->check_len) { EV_FD_SET(fd, DIR_RD); /* prepare for reading reply */ - EV_FD_CLR(fd, DIR_WR); /* nothing more to write */ + goto out_nowake; + } + else if (ret == 0 || errno == EAGAIN) { + /* we want some polling to happen first */ + fdtab[fd].ev &= ~FD_POLL_WR; return 0; } else { @@ -155,9 +166,12 @@ int event_srv_chk_w(int fd) s->result = 1; } } - + out_wakeup: task_wakeup(&rq, t); - return 0; + out_nowake: + EV_FD_CLR(fd, DIR_WR); /* nothing more to write */ + fdtab[fd].ev &= ~FD_POLL_WR; + return 1; } @@ -167,10 +181,12 @@ int event_srv_chk_w(int fd) * server replies HTTP 2xx or 3xx (valid responses), or if it returns at least * 5 bytes in response to SSL HELLO. The principle is that this is enough to * distinguish between an SSL server and a pure TCP relay. All other cases will - * return -1. The function returns 0. + * return -1. The function returns 0 if it needs to be called again after some + * polling, otherwise non-zero.. */ -int event_srv_chk_r(int fd) +static int event_srv_chk_r(int fd) { + __label__ out_wakeup; char reply[64]; int len, result; struct task *t = fdtab[fd].owner; @@ -179,34 +195,51 @@ int event_srv_chk_r(int fd) socklen_t lskerr = sizeof(skerr); result = len = -1; - if (!getsockopt(fd, SOL_SOCKET, SO_ERROR, &skerr, &lskerr) && !skerr) { -#ifndef MSG_NOSIGNAL - len = recv(fd, reply, sizeof(reply), 0); -#else - /* Warning! Linux returns EAGAIN on SO_ERROR if data are still available - * but the connection was closed on the remote end. Fortunately, recv still - * works correctly and we don't need to do the getsockopt() on linux. - */ - len = recv(fd, reply, sizeof(reply), MSG_NOSIGNAL); -#endif - if (((s->proxy->options & PR_O_HTTP_CHK) && - (len >= sizeof("HTTP/1.0 000")) && - !memcmp(reply, "HTTP/1.", 7) && - (reply[9] == '2' || reply[9] == '3')) /* 2xx or 3xx */ - || ((s->proxy->options & PR_O_SSL3_CHK) && (len >= 5) && - (reply[0] == 0x15 || reply[0] == 0x16))) /* alert or handshake */ - result = 1; + + if (unlikely(fdtab[fd].state == FD_STERROR || + (fdtab[fd].ev & FD_POLL_ERR) || + (getsockopt(fd, SOL_SOCKET, SO_ERROR, &skerr, &lskerr) == -1) || + (skerr != 0))) { + /* in case of TCP only, this tells us if the connection failed */ + s->result = -1; + fdtab[fd].state = FD_STERROR; + goto out_wakeup; } +#ifndef MSG_NOSIGNAL + len = recv(fd, reply, sizeof(reply), 0); +#else + /* Warning! Linux returns EAGAIN on SO_ERROR if data are still available + * but the connection was closed on the remote end. Fortunately, recv still + * works correctly and we don't need to do the getsockopt() on linux. + */ + len = recv(fd, reply, sizeof(reply), MSG_NOSIGNAL); +#endif + if (unlikely(len < 0 && errno == EAGAIN)) { + /* we want some polling to happen first */ + fdtab[fd].ev &= ~FD_POLL_RD; + return 0; + } + + if (((s->proxy->options & PR_O_HTTP_CHK) && + (len >= sizeof("HTTP/1.0 000")) && + !memcmp(reply, "HTTP/1.", 7) && + (reply[9] == '2' || reply[9] == '3')) /* 2xx or 3xx */ + || ((s->proxy->options & PR_O_SSL3_CHK) && (len >= 5) && + (reply[0] == 0x15 || reply[0] == 0x16))) /* alert or handshake */ + result = 1; + if (result == -1) fdtab[fd].state = FD_STERROR; if (s->result != -1) s->result = result; + out_wakeup: EV_FD_CLR(fd, DIR_RD); task_wakeup(&rq, t); - return 0; + fdtab[fd].ev &= ~FD_POLL_RD; + return 1; } /* diff --git a/src/stream_sock.c b/src/stream_sock.c index a150a08de3..7d2aa3035a 100644 --- a/src/stream_sock.c +++ b/src/stream_sock.c @@ -21,6 +21,7 @@ #include #include +#include #include #include @@ -35,98 +36,118 @@ /* * this function is called on a read event from a stream socket. - * It returns 0. + * It returns 0 if we have a high confidence that we will not be + * able to read more data without polling first. Returns non-zero + * otherwise. */ int stream_sock_read(int fd) { + __label__ out_wakeup; struct buffer *b = fdtab[fd].cb[DIR_RD].b; - int ret, max; + int ret, max, retval; int read_poll = MAX_READ_POLL_LOOPS; #ifdef DEBUG_FULL fprintf(stderr,"stream_sock_read : fd=%d, owner=%p\n", fd, fdtab[fd].owner); #endif - if (fdtab[fd].state != FD_STERROR) { - while (read_poll-- > 0) - { - if (b->l == 0) { /* let's realign the buffer to optimize I/O */ - b->r = b->w = b->lr = b->data; - max = b->rlim - b->data; - } - else if (b->r > b->w) { - max = b->rlim - b->r; - } - else { - max = b->w - b->r; - /* FIXME: theorically, if w>0, we shouldn't have rlim < data+size anymore - * since it means that the rewrite protection has been removed. This - * implies that the if statement can be removed. - */ - if (max > b->rlim - b->data) - max = b->rlim - b->data; - } - - if (max == 0) { /* not anymore room to store data */ - EV_FD_CLR(fd, DIR_RD); - break; - } + retval = 1; -#ifndef MSG_NOSIGNAL - { - int skerr; - socklen_t lskerr = sizeof(skerr); - - ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &skerr, &lskerr); - if (ret == -1 || skerr) - ret = -1; - else - ret = recv(fd, b->r, max, 0); - } -#else - ret = recv(fd, b->r, max, MSG_NOSIGNAL); -#endif - if (ret > 0) { - b->r += ret; - b->l += ret; - b->flags |= BF_PARTIAL_READ; - - if (b->r == b->data + BUFSIZE) { - b->r = b->data; /* wrap around the buffer */ - } - - b->total += ret; - - /* generally if we read something smaller than the 1 or 2 MSS, - * it means that it's not worth trying to read again. - */ - if (ret < MIN_RET_FOR_READ_LOOP) - break; - if (!read_poll) - break; - - /* we hope to read more data or to get a close on next round */ - continue; - } - else if (ret == 0) { - b->flags |= BF_READ_NULL; - break; - } - else if (errno == EAGAIN) {/* ignore EAGAIN */ - break; - } - else { - b->flags |= BF_READ_ERROR; - fdtab[fd].state = FD_STERROR; - break; - } - } /* while(1) */ - } - else { + if (unlikely(fdtab[fd].state == FD_STERROR || (fdtab[fd].ev & FD_POLL_ERR))) { + /* read/write error */ b->flags |= BF_READ_ERROR; fdtab[fd].state = FD_STERROR; + goto out_wakeup; } + if (unlikely(fdtab[fd].ev & FD_POLL_HUP)) { + /* connection closed */ + b->flags |= BF_READ_NULL; + goto out_wakeup; + } + + retval = 0; + while (read_poll-- > 0) { + if (b->l == 0) { /* let's realign the buffer to optimize I/O */ + b->r = b->w = b->lr = b->data; + max = b->rlim - b->data; + } + else if (b->r > b->w) { + max = b->rlim - b->r; + } + else { + max = b->w - b->r; + /* FIXME: theorically, if w>0, we shouldn't have rlim < data+size anymore + * since it means that the rewrite protection has been removed. This + * implies that the if statement can be removed. + */ + if (max > b->rlim - b->data) + max = b->rlim - b->data; + } + + if (max == 0) { /* not anymore room to store data */ + EV_FD_CLR(fd, DIR_RD); + break; + } + +#ifndef MSG_NOSIGNAL + { + int skerr; + socklen_t lskerr = sizeof(skerr); + + ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &skerr, &lskerr); + if (ret == -1 || skerr) + ret = -1; + else + ret = recv(fd, b->r, max, 0); + } +#else + ret = recv(fd, b->r, max, MSG_NOSIGNAL); +#endif + if (ret > 0) { + b->r += ret; + b->l += ret; + b->flags |= BF_PARTIAL_READ; + retval = 1; + + if (b->r == b->data + BUFSIZE) { + b->r = b->data; /* wrap around the buffer */ + } + + b->total += ret; + + /* generally if we read something smaller than the 1 or 2 MSS, + * it means that it's not worth trying to read again. It may + * also happen on headers, but the application then can stop + * reading before we start polling. + */ + if (ret < MIN_RET_FOR_READ_LOOP) + break; + + if (!read_poll) + break; + + /* we hope to read more data or to get a close on next round */ + continue; + } + else if (ret == 0) { + b->flags |= BF_READ_NULL; + retval = 1; // connection closed + break; + } + else if (errno == EAGAIN) {/* ignore EAGAIN */ + retval = 0; + break; + } + else { + retval = 1; + b->flags |= BF_READ_ERROR; + fdtab[fd].state = FD_STERROR; + break; + } + } /* while (read_poll) */ + if (b->flags & BF_READ_STATUS) { + out_wakeup: if (b->rto && EV_FD_ISSET(fd, DIR_RD)) tv_delayfrom(&b->rex, &now, b->rto); else @@ -135,55 +156,71 @@ int stream_sock_read(int fd) { task_wakeup(&rq, fdtab[fd].owner); } - return 0; + fdtab[fd].ev &= ~FD_POLL_RD; + return retval; } /* * this function is called on a write event from a stream socket. - * It returns 0. + * It returns 0 if we have a high confidence that we will not be + * able to write more data without polling first. Returns non-zero + * otherwise. */ int stream_sock_write(int fd) { + __label__ out_eternity; struct buffer *b = fdtab[fd].cb[DIR_WR].b; - int ret, max; + int ret, max, retval; + int write_poll = MAX_WRITE_POLL_LOOPS; #ifdef DEBUG_FULL fprintf(stderr,"stream_sock_write : fd=%d, owner=%p\n", fd, fdtab[fd].owner); #endif - if (b->l == 0) { /* let's realign the buffer to optimize I/O */ - b->r = b->w = b->lr = b->data; - max = 0; + retval = 1; + + if (unlikely(fdtab[fd].state == FD_STERROR || (fdtab[fd].ev & FD_POLL_ERR))) { + /* read/write error */ + b->flags |= BF_WRITE_ERROR; + fdtab[fd].state = FD_STERROR; + EV_FD_CLR(fd, DIR_WR); + goto out_eternity; } - else if (b->r > b->w) { - max = b->r - b->w; - } - else - max = b->data + BUFSIZE - b->w; - - if (fdtab[fd].state != FD_STERROR) { + + retval = 0; + while (write_poll-- > 0) { + if (b->l == 0) { /* let's realign the buffer to optimize I/O */ + b->r = b->w = b->lr = b->data; + max = 0; + } + else if (b->r > b->w) { + max = b->r - b->w; + } + else { + max = b->data + BUFSIZE - b->w; + } + if (max == 0) { /* may be we have received a connection acknowledgement in TCP mode without data */ - if (fdtab[fd].state == FD_STCONN) { + if (!(b->flags & BF_PARTIAL_WRITE) + && fdtab[fd].state == FD_STCONN) { int skerr; socklen_t lskerr = sizeof(skerr); ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &skerr, &lskerr); if (ret == -1 || skerr) { b->flags |= BF_WRITE_ERROR; fdtab[fd].state = FD_STERROR; - task_wakeup(&rq, fdtab[fd].owner); - tv_eternity(&b->wex); EV_FD_CLR(fd, DIR_WR); - return 0; + retval = 1; + goto out_eternity; } } b->flags |= BF_WRITE_NULL; - task_wakeup(&rq, fdtab[fd].owner); fdtab[fd].state = FD_STREADY; - tv_eternity(&b->wex); EV_FD_CLR(fd, DIR_WR); - return 0; + retval = 1; + goto out_eternity; } #ifndef MSG_NOSIGNAL @@ -206,41 +243,54 @@ int stream_sock_write(int fd) { b->w += ret; b->flags |= BF_PARTIAL_WRITE; + retval = 1; if (b->w == b->data + BUFSIZE) { b->w = b->data; /* wrap around the buffer */ } + + if (!write_poll) + break; + + /* we hope to be able to write more data */ + continue; } else if (ret == 0) { /* nothing written, just pretend we were never called */ - // b->flags |= BF_WRITE_NULL; - return 0; + retval = 0; + break; + } + else if (errno == EAGAIN) {/* ignore EAGAIN */ + retval = 0; + break; } - else if (errno == EAGAIN) /* ignore EAGAIN */ - return 0; else { b->flags |= BF_WRITE_ERROR; fdtab[fd].state = FD_STERROR; + EV_FD_CLR(fd, DIR_WR); + retval = 1; + goto out_eternity; + } + } /* while (write_poll) */ + + if (b->flags & BF_WRITE_STATUS) { + if (b->wto) { + tv_delayfrom(&b->wex, &now, b->wto); + /* FIXME: to prevent the client from expiring read timeouts during writes, + * we refresh it. A solution would be to merge read+write timeouts into a + * unique one, although that needs some study particularly on full-duplex + * TCP connections. */ + b->rex = b->wex; + } + else { + out_eternity: + tv_eternity(&b->wex); } } - else { - b->flags |= BF_WRITE_ERROR; - fdtab[fd].state = FD_STERROR; - } - - if (b->wto) { - tv_delayfrom(&b->wex, &now, b->wto); - /* FIXME: to prevent the client from expiring read timeouts during writes, - * we refresh it. A solution would be to merge read+write timeouts into a - * unique one, although that needs some study particularly on full-duplex - * TCP connections. */ - b->rex = b->wex; - } - else - tv_eternity(&b->wex); task_wakeup(&rq, fdtab[fd].owner); - return 0; + fdtab[fd].ev &= ~FD_POLL_WR; + return retval; }