[MAJOR] implemented support for speculative I/O processing

The pollers will now be able to speculatively call the I/O
processing functions and decide whether or not they want to
poll on those FDs. The changes primarily consist in teaching
those functions how to pass the info they got an EAGAIN.
This commit is contained in:
Willy Tarreau 2007-04-15 20:56:27 +02:00
parent 3d32d3a849
commit 8374918cce
3 changed files with 238 additions and 148 deletions

View File

@ -64,6 +64,13 @@
#define MAX_READ_POLL_LOOPS 4
#endif
// same, but for writes. Generally, it's enough to write twice: one time for
// first half of the buffer, and a second time for the last half after a
// wrap-around.
#ifndef MAX_WRITE_POLL_LOOPS
#define MAX_WRITE_POLL_LOOPS 2
#endif
// the number of bytes returned by a read below which we will not try to
// poll the socket again. Generally, return values below the MSS are worthless
// to try again.

View File

@ -22,6 +22,7 @@
#include <common/compat.h>
#include <common/config.h>
#include <common/mini-clist.h>
#include <common/standard.h>
#include <common/time.h>
#include <types/global.h>
@ -47,7 +48,7 @@
* remaining servers on the proxy and transfers queued sessions whenever
* possible to other servers.
*/
void set_server_down(struct server *s)
static void set_server_down(struct server *s)
{
struct pendconn *pc, *pc_bck, *pc_end;
struct session *sess;
@ -102,25 +103,31 @@ void set_server_down(struct server *s)
/*
* This function is used only for server health-checks. It handles
* the connection acknowledgement. If the proxy requires HTTP health-checks,
* it sends the request. In other cases, it returns 1 if the socket is OK,
* or -1 if an error occured.
* it sends the request. In other cases, it returns 1 in s->result if the
* socket is OK, or -1 if an error occured.
* The function itself returns 0 if it needs some polling before being called
* again, otherwise 1.
*/
int event_srv_chk_w(int fd)
static int event_srv_chk_w(int fd)
{
__label__ out_wakeup, out_nowake;
struct task *t = fdtab[fd].owner;
struct server *s = t->context;
int skerr;
socklen_t lskerr = sizeof(skerr);
skerr = 1;
if ((getsockopt(fd, SOL_SOCKET, SO_ERROR, &skerr, &lskerr) == -1)
|| (skerr != 0)) {
if (unlikely(fdtab[fd].state == FD_STERROR ||
(fdtab[fd].ev & FD_POLL_ERR) ||
(getsockopt(fd, SOL_SOCKET, SO_ERROR, &skerr, &lskerr) == -1) ||
(skerr != 0))) {
/* in case of TCP only, this tells us if the connection failed */
s->result = -1;
fdtab[fd].state = FD_STERROR;
EV_FD_CLR(fd, DIR_WR);
goto out_wakeup;
}
else if (s->result != -1) {
if (s->result != -1) {
/* we don't want to mark 'UP' a server on which we detected an error earlier */
if ((s->proxy->options & PR_O_HTTP_CHK) ||
(s->proxy->options & PR_O_SSL3_CHK)) {
@ -142,7 +149,11 @@ int event_srv_chk_w(int fd)
#endif
if (ret == s->proxy->check_len) {
EV_FD_SET(fd, DIR_RD); /* prepare for reading reply */
EV_FD_CLR(fd, DIR_WR); /* nothing more to write */
goto out_nowake;
}
else if (ret == 0 || errno == EAGAIN) {
/* we want some polling to happen first */
fdtab[fd].ev &= ~FD_POLL_WR;
return 0;
}
else {
@ -155,9 +166,12 @@ int event_srv_chk_w(int fd)
s->result = 1;
}
}
out_wakeup:
task_wakeup(&rq, t);
return 0;
out_nowake:
EV_FD_CLR(fd, DIR_WR); /* nothing more to write */
fdtab[fd].ev &= ~FD_POLL_WR;
return 1;
}
@ -167,10 +181,12 @@ int event_srv_chk_w(int fd)
* server replies HTTP 2xx or 3xx (valid responses), or if it returns at least
* 5 bytes in response to SSL HELLO. The principle is that this is enough to
* distinguish between an SSL server and a pure TCP relay. All other cases will
* return -1. The function returns 0.
* return -1. The function returns 0 if it needs to be called again after some
* polling, otherwise non-zero..
*/
int event_srv_chk_r(int fd)
static int event_srv_chk_r(int fd)
{
__label__ out_wakeup;
char reply[64];
int len, result;
struct task *t = fdtab[fd].owner;
@ -179,34 +195,51 @@ int event_srv_chk_r(int fd)
socklen_t lskerr = sizeof(skerr);
result = len = -1;
if (!getsockopt(fd, SOL_SOCKET, SO_ERROR, &skerr, &lskerr) && !skerr) {
#ifndef MSG_NOSIGNAL
len = recv(fd, reply, sizeof(reply), 0);
#else
/* Warning! Linux returns EAGAIN on SO_ERROR if data are still available
* but the connection was closed on the remote end. Fortunately, recv still
* works correctly and we don't need to do the getsockopt() on linux.
*/
len = recv(fd, reply, sizeof(reply), MSG_NOSIGNAL);
#endif
if (((s->proxy->options & PR_O_HTTP_CHK) &&
(len >= sizeof("HTTP/1.0 000")) &&
!memcmp(reply, "HTTP/1.", 7) &&
(reply[9] == '2' || reply[9] == '3')) /* 2xx or 3xx */
|| ((s->proxy->options & PR_O_SSL3_CHK) && (len >= 5) &&
(reply[0] == 0x15 || reply[0] == 0x16))) /* alert or handshake */
result = 1;
if (unlikely(fdtab[fd].state == FD_STERROR ||
(fdtab[fd].ev & FD_POLL_ERR) ||
(getsockopt(fd, SOL_SOCKET, SO_ERROR, &skerr, &lskerr) == -1) ||
(skerr != 0))) {
/* in case of TCP only, this tells us if the connection failed */
s->result = -1;
fdtab[fd].state = FD_STERROR;
goto out_wakeup;
}
#ifndef MSG_NOSIGNAL
len = recv(fd, reply, sizeof(reply), 0);
#else
/* Warning! Linux returns EAGAIN on SO_ERROR if data are still available
* but the connection was closed on the remote end. Fortunately, recv still
* works correctly and we don't need to do the getsockopt() on linux.
*/
len = recv(fd, reply, sizeof(reply), MSG_NOSIGNAL);
#endif
if (unlikely(len < 0 && errno == EAGAIN)) {
/* we want some polling to happen first */
fdtab[fd].ev &= ~FD_POLL_RD;
return 0;
}
if (((s->proxy->options & PR_O_HTTP_CHK) &&
(len >= sizeof("HTTP/1.0 000")) &&
!memcmp(reply, "HTTP/1.", 7) &&
(reply[9] == '2' || reply[9] == '3')) /* 2xx or 3xx */
|| ((s->proxy->options & PR_O_SSL3_CHK) && (len >= 5) &&
(reply[0] == 0x15 || reply[0] == 0x16))) /* alert or handshake */
result = 1;
if (result == -1)
fdtab[fd].state = FD_STERROR;
if (s->result != -1)
s->result = result;
out_wakeup:
EV_FD_CLR(fd, DIR_RD);
task_wakeup(&rq, t);
return 0;
fdtab[fd].ev &= ~FD_POLL_RD;
return 1;
}
/*

View File

@ -21,6 +21,7 @@
#include <common/compat.h>
#include <common/config.h>
#include <common/standard.h>
#include <common/time.h>
#include <types/buffers.h>
@ -35,98 +36,118 @@
/*
* this function is called on a read event from a stream socket.
* It returns 0.
* It returns 0 if we have a high confidence that we will not be
* able to read more data without polling first. Returns non-zero
* otherwise.
*/
int stream_sock_read(int fd) {
__label__ out_wakeup;
struct buffer *b = fdtab[fd].cb[DIR_RD].b;
int ret, max;
int ret, max, retval;
int read_poll = MAX_READ_POLL_LOOPS;
#ifdef DEBUG_FULL
fprintf(stderr,"stream_sock_read : fd=%d, owner=%p\n", fd, fdtab[fd].owner);
#endif
if (fdtab[fd].state != FD_STERROR) {
while (read_poll-- > 0)
{
if (b->l == 0) { /* let's realign the buffer to optimize I/O */
b->r = b->w = b->lr = b->data;
max = b->rlim - b->data;
}
else if (b->r > b->w) {
max = b->rlim - b->r;
}
else {
max = b->w - b->r;
/* FIXME: theorically, if w>0, we shouldn't have rlim < data+size anymore
* since it means that the rewrite protection has been removed. This
* implies that the if statement can be removed.
*/
if (max > b->rlim - b->data)
max = b->rlim - b->data;
}
if (max == 0) { /* not anymore room to store data */
EV_FD_CLR(fd, DIR_RD);
break;
}
retval = 1;
#ifndef MSG_NOSIGNAL
{
int skerr;
socklen_t lskerr = sizeof(skerr);
ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &skerr, &lskerr);
if (ret == -1 || skerr)
ret = -1;
else
ret = recv(fd, b->r, max, 0);
}
#else
ret = recv(fd, b->r, max, MSG_NOSIGNAL);
#endif
if (ret > 0) {
b->r += ret;
b->l += ret;
b->flags |= BF_PARTIAL_READ;
if (b->r == b->data + BUFSIZE) {
b->r = b->data; /* wrap around the buffer */
}
b->total += ret;
/* generally if we read something smaller than the 1 or 2 MSS,
* it means that it's not worth trying to read again.
*/
if (ret < MIN_RET_FOR_READ_LOOP)
break;
if (!read_poll)
break;
/* we hope to read more data or to get a close on next round */
continue;
}
else if (ret == 0) {
b->flags |= BF_READ_NULL;
break;
}
else if (errno == EAGAIN) {/* ignore EAGAIN */
break;
}
else {
b->flags |= BF_READ_ERROR;
fdtab[fd].state = FD_STERROR;
break;
}
} /* while(1) */
}
else {
if (unlikely(fdtab[fd].state == FD_STERROR || (fdtab[fd].ev & FD_POLL_ERR))) {
/* read/write error */
b->flags |= BF_READ_ERROR;
fdtab[fd].state = FD_STERROR;
goto out_wakeup;
}
if (unlikely(fdtab[fd].ev & FD_POLL_HUP)) {
/* connection closed */
b->flags |= BF_READ_NULL;
goto out_wakeup;
}
retval = 0;
while (read_poll-- > 0) {
if (b->l == 0) { /* let's realign the buffer to optimize I/O */
b->r = b->w = b->lr = b->data;
max = b->rlim - b->data;
}
else if (b->r > b->w) {
max = b->rlim - b->r;
}
else {
max = b->w - b->r;
/* FIXME: theorically, if w>0, we shouldn't have rlim < data+size anymore
* since it means that the rewrite protection has been removed. This
* implies that the if statement can be removed.
*/
if (max > b->rlim - b->data)
max = b->rlim - b->data;
}
if (max == 0) { /* not anymore room to store data */
EV_FD_CLR(fd, DIR_RD);
break;
}
#ifndef MSG_NOSIGNAL
{
int skerr;
socklen_t lskerr = sizeof(skerr);
ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &skerr, &lskerr);
if (ret == -1 || skerr)
ret = -1;
else
ret = recv(fd, b->r, max, 0);
}
#else
ret = recv(fd, b->r, max, MSG_NOSIGNAL);
#endif
if (ret > 0) {
b->r += ret;
b->l += ret;
b->flags |= BF_PARTIAL_READ;
retval = 1;
if (b->r == b->data + BUFSIZE) {
b->r = b->data; /* wrap around the buffer */
}
b->total += ret;
/* generally if we read something smaller than the 1 or 2 MSS,
* it means that it's not worth trying to read again. It may
* also happen on headers, but the application then can stop
* reading before we start polling.
*/
if (ret < MIN_RET_FOR_READ_LOOP)
break;
if (!read_poll)
break;
/* we hope to read more data or to get a close on next round */
continue;
}
else if (ret == 0) {
b->flags |= BF_READ_NULL;
retval = 1; // connection closed
break;
}
else if (errno == EAGAIN) {/* ignore EAGAIN */
retval = 0;
break;
}
else {
retval = 1;
b->flags |= BF_READ_ERROR;
fdtab[fd].state = FD_STERROR;
break;
}
} /* while (read_poll) */
if (b->flags & BF_READ_STATUS) {
out_wakeup:
if (b->rto && EV_FD_ISSET(fd, DIR_RD))
tv_delayfrom(&b->rex, &now, b->rto);
else
@ -135,55 +156,71 @@ int stream_sock_read(int fd) {
task_wakeup(&rq, fdtab[fd].owner);
}
return 0;
fdtab[fd].ev &= ~FD_POLL_RD;
return retval;
}
/*
* this function is called on a write event from a stream socket.
* It returns 0.
* It returns 0 if we have a high confidence that we will not be
* able to write more data without polling first. Returns non-zero
* otherwise.
*/
int stream_sock_write(int fd) {
__label__ out_eternity;
struct buffer *b = fdtab[fd].cb[DIR_WR].b;
int ret, max;
int ret, max, retval;
int write_poll = MAX_WRITE_POLL_LOOPS;
#ifdef DEBUG_FULL
fprintf(stderr,"stream_sock_write : fd=%d, owner=%p\n", fd, fdtab[fd].owner);
#endif
if (b->l == 0) { /* let's realign the buffer to optimize I/O */
b->r = b->w = b->lr = b->data;
max = 0;
retval = 1;
if (unlikely(fdtab[fd].state == FD_STERROR || (fdtab[fd].ev & FD_POLL_ERR))) {
/* read/write error */
b->flags |= BF_WRITE_ERROR;
fdtab[fd].state = FD_STERROR;
EV_FD_CLR(fd, DIR_WR);
goto out_eternity;
}
else if (b->r > b->w) {
max = b->r - b->w;
}
else
max = b->data + BUFSIZE - b->w;
if (fdtab[fd].state != FD_STERROR) {
retval = 0;
while (write_poll-- > 0) {
if (b->l == 0) { /* let's realign the buffer to optimize I/O */
b->r = b->w = b->lr = b->data;
max = 0;
}
else if (b->r > b->w) {
max = b->r - b->w;
}
else {
max = b->data + BUFSIZE - b->w;
}
if (max == 0) {
/* may be we have received a connection acknowledgement in TCP mode without data */
if (fdtab[fd].state == FD_STCONN) {
if (!(b->flags & BF_PARTIAL_WRITE)
&& fdtab[fd].state == FD_STCONN) {
int skerr;
socklen_t lskerr = sizeof(skerr);
ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &skerr, &lskerr);
if (ret == -1 || skerr) {
b->flags |= BF_WRITE_ERROR;
fdtab[fd].state = FD_STERROR;
task_wakeup(&rq, fdtab[fd].owner);
tv_eternity(&b->wex);
EV_FD_CLR(fd, DIR_WR);
return 0;
retval = 1;
goto out_eternity;
}
}
b->flags |= BF_WRITE_NULL;
task_wakeup(&rq, fdtab[fd].owner);
fdtab[fd].state = FD_STREADY;
tv_eternity(&b->wex);
EV_FD_CLR(fd, DIR_WR);
return 0;
retval = 1;
goto out_eternity;
}
#ifndef MSG_NOSIGNAL
@ -206,41 +243,54 @@ int stream_sock_write(int fd) {
b->w += ret;
b->flags |= BF_PARTIAL_WRITE;
retval = 1;
if (b->w == b->data + BUFSIZE) {
b->w = b->data; /* wrap around the buffer */
}
if (!write_poll)
break;
/* we hope to be able to write more data */
continue;
}
else if (ret == 0) {
/* nothing written, just pretend we were never called */
// b->flags |= BF_WRITE_NULL;
return 0;
retval = 0;
break;
}
else if (errno == EAGAIN) {/* ignore EAGAIN */
retval = 0;
break;
}
else if (errno == EAGAIN) /* ignore EAGAIN */
return 0;
else {
b->flags |= BF_WRITE_ERROR;
fdtab[fd].state = FD_STERROR;
EV_FD_CLR(fd, DIR_WR);
retval = 1;
goto out_eternity;
}
} /* while (write_poll) */
if (b->flags & BF_WRITE_STATUS) {
if (b->wto) {
tv_delayfrom(&b->wex, &now, b->wto);
/* FIXME: to prevent the client from expiring read timeouts during writes,
* we refresh it. A solution would be to merge read+write timeouts into a
* unique one, although that needs some study particularly on full-duplex
* TCP connections. */
b->rex = b->wex;
}
else {
out_eternity:
tv_eternity(&b->wex);
}
}
else {
b->flags |= BF_WRITE_ERROR;
fdtab[fd].state = FD_STERROR;
}
if (b->wto) {
tv_delayfrom(&b->wex, &now, b->wto);
/* FIXME: to prevent the client from expiring read timeouts during writes,
* we refresh it. A solution would be to merge read+write timeouts into a
* unique one, although that needs some study particularly on full-duplex
* TCP connections. */
b->rex = b->wex;
}
else
tv_eternity(&b->wex);
task_wakeup(&rq, fdtab[fd].owner);
return 0;
fdtab[fd].ev &= ~FD_POLL_WR;
return retval;
}