MEDIUM: mux-quic: Add consumer-side fast-forwarding support

The QUIC multiplexer now implements callbacks to consume fast-forwarded
data. It relies on the H3 stack to acquire the buffer and format the frame.
This commit is contained in:
Christopher Faulet 2023-10-27 15:48:13 +02:00
parent c7d4be4843
commit 4e336fb0fe
3 changed files with 163 additions and 0 deletions

View File

@ -189,6 +189,8 @@ struct qcc_app_ops {
int (*attach)(struct qcs *qcs, void *conn_ctx);
ssize_t (*decode_qcs)(struct qcs *qcs, struct buffer *b, int fin);
size_t (*snd_buf)(struct qcs *qcs, struct htx *htx, size_t count);
size_t (*nego_ff)(struct qcs *qcs, size_t count);
size_t (*done_ff)(struct qcs *qcs);
int (*close)(struct qcs *qcs, enum qcc_app_ops_close_side side);
void (*detach)(struct qcs *qcs);
int (*finalize)(void *ctx);

View File

@ -1836,6 +1836,69 @@ static size_t h3_snd_buf(struct qcs *qcs, struct htx *htx, size_t count)
return total;
}
static size_t h3_nego_ff(struct qcs *qcs, size_t count)
{
struct buffer *res;
int hsize;
size_t sz, ret = 0;
h3_debug_printf(stderr, "%s\n", __func__);
/* FIXME: no check on ALLOC ? */
res = mux_get_buf(qcs);
/* h3 DATA headers : 1-byte frame type + varint frame length */
hsize = 1 + QUIC_VARINT_MAX_SIZE;
while (1) {
if (b_contig_space(res) >= hsize || !b_space_wraps(res))
break;
b_slow_realign(res, trash.area, b_data(res));
}
/* Not enough room for headers and at least one data byte, block the
* stream. It is expected that the stream connector layer will subscribe
* on SEND.
*/
if (b_contig_space(res) <= hsize) {
qcs->flags |= QC_SF_BLK_MROOM;
qcs->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
goto end;
}
/* Cannot forward more than available room in output buffer */
sz = b_contig_space(res) - hsize;
if (count > sz)
count = sz;
qcs->sd->iobuf.buf = res;
qcs->sd->iobuf.offset = hsize;
qcs->sd->iobuf.data = 0;
ret = count;
end:
return ret;
}
static size_t h3_done_ff(struct qcs *qcs)
{
size_t total = qcs->sd->iobuf.data;
h3_debug_printf(stderr, "%s\n", __func__);
if (qcs->sd->iobuf.data) {
b_sub(qcs->sd->iobuf.buf, qcs->sd->iobuf.data);
b_putchr(qcs->sd->iobuf.buf, 0x00); /* h3 frame type = DATA */
b_quic_enc_int(qcs->sd->iobuf.buf, qcs->sd->iobuf.data, QUIC_VARINT_MAX_SIZE); /* h3 frame length */
b_add(qcs->sd->iobuf.buf, qcs->sd->iobuf.data);
}
qcs->sd->iobuf.buf = NULL;
qcs->sd->iobuf.offset = 0;
qcs->sd->iobuf.data = 0;
return total;
}
/* Notify about a closure on <qcs> stream requested by the remote peer.
*
* Stream channel <side> is explained relative to our endpoint : WR for
@ -2132,6 +2195,8 @@ const struct qcc_app_ops h3_ops = {
.attach = h3_attach,
.decode_qcs = h3_decode_qcs,
.snd_buf = h3_snd_buf,
.nego_ff = h3_nego_ff,
.done_ff = h3_done_ff,
.close = h3_close,
.detach = h3_detach,
.finalize = h3_finalize,

View File

@ -21,6 +21,7 @@
#include <haproxy/stconn.h>
#include <haproxy/time.h>
#include <haproxy/trace.h>
#include <haproxy/xref.h>
DECLARE_POOL(pool_head_qcc, "qcc", sizeof(struct qcc));
DECLARE_POOL(pool_head_qcs, "qcs", sizeof(struct qcs));
@ -2807,6 +2808,98 @@ static size_t qmux_strm_snd_buf(struct stconn *sc, struct buffer *buf,
return ret;
}
static size_t qmux_nego_ff(struct stconn *sc, struct buffer *input, size_t count, unsigned int may_splice)
{
struct qcs *qcs = __sc_mux_strm(sc);
size_t ret = 0;
TRACE_ENTER(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
/* stream layer has been detached so no transfer must occur after. */
BUG_ON_HOT(qcs->flags & QC_SF_DETACH);
if (!qcs->qcc->app_ops->nego_ff || !qcs->qcc->app_ops->done_ff) {
/* Fast forwading is not supported by the QUIC application layer */
qcs->sd->iobuf.flags |= IOBUF_FL_NO_FF;
goto end;
}
/* Alawys disable splicing */
qcs->sd->iobuf.flags |= IOBUF_FL_NO_SPLICING;
ret = qcs->qcc->app_ops->nego_ff(qcs, count);
if (!ret)
goto end;
/* forward remaining input data */
if (b_data(input)) {
size_t xfer = ret;
if (xfer > b_data(input))
xfer = b_data(input);
b_add(qcs->sd->iobuf.buf, qcs->sd->iobuf.offset);
qcs->sd->iobuf.data = b_xfer(qcs->sd->iobuf.buf, input, xfer);
b_sub(qcs->sd->iobuf.buf, qcs->sd->iobuf.offset);
/* Cannot forward more data, wait for room */
if (b_data(input))
goto end;
}
ret -= qcs->sd->iobuf.data;
end:
TRACE_LEAVE(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
return ret;
}
static size_t qmux_done_ff(struct stconn *sc)
{
struct qcs *qcs = __sc_mux_strm(sc);
struct qcc *qcc = qcs->qcc;
struct sedesc *sd = qcs->sd;
size_t total = 0;
TRACE_ENTER(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
/* FIXME: Must be handled with a flag. It is just a temporary hack */
{
struct xref *peer;
struct sedesc *sdo;
peer = xref_get_peer_and_lock(&qcs->sd->xref);
if (!peer)
goto end;
sdo = container_of(peer, struct sedesc, xref);
xref_unlock(&qcs->sd->xref, peer);
if (se_fl_test(sdo, SE_FL_EOI))
qcs->flags |= QC_SF_FIN_STREAM;
}
if (!(qcs->flags & QC_SF_FIN_STREAM) && !sd->iobuf.data)
goto end;
total = qcs->qcc->app_ops->done_ff(qcs);
qcc_send_stream(qcs, 0);
if (!(qcs->qcc->wait_event.events & SUB_RETRY_SEND))
tasklet_wakeup(qcc->wait_event.tasklet);
end:
if (!b_data(&qcs->tx.buf))
b_free(&qcs->tx.buf);
TRACE_LEAVE(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
return total;
}
static int qmux_resume_ff(struct stconn *sc, unsigned int flags)
{
return 0;
}
/* Called from the upper layer, to subscribe <es> to events <event_type>. The
* event subscriber <es> is not allowed to change from a previous call as long
* as at least one event is still subscribed. The <event_type> must only be a
@ -2927,6 +3020,9 @@ static const struct mux_ops qmux_ops = {
.detach = qmux_strm_detach,
.rcv_buf = qmux_strm_rcv_buf,
.snd_buf = qmux_strm_snd_buf,
.nego_fastfwd = qmux_nego_ff,
.done_fastfwd = qmux_done_ff,
.resume_fastfwd = qmux_resume_ff,
.subscribe = qmux_strm_subscribe,
.unsubscribe = qmux_strm_unsubscribe,
.wake = qmux_wake,