From 2d80eb5b7ad1dc926229b1355f1bd1a077d144f1 Mon Sep 17 00:00:00 2001 From: Christopher Faulet Date: Fri, 29 Sep 2023 14:25:40 +0200 Subject: [PATCH] MEDIUM: mux-h1: Add fast-forwarding support The H1 multiplexer now implements callbacks function to produce and consume fast-forwarded data. --- include/haproxy/mux_h1-t.h | 5 +- src/mux_h1.c | 404 ++++++++++++++++++++++++++++++++++++- 2 files changed, 402 insertions(+), 7 deletions(-) diff --git a/include/haproxy/mux_h1-t.h b/include/haproxy/mux_h1-t.h index ea06ff251..2f49a495e 100644 --- a/include/haproxy/mux_h1-t.h +++ b/include/haproxy/mux_h1-t.h @@ -50,8 +50,9 @@ #define H1C_F_UPG_H2C 0x00010000 /* set if an upgrade to h2 should be done */ #define H1C_F_CO_MSG_MORE 0x00020000 /* set if CO_SFL_MSG_MORE must be set when calling xprt->snd_buf() */ #define H1C_F_CO_STREAMER 0x00040000 /* set if CO_SFL_STREAMER must be set when calling xprt->snd_buf() */ +#define H1C_F_CANT_FASTFWD 0x00080000 /* Fast-forwarding is not supported (exclusive with WANT_FASTFWD) */ -/* 0x00040000 - 0x40000000 unused */ +/* 0x00100000 - 0x40000000 unused */ #define H1C_F_IS_BACK 0x80000000 /* Set on outgoing connection */ @@ -70,7 +71,7 @@ static forceinline char *h1c_show_flags(char *buf, size_t len, const char *delim _(H1C_F_EOS, _(H1C_F_ERR_PENDING, _(H1C_F_ERROR, _(H1C_F_SILENT_SHUT, _(H1C_F_ABRT_PENDING, _(H1C_F_ABRTED, _(H1C_F_WANT_FASTFWD, _(H1C_F_WAIT_NEXT_REQ, _(H1C_F_UPG_H2C, _(H1C_F_CO_MSG_MORE, - _(H1C_F_CO_STREAMER, _(H1C_F_IS_BACK))))))))))))))))); + _(H1C_F_CO_STREAMER, _(H1C_F_CANT_FASTFWD, _(H1C_F_IS_BACK)))))))))))))))))); /* epilogue */ _(~0U); return buf; diff --git a/src/mux_h1.c b/src/mux_h1.c index 86e368b40..d098c1b29 100644 --- a/src/mux_h1.c +++ b/src/mux_h1.c @@ -24,13 +24,14 @@ #include #include #include -#include +#include #include #include #include #include #include #include +#include /* H1 connection descriptor */ struct h1c { @@ -1906,7 +1907,8 @@ static size_t h1_process_demux(struct h1c *h1c, struct buffer *buf, size_t count } /* Here h1s_sc(h1s) is always defined */ - if ((!(h1m->flags & H1_MF_RESP) || !(h1s->flags & H1S_F_BODYLESS_RESP)) && + if (!(h1c->flags & H1C_F_CANT_FASTFWD) && + (!(h1m->flags & H1_MF_RESP) || !(h1s->flags & H1S_F_BODYLESS_RESP)) && (h1m->state == H1_MSG_DATA || h1m->state == H1_MSG_TUNNEL)) { TRACE_STATE("notify the mux can use fast-forward", H1_EV_RX_DATA|H1_EV_RX_BODY, h1c->conn, h1s); se_fl_set(h1s->sd, SE_FL_MAY_FASTFWD); @@ -1917,9 +1919,6 @@ static size_t h1_process_demux(struct h1c *h1c, struct buffer *buf, size_t count h1c->flags &= ~H1C_F_WANT_FASTFWD; } - se_fl_clr(h1s->sd, SE_FL_MAY_FASTFWD); - h1c->flags &= ~H1C_F_WANT_FASTFWD; - /* Set EOI on stream connector in DONE state iff: * - it is a response * - it is a request but no a protocol upgrade nor a CONNECT @@ -4377,6 +4376,393 @@ static size_t h1_snd_buf(struct stconn *sc, struct buffer *buf, size_t count, in return total; } +static inline struct sedesc *h1s_opposite_sd(struct h1s *h1s) +{ + struct xref *peer; + struct sedesc *sdo; + + peer = xref_get_peer_and_lock(&h1s->sd->xref); + if (!peer) + return NULL; + + sdo = container_of(peer, struct sedesc, xref); + xref_unlock(&h1s->sd->xref, peer); + return sdo; +} + +static size_t h1_init_ff(struct stconn *sc, struct buffer *input, size_t count, unsigned int may_splice) +{ + struct h1s *h1s = __sc_mux_strm(sc); + struct h1c *h1c = h1s->h1c; + struct h1m *h1m = (!(h1c->flags & H1C_F_IS_BACK) ? &h1s->res : &h1s->req); + size_t ret = 0; + + TRACE_ENTER(H1_EV_STRM_SEND, h1c->conn, h1s, 0, (size_t[]){count}); + + /* TODO: add check on curr_len if CLEN */ + + if (h1m->flags & H1_MF_CHNK) { + if (h1m->curr_len) { + BUG_ON(h1m->state != H1_MSG_DATA); + if (count > h1m->curr_len) + count = h1m->curr_len; + } + else { + BUG_ON(h1m->state != H1_MSG_CHUNK_CRLF && h1m->state != H1_MSG_CHUNK_SIZE); + if (!h1_make_chunk(h1s, h1m, count)) + goto out; + h1m->curr_len = count; + } + } + + /* Use kernel splicing if it is supported by the sender and if there + * are no input data _AND_ no output data. + * + * TODO: It may be good to add a flag to send obuf data first if any, + * and then data in pipe, or the opposite. For now, it is not + * supported to mix data. + */ + if (!b_data(input) && !b_data(&h1c->obuf) && may_splice) { + if (h1c->conn->xprt->snd_pipe && (h1s->sd->iobuf.pipe || (pipes_used < global.maxpipes && (h1s->sd->iobuf.pipe = get_pipe())))) { + h1s->sd->iobuf.offset = 0; + h1s->sd->iobuf.data = 0; + ret = count; + goto out; + } + h1s->sd->iobuf.flags |= IOBUF_FL_NO_SPLICING; + TRACE_DEVEL("Unable to allocate pipe for splicing, fallback to buffer", H1_EV_STRM_SEND, h1c->conn, h1s); + } + + if (!h1_get_buf(h1c, &h1c->obuf)) { + h1c->flags |= H1C_F_OUT_ALLOC; + TRACE_STATE("waiting for opposite h1c obuf allocation", H1_EV_STRM_SEND|H1_EV_H1S_BLK, h1c->conn, h1s); + goto out; + } + + if (b_space_wraps(&h1c->obuf)) + b_slow_realign(&h1c->obuf, trash.area, b_data(&h1c->obuf)); + + h1s->sd->iobuf.buf = &h1c->obuf; + h1s->sd->iobuf.offset = 0; + h1s->sd->iobuf.data = 0; + + /* Cannot forward more than available room in output buffer */ + if (count > b_room(&h1c->obuf)) + count = b_room(&h1c->obuf); + + if (!count) { + h1c->flags |= H1C_F_OUT_FULL; + h1s->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED; + TRACE_STATE("output buffer full", H1_EV_STRM_SEND|H1_EV_H1S_BLK, h1c->conn, h1s); + goto out; + } + + /* forward remaining input data */ + if (b_data(input)) { + size_t xfer = count; + + if (xfer > b_data(input)) + xfer = b_data(input); + h1s->sd->iobuf.data = b_xfer(&h1c->obuf, input, xfer); + + /* Cannot forward more data, wait for room */ + if (b_data(input)) + goto out; + } + + ret = count - h1s->sd->iobuf.data; + + out: + TRACE_LEAVE(H1_EV_STRM_SEND, h1c->conn, h1s, 0, (size_t[]){ret}); + return ret; +} + +static void h1_done_ff(struct stconn *sc) +{ + struct h1s *h1s = __sc_mux_strm(sc); + struct h1c *h1c = h1s->h1c; + struct h1m *h1m = (!(h1c->flags & H1C_F_IS_BACK) ? &h1s->res : &h1s->req); + struct sedesc *sd = h1s->sd; + size_t total = 0; + + TRACE_ENTER(H1_EV_STRM_SEND, h1c->conn, h1s); + + + if (sd->iobuf.pipe) { + total = h1c->conn->xprt->snd_pipe(h1c->conn, h1c->conn->xprt_ctx, sd->iobuf.pipe, sd->iobuf.pipe->data); + if (total > 0) + HA_ATOMIC_ADD(&h1c->px_counters->spliced_bytes_out, total); + if (!sd->iobuf.pipe->data) { + put_pipe(sd->iobuf.pipe); + sd->iobuf.pipe = NULL; + } + } + else { + if (b_room(&h1c->obuf) == sd->iobuf.offset) + h1c->flags |= H1C_F_OUT_FULL; + + total = sd->iobuf.data; + sd->iobuf.buf = NULL; + sd->iobuf.offset = 0; + sd->iobuf.data = 0; + + /* Perform a synchronous send but in all cases, consider + * everything was already sent from the SC point of view. + */ + h1_send(h1c); + } + + if (h1m->curr_len) + h1m->curr_len -= total; + + if (!h1m->curr_len && (h1m->flags & H1_MF_CLEN)) + h1m->state = H1_MSG_DONE; + else if (!h1m->curr_len && (h1m->flags & H1_MF_CHNK)) { + if (h1m->state == H1_MSG_DATA) + h1m->state = H1_MSG_CHUNK_CRLF; + } + + HA_ATOMIC_ADD(&h1c->px_counters->bytes_out, total); + + out: + // TODO: should we call h1_process() instead ? + if (h1c->conn->flags & CO_FL_ERROR) { + h1c->flags = (h1c->flags & ~H1C_F_WANT_FASTFWD) | H1C_F_ERR_PENDING; + if (h1c->flags & H1C_F_EOS) + h1c->flags |= H1C_F_ERROR; + else if (!(h1c->wait_event.events & SUB_RETRY_RECV)) { + /* EOS not seen, so subscribe for reads to be able to + * catch the error on the reading path. It is especially + * important if EOI was reached. + */ + h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event); + } + se_fl_set_error(h1s->sd); + TRACE_DEVEL("connection error", H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, h1c->conn, h1s); + } + + TRACE_LEAVE(H1_EV_STRM_RECV, h1c->conn, h1s, 0, (size_t[]){total}); +} + +static int h1_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags) +{ + struct h1s *h1s = __sc_mux_strm(sc); + struct h1c *h1c = h1s->h1c; + struct h1m *h1m = (!(h1c->flags & H1C_F_IS_BACK) ? &h1s->req : &h1s->res); + struct sedesc *sdo = NULL; + size_t total = 0, try = 0; + int ret = 0; + + TRACE_ENTER(H1_EV_STRM_RECV, h1c->conn, h1s, 0, (size_t[]){count}); + + if (h1m->state != H1_MSG_DATA && h1m->state != H1_MSG_TUNNEL) { + h1c->flags &= ~H1C_F_WANT_FASTFWD; + TRACE_STATE("Cannot fast-forwad data now !(msg_data|msg_tunnel)", H1_EV_STRM_RECV, h1c->conn, h1s); + goto end; + } + + se_fl_clr(h1s->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM); + h1c->conn->flags &= ~CO_FL_WAIT_ROOM; + h1c->flags |= H1C_F_WANT_FASTFWD; + + if (h1c->flags & (H1C_F_EOS|H1C_F_ERROR)) { + h1c->flags &= ~H1C_F_WANT_FASTFWD; + TRACE_DEVEL("leaving on (EOS|ERROR)", H1_EV_STRM_RECV, h1c->conn, h1s); + goto end; + } + + sdo = h1s_opposite_sd(h1s); + if (!sdo) { + TRACE_STATE("Opposite endpoint not available yet", H1_EV_STRM_RECV, h1c->conn, h1s); + goto out; + } + + if (h1m->state == H1_MSG_DATA && (h1m->flags & (H1_MF_CHNK|H1_MF_CLEN)) && count > h1m->curr_len) + count = h1m->curr_len; + + try = se_init_ff(sdo, &h1c->ibuf, count, h1c->conn->xprt->rcv_pipe && !!(flags & CO_RFL_MAY_SPLICE) && !(sdo->iobuf.flags & IOBUF_FL_NO_SPLICING)); + if (b_room(&h1c->ibuf) && (h1c->flags & H1C_F_IN_FULL)) { + h1c->flags &= ~H1C_F_IN_FULL; + TRACE_STATE("h1c ibuf not full anymore", H1_EV_STRM_RECV|H1_EV_H1C_BLK); + } + + if (sdo->iobuf.flags & IOBUF_FL_NO_FF) { + /* Fast forwading is not supported by the consumer */ + h1c->flags = (h1c->flags & ~H1C_F_WANT_FASTFWD) | H1C_F_CANT_FASTFWD; + TRACE_DEVEL("Fast-forwarding not supported by opposite endpoint, disable it", H1_EV_STRM_RECV, h1c->conn, h1s); + goto end; + } + if (sdo->iobuf.flags & IOBUF_FL_FF_BLOCKED) { + se_fl_set(h1s->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM); + TRACE_STATE("waiting for more room", H1_EV_STRM_RECV|H1_EV_H1S_ERR, h1c->conn, h1s); + goto out; + } + + total += sdo->iobuf.data; + if (sdo->iobuf.pipe) { + /* Here, not data was xferred */ + ret = h1c->conn->xprt->rcv_pipe(h1c->conn, h1c->conn->xprt_ctx, sdo->iobuf.pipe, try); + if (ret < 0) { + h1c->flags = (h1c->flags & ~H1C_F_WANT_FASTFWD) | H1C_F_CANT_FASTFWD; + TRACE_ERROR("Error when trying to fast-forward data, disable it and abort", + H1_EV_STRM_RECV|H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, h1c->conn, h1s); + BUG_ON(sdo->iobuf.pipe->data); + put_pipe(sdo->iobuf.pipe); + sdo->iobuf.pipe = NULL; + goto end; + } + total += ret; + if (!ret) { + TRACE_STATE("failed to receive data, subscribing", H1_EV_STRM_RECV, h1c->conn); + h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event); + } + HA_ATOMIC_ADD(&h1c->px_counters->spliced_bytes_in, ret); + } + else { + b_add(sdo->iobuf.buf, sdo->iobuf.offset); + ret = h1c->conn->xprt->rcv_buf(h1c->conn, h1c->conn->xprt_ctx, sdo->iobuf.buf, try, flags); + if (ret < try) { + TRACE_STATE("failed to receive data, subscribing", H1_EV_STRM_RECV, h1c->conn); + h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event); + } + b_sub(sdo->iobuf.buf, sdo->iobuf.offset); + total += ret; + sdo->iobuf.data += ret; + } + + if (h1m->state == H1_MSG_DATA && (h1m->flags & (H1_MF_CHNK|H1_MF_CLEN))) { + if (total > h1m->curr_len) { + h1s->flags |= H1S_F_PARSING_ERROR; + se_fl_set(h1s->sd, SE_FL_ERROR); + TRACE_ERROR("too much payload, more than announced", + H1_EV_STRM_RECV|H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, h1c->conn, h1s); + goto out; + } + h1m->curr_len -= total; + if (!h1m->curr_len) { + if (h1m->flags & H1_MF_CLEN) { + h1m->state = H1_MSG_DONE; + se_fl_set(h1s->sd, SE_FL_EOI); /* TODO: this line is tricky and must be evaluated first + * Its purpose is to avoid to set CO_SFL_MSG_MORE on the + * next calls to ->complete_fastfwd(). + */ + } + else + h1m->state = H1_MSG_CHUNK_CRLF; + h1c->flags &= ~H1C_F_WANT_FASTFWD; + + if (!(h1c->flags & H1C_F_IS_BACK)) { + /* The request was fully received. It means the H1S now + * expect data from the opposite side + */ + se_expect_data(h1s->sd); + } + + TRACE_STATE("payload fully received", H1_EV_STRM_RECV, h1c->conn, h1s); + } + } + + HA_ATOMIC_ADD(&h1c->px_counters->bytes_in, total); + ret = total; + se_done_ff(sdo); + + if (sdo->iobuf.pipe) { + se_fl_set(h1s->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM); + } + + TRACE_DEVEL("Data fast-forwarded", H1_EV_STRM_RECV, h1c->conn, h1s, 0, (size_t[]){ret}); + + out: + if (conn_xprt_read0_pending(h1c->conn)) { + se_fl_set(h1s->sd, SE_FL_EOS); + TRACE_STATE("report EOS to SE", H1_EV_STRM_RECV, h1c->conn, h1s); + if (h1m->state >= H1_MSG_DONE || !(h1m->flags & H1_MF_XFER_LEN)) { + /* DONE or TUNNEL or SHUTR without XFER_LEN, set + * EOI on the stream connector */ + se_fl_set(h1s->sd, SE_FL_EOI); + TRACE_STATE("report EOI to SE", H1_EV_STRM_RECV, h1c->conn, h1s); + } + else { + se_fl_set(h1s->sd, SE_FL_ERROR); + h1c->flags = (h1c->flags & ~H1C_F_WANT_FASTFWD) | H1C_F_ERROR; + TRACE_ERROR("message aborted, set error on SC", H1_EV_STRM_RECV|H1_EV_H1S_ERR, h1c->conn, h1s); + } + h1c->flags = (h1c->flags & ~H1C_F_WANT_FASTFWD) | H1C_F_EOS; + TRACE_STATE("Allow xprt rcv_buf on read0", H1_EV_STRM_RECV, h1c->conn, h1s); + } + if (h1c->conn->flags & CO_FL_ERROR) { + se_fl_set(h1s->sd, SE_FL_ERROR); + h1c->flags = (h1c->flags & ~H1C_F_WANT_FASTFWD) | H1C_F_ERROR; + TRACE_DEVEL("connection error", H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, h1c->conn, h1s); + } + + end: + if (!(h1c->flags & H1C_F_WANT_FASTFWD)) { + TRACE_STATE("notify the mux can't use fast-forward anymore", H1_EV_STRM_RECV, h1c->conn, h1s); + se_fl_clr(h1s->sd, SE_FL_MAY_FASTFWD); + if (!(h1c->wait_event.events & SUB_RETRY_RECV)) { + TRACE_STATE("restart receiving data, subscribing", H1_EV_STRM_RECV, h1c->conn, h1s); + h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event); + } + } + + TRACE_LEAVE(H1_EV_STRM_RECV, h1c->conn, h1s, 0, (size_t[]){ret}); + return ret; +} + +static int h1_resume_fastfwd(struct stconn *sc, unsigned int flags) +{ + struct h1s *h1s = __sc_mux_strm(sc); + struct h1c *h1c = h1s->h1c; + struct h1m *h1m = (!(h1c->flags & H1C_F_IS_BACK) ? &h1s->res : &h1s->req); + struct sedesc *sd = h1s->sd; + int ret = 0; + + TRACE_ENTER(H1_EV_STRM_SEND, h1c->conn, h1s, 0, (size_t[]){flags}); + + if (sd->iobuf.pipe) { + ret = h1c->conn->xprt->snd_pipe(h1c->conn, h1c->conn->xprt_ctx, sd->iobuf.pipe, sd->iobuf.pipe->data); + if (ret > 0) + HA_ATOMIC_ADD(&h1c->px_counters->spliced_bytes_out, ret); + + if (!sd->iobuf.pipe->data) { + put_pipe(sd->iobuf.pipe); + sd->iobuf.pipe = NULL; + } + } + + h1m->curr_len -= ret; + + if (!h1m->curr_len && (h1m->flags & H1_MF_CLEN)) + h1m->state = H1_MSG_DONE; + else if (!h1m->curr_len && (h1m->flags & H1_MF_CHNK)) { + if (h1m->state == H1_MSG_DATA) + h1m->state = H1_MSG_CHUNK_CRLF; + } + + HA_ATOMIC_ADD(&h1c->px_counters->bytes_out, ret); + + out: + // TODO: should we call h1_process() instead ? + if (h1c->conn->flags & CO_FL_ERROR) { + h1c->flags = (h1c->flags & ~H1C_F_WANT_FASTFWD) | H1C_F_ERR_PENDING; + if (h1c->flags & H1C_F_EOS) + h1c->flags |= H1C_F_ERROR; + else if (!(h1c->wait_event.events & SUB_RETRY_RECV)) { + /* EOS not seen, so subscribe for reads to be able to + * catch the error on the reading path. It is especially + * important if EOI was reached. + */ + h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event); + } + se_fl_set_error(h1s->sd); + TRACE_DEVEL("connection error", H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, h1c->conn, h1s); + } + + TRACE_LEAVE(H1_EV_STRM_RECV, h1c->conn, h1s, 0, (size_t[]){ret}); + return ret; +} + static int h1_ctl(struct connection *conn, enum mux_ctl_type mux_ctl, void *output) { const struct h1c *h1c = conn->ctx; @@ -4792,6 +5178,10 @@ static const struct mux_ops mux_http_ops = { .used_streams = h1_used_streams, .rcv_buf = h1_rcv_buf, .snd_buf = h1_snd_buf, + .init_fastfwd = h1_init_ff, + .done_fastfwd = h1_done_ff, + .fastfwd = h1_fastfwd, + .resume_fastfwd = h1_resume_fastfwd, .subscribe = h1_subscribe, .unsubscribe = h1_unsubscribe, .shutr = h1_shutr, @@ -4815,6 +5205,10 @@ static const struct mux_ops mux_h1_ops = { .used_streams = h1_used_streams, .rcv_buf = h1_rcv_buf, .snd_buf = h1_snd_buf, + .init_fastfwd = h1_init_ff, + .done_fastfwd = h1_done_ff, + .fastfwd = h1_fastfwd, + .resume_fastfwd = h1_resume_fastfwd, .subscribe = h1_subscribe, .unsubscribe = h1_unsubscribe, .shutr = h1_shutr,