MAJOR: mux-quic: support pacing emission

Support pacing emission for STREAM frames at the QUIC MUX layer. This is
implemented by adding a quic_pacer engine into QCC structure.

The main changes have been written into qcc_io_send(). It now
differentiates cases when some frames have been rejected by transport
layer. This can occur as previously due to congestion or FD buffer full,
which requires subscribing on transport layer. The new case is when
emission has been interrupted due to pacing timing. In this case, QUIC
MUX I/O tasklet is rescheduled to run with the flag TASK_F_USR1.

On tasklet execution, if TASK_F_USR1 is set, all standard processing for
emission and reception is skipped. Instead, a new function
qcc_purge_sending() is called. Its purpose is to retry emission with the
saved STREAM frames list. Either all remaining frames can now be send,
subscribe is done on transport error or tasklet must be rescheduled for
pacing purging.

In the meantime, if tasklet is rescheduled due to other conditions,
TASK_F_USR1 is reset. This will trigger a full regeneration of STREAM
frames. In this case, pacing expiration must be check before calling
qcc_send_frames() to ensure emission is now allowed.
This commit is contained in:
Amaury Denoyelle 2024-11-18 14:55:41 +01:00
parent ede4cd4c2e
commit 796446a15e
2 changed files with 88 additions and 14 deletions

View File

@ -15,6 +15,7 @@
#include <haproxy/ncbuf-t.h> #include <haproxy/ncbuf-t.h>
#include <haproxy/quic_fctl-t.h> #include <haproxy/quic_fctl-t.h>
#include <haproxy/quic_frame-t.h> #include <haproxy/quic_frame-t.h>
#include <haproxy/quic_pacing-t.h>
#include <haproxy/quic_stream-t.h> #include <haproxy/quic_stream-t.h>
#include <haproxy/stconn-t.h> #include <haproxy/stconn-t.h>
#include <haproxy/time-t.h> #include <haproxy/time-t.h>
@ -69,6 +70,7 @@ struct qcc {
struct quic_fctl fc; /* stream flow control applied on sending */ struct quic_fctl fc; /* stream flow control applied on sending */
uint64_t buf_in_flight; /* sum of currently allocated Tx buffer sizes */ uint64_t buf_in_flight; /* sum of currently allocated Tx buffer sizes */
struct list frms; /* list of STREAM frames ready for sent */ struct list frms; /* list of STREAM frames ready for sent */
struct quic_pacer pacer; /* engine used to pace emission */
} tx; } tx;
uint64_t largest_bidi_r; /* largest remote bidi stream ID opened. */ uint64_t largest_bidi_r; /* largest remote bidi stream ID opened. */

View File

