MEDIUM: mux-h2: Add consumer-side fast-forwarding support

The H2 multiplexer now implements callbacks to consume fast-forwarded
data. It is the most usful case: A H2 client getting data from a H1
server. It is also the easiest case to implement. The producer side is
trickier because of multiplexing. It is not obvious this case would be
improved with data fast-forwarding.
This commit is contained in:
Christopher Faulet 2023-08-03 18:18:45 +02:00
parent eb346074bb
commit 11c05c516a

View File

@ -32,6 +32,7 @@
#include <haproxy/stconn.h>
#include <haproxy/stream.h>
#include <haproxy/trace.h>
#include <haproxy/xref.h>
/* dummy streams returned for closed, error, refused, idle and states */
@ -6871,6 +6872,192 @@ static size_t h2_snd_buf(struct stconn *sc, struct buffer *buf, size_t count, in
return total;
}
static size_t h2_init_ff(struct stconn *sc, struct buffer *input, size_t count, unsigned int may_splice)
{
struct h2s *h2s = __sc_mux_strm(sc);
struct h2c *h2c = h2s->h2c;
struct buffer *mbuf;
size_t sz , ret = 0;
TRACE_ENTER(H2_EV_H2S_SEND|H2_EV_STRM_SEND, h2s->h2c->conn, h2s);
/* If we were not just woken because we wanted to send but couldn't,
* and there's somebody else that is waiting to send, do nothing,
* we will subscribe later and be put at the end of the list
*/
if (!(h2s->flags & H2_SF_NOTIFIED) &&
(!LIST_ISEMPTY(&h2c->send_list) || !LIST_ISEMPTY(&h2c->fctl_list))) {
if (LIST_INLIST(&h2s->list))
TRACE_DEVEL("stream already waiting, leaving", H2_EV_H2S_SEND|H2_EV_H2S_BLK, h2s->h2c->conn, h2s);
else {
TRACE_DEVEL("other streams already waiting, going to the queue and leaving", H2_EV_H2S_SEND|H2_EV_H2S_BLK, h2s->h2c->conn, h2s);
h2s->h2c->flags |= H2_CF_WAIT_INLIST;
}
h2s->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
goto end;
}
h2s->flags &= ~H2_SF_NOTIFIED;
if (h2s_mws(h2s) <= 0) {
h2s->flags |= H2_SF_BLK_SFCTL;
if (LIST_INLIST(&h2s->list))
LIST_DEL_INIT(&h2s->list);
LIST_APPEND(&h2c->blocked_list, &h2s->list);
h2s->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
TRACE_STATE("stream window <=0, flow-controlled", H2_EV_H2S_SEND|H2_EV_H2S_FCTL, h2c->conn, h2s);
goto end;
}
if (h2c->mws <= 0) {
h2s->flags |= H2_SF_BLK_MFCTL;
h2s->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
TRACE_STATE("connection window <=0, stream flow-controlled", H2_EV_H2S_SEND|H2_EV_H2C_FCTL, h2c->conn, h2s);
goto end;
}
sz = count;
if (sz > h2s_mws(h2s))
sz = h2s_mws(h2s);
if (h2c->mfs && sz > h2c->mfs)
sz = h2c->mfs; // >0
if (sz > h2c->mws)
sz = h2c->mws;
if (count > sz)
count = sz;
mbuf = br_tail(h2c->mbuf);
retry:
if (!h2_get_buf(h2c, mbuf)) {
h2c->flags |= H2_CF_MUX_MALLOC;
h2s->flags |= H2_SF_BLK_MROOM;
h2s->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
TRACE_STATE("waiting for room in output buffer", H2_EV_H2S_SEND|H2_EV_H2S_BLK, h2c->conn, h2s);
goto end;
}
if (b_room(mbuf) < sz && b_room(mbuf) < b_size(mbuf) / 4) {
if ((mbuf = br_tail_add(h2c->mbuf)) != NULL)
goto retry;
h2c->flags |= H2_CF_MUX_MFULL;
h2s->flags |= H2_SF_BLK_MROOM;
h2s->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
TRACE_STATE("too large data present in output buffer, waiting for emptiness", H2_EV_H2S_SEND|H2_EV_H2S_BLK, h2c->conn, h2s);
goto end;
}
while (1) {
if (b_contig_space(mbuf) >= 9 || !b_space_wraps(mbuf))
break;
b_slow_realign(mbuf, trash.area, b_data(mbuf));
}
if (b_contig_space(mbuf) <= 9) {
if ((mbuf = br_tail_add(h2c->mbuf)) != NULL)
goto retry;
h2c->flags |= H2_CF_MUX_MFULL;
h2s->flags |= H2_SF_BLK_MROOM;
h2s->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
TRACE_STATE("output buffer full", H2_EV_H2S_SEND|H2_EV_H2S_BLK, h2c->conn, h2s);
goto end;
}
/* Cannot forward more than available room in output buffer */
sz = b_contig_space(mbuf) - 9;
if (count > sz)
count = sz;
/* len: 0x000000 (fill later), type: 0(DATA), flags: none=0 */
memcpy(b_tail(mbuf), "\x00\x00\x00\x00\x00", 5);
write_n32(b_tail(mbuf) + 5, h2s->id); // 4 bytes
h2s->sd->iobuf.buf = mbuf;
h2s->sd->iobuf.offset = 9;
h2s->sd->iobuf.data = 0;
/* forward remaining input data */
if (b_data(input)) {
size_t xfer = count;
if (xfer > b_data(input))
xfer = b_data(input);
b_add(mbuf, 9);
h2s->sd->iobuf.data = b_xfer(mbuf, input, xfer);
b_sub(mbuf, 9);
/* Cannot forward more data, wait for room */
if (b_data(input))
goto end;
}
ret = count - h2s->sd->iobuf.data;
end:
TRACE_LEAVE(H2_EV_H2S_SEND|H2_EV_STRM_SEND, h2s->h2c->conn, h2s);
return ret;
}
static void h2_done_ff(struct stconn *sc)
{
struct h2s *h2s = __sc_mux_strm(sc);
struct h2c *h2c = h2s->h2c;
struct sedesc *sd = h2s->sd;
struct buffer *mbuf;
char *head;
TRACE_ENTER(H2_EV_H2S_SEND|H2_EV_STRM_SEND, h2s->h2c->conn, h2s);
mbuf = sd->iobuf.buf;
if (!mbuf)
goto end;
head = b_peek(mbuf, b_data(mbuf) - sd->iobuf.data);
/* FIXME: Must be handled with a flag. It is just a temporary hack */
{
struct xref *peer;
struct sedesc *sdo;
peer = xref_get_peer_and_lock(&h2s->sd->xref);
if (!peer)
goto end;
sdo = container_of(peer, struct sedesc, xref);
xref_unlock(&h2s->sd->xref, peer);
if (se_fl_test(sdo, SE_FL_EOI))
h2s->flags &= ~H2_SF_MORE_HTX_DATA;
}
if (!(sd->iobuf.flags & IOBUF_FL_FF_BLOCKED) &&
!(h2s->flags & H2_SF_BLK_SFCTL) &&
!(h2s->flags & (H2_SF_WANT_SHUTR|H2_SF_WANT_SHUTW))) {
/* Ok we managed to send something, leave the send_list if we were still there */
h2_remove_from_list(h2s);
}
if (!sd->iobuf.data)
goto end;
/* Perform a synchronous send but in all cases, consider
* everything was already sent from the SC point of view.
*/
h2_set_frame_size(head, sd->iobuf.data);
b_add(mbuf, 9);
h2s->sws -= sd->iobuf.data;
h2c->mws -= sd->iobuf.data;
h2_process(h2c);
end:
sd->iobuf.buf = NULL;
sd->iobuf.offset = 0;
sd->iobuf.data = 0;
TRACE_LEAVE(H2_EV_H2S_SEND|H2_EV_STRM_SEND, h2s->h2c->conn, h2s);
}
static int h2_resume_ff(struct stconn *sc, unsigned int flags)
{
return 0;
}
/* appends some info about stream <h2s> to buffer <msg>, or does nothing if
* <h2s> is NULL. Returns non-zero if the stream is considered suspicious. May
* emit multiple lines, each new one being prefixed with <pfx>, if <pfx> is not
@ -7185,6 +7372,9 @@ static const struct mux_ops h2_ops = {
.wake = h2_wake,
.snd_buf = h2_snd_buf,
.rcv_buf = h2_rcv_buf,
.init_fastfwd = h2_init_ff,
.done_fastfwd = h2_done_ff,
.resume_fastfwd = h2_resume_ff,
.subscribe = h2_subscribe,
.unsubscribe = h2_unsubscribe,
.attach = h2_attach,