MEDIUM: mux-h1: Add fast-forwarding support

The H1 multiplexer now implements callbacks function to produce and consume
fast-forwarded data.
This commit is contained in:
Christopher Faulet 2023-09-29 14:25:40 +02:00
parent 2db273a7b5
commit 2d80eb5b7a
2 changed files with 402 additions and 7 deletions

View File

@ -50,8 +50,9 @@
#define H1C_F_UPG_H2C 0x00010000 /* set if an upgrade to h2 should be done */
#define H1C_F_CO_MSG_MORE 0x00020000 /* set if CO_SFL_MSG_MORE must be set when calling xprt->snd_buf() */
#define H1C_F_CO_STREAMER 0x00040000 /* set if CO_SFL_STREAMER must be set when calling xprt->snd_buf() */
#define H1C_F_CANT_FASTFWD 0x00080000 /* Fast-forwarding is not supported (exclusive with WANT_FASTFWD) */
/* 0x00040000 - 0x40000000 unused */
/* 0x00100000 - 0x40000000 unused */
#define H1C_F_IS_BACK 0x80000000 /* Set on outgoing connection */
@ -70,7 +71,7 @@ static forceinline char *h1c_show_flags(char *buf, size_t len, const char *delim
_(H1C_F_EOS, _(H1C_F_ERR_PENDING, _(H1C_F_ERROR,
_(H1C_F_SILENT_SHUT, _(H1C_F_ABRT_PENDING, _(H1C_F_ABRTED,
_(H1C_F_WANT_FASTFWD, _(H1C_F_WAIT_NEXT_REQ, _(H1C_F_UPG_H2C, _(H1C_F_CO_MSG_MORE,
_(H1C_F_CO_STREAMER, _(H1C_F_IS_BACK)))))))))))))))));
_(H1C_F_CO_STREAMER, _(H1C_F_CANT_FASTFWD, _(H1C_F_IS_BACK))))))))))))))))));
/* epilogue */
_(~0U);
return buf;

View File

