mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2025-04-18 13:05:38 +00:00
MEDIUM: mux-h1: Add support of the kernel TCP splicing to forward data
The mux relies on the flag CO_RFL_BUF_FLUSH during a call to h1_rcv_buf to know if it needs to stop reads and to flush its internal buffers to use kernel tcp splicing. It is the caller responsibility (here the SI) to know when it must come back on buffered exchanges.
This commit is contained in:
parent
f2824e6e10
commit
1be55f9eb2
52
src/mux_h1.c
52
src/mux_h1.c
@ -12,6 +12,7 @@
|
||||
#include <common/cfgparse.h>
|
||||
#include <common/config.h>
|
||||
|
||||
#include <types/pipe.h>
|
||||
#include <types/proxy.h>
|
||||
#include <types/session.h>
|
||||
|
||||
@ -60,6 +61,7 @@
|
||||
#define H1S_F_WANT_CLO 0x00000040
|
||||
#define H1S_F_WANT_MSK 0x00000070
|
||||
#define H1S_F_NOT_FIRST 0x00000080 /* The H1 stream is not the first one */
|
||||
#define H1S_F_BUF_FLUSH 0x00000100 /* Flush input buffers (ibuf and rxbuf) and don't read more data */
|
||||
|
||||
|
||||
/* H1 connection descriptor */
|
||||
@ -1140,6 +1142,9 @@ static int h1_recv(struct h1c *h1c)
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (h1c->h1s && (h1c->h1s->flags & H1S_F_BUF_FLUSH))
|
||||
return 1;
|
||||
|
||||
if (!h1_get_buf(h1c, &h1c->ibuf)) {
|
||||
h1c->flags |= H1C_F_IN_ALLOC;
|
||||
return 0;
|
||||
@ -1570,7 +1575,11 @@ static size_t h1_rcv_buf(struct conn_stream *cs, struct buffer *buf, size_t coun
|
||||
|
||||
if (!(h1s->h1c->flags & H1C_F_RX_ALLOC))
|
||||
ret = h1_xfer(h1s, buf, count);
|
||||
if (ret > 0) {
|
||||
|
||||
if (flags & CO_RFL_BUF_FLUSH)
|
||||
h1s->flags |= H1S_F_BUF_FLUSH;
|
||||
else if (ret > 0 || (h1s->flags & H1S_F_BUF_FLUSH)) {
|
||||
h1s->flags &= ~H1S_F_BUF_FLUSH;
|
||||
if (!(h1s->h1c->wait_event.wait_reason & SUB_CAN_RECV))
|
||||
tasklet_wakeup(h1s->h1c->wait_event.task);
|
||||
}
|
||||
@ -1610,6 +1619,43 @@ static size_t h1_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t coun
|
||||
|
||||
}
|
||||
|
||||
#if defined(CONFIG_HAP_LINUX_SPLICE)
|
||||
/* Send and get, using splicing */
|
||||
static int h1_rcv_pipe(struct conn_stream *cs, struct pipe *pipe, unsigned int count)
|
||||
{
|
||||
struct h1s *h1s = cs->ctx;
|
||||
struct h1m *h1m = (!conn_is_back(cs->conn) ? &h1s->req : &h1s->res);
|
||||
int ret = 0;
|
||||
|
||||
if (b_data(&h1s->rxbuf) || b_data(&h1s->h1c->ibuf))
|
||||
goto end;
|
||||
if (h1m->state == H1_MSG_DATA && count > h1m->curr_len)
|
||||
count = h1m->curr_len;
|
||||
ret = cs->conn->xprt->rcv_pipe(cs->conn, pipe, count);
|
||||
if (h1m->state == H1_MSG_DATA && ret > 0)
|
||||
h1m->curr_len -= ret;
|
||||
end:
|
||||
return ret;
|
||||
|
||||
}
|
||||
|
||||
static int h1_snd_pipe(struct conn_stream *cs, struct pipe *pipe)
|
||||
{
|
||||
struct h1s *h1s = cs->ctx;
|
||||
struct h1m *h1m = (!conn_is_back(cs->conn) ? &h1s->res : &h1s->req);
|
||||
int ret = 0;
|
||||
|
||||
if (b_data(&h1s->h1c->obuf))
|
||||
goto end;
|
||||
|
||||
ret = cs->conn->xprt->snd_pipe(cs->conn, pipe);
|
||||
if (h1m->state == H1_MSG_DATA && ret > 0)
|
||||
h1m->curr_len -= ret;
|
||||
end:
|
||||
return ret;
|
||||
}
|
||||
#endif
|
||||
|
||||
/****************************************/
|
||||
/* MUX initialization and instanciation */
|
||||
/****************************************/
|
||||
@ -1625,6 +1671,10 @@ const struct mux_ops mux_h1_ops = {
|
||||
.avail_streams = h1_avail_streams,
|
||||
.rcv_buf = h1_rcv_buf,
|
||||
.snd_buf = h1_snd_buf,
|
||||
#if defined(CONFIG_HAP_LINUX_SPLICE)
|
||||
.rcv_pipe = h1_rcv_pipe,
|
||||
.snd_pipe = h1_snd_pipe,
|
||||
#endif
|
||||
.subscribe = h1_subscribe,
|
||||
.unsubscribe = h1_unsubscribe,
|
||||
.shutr = h1_shutr,
|
||||
|
Loading…
Reference in New Issue
Block a user