MAJOR: stream-interface: restore splicing mechanism

The splicing is now provided by the data-layer rcv_pipe/snd_pipe functions
which in turn are called by the stream interface's recv and send callbacks.

The presence of the rcv_pipe/snd_pipe functions is used to attest support
for splicing at the data layer. It looks like the stream-interface's
SI_FL_CAP_SPLICE flag does not make sense anymore as it's used as a proxy
for the pointers above.

It also appears that we call chk_snd() from the recv callback and then
try to call it again in update_conn(). It is very likely that this last
function will progressively slip into the recv/send callbacks in order
to avoid duplicate check code.

The code works right now with and without splicing. Only raw_sock provides
support for it and it is automatically selected when the various splice
options are set. However it looks like splice-auto doesn't enable it, which
possibly means that the streamer detection code does not work anymore, or
that it's only called at a time where it's too late to enable splicing (in
process_session).
This commit is contained in:
Willy Tarreau 2012-08-24 00:46:52 +02:00
parent 5368d80ede
commit 96199b1016
5 changed files with 192 additions and 149 deletions

View File

@ -96,6 +96,7 @@ struct server;
struct proxy; struct proxy;
struct si_applet; struct si_applet;
struct stream_interface; struct stream_interface;
struct pipe;
struct target { struct target {
int type; int type;
@ -120,6 +121,8 @@ struct sock_ops {
void (*close)(struct connection *); /* close the data channel on the connection */ void (*close)(struct connection *); /* close the data channel on the connection */
int (*rcv_buf)(struct connection *conn, struct buffer *buf, int count); /* recv callback */ int (*rcv_buf)(struct connection *conn, struct buffer *buf, int count); /* recv callback */
int (*snd_buf)(struct connection *conn, struct buffer *buf, int flags); /* send callback */ int (*snd_buf)(struct connection *conn, struct buffer *buf, int flags); /* send callback */
int (*rcv_pipe)(struct connection *conn, struct pipe *pipe, unsigned int count); /* recv-to-pipe callback */
int (*snd_pipe)(struct connection *conn, struct pipe *pipe); /* send-to-pipe callback */
}; };
/* A stream interface has 3 parts : /* A stream interface has 3 parts :

View File

@ -479,6 +479,7 @@ int tcp_connect_server(struct stream_interface *si)
conn_data_want_send(&si->conn); /* prepare to send data if any */ conn_data_want_send(&si->conn); /* prepare to send data if any */
si->state = SI_ST_CON; si->state = SI_ST_CON;
if (si->conn.data->rcv_pipe && si->conn.data->snd_pipe)
si->flags |= SI_FL_CAP_SPLTCP; /* TCP supports splicing */ si->flags |= SI_FL_CAP_SPLTCP; /* TCP supports splicing */
si->exp = tick_add_ifset(now_ms, be->timeout.connect); si->exp = tick_add_ifset(now_ms, be->timeout.connect);

View File

@ -43,7 +43,7 @@
#include <types/global.h> #include <types/global.h>
#if 0 && defined(CONFIG_HAP_LINUX_SPLICE) #if defined(CONFIG_HAP_LINUX_SPLICE)
#include <common/splice.h> #include <common/splice.h>
/* A pipe contains 16 segments max, and it's common to see segments of 1448 bytes /* A pipe contains 16 segments max, and it's common to see segments of 1448 bytes
@ -56,74 +56,29 @@
#define MAX_SPLICE_AT_ONCE (1<<30) #define MAX_SPLICE_AT_ONCE (1<<30)
/* Returns : /* Returns :
* -1 if splice is not possible or not possible anymore and we must switch to * -1 if splice() is not supported
* user-land copy (eg: to_forward reached) * >= 0 to report the amount of spliced bytes.
* 0 otherwise, including errors and close. * connection flags are updated (error, read0, wait_room, wait_data).
* Sets : * The caller must have previously allocated the pipe.
* BF_READ_NULL
* BF_READ_PARTIAL
* BF_WRITE_PARTIAL (during copy)
* BF_OUT_EMPTY (during copy)
* SI_FL_ERR
* SI_FL_WAIT_ROOM
* (SI_FL_WAIT_RECV)
*
* This function automatically allocates a pipe from the pipe pool. It also
* carefully ensures to clear b->pipe whenever it leaves the pipe empty.
*/ */
static int sock_raw_splice_in(struct channel *b, struct stream_interface *si) int raw_sock_to_pipe(struct connection *conn, struct pipe *pipe, unsigned int count)
{ {
static int splice_detects_close; static int splice_detects_close;
int fd = si_fd(si);
int ret; int ret;
unsigned long max;
int retval = 0; int retval = 0;
if (!b->to_forward) /* Under Linux, if FD_POLL_HUP is set, we have reached the end.
return -1; * Since older splice() implementations were buggy and returned
* EAGAIN on end of read, let's bypass the call to splice() now.
if (!(b->flags & BF_KERN_SPLICING))
return -1;
if (buffer_not_empty(&b->buf)) {
/* We're embarrassed, there are already data pending in
* the buffer and we don't want to have them at two
* locations at a time. Let's indicate we need some
* place and ask the consumer to hurry.
*/ */
si->flags |= SI_FL_WAIT_ROOM; if ((fdtab[conn->t.sock.fd].ev & (FD_POLL_IN|FD_POLL_HUP)) == FD_POLL_HUP)
conn_data_stop_recv(&si->conn); goto out_read0;
b->rex = TICK_ETERNITY;
si_chk_snd(b->cons);
return 0;
}
if (unlikely(b->pipe == NULL)) { while (count) {
if (pipes_used >= global.maxpipes || !(b->pipe = get_pipe())) { if (count > MAX_SPLICE_AT_ONCE)
b->flags &= ~BF_KERN_SPLICING; count = MAX_SPLICE_AT_ONCE;
return -1;
}
}
/* At this point, b->pipe is valid */ ret = splice(conn->t.sock.fd, NULL, pipe->prod, NULL, count,
while (1) {
if (b->to_forward == BUF_INFINITE_FORWARD)
max = MAX_SPLICE_AT_ONCE;
else
max = b->to_forward;
if (!max) {
/* It looks like the buffer + the pipe already contain
* the maximum amount of data to be transferred. Try to
* send those data immediately on the other side if it
* is currently waiting.
*/
retval = -1; /* end of forwarding */
break;
}
ret = splice(fd, NULL, b->pipe->prod, NULL, max,
SPLICE_F_MOVE|SPLICE_F_NONBLOCK); SPLICE_F_MOVE|SPLICE_F_NONBLOCK);
if (ret <= 0) { if (ret <= 0) {
@ -133,8 +88,7 @@ static int sock_raw_splice_in(struct channel *b, struct stream_interface *si)
* it works, we store the info for later use. * it works, we store the info for later use.
*/ */
splice_detects_close = 1; splice_detects_close = 1;
b->flags |= BF_READ_NULL; goto out_read0;
break;
} }
if (errno == EAGAIN) { if (errno == EAGAIN) {
@ -142,13 +96,16 @@ static int sock_raw_splice_in(struct channel *b, struct stream_interface *si)
* - nothing in the socket buffer (standard) * - nothing in the socket buffer (standard)
* - pipe is full * - pipe is full
* - the connection is closed (kernel < 2.6.27.13) * - the connection is closed (kernel < 2.6.27.13)
* Since we don't know if pipe is full, we'll * The last case is annoying but know if we can detect it
* stop if the pipe is not empty. Anyway, we * and if we can't then we rely on the call to recv() to
* will almost always fill/empty the pipe. * get a valid verdict. The difference between the first
* two situations is problematic. Since we don't know if
* the pipe is full, we'll stop if the pipe is not empty.
* Anyway, we will almost always fill/empty the pipe.
*/ */
if (pipe->data) {
if (b->pipe->data) { /* alway stop reading until the pipe is flushed */
si->flags |= SI_FL_WAIT_ROOM; conn->flags |= CO_FL_WAIT_ROOM;
break; break;
} }
@ -161,46 +118,71 @@ static int sock_raw_splice_in(struct channel *b, struct stream_interface *si)
* which will be able to deal with the situation. * which will be able to deal with the situation.
*/ */
if (splice_detects_close) if (splice_detects_close)
conn_data_poll_recv(&si->conn); /* we know for sure that it's EAGAIN */ conn->flags |= CO_FL_WAIT_DATA; /* we know for sure that it's EAGAIN */
else
retval = -1;
break; break;
} }
else if (errno == ENOSYS || errno == EINVAL) {
if (errno == ENOSYS || errno == EINVAL) { /* splice not supported on this end, disable it.
/* splice not supported on this end, disable it */ * We can safely return -1 since there is no
b->flags &= ~BF_KERN_SPLICING; * chance that any data has been piped yet.
si->flags &= ~SI_FL_CAP_SPLICE; */
put_pipe(b->pipe);
b->pipe = NULL;
return -1; return -1;
} }
else if (errno == EINTR) {
/* try again */
continue;
}
/* here we have another error */ /* here we have another error */
si->flags |= SI_FL_ERR; conn->flags |= CO_FL_ERROR;
break; break;
} /* ret <= 0 */ } /* ret <= 0 */
if (b->to_forward != BUF_INFINITE_FORWARD) retval += ret;
b->to_forward -= ret; pipe->data += ret;
b->total += ret;
b->pipe->data += ret;
b->flags |= BF_READ_PARTIAL;
b->flags &= ~BF_OUT_EMPTY;
if (b->pipe->data >= SPLICE_FULL_HINT || if (pipe->data >= SPLICE_FULL_HINT || ret >= global.tune.recv_enough) {
ret >= global.tune.recv_enough) { /* We've read enough of it for this time, let's stop before
/* We've read enough of it for this time. */ * being asked to poll.
*/
break; break;
} }
} /* while */ } /* while */
if (unlikely(!b->pipe->data)) { return retval;
put_pipe(b->pipe);
b->pipe = NULL; out_read0:
conn_sock_read0(conn);
return retval;
}
/* Send as many bytes as possible from the pipe to the connection's socket.
*/
int raw_sock_from_pipe(struct connection *conn, struct pipe *pipe)
{
int ret, done;
done = 0;
while (pipe->data) {
ret = splice(pipe->cons, NULL, conn->t.sock.fd, NULL, pipe->data,
SPLICE_F_MOVE|SPLICE_F_NONBLOCK);
if (ret <= 0) {
if (ret == 0 || errno == EAGAIN) {
conn->flags |= CO_FL_WAIT_ROOM;
break;
}
else if (errno == EINTR)
continue;
/* here we have another error */
conn->flags |= CO_FL_ERROR;
break;
} }
return retval; done += ret;
pipe->data -= ret;
}
return done;
} }
#endif /* CONFIG_HAP_LINUX_SPLICE */ #endif /* CONFIG_HAP_LINUX_SPLICE */
@ -347,6 +329,10 @@ struct sock_ops raw_sock = {
.write = si_conn_send_cb, .write = si_conn_send_cb,
.snd_buf = raw_sock_from_buf, .snd_buf = raw_sock_from_buf,
.rcv_buf = raw_sock_to_buf, .rcv_buf = raw_sock_to_buf,
#if defined(CONFIG_HAP_LINUX_SPLICE)
.rcv_pipe = raw_sock_to_pipe,
.snd_pipe = raw_sock_from_pipe,
#endif
.close = NULL, .close = NULL,
}; };

