[MEDIUM] split stream_sock_write() into callback and core functions

stream_sock_write() has been split in two parts :
  - the poll callback, intented to be called when an I/O event has
    been detected
  - the write() core function, which ought to be usable from various
    other places, possibly not meant to wake the task up.

The code has also been slightly cleaned up in the process. It's more
readable now.
This commit is contained in:
Willy Tarreau 2009-01-18 15:30:37 +01:00
parent ac128fef73
commit 0c2fc1f39d

View File

@ -289,17 +289,105 @@ int stream_sock_read(int fd) {
/*
* this function is called on a write event from a stream socket.
* It returns 0 if we have a high confidence that we will not be
* able to write more data without polling first. Returns non-zero
* otherwise.
* This function is called to send buffer data to a stream socket.
* It returns -1 in case of unrecoverable error, 0 if the caller needs to poll
* before calling it again, otherwise 1.
*/
int stream_sock_write(int fd) {
__label__ out_wakeup, out_error;
int stream_sock_write_loop(struct stream_interface *si, struct buffer *b)
{
int write_poll = MAX_WRITE_POLL_LOOPS;
int retval = 1;
int ret, max;
if (!b->l || !b->send_max)
return retval;
/* when we're in this loop, we already know that there is no spliced
* data left, and that there are sendable buffered data.
*/
while (1) {
if (b->r > b->w)
max = b->r - b->w;
else
max = b->data + BUFSIZE - b->w;
/* limit the amount of outgoing data if required */
if (max > b->send_max)
max = b->send_max;
#ifndef MSG_NOSIGNAL
{
int skerr;
socklen_t lskerr = sizeof(skerr);
ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &skerr, &lskerr);
if (ret == -1 || skerr)
ret = -1;
else
ret = send(fd, b->w, max, MSG_DONTWAIT);
}
#else
ret = send(si->fd, b->w, max, MSG_DONTWAIT | MSG_NOSIGNAL);
#endif
if (ret > 0) {
if (fdtab[si->fd].state == FD_STCONN)
fdtab[si->fd].state = FD_STREADY;
b->flags |= BF_WRITE_PARTIAL;
b->w += ret;
if (b->w == b->data + BUFSIZE)
b->w = b->data; /* wrap around the buffer */
b->l -= ret;
if (likely(b->l < b->max_len))
b->flags &= ~BF_FULL;
if (likely(!b->l)) {
/* optimize data alignment in the buffer */
b->r = b->w = b->lr = b->data;
if (likely(!b->splice_len))
b->flags |= BF_EMPTY;
}
b->send_max -= ret;
if (!b->send_max || !b->l)
break;
/* if the system buffer is full, don't insist */
if (ret < max)
break;
if (--write_poll <= 0)
break;
}
else if (ret == 0 || errno == EAGAIN) {
/* nothing written, we need to poll for write first */
retval = 0;
break;
}
else {
/* bad, we got an error */
retval = -1;
break;
}
} /* while (1) */
return retval;
}
/*
* This function is called on a write event from a stream socket.
* It returns 0 if the caller needs to poll before calling it again, otherwise
* non-zero.
*/
int stream_sock_write(int fd)
{
struct stream_interface *si = fdtab[fd].owner;
struct buffer *b = si->ob;
int ret, max, retval;
int write_poll = MAX_WRITE_POLL_LOOPS;
int retval = 1;
#ifdef DEBUG_FULL
fprintf(stderr,"stream_sock_write : fd=%d, owner=%p\n", fd, fdtab[fd].owner);
@ -309,23 +397,13 @@ int stream_sock_write(int fd) {
if (fdtab[fd].state == FD_STERROR || (fdtab[fd].ev & FD_POLL_ERR))
goto out_error;
while (1) {
if (b->l == 0) { /* let's realign the buffer to optimize I/O */
b->r = b->w = b->lr = b->data;
max = 0;
}
else if (b->r > b->w) {
max = b->r - b->w;
if (likely(!(b->flags & BF_EMPTY))) {
/* OK there are data waiting to be sent */
retval = stream_sock_write_loop(si, b);
if (retval < 0)
goto out_error;
}
else {
max = b->data + BUFSIZE - b->w;
}
/* limit the amount of outgoing data if required */
if (max > b->send_max)
max = b->send_max;
if (max == 0) {
/* may be we have received a connection acknowledgement in TCP mode without data */
if (likely(fdtab[fd].state == FD_STCONN)) {
/* We have no data to send to check the connection, and
@ -361,88 +439,46 @@ int stream_sock_write(int fd) {
* so we cannot write anything from the buffer. Let's disable
* the write event and pretend we never came there.
*/
goto out_stop_write;
}
#ifndef MSG_NOSIGNAL
{
int skerr;
socklen_t lskerr = sizeof(skerr);
ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &skerr, &lskerr);
if (ret == -1 || skerr)
ret = -1;
else
ret = send(fd, b->w, max, MSG_DONTWAIT);
}
#else
ret = send(fd, b->w, max, MSG_DONTWAIT | MSG_NOSIGNAL);
#endif
if (ret > 0) {
b->l -= ret;
b->w += ret;
b->send_max -= ret;
if (fdtab[fd].state == FD_STCONN)
fdtab[fd].state = FD_STREADY;
b->flags |= BF_WRITE_PARTIAL;
if (b->l < b->max_len)
b->flags &= ~BF_FULL;
if (b->w == b->data + BUFSIZE) {
b->w = b->data; /* wrap around the buffer */
}
if (!b->l && !b->splice_len) {
b->flags |= BF_EMPTY;
goto out_stop_write;
}
/* if the system buffer is full, don't insist */
if (ret < max)
break;
if (--write_poll <= 0)
break;
}
else if (ret == 0 || errno == EAGAIN) {
/* nothing written, just pretend we were never called
* and wait for the socket to be ready. But we may have
* done some work justifying to notify the task.
if ((b->flags & BF_EMPTY) || !b->send_max) {
/* the connection is established but we can't write. Either the
* buffer is empty, or we just refrain from sending because the
* send_max limit was reached. Maybe we just wrote the last
* chunk and need to close.
*/
retval = 0;
break;
if (((b->flags & (BF_SHUTW|BF_EMPTY|BF_HIJACK|BF_WRITE_ENA|BF_SHUTR)) ==
(BF_EMPTY|BF_WRITE_ENA|BF_SHUTR)) &&
(si->state == SI_ST_EST)) {
stream_sock_shutw(si);
goto out_wakeup;
}
else {
goto out_error;
}
} /* while (1) */
/*
* The only way to get out of this loop is to have stopped writing
* without any error, either by limiting the number of loops, or
* because of an EAGAIN. We only rearm the timer if we have at least
* written something.
*/
if (b->flags & BF_EMPTY)
si->flags |= SI_FL_WAIT_DATA;
if ((b->flags & (BF_WRITE_PARTIAL|BF_EMPTY|BF_SHUTW)) == BF_WRITE_PARTIAL) {
b->wex = tick_add_ifset(now_ms, b->wto);
if (tick_isset(b->wex) & tick_isset(si->ib->rex)) {
/* FIXME: to prevent the client from expiring read timeouts during writes,
* we refresh it. A solution would be to merge read+write timeouts into a
* unique one, although that needs some study particularly on full-duplex
* TCP connections. */
si->ib->rex = b->wex;
}
EV_FD_CLR(fd, DIR_WR);
b->wex = TICK_ETERNITY;
}
out_may_wakeup:
if (!(b->flags & BF_WRITE_ACTIVITY))
goto out_skip_wakeup;
if (b->flags & BF_WRITE_ACTIVITY) {
/* update timeout if we have written something */
if (b->send_max &&
(b->flags & (BF_EMPTY|BF_SHUTW|BF_WRITE_PARTIAL)) == BF_WRITE_PARTIAL)
b->wex = tick_add_ifset(now_ms, b->wto);
out_wakeup:
if (tick_isset(si->ib->rex)) {
/* Note: to prevent the client from expiring read timeouts
* during writes, we refresh it. A better solution would be
* to merge read+write timeouts into a unique one, although
* that needs some study particularly on full-duplex TCP
* connections.
*/
si->ib->rex = tick_add_ifset(now_ms, si->ib->rto);
}
/* the producer might be waiting for more room to store data */
if (likely((b->flags & (BF_WRITE_PARTIAL|BF_FULL)) == BF_WRITE_PARTIAL &&
(b->prod->flags & SI_FL_WAIT_ROOM)))
@ -456,27 +492,11 @@ int stream_sock_write(int fd) {
si->state != SI_ST_EST ||
b->prod->state != SI_ST_EST))
task_wakeup(si->owner, TASK_WOKEN_IO);
}
out_skip_wakeup:
fdtab[fd].ev &= ~FD_POLL_OUT;
return retval;
out_stop_write:
/* We can't write anymore. Either the buffer is empty, or we just
* refrain from sending because send_max is reached. Maybe we just
* wrote the last chunk and need to close.
*/
if ((b->flags & (BF_SHUTW|BF_EMPTY|BF_HIJACK|BF_WRITE_ENA|BF_SHUTR)) == (BF_EMPTY|BF_WRITE_ENA|BF_SHUTR) &&
(si->state == SI_ST_EST)) {
stream_sock_shutw(si);
} else {
if (!b->l && !b->splice_len)
si->flags |= SI_FL_WAIT_DATA;
EV_FD_CLR(fd, DIR_WR);
}
b->wex = TICK_ETERNITY;
goto out_wakeup;
out_error:
/* Write error on the file descriptor. We mark the FD as STERROR so
* that we don't use it anymore. The error is reported to the stream