From 796446a15e7db9923e8243a89155ddfb24bed374 Mon Sep 17 00:00:00 2001 From: Amaury Denoyelle Date: Mon, 18 Nov 2024 14:55:41 +0100 Subject: [PATCH] MAJOR: mux-quic: support pacing emission Support pacing emission for STREAM frames at the QUIC MUX layer. This is implemented by adding a quic_pacer engine into QCC structure. The main changes have been written into qcc_io_send(). It now differentiates cases when some frames have been rejected by transport layer. This can occur as previously due to congestion or FD buffer full, which requires subscribing on transport layer. The new case is when emission has been interrupted due to pacing timing. In this case, QUIC MUX I/O tasklet is rescheduled to run with the flag TASK_F_USR1. On tasklet execution, if TASK_F_USR1 is set, all standard processing for emission and reception is skipped. Instead, a new function qcc_purge_sending() is called. Its purpose is to retry emission with the saved STREAM frames list. Either all remaining frames can now be send, subscribe is done on transport error or tasklet must be rescheduled for pacing purging. In the meantime, if tasklet is rescheduled due to other conditions, TASK_F_USR1 is reset. This will trigger a full regeneration of STREAM frames. In this case, pacing expiration must be check before calling qcc_send_frames() to ensure emission is now allowed. --- include/haproxy/mux_quic-t.h | 2 + src/mux_quic.c | 100 ++++++++++++++++++++++++++++++----- 2 files changed, 88 insertions(+), 14 deletions(-) diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h index 62ae59bbd5..e56045b575 100644 --- a/include/haproxy/mux_quic-t.h +++ b/include/haproxy/mux_quic-t.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -69,6 +70,7 @@ struct qcc { struct quic_fctl fc; /* stream flow control applied on sending */ uint64_t buf_in_flight; /* sum of currently allocated Tx buffer sizes */ struct list frms; /* list of STREAM frames ready for sent */ + struct quic_pacer pacer; /* engine used to pace emission */ } tx; uint64_t largest_bidi_r; /* largest remote bidi stream ID opened. */ diff --git a/src/mux_quic.c b/src/mux_quic.c index 0db4f91779..4c1d1c4dc4 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -36,6 +37,13 @@ DECLARE_POOL(pool_head_qcs, "qcs", sizeof(struct qcs)); static void qmux_ctrl_send(struct qc_stream_desc *, uint64_t data, uint64_t offset); static void qmux_ctrl_room(struct qc_stream_desc *, uint64_t room); +/* Returns true if pacing should be used for connection. */ +static int qcc_is_pacing_active(const struct connection *conn) +{ + const struct quic_conn *qc = conn->handle.qc; + return !!(qc->path->cc.algo->pacing_rate); +} + /* Free STREAM frames in Tx list. */ static void qcc_tx_frms_free(struct qcc *qcc) { @@ -398,6 +406,13 @@ static void qcc_refresh_timeout(struct qcc *qcc) void qcc_wakeup(struct qcc *qcc) { + HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1); + tasklet_wakeup(qcc->wait_event.tasklet); +} + +static void qcc_wakeup_pacing(struct qcc *qcc) +{ + HA_ATOMIC_OR(&qcc->wait_event.tasklet->state, TASK_F_USR1); tasklet_wakeup(qcc->wait_event.tasklet); } @@ -2084,20 +2099,25 @@ static int qcc_subscribe_send(struct qcc *qcc) /* Wrapper for send on transport layer. Send a list of frames for the * connection . * - * Returns 0 if all data sent with success else non-zero. + * Returns 0 if all data sent with success. On fatal error, a negative error + * code is returned. A positive 1 is used if emission should be paced. */ -static int qcc_send_frames(struct qcc *qcc, struct list *frms) +static int qcc_send_frames(struct qcc *qcc, struct list *frms, int stream) { enum quic_tx_err ret; + struct quic_pacer *pacer = NULL; TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn); if (LIST_ISEMPTY(frms)) { TRACE_DEVEL("leaving on no frame to send", QMUX_EV_QCC_SEND, qcc->conn); - return 1; + return -1; } - ret = qc_send_mux(qcc->conn->handle.qc, frms, NULL); + if (stream && qcc_is_pacing_active(qcc->conn)) + pacer = &qcc->tx.pacer; + + ret = qc_send_mux(qcc->conn->handle.qc, frms, pacer); if (ret == QUIC_TX_ERR_FATAL) { TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn); qcc_subscribe_send(qcc); @@ -2107,18 +2127,18 @@ static int qcc_send_frames(struct qcc *qcc, struct list *frms) /* If there is frames left at this stage, transport layer is blocked. * Subscribe on it to retry later. */ - if (!LIST_ISEMPTY(frms)) { + if (!LIST_ISEMPTY(frms) && ret != QUIC_TX_ERR_PACING) { TRACE_DEVEL("remaining frames to send", QMUX_EV_QCC_SEND, qcc->conn); qcc_subscribe_send(qcc); goto err; } TRACE_LEAVE(QMUX_EV_QCC_SEND, qcc->conn); - return 0; + return ret == QUIC_TX_ERR_PACING ? 1 : 0; err: TRACE_DEVEL("leaving on error", QMUX_EV_QCC_SEND, qcc->conn); - return 1; + return -1; } /* Emit a RESET_STREAM on . @@ -2143,7 +2163,7 @@ static int qcs_send_reset(struct qcs *qcs) frm->reset_stream.final_size = qcs->tx.fc.off_real; LIST_APPEND(&frms, &frm->list); - if (qcc_send_frames(qcs->qcc, &frms)) { + if (qcc_send_frames(qcs->qcc, &frms, 0)) { if (!LIST_ISEMPTY(&frms)) qc_frm_free(qcs->qcc->conn->handle.qc, &frm); TRACE_DEVEL("cannot send RESET_STREAM", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs); @@ -2194,7 +2214,7 @@ static int qcs_send_stop_sending(struct qcs *qcs) frm->stop_sending.app_error_code = qcs->err; LIST_APPEND(&frms, &frm->list); - if (qcc_send_frames(qcs->qcc, &frms)) { + if (qcc_send_frames(qcs->qcc, &frms, 0)) { if (!LIST_ISEMPTY(&frms)) qc_frm_free(qcc->conn->handle.qc, &frm); TRACE_DEVEL("cannot send STOP_SENDING", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs); @@ -2266,7 +2286,7 @@ static int qcc_io_send(struct qcc *qcc) struct list qcs_failed = LIST_HEAD_INIT(qcs_failed); struct qcs *qcs, *qcs_tmp, *first_qcs = NULL; uint64_t window_conn = qfctl_rcap(&qcc->tx.fc); - int ret, total = 0, resent; + int ret = 0, total = 0, resent; TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn); @@ -2276,6 +2296,8 @@ static int qcc_io_send(struct qcc *qcc) * apply for STREAM frames. */ + qcc_tx_frms_free(qcc); + /* Check for transport error. */ if (qcc->flags & QC_CF_ERR_CONN || qcc->conn->flags & CO_FL_ERROR) { TRACE_DEVEL("connection on error", QMUX_EV_QCC_SEND, qcc->conn); @@ -2300,7 +2322,7 @@ static int qcc_io_send(struct qcc *qcc) } if (!LIST_ISEMPTY(&qcc->lfctl.frms)) { - if (qcc_send_frames(qcc, &qcc->lfctl.frms)) { + if (qcc_send_frames(qcc, &qcc->lfctl.frms, 0)) { TRACE_DEVEL("flow-control frames rejected by transport, aborting send", QMUX_EV_QCC_SEND, qcc->conn); goto out; } @@ -2376,10 +2398,17 @@ static int qcc_io_send(struct qcc *qcc) } } + if (qcc_is_pacing_active(qcc->conn)) { + if (!LIST_ISEMPTY(frms) && !quic_pacing_expired(&qcc->tx.pacer)) { + qcc_wakeup_pacing(qcc); + return 1; + } + } + /* Retry sending until no frame to send, data rejected or connection * flow-control limit reached. */ - while (qcc_send_frames(qcc, frms) == 0 && !qfctl_rblocked(&qcc->tx.fc)) { + while ((ret = qcc_send_frames(qcc, frms, 1)) == 0 && !qfctl_rblocked(&qcc->tx.fc)) { window_conn = qfctl_rcap(&qcc->tx.fc); resent = 0; @@ -2410,8 +2439,15 @@ static int qcc_io_send(struct qcc *qcc) } sent_done: - /* Deallocate frames that the transport layer has rejected. */ - qcc_tx_frms_free(qcc); + if (ret == 1) { + /* qcc_send_frames cannot return 1 if pacing not used. */ + BUG_ON(!qcc_is_pacing_active(qcc->conn)); + qcc_wakeup_pacing(qcc); + } + else if (!LIST_ISEMPTY(&qcc->tx.frms)) { + /* Deallocate frames that the transport layer has rejected. */ + qcc_tx_frms_free(qcc); + } /* Re-insert on-error QCS at the end of the send-list. */ if (!LIST_ISEMPTY(&qcs_failed)) { @@ -2758,12 +2794,45 @@ static void qcc_release(struct qcc *qcc) TRACE_LEAVE(QMUX_EV_QCC_END); } +static void qcc_purge_sending(struct qcc *qcc) +{ + struct quic_pacer *pacer = &qcc->tx.pacer; + struct list *frms = &qcc->tx.frms; + enum quic_tx_err ret = QUIC_TX_ERR_PACING; + + /* This function is reserved for pacing usage. */ + BUG_ON(!qcc_is_pacing_active(qcc->conn)); + + /* Only restart emission if pacing delay is reached. */ + if (quic_pacing_expired(pacer)) + ret = qc_send_mux(qcc->conn->handle.qc, frms, pacer); + + if (ret == QUIC_TX_ERR_PACING) { + BUG_ON(LIST_ISEMPTY(frms)); + qcc_wakeup_pacing(qcc); + } + else if (ret == QUIC_TX_ERR_FATAL) { + TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn); + HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1); + qcc_subscribe_send(qcc); + } + else { + if (!LIST_ISEMPTY(frms)) + qcc_subscribe_send(qcc); + } +} + struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int status) { struct qcc *qcc = ctx; TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn); + if (status & TASK_F_USR1) { + qcc_purge_sending(qcc); + return NULL; + } + if (!(qcc->wait_event.events & SUB_RETRY_SEND)) qcc_io_send(qcc); @@ -2891,6 +2960,9 @@ static int qmux_init(struct connection *conn, struct proxy *prx, qcc->tx.buf_in_flight = 0; + if (qcc_is_pacing_active(conn)) + quic_pacing_init(&qcc->tx.pacer, &conn->handle.qc->path->cc); + if (conn_is_back(conn)) { qcc->next_bidi_l = 0x00; qcc->largest_bidi_r = 0x01;