@ -24,13 +24,14 @@
#include <haproxy/istbuf.h>
#include <haproxy/log.h>
#include <haproxy/mux_h1-t.h>
#include <haproxy/pipe-t.h>
#include <haproxy/pipe.h>
#include <haproxy/proxy.h>
#include <haproxy/session-t.h>
#include <haproxy/stats.h>
#include <haproxy/stconn.h>
#include <haproxy/stream.h>
#include <haproxy/trace.h>
#include <haproxy/xref.h>
/* H1 connection descriptor */
struct h1c {
@ -1906,7 +1907,8 @@ static size_t h1_process_demux(struct h1c *h1c, struct buffer *buf, size_t count
}
/* Here h1s_sc(h1s) is always defined */
if ((!(h1m->flags & H1_MF_RESP) || !(h1s->flags & H1S_F_BODYLESS_RESP)) &&
if (!(h1c->flags & H1C_F_CANT_FASTFWD) &&
(!(h1m->flags & H1_MF_RESP) || !(h1s->flags & H1S_F_BODYLESS_RESP)) &&
(h1m->state == H1_MSG_DATA || h1m->state == H1_MSG_TUNNEL)) {
TRACE_STATE("notify the mux can use fast-forward", H1_EV_RX_DATA|H1_EV_RX_BODY, h1c->conn, h1s);
se_fl_set(h1s->sd, SE_FL_MAY_FASTFWD);
@ -1917,9 +1919,6 @@ static size_t h1_process_demux(struct h1c *h1c, struct buffer *buf, size_t count
h1c->flags &= ~H1C_F_WANT_FASTFWD;
}
se_fl_clr(h1s->sd, SE_FL_MAY_FASTFWD);
h1c->flags &= ~H1C_F_WANT_FASTFWD;
/* Set EOI on stream connector in DONE state iff:
* - it is a response
* - it is a request but no a protocol upgrade nor a CONNECT
@ -4377,6 +4376,393 @@ static size_t h1_snd_buf(struct stconn *sc, struct buffer *buf, size_t count, in
return total;
}
static inline struct sedesc *h1s_opposite_sd(struct h1s *h1s)
{
struct xref *peer;
struct sedesc *sdo;
peer = xref_get_peer_and_lock(&h1s->sd->xref);
if (!peer)
return NULL;
sdo = container_of(peer, struct sedesc, xref);
xref_unlock(&h1s->sd->xref, peer);
return sdo;
}
static size_t h1_init_ff(struct stconn *sc, struct buffer *input, size_t count, unsigned int may_splice)
{
struct h1s *h1s = __sc_mux_strm(sc);
struct h1c *h1c = h1s->h1c;
struct h1m *h1m = (!(h1c->flags & H1C_F_IS_BACK) ? &h1s->res : &h1s->req);
size_t ret = 0;
TRACE_ENTER(H1_EV_STRM_SEND, h1c->conn, h1s, 0, (size_t[]){count});
/* TODO: add check on curr_len if CLEN */
if (h1m->flags & H1_MF_CHNK) {
if (h1m->curr_len) {
BUG_ON(h1m->state != H1_MSG_DATA);
if (count > h1m->curr_len)
count = h1m->curr_len;
}
else {
BUG_ON(h1m->state != H1_MSG_CHUNK_CRLF && h1m->state != H1_MSG_CHUNK_SIZE);
if (!h1_make_chunk(h1s, h1m, count))
goto out;
h1m->curr_len = count;
}
}
/* Use kernel splicing if it is supported by the sender and if there
* are no input data _AND_ no output data.
*
* TODO: It may be good to add a flag to send obuf data first if any,
* and then data in pipe, or the opposite. For now, it is not
* supported to mix data.
*/
if (!b_data(input) && !b_data(&h1c->obuf) && may_splice) {
if (h1c->conn->xprt->snd_pipe && (h1s->sd->iobuf.pipe || (pipes_used < global.maxpipes && (h1s->sd->iobuf.pipe = get_pipe())))) {
h1s->sd->iobuf.offset = 0;
h1s->sd->iobuf.data = 0;
ret = count;
goto out;
}
h1s->sd->iobuf.flags |= IOBUF_FL_NO_SPLICING;
TRACE_DEVEL("Unable to allocate pipe for splicing, fallback to buffer", H1_EV_STRM_SEND, h1c->conn, h1s);
}
if (!h1_get_buf(h1c, &h1c->obuf)) {
h1c->flags |= H1C_F_OUT_ALLOC;
TRACE_STATE("waiting for opposite h1c obuf allocation", H1_EV_STRM_SEND|H1_EV_H1S_BLK, h1c->conn, h1s);
goto out;
}
if (b_space_wraps(&h1c->obuf))
b_slow_realign(&h1c->obuf, trash.area, b_data(&h1c->obuf));
h1s->sd->iobuf.buf = &h1c->obuf;
h1s->sd->iobuf.offset = 0;
h1s->sd->iobuf.data = 0;
/* Cannot forward more than available room in output buffer */
if (count > b_room(&h1c->obuf))
count = b_room(&h1c->obuf);
if (!count) {
h1c->flags |= H1C_F_OUT_FULL;
h1s->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
TRACE_STATE("output buffer full", H1_EV_STRM_SEND|H1_EV_H1S_BLK, h1c->conn, h1s);
goto out;
}
/* forward remaining input data */
if (b_data(input)) {
size_t xfer = count;
if (xfer > b_data(input))
xfer = b_data(input);
h1s->sd->iobuf.data = b_xfer(&h1c->obuf, input, xfer);
/* Cannot forward more data, wait for room */
if (b_data(input))
goto out;
}
ret = count - h1s->sd->iobuf.data;
out:
TRACE_LEAVE(H1_EV_STRM_SEND, h1c->conn, h1s, 0, (size_t[]){ret});
return ret;
}
static void h1_done_ff(struct stconn *sc)
{
struct h1s *h1s = __sc_mux_strm(sc);
struct h1c *h1c = h1s->h1c;
struct h1m *h1m = (!(h1c->flags & H1C_F_IS_BACK) ? &h1s->res : &h1s->req);
struct sedesc *sd = h1s->sd;
size_t total = 0;
TRACE_ENTER(H1_EV_STRM_SEND, h1c->conn, h1s);
if (sd->iobuf.pipe) {
total = h1c->conn->xprt->snd_pipe(h1c->conn, h1c->conn->xprt_ctx, sd->iobuf.pipe, sd->iobuf.pipe->data);
if (total > 0)
HA_ATOMIC_ADD(&h1c->px_counters->spliced_bytes_out, total);
if (!sd->iobuf.pipe->data) {
put_pipe(sd->iobuf.pipe);
sd->iobuf.pipe = NULL;
}
}
else {
if (b_room(&h1c->obuf) == sd->iobuf.offset)
h1c->flags |= H1C_F_OUT_FULL;
total = sd->iobuf.data;
sd->iobuf.buf = NULL;
sd->iobuf.offset = 0;
sd->iobuf.data = 0;
/* Perform a synchronous send but in all cases, consider
* everything was already sent from the SC point of view.
*/
h1_send(h1c);
}
if (h1m->curr_len)
h1m->curr_len -= total;
if (!h1m->curr_len && (h1m->flags & H1_MF_CLEN))
h1m->state = H1_MSG_DONE;
else if (!h1m->curr_len && (h1m->flags & H1_MF_CHNK)) {
if (h1m->state == H1_MSG_DATA)
h1m->state = H1_MSG_CHUNK_CRLF;
}
HA_ATOMIC_ADD(&h1c->px_counters->bytes_out, total);
out:
// TODO: should we call h1_process() instead ?
if (h1c->conn->flags & CO_FL_ERROR) {
h1c->flags = (h1c->flags & ~H1C_F_WANT_FASTFWD) | H1C_F_ERR_PENDING;
if (h1c->flags & H1C_F_EOS)
h1c->flags |= H1C_F_ERROR;
else if (!(h1c->wait_event.events & SUB_RETRY_RECV)) {
/* EOS not seen, so subscribe for reads to be able to
* catch the error on the reading path. It is especially
* important if EOI was reached.
*/
h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event);
}
se_fl_set_error(h1s->sd);
TRACE_DEVEL("connection error", H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, h1c->conn, h1s);
}
TRACE_LEAVE(H1_EV_STRM_RECV, h1c->conn, h1s, 0, (size_t[]){total});
}
static int h1_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags)
{
struct h1s *h1s = __sc_mux_strm(sc);
struct h1c *h1c = h1s->h1c;
struct h1m *h1m = (!(h1c->flags & H1C_F_IS_BACK) ? &h1s->req : &h1s->res);
struct sedesc *sdo = NULL;
size_t total = 0, try = 0;
int ret = 0;
TRACE_ENTER(H1_EV_STRM_RECV, h1c->conn, h1s, 0, (size_t[]){count});
if (h1m->state != H1_MSG_DATA && h1m->state != H1_MSG_TUNNEL) {
h1c->flags &= ~H1C_F_WANT_FASTFWD;
TRACE_STATE("Cannot fast-forwad data now !(msg_data|msg_tunnel)", H1_EV_STRM_RECV, h1c->conn, h1s);
goto end;
}
se_fl_clr(h1s->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM);
h1c->conn->flags &= ~CO_FL_WAIT_ROOM;
h1c->flags |= H1C_F_WANT_FASTFWD;
if (h1c->flags & (H1C_F_EOS|H1C_F_ERROR)) {
h1c->flags &= ~H1C_F_WANT_FASTFWD;
TRACE_DEVEL("leaving on (EOS|ERROR)", H1_EV_STRM_RECV, h1c->conn, h1s);
goto end;
}
sdo = h1s_opposite_sd(h1s);
if (!sdo) {
TRACE_STATE("Opposite endpoint not available yet", H1_EV_STRM_RECV, h1c->conn, h1s);
goto out;
}
if (h1m->state == H1_MSG_DATA && (h1m->flags & (H1_MF_CHNK|H1_MF_CLEN)) && count > h1m->curr_len)
count = h1m->curr_len;
try = se_init_ff(sdo, &h1c->ibuf, count, h1c->conn->xprt->rcv_pipe && !!(flags & CO_RFL_MAY_SPLICE) && !(sdo->iobuf.flags & IOBUF_FL_NO_SPLICING));
if (b_room(&h1c->ibuf) && (h1c->flags & H1C_F_IN_FULL)) {
h1c->flags &= ~H1C_F_IN_FULL;
TRACE_STATE("h1c ibuf not full anymore", H1_EV_STRM_RECV|H1_EV_H1C_BLK);
}
if (sdo->iobuf.flags & IOBUF_FL_NO_FF) {
/* Fast forwading is not supported by the consumer */
h1c->flags = (h1c->flags & ~H1C_F_WANT_FASTFWD) | H1C_F_CANT_FASTFWD;
TRACE_DEVEL("Fast-forwarding not supported by opposite endpoint, disable it", H1_EV_STRM_RECV, h1c->conn, h1s);
goto end;
}
if (sdo->iobuf.flags & IOBUF_FL_FF_BLOCKED) {
se_fl_set(h1s->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM);
TRACE_STATE("waiting for more room", H1_EV_STRM_RECV|H1_EV_H1S_ERR, h1c->conn, h1s);
goto out;
}
total += sdo->iobuf.data;
if (sdo->iobuf.pipe) {
/* Here, not data was xferred */
ret = h1c->conn->xprt->rcv_pipe(h1c->conn, h1c->conn->xprt_ctx, sdo->iobuf.pipe, try);
if (ret < 0) {
h1c->flags = (h1c->flags & ~H1C_F_WANT_FASTFWD) | H1C_F_CANT_FASTFWD;
TRACE_ERROR("Error when trying to fast-forward data, disable it and abort",
H1_EV_STRM_RECV|H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, h1c->conn, h1s);
BUG_ON(sdo->iobuf.pipe->data);
put_pipe(sdo->iobuf.pipe);
sdo->iobuf.pipe = NULL;
goto end;
}
total += ret;
if (!ret) {
TRACE_STATE("failed to receive data, subscribing", H1_EV_STRM_RECV, h1c->conn);
h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event);
}
HA_ATOMIC_ADD(&h1c->px_counters->spliced_bytes_in, ret);
}
else {
b_add(sdo->iobuf.buf, sdo->iobuf.offset);
ret = h1c->conn->xprt->rcv_buf(h1c->conn, h1c->conn->xprt_ctx, sdo->iobuf.buf, try, flags);
if (ret < try) {
TRACE_STATE("failed to receive data, subscribing", H1_EV_STRM_RECV, h1c->conn);
h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event);
}
b_sub(sdo->iobuf.buf, sdo->iobuf.offset);
total += ret;
sdo->iobuf.data += ret;
}
if (h1m->state == H1_MSG_DATA && (h1m->flags & (H1_MF_CHNK|H1_MF_CLEN))) {
if (total > h1m->curr_len) {
h1s->flags |= H1S_F_PARSING_ERROR;
se_fl_set(h1s->sd, SE_FL_ERROR);
TRACE_ERROR("too much payload, more than announced",
H1_EV_STRM_RECV|H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, h1c->conn, h1s);
goto out;
}
h1m->curr_len -= total;
if (!h1m->curr_len) {
if (h1m->flags & H1_MF_CLEN) {
h1m->state = H1_MSG_DONE;
se_fl_set(h1s->sd, SE_FL_EOI); /* TODO: this line is tricky and must be evaluated first
* Its purpose is to avoid to set CO_SFL_MSG_MORE on the
* next calls to ->complete_fastfwd().
*/
}
else
h1m->state = H1_MSG_CHUNK_CRLF;
h1c->flags &= ~H1C_F_WANT_FASTFWD;
if (!(h1c->flags & H1C_F_IS_BACK)) {
/* The request was fully received. It means the H1S now
* expect data from the opposite side
*/
se_expect_data(h1s->sd);
}
TRACE_STATE("payload fully received", H1_EV_STRM_RECV, h1c->conn, h1s);
}
}
HA_ATOMIC_ADD(&h1c->px_counters->bytes_in, total);
ret = total;
se_done_ff(sdo);
if (sdo->iobuf.pipe) {
se_fl_set(h1s->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM);
}
TRACE_DEVEL("Data fast-forwarded", H1_EV_STRM_RECV, h1c->conn, h1s, 0, (size_t[]){ret});
out:
if (conn_xprt_read0_pending(h1c->conn)) {
se_fl_set(h1s->sd, SE_FL_EOS);
TRACE_STATE("report EOS to SE", H1_EV_STRM_RECV, h1c->conn, h1s);
if (h1m->state >= H1_MSG_DONE || !(h1m->flags & H1_MF_XFER_LEN)) {
/* DONE or TUNNEL or SHUTR without XFER_LEN, set
* EOI on the stream connector */
se_fl_set(h1s->sd, SE_FL_EOI);
TRACE_STATE("report EOI to SE", H1_EV_STRM_RECV, h1c->conn, h1s);
}
else {
se_fl_set(h1s->sd, SE_FL_ERROR);
h1c->flags = (h1c->flags & ~H1C_F_WANT_FASTFWD) | H1C_F_ERROR;
TRACE_ERROR("message aborted, set error on SC", H1_EV_STRM_RECV|H1_EV_H1S_ERR, h1c->conn, h1s);
}
h1c->flags = (h1c->flags & ~H1C_F_WANT_FASTFWD) | H1C_F_EOS;
TRACE_STATE("Allow xprt rcv_buf on read0", H1_EV_STRM_RECV, h1c->conn, h1s);
}
if (h1c->conn->flags & CO_FL_ERROR) {
se_fl_set(h1s->sd, SE_FL_ERROR);
h1c->flags = (h1c->flags & ~H1C_F_WANT_FASTFWD) | H1C_F_ERROR;
TRACE_DEVEL("connection error", H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, h1c->conn, h1s);
}
end:
if (!(h1c->flags & H1C_F_WANT_FASTFWD)) {
TRACE_STATE("notify the mux can't use fast-forward anymore", H1_EV_STRM_RECV, h1c->conn, h1s);
se_fl_clr(h1s->sd, SE_FL_MAY_FASTFWD);
if (!(h1c->wait_event.events & SUB_RETRY_RECV)) {
TRACE_STATE("restart receiving data, subscribing", H1_EV_STRM_RECV, h1c->conn, h1s);
h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event);
}
}
TRACE_LEAVE(H1_EV_STRM_RECV, h1c->conn, h1s, 0, (size_t[]){ret});
return ret;
}
static int h1_resume_fastfwd(struct stconn *sc, unsigned int flags)
{
struct h1s *h1s = __sc_mux_strm(sc);
struct h1c *h1c = h1s->h1c;
struct h1m *h1m = (!(h1c->flags & H1C_F_IS_BACK) ? &h1s->res : &h1s->req);
struct sedesc *sd = h1s->sd;
int ret = 0;
TRACE_ENTER(H1_EV_STRM_SEND, h1c->conn, h1s, 0, (size_t[]){flags});
if (sd->iobuf.pipe) {
ret = h1c->conn->xprt->snd_pipe(h1c->conn, h1c->conn->xprt_ctx, sd->iobuf.pipe, sd->iobuf.pipe->data);
if (ret > 0)
HA_ATOMIC_ADD(&h1c->px_counters->spliced_bytes_out, ret);
if (!sd->iobuf.pipe->data) {
put_pipe(sd->iobuf.pipe);
sd->iobuf.pipe = NULL;
}
}
h1m->curr_len -= ret;
if (!h1m->curr_len && (h1m->flags & H1_MF_CLEN))
h1m->state = H1_MSG_DONE;
else if (!h1m->curr_len && (h1m->flags & H1_MF_CHNK)) {
if (h1m->state == H1_MSG_DATA)
h1m->state = H1_MSG_CHUNK_CRLF;
}
HA_ATOMIC_ADD(&h1c->px_counters->bytes_out, ret);
out:
// TODO: should we call h1_process() instead ?
if (h1c->conn->flags & CO_FL_ERROR) {
h1c->flags = (h1c->flags & ~H1C_F_WANT_FASTFWD) | H1C_F_ERR_PENDING;
if (h1c->flags & H1C_F_EOS)
h1c->flags |= H1C_F_ERROR;
else if (!(h1c->wait_event.events & SUB_RETRY_RECV)) {
/* EOS not seen, so subscribe for reads to be able to
* catch the error on the reading path. It is especially
* important if EOI was reached.
*/
h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event);
}
se_fl_set_error(h1s->sd);
TRACE_DEVEL("connection error", H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, h1c->conn, h1s);
}
TRACE_LEAVE(H1_EV_STRM_RECV, h1c->conn, h1s, 0, (size_t[]){ret});
return ret;
}
static int h1_ctl(struct connection *conn, enum mux_ctl_type mux_ctl, void *output)
{
const struct h1c *h1c = conn->ctx;
@ -4792,6 +5178,10 @@ static const struct mux_ops mux_http_ops = {
.used_streams = h1_used_streams,
.rcv_buf = h1_rcv_buf,
.snd_buf = h1_snd_buf,
.init_fastfwd = h1_init_ff,
.done_fastfwd = h1_done_ff,
.fastfwd = h1_fastfwd,
.resume_fastfwd = h1_resume_fastfwd,
.subscribe = h1_subscribe,
.unsubscribe = h1_unsubscribe,
.shutr = h1_shutr,
@ -4815,6 +5205,10 @@ static const struct mux_ops mux_h1_ops = {
.used_streams = h1_used_streams,
.rcv_buf = h1_rcv_buf,
.snd_buf = h1_snd_buf,
.init_fastfwd = h1_init_ff,
.done_fastfwd = h1_done_ff,
.fastfwd = h1_fastfwd,
.resume_fastfwd = h1_resume_fastfwd,
.subscribe = h1_subscribe,
.unsubscribe = h1_unsubscribe,
.shutr = h1_shutr,