From d4bf6f0526a1ec707265d0f7745aefde754cf40a Mon Sep 17 00:00:00 2001 From: Amaury Denoyelle Date: Wed, 18 Oct 2023 17:48:11 +0200 Subject: [PATCH] MEDIUM: mux-quic: limit conn flow control on snd_buf This commit is a direct follow-up on the previous one. This time, it deals with connection level flow control. Process is similar to stream level : soft offset is incremented during snd_buf and real offset during STREAM frame emission. On MAX_DATA reception, both stream layer and QMUX is woken up if necessary. One extra feature for conn level is the introduction of a new QCC list to reference QCS instances. It will store instances for which snd_buf callback has been interrupted on QCC soft offset reached. Every stream instances is woken up on MAX_DATA reception if soft_offset is unblocked. --- include/haproxy/mux_quic-t.h | 6 ++- src/h3.c | 4 +- src/mux_quic.c | 89 ++++++++++++++++++++++++++++-------- src/qmux_trace.c | 2 +- 4 files changed, 79 insertions(+), 22 deletions(-) diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h index 33f617a9a..d2c7bdbcc 100644 --- a/include/haproxy/mux_quic-t.h +++ b/include/haproxy/mux_quic-t.h @@ -31,7 +31,7 @@ enum qcs_type { #define QC_CF_ERRL 0x00000001 /* fatal error detected locally, connection should be closed soon */ #define QC_CF_ERRL_DONE 0x00000002 /* local error properly handled, connection can be released */ -#define QC_CF_BLK_MFCTL 0x00000004 /* sending blocked due to connection flow-control */ +/* unused 0x00000004 */ #define QC_CF_CONN_FULL 0x00000008 /* no stream buffers available on connection */ #define QC_CF_APP_SHUT 0x00000010 /* Application layer shutdown done. */ #define QC_CF_ERR_CONN 0x00000020 /* fatal error reported by transport layer */ @@ -71,6 +71,8 @@ struct qcc { } rfctl; struct { + struct quic_fctl fc; /* stream flow control applied on sending */ + uint64_t offsets; /* sum of all offsets prepared */ uint64_t sent_offsets; /* sum of all offset sent */ } tx; @@ -84,6 +86,7 @@ struct qcc { struct list send_retry_list; /* list of qcs eligible to send retry */ struct list send_list; /* list of qcs ready to send (STREAM, STOP_SENDING or RESET_STREAM emission) */ + struct list fctl_list; /* list of sending qcs blocked on conn flow control */ struct wait_event wait_event; /* To be used if we're waiting for I/Os */ @@ -170,6 +173,7 @@ struct qcs { struct list el; /* element of qcc.send_retry_list */ struct list el_send; /* element of qcc.send_list */ struct list el_opening; /* element of qcc.opening_list */ + struct list el_fctl; /* element of qcc.fctl_list */ struct wait_event wait_event; struct wait_event *subs; diff --git a/src/h3.c b/src/h3.c index 6c2cad7dd..ec8edfea6 100644 --- a/src/h3.c +++ b/src/h3.c @@ -1463,7 +1463,7 @@ static int h3_control_send(struct qcs *qcs, void *ctx) b_quic_enc_int(&pos, h3_settings_max_field_section_size, 0); } - if (qfctl_sblocked(&qcs->tx.fc)) { + if (qfctl_sblocked(&qcs->tx.fc) || qfctl_sblocked(&qcs->qcc->tx.fc)) { TRACE_ERROR("not enough initial credit for control stream", H3_EV_TX_FRAME|H3_EV_TX_SETTINGS, qcs->qcc->conn, qcs); goto err; } @@ -2236,7 +2236,7 @@ static int h3_send_goaway(struct h3c *h3c) res = qcc_get_stream_txbuf(qcs); if (!res || b_room(res) < b_data(&pos) || - qfctl_sblocked(&qcs->tx.fc)) { + qfctl_sblocked(&qcs->tx.fc) || qfctl_sblocked(&h3c->qcc->tx.fc)) { /* Do not try forcefully to emit GOAWAY if no space left. */ TRACE_ERROR("cannot send GOAWAY", H3_EV_H3C_END, h3c->qcc->conn, qcs); goto err; diff --git a/src/mux_quic.c b/src/mux_quic.c index 6e6654943..f2b4cb970 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -59,6 +59,7 @@ static void qcs_free(struct qcs *qcs) /* Safe to use even if already removed from the list. */ LIST_DEL_INIT(&qcs->el_opening); LIST_DEL_INIT(&qcs->el_send); + LIST_DEL_INIT(&qcs->el_fctl); /* Release stream endpoint descriptor. */ BUG_ON(qcs->sd && !se_fl_test(qcs->sd, SE_FL_ORPHAN)); @@ -108,6 +109,7 @@ static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type) */ LIST_INIT(&qcs->el_opening); LIST_INIT(&qcs->el_send); + LIST_INIT(&qcs->el_fctl); qcs->start = TICK_ETERNITY; /* store transport layer stream descriptor in qcc tree */ @@ -933,6 +935,20 @@ struct buffer *qcc_get_stream_txbuf(struct qcs *qcs) return b_alloc(&qcs->tx.buf); } +/* Wakes up every streams of which are currently waiting for sending but + * are blocked on connection flow control. + */ +static void qcc_notify_fctl(struct qcc *qcc) +{ + struct qcs *qcs; + + while (!LIST_ISEMPTY(&qcc->fctl_list)) { + qcs = LIST_ELEM(qcc->fctl_list.n, struct qcs *, el_fctl); + LIST_DEL_INIT(&qcs->el_fctl); + qcs_notify_send(qcs); + } +} + /* Prepare for the emission of RESET_STREAM on with error code . */ void qcc_reset_stream(struct qcs *qcs, int err) { @@ -954,6 +970,15 @@ void qcc_reset_stream(struct qcs *qcs, int err) qcs->tx.offset = qcs->tx.sent_offset; } + /* Substract to conn flow control data amount prepared on stream not yet sent. */ + if (qcs->tx.fc.off_soft > qcs->tx.fc.off_real) { + const int soft_blocked = qfctl_sblocked(&qcc->tx.fc); + + qcc->tx.fc.off_soft -= (qcs->tx.fc.off_soft - qcs->tx.fc.off_real); + if (soft_blocked && !qfctl_sblocked(&qcc->tx.fc)) + qcc_notify_fctl(qcc); + } + /* Report send error to stream-endpoint layer. */ if (qcs_sc(qcs)) { se_fl_set_error(qcs->sd); @@ -987,8 +1012,10 @@ void qcc_send_stream(struct qcs *qcs, int urg, int count) LIST_APPEND(&qcs->qcc->send_list, &qcs->el_send); } - if (count) + if (count) { + qfctl_sinc(&qcc->tx.fc, count); qfctl_sinc(&qcs->tx.fc, count); + } TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs); } @@ -1210,17 +1237,19 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset, */ int qcc_recv_max_data(struct qcc *qcc, uint64_t max) { + int unblock_soft = 0, unblock_real = 0; + TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn); TRACE_PROTO("receiving MAX_DATA", QMUX_EV_QCC_RECV, qcc->conn); - if (qcc->rfctl.md < max) { - qcc->rfctl.md = max; + if (qfctl_set_max(&qcc->tx.fc, max, &unblock_soft, &unblock_real)) { TRACE_DATA("increase remote max-data", QMUX_EV_QCC_RECV, qcc->conn); - if (qcc->flags & QC_CF_BLK_MFCTL) { - qcc->flags &= ~QC_CF_BLK_MFCTL; + if (unblock_real) tasklet_wakeup(qcc->wait_event.tasklet); - } + + if (unblock_soft) + qcc_notify_fctl(qcc); } TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn); @@ -1570,11 +1599,11 @@ static int qcs_xfer_data(struct qcs *qcs, struct buffer *out, struct buffer *in) to_xfer = qcs->tx.fc.limit - qcs->tx.offset; } - BUG_ON_HOT(qcc->tx.offsets > qcc->rfctl.md); + BUG_ON_HOT(qcc->tx.offsets > qcc->tx.fc.limit); /* do not overcome flow control limit on connection */ - if (qcc->tx.offsets + to_xfer > qcc->rfctl.md) { + if (qcc->tx.offsets + to_xfer > qcc->tx.fc.limit) { TRACE_DATA("do not exceed conn flow control", QMUX_EV_QCS_SEND, qcc->conn, qcs); - to_xfer = qcc->rfctl.md - qcc->tx.offsets; + to_xfer = qcc->tx.fc.limit - qcc->tx.offsets; } if (!left && !to_xfer) @@ -1631,7 +1660,7 @@ static int qcs_build_stream_frm(struct qcs *qcs, struct buffer *out, char fin, BUG_ON((!total && qcs->tx.sent_offset > qcs->tx.offset) || (total && qcs->tx.sent_offset >= qcs->tx.offset)); BUG_ON(qcs->tx.sent_offset + total > qcs->tx.offset); - BUG_ON(qcc->tx.sent_offsets + total > qcc->rfctl.md); + BUG_ON(qcc->tx.sent_offsets + total > qcc->tx.fc.limit); TRACE_PROTO("sending STREAM frame", QMUX_EV_QCS_SEND, qcc->conn, qcs); frm = qc_frm_alloc(QUIC_FT_STREAM_8); @@ -1732,17 +1761,19 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset) diff = offset + data - qcs->tx.sent_offset; if (diff) { + struct quic_fctl *fc_conn = &qcc->tx.fc; struct quic_fctl *fc_strm = &qcs->tx.fc; /* Ensure real offset never exceeds soft value. */ + BUG_ON(fc_conn->off_real + diff > fc_conn->off_soft); BUG_ON(fc_strm->off_real + diff > fc_strm->off_soft); /* increase offset sum on connection */ qcc->tx.sent_offsets += diff; - BUG_ON_HOT(qcc->tx.sent_offsets > qcc->rfctl.md); - if (qcc->tx.sent_offsets == qcc->rfctl.md) { - qcc->flags |= QC_CF_BLK_MFCTL; - TRACE_STATE("connection flow-control reached", QMUX_EV_QCS_SEND, qcc->conn); + BUG_ON_HOT(qcc->tx.sent_offsets > fc_conn->limit); + if (qfctl_rinc(fc_conn, diff)) { + TRACE_STATE("connection flow-control reached", + QMUX_EV_QCS_SEND, qcc->conn); } /* increase offset on stream */ @@ -2102,7 +2133,7 @@ static int qcc_io_send(struct qcc *qcc) continue; } - if (!(qcc->flags & QC_CF_BLK_MFCTL) && + if (!qfctl_rblocked(&qcc->tx.fc) && !qfctl_rblocked(&qcs->tx.fc)) { if ((ret = qcs_send(qcs, &frms)) < 0) { /* Temporarily remove QCS from send-list. */ @@ -2128,7 +2159,7 @@ static int qcc_io_send(struct qcc *qcc) /* Retry sending until no frame to send, data rejected or connection * flow-control limit reached. */ - while (qcc_send_frames(qcc, &frms) == 0 && !(qcc->flags & QC_CF_BLK_MFCTL)) { + while (qcc_send_frames(qcc, &frms) == 0 && !qfctl_rblocked(&qcc->tx.fc)) { /* Reloop over . Useful for streams which have * fulfilled their qc_stream_desc buf and have now release it. */ @@ -2167,7 +2198,7 @@ static int qcc_io_send(struct qcc *qcc) LIST_APPEND(&qcc->send_list, &qcs->el_send); } - if (!(qcc->flags & QC_CF_BLK_MFCTL)) + if (!qfctl_rblocked(&qcc->tx.fc)) tasklet_wakeup(qcc->wait_event.tasklet); } @@ -2575,7 +2606,7 @@ static int qmux_init(struct connection *conn, struct proxy *prx, qcc->lfctl.offsets_recv = qcc->lfctl.offsets_consume = 0; rparams = &conn->handle.qc->tx.params; - qcc->rfctl.md = rparams->initial_max_data; + qfctl_init(&qcc->tx.fc, rparams->initial_max_data); qcc->rfctl.msd_bidi_l = rparams->initial_max_stream_data_bidi_local; qcc->rfctl.msd_bidi_r = rparams->initial_max_stream_data_bidi_remote; qcc->rfctl.msd_uni_l = rparams->initial_max_stream_data_uni; @@ -2600,6 +2631,7 @@ static int qmux_init(struct connection *conn, struct proxy *prx, } LIST_INIT(&qcc->send_list); + LIST_INIT(&qcc->fctl_list); qcc->wait_event.tasklet->process = qcc_io_cb; qcc->wait_event.tasklet->context = qcc; @@ -2828,6 +2860,17 @@ static size_t qmux_strm_snd_buf(struct stconn *sc, struct buffer *buf, goto end; } + if (qfctl_sblocked(&qcs->qcc->tx.fc)) { + TRACE_DEVEL("leaving on connection flow control", + QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs); + if (!LIST_INLIST(&qcs->el_fctl)) { + TRACE_DEVEL("append to fctl-list", + QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs); + LIST_APPEND(&qcs->qcc->fctl_list, &qcs->el_fctl); + } + goto end; + } + if (qfctl_sblocked(&qcs->tx.fc)) { TRACE_DEVEL("leaving on flow-control reached", QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs); @@ -2887,6 +2930,16 @@ static size_t qmux_strm_nego_ff(struct stconn *sc, struct buffer *input, goto end; } + if (qfctl_sblocked(&qcs->qcc->tx.fc)) { + TRACE_DEVEL("leaving on connection flow control", QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs); + if (!LIST_INLIST(&qcs->el_fctl)) { + TRACE_DEVEL("append to fctl-list", QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs); + LIST_APPEND(&qcs->qcc->fctl_list, &qcs->el_fctl); + } + qcs->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED; + goto end; + } + if (qfctl_sblocked(&qcs->tx.fc)) { TRACE_DEVEL("leaving on flow-control reached", QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs); qcs->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED; diff --git a/src/qmux_trace.c b/src/qmux_trace.c index 298f4a35b..992b940d4 100644 --- a/src/qmux_trace.c +++ b/src/qmux_trace.c @@ -77,7 +77,7 @@ static void qmux_trace(enum trace_level level, uint64_t mask, chunk_appendf(&trace_buf, " qc=%p", qcc->conn->handle.qc); chunk_appendf(&trace_buf, " md=%llu/%llu/%llu", - (ullong)qcc->rfctl.md, (ullong)qcc->tx.offsets, (ullong)qcc->tx.sent_offsets); + (ullong)qcc->tx.fc.limit, (ullong)qcc->tx.offsets, (ullong)qcc->tx.sent_offsets); if (qcs) { chunk_appendf(&trace_buf, " qcs=%p .id=%llu .st=%s",