View File

@ -179,12 +179,13 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr)
if (likely(s->fe->options2 & PR_O2_INDEPSTR)) if (likely(s->fe->options2 & PR_O2_INDEPSTR))
s->si[0].flags |= SI_FL_INDEP_STR; s->si[0].flags |= SI_FL_INDEP_STR;
if (addr->ss_family == AF_INET || addr->ss_family == AF_INET6)
s->si[0].flags = SI_FL_CAP_SPLTCP; /* TCP/TCPv6 splicing possible */
/* add the various callbacks */ /* add the various callbacks */
stream_interface_prepare(&s->si[0], l->sock); stream_interface_prepare(&s->si[0], l->sock);
if ((s->si[0].conn.data->rcv_pipe && s->si[0].conn.data->snd_pipe) &&
(addr->ss_family == AF_INET || addr->ss_family == AF_INET6))
s->si[0].flags = SI_FL_CAP_SPLTCP; /* TCP/TCPv6 splicing possible */
/* pre-initialize the other side's stream interface to an INIT state. The /* pre-initialize the other side's stream interface to an INIT state. The
* callbacks will be initialized before attempting to connect. * callbacks will be initialized before attempting to connect.
*/ */

View File

@ -30,6 +30,7 @@
#include <proto/connection.h> #include <proto/connection.h>
#include <proto/fd.h> #include <proto/fd.h>
#include <proto/frontend.h> #include <proto/frontend.h>
#include <proto/pipe.h>
#include <proto/stream_interface.h> #include <proto/stream_interface.h>
#include <proto/task.h> #include <proto/task.h>
@ -666,42 +667,30 @@ static int si_conn_send_loop(struct connection *conn)
int write_poll = MAX_WRITE_POLL_LOOPS; int write_poll = MAX_WRITE_POLL_LOOPS;
int ret; int ret;
#if 0 && defined(CONFIG_HAP_LINUX_SPLICE) conn->flags &= ~(CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM);
while (b->pipe) {
ret = splice(b->pipe->cons, NULL, si_fd(si), NULL, b->pipe->data,
SPLICE_F_MOVE|SPLICE_F_NONBLOCK);
if (ret <= 0) {
if (ret == 0 || errno == EAGAIN) {
conn_data_poll_send(&si->conn);
return 0;
}
/* here we have another error */
return -1;
}
if (b->pipe && conn->data->snd_pipe) {
ret = conn->data->snd_pipe(conn, b->pipe);
if (ret > 0)
b->flags |= BF_WRITE_PARTIAL; b->flags |= BF_WRITE_PARTIAL;
b->pipe->data -= ret;
if (!b->pipe->data) { if (!b->pipe->data) {
put_pipe(b->pipe); put_pipe(b->pipe);
b->pipe = NULL; b->pipe = NULL;
break;
} }
if (--write_poll <= 0) if (conn->flags & CO_FL_ERROR)
return 0; return -1;
/* The only reason we did not empty the pipe is that the output if (conn->flags & CO_FL_WAIT_ROOM) {
* buffer is full. conn_data_poll_send(conn);
*/
conn_data_poll_send(&si->conn);
return 0; return 0;
} }
}
/* At this point, the pipe is empty, but we may still have data pending /* At this point, the pipe is empty, but we may still have data pending
* in the normal buffer. * in the normal buffer.
*/ */
#endif
if (!b->buf.o) { if (!b->buf.o) {
b->flags |= BF_OUT_EMPTY; b->flags |= BF_OUT_EMPTY;
return 0; return 0;
@ -710,7 +699,6 @@ static int si_conn_send_loop(struct connection *conn)
/* when we're in this loop, we already know that there is no spliced /* when we're in this loop, we already know that there is no spliced
* data left, and that there are sendable buffered data. * data left, and that there are sendable buffered data.
*/ */
conn->flags &= ~(CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM);
while (!(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_WR_SH | CO_FL_DATA_WR_SH | CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM | CO_FL_HANDSHAKE))) { while (!(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_WR_SH | CO_FL_DATA_WR_SH | CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM | CO_FL_HANDSHAKE))) {
/* check if we want to inform the kernel that we're interested in /* check if we want to inform the kernel that we're interested in
* sending more data after this call. We want this if : * sending more data after this call. We want this if :
@ -1004,34 +992,69 @@ void si_conn_recv_cb(struct connection *conn)
if (b->flags & BF_SHUTR) if (b->flags & BF_SHUTR)
return; return;
#if 0 && defined(CONFIG_HAP_LINUX_SPLICE)
if (b->to_forward >= MIN_SPLICE_FORWARD && b->flags & BF_KERN_SPLICING) {
/* Under Linux, if FD_POLL_HUP is set, we have reached the end.
* Since older splice() implementations were buggy and returned
* EAGAIN on end of read, let's bypass the call to splice() now.
*/
if (fdtab[conn->t.sock.fd].ev & FD_POLL_HUP)
goto out_shutdown_r;
if (sock_raw_splice_in(b, si) >= 0) {
if (si->flags & SI_FL_ERR)
goto out_error;
if (b->flags & BF_READ_NULL)
goto out_shutdown_r;
return;
}
/* splice not possible (anymore), let's go on on standard copy */
}
#endif
cur_read = 0; cur_read = 0;
conn->flags &= ~(CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM); conn->flags &= ~(CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM);
while (!(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_DATA_RD_SH | CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM | CO_FL_HANDSHAKE))) {
/* First, let's see if we may splice data across the channel without
* using a buffer.
*/
if (conn->data->rcv_pipe &&
b->to_forward >= MIN_SPLICE_FORWARD && b->flags & BF_KERN_SPLICING) {
if (buffer_not_empty(&b->buf)) {
/* We're embarrassed, there are already data pending in
* the buffer and we don't want to have them at two
* locations at a time. Let's indicate we need some
* place and ask the consumer to hurry.
*/
goto abort_splice;
}
if (unlikely(b->pipe == NULL)) {
if (pipes_used >= global.maxpipes || !(b->pipe = get_pipe())) {
b->flags &= ~BF_KERN_SPLICING;
goto abort_splice;
}
}
ret = conn->data->rcv_pipe(conn, b->pipe, b->to_forward);
if (ret < 0) {
/* splice not supported on this end, let's disable it */
b->flags &= ~BF_KERN_SPLICING;
si->flags &= ~SI_FL_CAP_SPLICE;
goto abort_splice;
}
if (ret > 0) {
if (b->to_forward != BUF_INFINITE_FORWARD)
b->to_forward -= ret;
b->total += ret;
cur_read += ret;
b->flags |= BF_READ_PARTIAL;
b->flags &= ~BF_OUT_EMPTY;
}
if (conn_data_read0_pending(conn))
goto out_shutdown_r;
if (conn->flags & CO_FL_ERROR)
goto out_error;
/* splice not possible (anymore), let's go on on standard copy */
}
abort_splice:
/* release the pipe if we can, which is almost always the case */
if (b->pipe && !b->pipe->data) {
put_pipe(b->pipe);
b->pipe = NULL;
}
while (!b->pipe && !(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_DATA_RD_SH | CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM | CO_FL_HANDSHAKE))) {
max = bi_avail(b); max = bi_avail(b);
if (!max) { if (!max) {
b->flags |= BF_FULL; b->flags |= BF_FULL;
si->flags |= SI_FL_WAIT_ROOM; conn->flags |= CO_FL_WAIT_ROOM;
break; break;
} }
@ -1133,7 +1156,39 @@ void si_conn_recv_cb(struct connection *conn)
} }
} /* while !flags */ } /* while !flags */
if (conn->flags & CO_FL_WAIT_DATA) { if (conn->flags & CO_FL_ERROR)
goto out_error;
if (conn->flags & CO_FL_WAIT_ROOM) {
/* We might have some data the consumer is waiting for.
* We can do fast-forwarding, but we avoid doing this for partial
* buffers, because it is very likely that it will be done again
* immediately afterwards once the following data is parsed (eg:
* HTTP chunking).
*/
if (((b->flags & (BF_READ_PARTIAL|BF_OUT_EMPTY)) == BF_READ_PARTIAL) &&
(b->pipe /* always try to send spliced data */ ||
(b->buf.i == 0 && (b->cons->flags & SI_FL_WAIT_DATA)))) {
int last_len = b->pipe ? b->pipe->data : 0;
si_chk_snd(b->cons);
/* check if the consumer has freed some space */
if (!(b->flags & BF_FULL) &&
(!last_len || !b->pipe || b->pipe->data < last_len))
si->flags &= ~SI_FL_WAIT_ROOM;
}
if (si->flags & SI_FL_WAIT_ROOM) {
conn_data_stop_recv(conn);
b->rex = TICK_ETERNITY;
}
else if ((b->flags & (BF_SHUTR|BF_READ_PARTIAL|BF_FULL|BF_DONT_READ|BF_READ_NOEXP)) == BF_READ_PARTIAL) {
if (tick_isset(b->rex))
b->rex = tick_add_ifset(now_ms, b->rto);
}
}
else if (conn->flags & CO_FL_WAIT_DATA) {
/* we don't automatically ask for polling if we have /* we don't automatically ask for polling if we have
* read enough data, as it saves some syscalls with * read enough data, as it saves some syscalls with
* speculative pollers. * speculative pollers.
@ -1144,9 +1199,6 @@ void si_conn_recv_cb(struct connection *conn)
__conn_data_want_recv(conn); __conn_data_want_recv(conn);
} }
if (conn->flags & CO_FL_ERROR)
goto out_error;
if (conn_data_read0_pending(conn)) if (conn_data_read0_pending(conn))
/* connection closed */ /* connection closed */
goto out_shutdown_r; goto out_shutdown_r;