diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h index 62ae59bbd5..e56045b575 100644 --- a/include/haproxy/mux_quic-t.h +++ b/include/haproxy/mux_quic-t.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -69,6 +70,7 @@ struct qcc { struct quic_fctl fc; /* stream flow control applied on sending */ uint64_t buf_in_flight; /* sum of currently allocated Tx buffer sizes */ struct list frms; /* list of STREAM frames ready for sent */ + struct quic_pacer pacer; /* engine used to pace emission */ } tx; uint64_t largest_bidi_r; /* largest remote bidi stream ID opened. */ diff --git a/src/mux_quic.c b/src/mux_quic.c index 0db4f91779..4c1d1c4dc4 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -36,6 +37,13 @@ DECLARE_POOL(pool_head_qcs, "qcs", sizeof(struct qcs)); static void qmux_ctrl_send(struct qc_stream_desc *, uint64_t data, uint64_t offset); static void qmux_ctrl_room(struct qc_stream_desc *, uint64_t room); +/* Returns true if pacing should be used for connection. */ +static int qcc_is_pacing_active(const struct connection *conn) +{ + const struct quic_conn *qc = conn->handle.qc; + return !!(qc->path->cc.algo->pacing_rate); +} + /* Free STREAM frames in Tx list. */ static void qcc_tx_frms_free(struct qcc *qcc) { @@ -398,6 +406,13 @@ static void qcc_refresh_timeout(struct qcc *qcc) void qcc_wakeup(struct qcc *qcc) { + HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1); + tasklet_wakeup(qcc->wait_event.tasklet); +} + +static void qcc_wakeup_pacing(struct qcc *qcc) +{ + HA_ATOMIC_OR(&qcc->wait_event.tasklet->state, TASK_F_USR1); tasklet_wakeup(qcc->wait_event.tasklet); } @@ -2084,20 +2099,25 @@ static int qcc_subscribe_send(struct qcc *qcc) /* Wrapper for send on transport layer. Send a list of frames for the * connection . * - * Returns 0 if all data sent with success else non-zero. + * Returns 0 if all data sent with success. On fatal error, a negative error + * code is returned. A positive 1 is used if emission should be paced. */ -static int qcc_send_frames(struct qcc *qcc, struct list *frms) +static int qcc_send_frames(struct qcc *qcc, struct list *frms, int stream) { enum quic_tx_err ret; + struct quic_pacer *pacer = NULL; TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn); if (LIST_ISEMPTY(frms)) { TRACE_DEVEL("leaving on no frame to send", QMUX_EV_QCC_SEND, qcc->conn); - return 1; + return -1; } - ret = qc_send_mux(qcc->conn->handle.qc, frms, NULL); + if (stream && qcc_is_pacing_active(qcc->conn)) + pacer = &qcc->tx.pacer; + + ret = qc_send_mux(qcc->conn->handle.qc, frms, pacer); if (ret == QUIC_TX_ERR_FATAL) { TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn); qcc_subscribe_send(qcc); @@ -2107,18 +2127,18 @@ static int qcc_send_frames(struct qcc *qcc, struct list *frms) /* If there is frames left at this stage, transport layer is blocked. * Subscribe on it to retry later. */ - if (!LIST_ISEMPTY(frms)) { + if (!LIST_ISEMPTY(frms) && ret != QUIC_TX_ERR_PACING) { TRACE_DEVEL("remaining frames to send", QMUX_EV_QCC_SEND, qcc->conn); qcc_subscribe_send(qcc); goto err; } TRACE_LEAVE(QMUX_EV_QCC_SEND, qcc->conn); - return 0; + return ret == QUIC_TX_ERR_PACING ? 1 : 0; err: TRACE_DEVEL("leaving on error", QMUX_EV_QCC_SEND, qcc->conn); - return 1; + return -1; } /* Emit a RESET_STREAM on . @@ -2143,7 +2163,7 @@ static int qcs_send_reset(struct qcs *qcs) frm->reset_stream.final_size = qcs->tx.fc.off_real; LIST_APPEND(&frms, &frm->list); - if (qcc_send_frames(qcs->qcc, &frms)) { + if (qcc_send_frames(qcs->qcc, &frms, 0)) { if (!LIST_ISEMPTY(&frms)) qc_frm_free(qcs->qcc->conn->handle.qc, &frm); TRACE_DEVEL("cannot send RESET_STREAM", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs); @@ -2194,7 +2214,7 @@ static int qcs_send_stop_sending(struct qcs *qcs) frm->stop_sending.app_error_code = qcs->err; LIST_APPEND(&frms, &frm->list); - if (qcc_send_frames(qcs->qcc, &frms)) { + if (qcc_send_frames(qcs->qcc, &frms, 0)) { if (!LIST_ISEMPTY(&frms)) qc_frm_free(qcc->conn->handle.qc, &frm); TRACE_DEVEL("cannot send STOP_SENDING", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs); @@ -2266,7 +2286,7 @@ static int qcc_io_send(struct qcc *qcc) struct list qcs_failed = LIST_HEAD_INIT(qcs_failed); struct qcs *qcs, *qcs_tmp, *first_qcs = NULL; uint64_t window_conn = qfctl_rcap(&qcc->tx.fc); - int ret, total = 0, resent; + int ret = 0, total = 0, resent; TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn); @@ -2276,6 +2296,8 @@ static int qcc_io_send(struct qcc *qcc) * apply for STREAM frames. */ + qcc_tx_frms_free(qcc); + /* Check for transport error. */ if (qcc->flags & QC_CF_ERR_CONN || qcc->conn->flags & CO_FL_ERROR) { TRACE_DEVEL("connection on error", QMUX_EV_QCC_SEND, qcc->conn); @@ -2300,7 +2322,7 @@ static int qcc_io_send(struct qcc *qcc) } if (!LIST_ISEMPTY(&qcc->lfctl.frms)) { - if (qcc_send_frames(qcc, &qcc->lfctl.frms)) { + if (qcc_send_frames(qcc, &qcc->lfctl.frms, 0)) { TRACE_DEVEL("flow-control frames rejected by transport, aborting send", QMUX_EV_QCC_SEND, qcc->conn); goto out; } @@ -2376,10 +2398,17 @@ static int qcc_io_send(struct qcc *qcc) } } + if (qcc_is_pacing_active(qcc->conn)) { + if (!LIST_ISEMPTY(frms) && !quic_pacing_expired(&qcc->tx.pacer)) { + qcc_wakeup_pacing(qcc); + return 1; + } + } + /* Retry sending until no frame to send, data rejected or connection * flow-control limit reached. */ - while (qcc_send_frames(qcc, frms) == 0 && !qfctl_rblocked(&qcc->tx.fc)) { + while ((ret = qcc_send_frames(qcc, frms, 1)) == 0 && !qfctl_rblocked(&qcc->tx.fc)) { window_conn = qfctl_rcap(&qcc->tx.fc); resent = 0; @@ -2410,8 +2439,15 @@ static int qcc_io_send(struct qcc *qcc) } sent_done: - /* Deallocate frames that the transport layer has rejected. */ - qcc_tx_frms_free(qcc); + if (ret == 1) { + /* qcc_send_frames cannot return 1 if pacing not used. */ + BUG_ON(!qcc_is_pacing_active(qcc->conn)); + qcc_wakeup_pacing(qcc); + } + else if (!LIST_ISEMPTY(&qcc->tx.frms)) { + /* Deallocate frames that the transport layer has rejected. */ + qcc_tx_frms_free(qcc); + } /* Re-insert on-error QCS at the end of the send-list. */ if (!LIST_ISEMPTY(&qcs_failed)) { @@ -2758,12 +2794,45 @@ static void qcc_release(struct qcc *qcc) TRACE_LEAVE(QMUX_EV_QCC_END); } +static void qcc_purge_sending(struct qcc *qcc) +{ + struct quic_pacer *pacer = &qcc->tx.pacer; + struct list *frms = &qcc->tx.frms; + enum quic_tx_err ret = QUIC_TX_ERR_PACING; + + /* This function is reserved for pacing usage. */ + BUG_ON(!qcc_is_pacing_active(qcc->conn)); + + /* Only restart emission if pacing delay is reached. */ + if (quic_pacing_expired(pacer)) + ret = qc_send_mux(qcc->conn->handle.qc, frms, pacer); + + if (ret == QUIC_TX_ERR_PACING) { + BUG_ON(LIST_ISEMPTY(frms)); + qcc_wakeup_pacing(qcc); + } + else if (ret == QUIC_TX_ERR_FATAL) { + TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn); + HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1); + qcc_subscribe_send(qcc); + } + else { + if (!LIST_ISEMPTY(frms)) + qcc_subscribe_send(qcc); + } +} + struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int status) { struct qcc *qcc = ctx; TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn); + if (status & TASK_F_USR1) { + qcc_purge_sending(qcc); + return NULL; + } + if (!(qcc->wait_event.events & SUB_RETRY_SEND)) qcc_io_send(qcc); @@ -2891,6 +2960,9 @@ static int qmux_init(struct connection *conn, struct proxy *prx, qcc->tx.buf_in_flight = 0; + if (qcc_is_pacing_active(conn)) + quic_pacing_init(&qcc->tx.pacer, &conn->handle.qc->path->cc); + if (conn_is_back(conn)) { qcc->next_bidi_l = 0x00; qcc->largest_bidi_r = 0x01;