@ -19,6 +19,7 @@
#include <haproxy/quic_enc.h> #include <haproxy/quic_enc.h>
#include <haproxy/quic_fctl.h> #include <haproxy/quic_fctl.h>
#include <haproxy/quic_frame.h> #include <haproxy/quic_frame.h>
#include <haproxy/quic_pacing.h>
#include <haproxy/quic_sock.h> #include <haproxy/quic_sock.h>
#include <haproxy/quic_stream.h> #include <haproxy/quic_stream.h>
#include <haproxy/quic_tp-t.h> #include <haproxy/quic_tp-t.h>
@ -36,6 +37,13 @@ DECLARE_POOL(pool_head_qcs, "qcs", sizeof(struct qcs));
static void qmux_ctrl_send(struct qc_stream_desc *, uint64_t data, uint64_t offset); static void qmux_ctrl_send(struct qc_stream_desc *, uint64_t data, uint64_t offset);
static void qmux_ctrl_room(struct qc_stream_desc *, uint64_t room); static void qmux_ctrl_room(struct qc_stream_desc *, uint64_t room);
/* Returns true if pacing should be used for <conn> connection. */
static int qcc_is_pacing_active(const struct connection *conn)
{
const struct quic_conn *qc = conn->handle.qc;
return !!(qc->path->cc.algo->pacing_rate);
}
/* Free <qcc> STREAM frames in Tx list. */ /* Free <qcc> STREAM frames in Tx list. */
static void qcc_tx_frms_free(struct qcc *qcc) static void qcc_tx_frms_free(struct qcc *qcc)
{ {
@ -398,6 +406,13 @@ static void qcc_refresh_timeout(struct qcc *qcc)
void qcc_wakeup(struct qcc *qcc) void qcc_wakeup(struct qcc *qcc)
{ {
HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
tasklet_wakeup(qcc->wait_event.tasklet);
}
static void qcc_wakeup_pacing(struct qcc *qcc)
{
HA_ATOMIC_OR(&qcc->wait_event.tasklet->state, TASK_F_USR1);
tasklet_wakeup(qcc->wait_event.tasklet); tasklet_wakeup(qcc->wait_event.tasklet);
} }
@ -2084,20 +2099,25 @@ static int qcc_subscribe_send(struct qcc *qcc)
/* Wrapper for send on transport layer. Send a list of frames <frms> for the /* Wrapper for send on transport layer. Send a list of frames <frms> for the
* connection <qcc>. * connection <qcc>.
* *
* Returns 0 if all data sent with success else non-zero. * Returns 0 if all data sent with success. On fatal error, a negative error
* code is returned. A positive 1 is used if emission should be paced.
*/ */
static int qcc_send_frames(struct qcc *qcc, struct list *frms) static int qcc_send_frames(struct qcc *qcc, struct list *frms, int stream)
{ {
enum quic_tx_err ret; enum quic_tx_err ret;
struct quic_pacer *pacer = NULL;
TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn); TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);
if (LIST_ISEMPTY(frms)) { if (LIST_ISEMPTY(frms)) {
TRACE_DEVEL("leaving on no frame to send", QMUX_EV_QCC_SEND, qcc->conn); TRACE_DEVEL("leaving on no frame to send", QMUX_EV_QCC_SEND, qcc->conn);
return 1; return -1;
} }
ret = qc_send_mux(qcc->conn->handle.qc, frms, NULL); if (stream && qcc_is_pacing_active(qcc->conn))
pacer = &qcc->tx.pacer;
ret = qc_send_mux(qcc->conn->handle.qc, frms, pacer);
if (ret == QUIC_TX_ERR_FATAL) { if (ret == QUIC_TX_ERR_FATAL) {
TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn); TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn);
qcc_subscribe_send(qcc); qcc_subscribe_send(qcc);
@ -2107,18 +2127,18 @@ static int qcc_send_frames(struct qcc *qcc, struct list *frms)
/* If there is frames left at this stage, transport layer is blocked. /* If there is frames left at this stage, transport layer is blocked.
* Subscribe on it to retry later. * Subscribe on it to retry later.
*/ */
if (!LIST_ISEMPTY(frms)) { if (!LIST_ISEMPTY(frms) && ret != QUIC_TX_ERR_PACING) {
TRACE_DEVEL("remaining frames to send", QMUX_EV_QCC_SEND, qcc->conn); TRACE_DEVEL("remaining frames to send", QMUX_EV_QCC_SEND, qcc->conn);
qcc_subscribe_send(qcc); qcc_subscribe_send(qcc);
goto err; goto err;
} }
TRACE_LEAVE(QMUX_EV_QCC_SEND, qcc->conn); TRACE_LEAVE(QMUX_EV_QCC_SEND, qcc->conn);
return 0; return ret == QUIC_TX_ERR_PACING ? 1 : 0;
err: err:
TRACE_DEVEL("leaving on error", QMUX_EV_QCC_SEND, qcc->conn); TRACE_DEVEL("leaving on error", QMUX_EV_QCC_SEND, qcc->conn);
return 1; return -1;
} }
/* Emit a RESET_STREAM on <qcs>. /* Emit a RESET_STREAM on <qcs>.
@ -2143,7 +2163,7 @@ static int qcs_send_reset(struct qcs *qcs)
frm->reset_stream.final_size = qcs->tx.fc.off_real; frm->reset_stream.final_size = qcs->tx.fc.off_real;
LIST_APPEND(&frms, &frm->list); LIST_APPEND(&frms, &frm->list);
if (qcc_send_frames(qcs->qcc, &frms)) { if (qcc_send_frames(qcs->qcc, &frms, 0)) {
if (!LIST_ISEMPTY(&frms)) if (!LIST_ISEMPTY(&frms))
qc_frm_free(qcs->qcc->conn->handle.qc, &frm); qc_frm_free(qcs->qcc->conn->handle.qc, &frm);
TRACE_DEVEL("cannot send RESET_STREAM", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs); TRACE_DEVEL("cannot send RESET_STREAM", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
@ -2194,7 +2214,7 @@ static int qcs_send_stop_sending(struct qcs *qcs)
frm->stop_sending.app_error_code = qcs->err; frm->stop_sending.app_error_code = qcs->err;
LIST_APPEND(&frms, &frm->list); LIST_APPEND(&frms, &frm->list);
if (qcc_send_frames(qcs->qcc, &frms)) { if (qcc_send_frames(qcs->qcc, &frms, 0)) {
if (!LIST_ISEMPTY(&frms)) if (!LIST_ISEMPTY(&frms))
qc_frm_free(qcc->conn->handle.qc, &frm); qc_frm_free(qcc->conn->handle.qc, &frm);
TRACE_DEVEL("cannot send STOP_SENDING", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs); TRACE_DEVEL("cannot send STOP_SENDING", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
@ -2266,7 +2286,7 @@ static int qcc_io_send(struct qcc *qcc)
struct list qcs_failed = LIST_HEAD_INIT(qcs_failed); struct list qcs_failed = LIST_HEAD_INIT(qcs_failed);
struct qcs *qcs, *qcs_tmp, *first_qcs = NULL; struct qcs *qcs, *qcs_tmp, *first_qcs = NULL;
uint64_t window_conn = qfctl_rcap(&qcc->tx.fc); uint64_t window_conn = qfctl_rcap(&qcc->tx.fc);
int ret, total = 0, resent; int ret = 0, total = 0, resent;
TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn); TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);
@ -2276,6 +2296,8 @@ static int qcc_io_send(struct qcc *qcc)
* apply for STREAM frames. * apply for STREAM frames.
*/ */
qcc_tx_frms_free(qcc);
/* Check for transport error. */ /* Check for transport error. */
if (qcc->flags & QC_CF_ERR_CONN || qcc->conn->flags & CO_FL_ERROR) { if (qcc->flags & QC_CF_ERR_CONN || qcc->conn->flags & CO_FL_ERROR) {
TRACE_DEVEL("connection on error", QMUX_EV_QCC_SEND, qcc->conn); TRACE_DEVEL("connection on error", QMUX_EV_QCC_SEND, qcc->conn);
@ -2300,7 +2322,7 @@ static int qcc_io_send(struct qcc *qcc)
} }
if (!LIST_ISEMPTY(&qcc->lfctl.frms)) { if (!LIST_ISEMPTY(&qcc->lfctl.frms)) {
if (qcc_send_frames(qcc, &qcc->lfctl.frms)) { if (qcc_send_frames(qcc, &qcc->lfctl.frms, 0)) {
TRACE_DEVEL("flow-control frames rejected by transport, aborting send", QMUX_EV_QCC_SEND, qcc->conn); TRACE_DEVEL("flow-control frames rejected by transport, aborting send", QMUX_EV_QCC_SEND, qcc->conn);
goto out; goto out;
} }
@ -2376,10 +2398,17 @@ static int qcc_io_send(struct qcc *qcc)
} }
} }
if (qcc_is_pacing_active(qcc->conn)) {
if (!LIST_ISEMPTY(frms) && !quic_pacing_expired(&qcc->tx.pacer)) {
qcc_wakeup_pacing(qcc);
return 1;
}
}
/* Retry sending until no frame to send, data rejected or connection /* Retry sending until no frame to send, data rejected or connection
* flow-control limit reached. * flow-control limit reached.
*/ */
while (qcc_send_frames(qcc, frms) == 0 && !qfctl_rblocked(&qcc->tx.fc)) { while ((ret = qcc_send_frames(qcc, frms, 1)) == 0 && !qfctl_rblocked(&qcc->tx.fc)) {
window_conn = qfctl_rcap(&qcc->tx.fc); window_conn = qfctl_rcap(&qcc->tx.fc);
resent = 0; resent = 0;
@ -2410,8 +2439,15 @@ static int qcc_io_send(struct qcc *qcc)
} }
sent_done: sent_done:
/* Deallocate frames that the transport layer has rejected. */ if (ret == 1) {
qcc_tx_frms_free(qcc); /* qcc_send_frames cannot return 1 if pacing not used. */
BUG_ON(!qcc_is_pacing_active(qcc->conn));
qcc_wakeup_pacing(qcc);
}
else if (!LIST_ISEMPTY(&qcc->tx.frms)) {
/* Deallocate frames that the transport layer has rejected. */
qcc_tx_frms_free(qcc);
}
/* Re-insert on-error QCS at the end of the send-list. */ /* Re-insert on-error QCS at the end of the send-list. */
if (!LIST_ISEMPTY(&qcs_failed)) { if (!LIST_ISEMPTY(&qcs_failed)) {
@ -2758,12 +2794,45 @@ static void qcc_release(struct qcc *qcc)
TRACE_LEAVE(QMUX_EV_QCC_END); TRACE_LEAVE(QMUX_EV_QCC_END);
} }
static void qcc_purge_sending(struct qcc *qcc)
{
struct quic_pacer *pacer = &qcc->tx.pacer;
struct list *frms = &qcc->tx.frms;
enum quic_tx_err ret = QUIC_TX_ERR_PACING;
/* This function is reserved for pacing usage. */
BUG_ON(!qcc_is_pacing_active(qcc->conn));
/* Only restart emission if pacing delay is reached. */
if (quic_pacing_expired(pacer))
ret = qc_send_mux(qcc->conn->handle.qc, frms, pacer);
if (ret == QUIC_TX_ERR_PACING) {
BUG_ON(LIST_ISEMPTY(frms));
qcc_wakeup_pacing(qcc);
}
else if (ret == QUIC_TX_ERR_FATAL) {
TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn);
HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
qcc_subscribe_send(qcc);
}
else {
if (!LIST_ISEMPTY(frms))
qcc_subscribe_send(qcc);
}
}
struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int status) struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int status)
{ {
struct qcc *qcc = ctx; struct qcc *qcc = ctx;
TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn); TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
if (status & TASK_F_USR1) {
qcc_purge_sending(qcc);
return NULL;
}
if (!(qcc->wait_event.events & SUB_RETRY_SEND)) if (!(qcc->wait_event.events & SUB_RETRY_SEND))
qcc_io_send(qcc); qcc_io_send(qcc);
@ -2891,6 +2960,9 @@ static int qmux_init(struct connection *conn, struct proxy *prx,
qcc->tx.buf_in_flight = 0; qcc->tx.buf_in_flight = 0;
if (qcc_is_pacing_active(conn))
quic_pacing_init(&qcc->tx.pacer, &conn->handle.qc->path->cc);
if (conn_is_back(conn)) { if (conn_is_back(conn)) {
qcc->next_bidi_l = 0x00; qcc->next_bidi_l = 0x00;
qcc->largest_bidi_r = 0x01; qcc->largest_bidi_r = 0x01;