MEDIUM: mux-quic: limit stream flow control on snd_buf

This patch is the first of two to reimplement flow control emission
limits check. The objective is to account flow control earlier during
snd_buf stream callback. This should smooth transfers and prevent over
buffering on haproxy side if flow control limit is reached.

The current patch deals with stream level flow control. It reuses the
newly defined flow control type. Soft offset is incremented after HTX to
data conversion. If limit is reached, snd_buf is interrupted and stream
layer will subscribe on QCS.

On qcc_io_cb(), generation of STREAM frames is restricted as previously
to ensure to never surpass peer limits. Finally, flow control real
offset is incremented on lower layer send notification. Thus, it will
serve as a base offset for built STREAM frames. If limit is reached,
STREAM frames generation is suspended.

Each time QCS data flow control limit is reached, soft and real offsets
are reconsidered.

Finally, special care is used when flow control limit is incremented via
MAX_STREAM_DATA reception. If soft value is unblocked, stream layer
snd_buf is woken up. If real value is unblocked, qcc_io_cb() is
rescheduled.
This commit is contained in:
Amaury Denoyelle 2023-10-18 15:55:38 +02:00
parent 25493ca036
commit c44692356d
4 changed files with 56 additions and 31 deletions

View File

@ -13,6 +13,7 @@
#include <haproxy/htx-t.h> #include <haproxy/htx-t.h>
#include <haproxy/list-t.h> #include <haproxy/list-t.h>
#include <haproxy/ncbuf-t.h> #include <haproxy/ncbuf-t.h>
#include <haproxy/quic_fctl-t.h>
#include <haproxy/quic_frame-t.h> #include <haproxy/quic_frame-t.h>
#include <haproxy/quic_stream-t.h> #include <haproxy/quic_stream-t.h>
#include <haproxy/stconn-t.h> #include <haproxy/stconn-t.h>
@ -105,7 +106,7 @@ struct qcc {
#define QC_SF_FIN_STREAM 0x00000002 /* FIN bit must be set for last frame of the stream */ #define QC_SF_FIN_STREAM 0x00000002 /* FIN bit must be set for last frame of the stream */
#define QC_SF_BLK_MROOM 0x00000004 /* app layer is blocked waiting for room in the qcs.tx.buf */ #define QC_SF_BLK_MROOM 0x00000004 /* app layer is blocked waiting for room in the qcs.tx.buf */
#define QC_SF_DETACH 0x00000008 /* sc is detached but there is remaining data to send */ #define QC_SF_DETACH 0x00000008 /* sc is detached but there is remaining data to send */
#define QC_SF_BLK_SFCTL 0x00000010 /* stream blocked due to stream flow control limit */ /* unused 0x00000010 */
#define QC_SF_DEM_FULL 0x00000020 /* demux blocked on request channel buffer full */ #define QC_SF_DEM_FULL 0x00000020 /* demux blocked on request channel buffer full */
#define QC_SF_READ_ABORTED 0x00000040 /* Rx closed using STOP_SENDING*/ #define QC_SF_READ_ABORTED 0x00000040 /* Rx closed using STOP_SENDING*/
#define QC_SF_TO_RESET 0x00000080 /* a RESET_STREAM must be sent */ #define QC_SF_TO_RESET 0x00000080 /* a RESET_STREAM must be sent */
@ -155,10 +156,11 @@ struct qcs {
uint64_t msd_init; /* initial max-stream-data */ uint64_t msd_init; /* initial max-stream-data */
} rx; } rx;
struct { struct {
struct quic_fctl fc; /* stream flow control applied on sending */
uint64_t offset; /* last offset of data ready to be sent */ uint64_t offset; /* last offset of data ready to be sent */
uint64_t sent_offset; /* last offset sent by transport layer */ uint64_t sent_offset; /* last offset sent by transport layer */
struct buffer buf; /* transmit buffer before sending via xprt */ struct buffer buf; /* transmit buffer before sending via xprt */
uint64_t msd; /* fctl bytes limit to respect on emission */
} tx; } tx;
struct eb64_node by_id; struct eb64_node by_id;

View File

@ -37,6 +37,7 @@
#include <haproxy/qpack-dec.h> #include <haproxy/qpack-dec.h>
#include <haproxy/qpack-enc.h> #include <haproxy/qpack-enc.h>
#include <haproxy/quic_enc.h> #include <haproxy/quic_enc.h>
#include <haproxy/quic_fctl.h>
#include <haproxy/quic_frame.h> #include <haproxy/quic_frame.h>
#include <haproxy/stats-t.h> #include <haproxy/stats-t.h>
#include <haproxy/tools.h> #include <haproxy/tools.h>
@ -1462,6 +1463,11 @@ static int h3_control_send(struct qcs *qcs, void *ctx)
b_quic_enc_int(&pos, h3_settings_max_field_section_size, 0); b_quic_enc_int(&pos, h3_settings_max_field_section_size, 0);
} }
if (qfctl_sblocked(&qcs->tx.fc)) {
TRACE_ERROR("not enough initial credit for control stream", H3_EV_TX_FRAME|H3_EV_TX_SETTINGS, qcs->qcc->conn, qcs);
goto err;
}
if (!(res = qcc_get_stream_txbuf(qcs))) { if (!(res = qcc_get_stream_txbuf(qcs))) {
TRACE_ERROR("cannot allocate Tx buffer", H3_EV_TX_FRAME|H3_EV_TX_SETTINGS, qcs->qcc->conn, qcs); TRACE_ERROR("cannot allocate Tx buffer", H3_EV_TX_FRAME|H3_EV_TX_SETTINGS, qcs->qcc->conn, qcs);
goto err; goto err;
@ -2229,7 +2235,8 @@ static int h3_send_goaway(struct h3c *h3c)
b_quic_enc_int(&pos, h3c->id_goaway, 0); b_quic_enc_int(&pos, h3c->id_goaway, 0);
res = qcc_get_stream_txbuf(qcs); res = qcc_get_stream_txbuf(qcs);
if (!res || b_room(res) < b_data(&pos)) { if (!res || b_room(res) < b_data(&pos) ||
qfctl_sblocked(&qcs->tx.fc)) {
/* Do not try forcefully to emit GOAWAY if no space left. */ /* Do not try forcefully to emit GOAWAY if no space left. */
TRACE_ERROR("cannot send GOAWAY", H3_EV_H3C_END, h3c->qcc->conn, qcs); TRACE_ERROR("cannot send GOAWAY", H3_EV_H3C_END, h3c->qcc->conn, qcs);
goto err; goto err;

View File

@ -13,6 +13,7 @@
#include <haproxy/qmux_http.h> #include <haproxy/qmux_http.h>
#include <haproxy/qmux_trace.h> #include <haproxy/qmux_trace.h>
#include <haproxy/quic_conn.h> #include <haproxy/quic_conn.h>
#include <haproxy/quic_fctl.h>
#include <haproxy/quic_frame.h> #include <haproxy/quic_frame.h>
#include <haproxy/quic_sock.h> #include <haproxy/quic_sock.h>
#include <haproxy/quic_stream.h> #include <haproxy/quic_stream.h>
@ -113,19 +114,15 @@ static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
qcs->id = qcs->by_id.key = id; qcs->id = qcs->by_id.key = id;
eb64_insert(&qcc->streams_by_id, &qcs->by_id); eb64_insert(&qcc->streams_by_id, &qcs->by_id);
/* If stream is local, use peer remote-limit, or else the opposite. */ /* Different limits can be set by the peer for local and remote bidi streams. */
if (quic_stream_is_bidi(id)) { if (quic_stream_is_bidi(id)) {
qcs->tx.msd = quic_stream_is_local(qcc, id) ? qcc->rfctl.msd_bidi_r : qfctl_init(&qcs->tx.fc, quic_stream_is_local(qcc, id) ?
qcc->rfctl.msd_bidi_l; qcc->rfctl.msd_bidi_r : qcc->rfctl.msd_bidi_l);
} }
else if (quic_stream_is_local(qcc, id)) { else if (quic_stream_is_local(qcc, id)) {
qcs->tx.msd = qcc->rfctl.msd_uni_l; qfctl_init(&qcs->tx.fc, qcc->rfctl.msd_uni_l);
} }
/* Properly set flow-control blocking if initial MSD is nul. */
if (!qcs->tx.msd)
qcs->flags |= QC_SF_BLK_SFCTL;
qcs->rx.ncbuf = NCBUF_NULL; qcs->rx.ncbuf = NCBUF_NULL;
qcs->rx.app_buf = BUF_NULL; qcs->rx.app_buf = BUF_NULL;
qcs->rx.offset = qcs->rx.offset_max = 0; qcs->rx.offset = qcs->rx.offset_max = 0;
@ -970,7 +967,7 @@ void qcc_reset_stream(struct qcs *qcs, int err)
/* Register <qcs> stream for emission of STREAM, STOP_SENDING or RESET_STREAM. /* Register <qcs> stream for emission of STREAM, STOP_SENDING or RESET_STREAM.
* Set <urg> to 1 if stream content should be treated in priority compared to * Set <urg> to 1 if stream content should be treated in priority compared to
* other streams. For STREAM emission, <count> must contains the size of the * other streams. For STREAM emission, <count> must contains the size of the
* frame payload. * frame payload. This is used for flow control accounting.
*/ */
void qcc_send_stream(struct qcs *qcs, int urg, int count) void qcc_send_stream(struct qcs *qcs, int urg, int count)
{ {
@ -990,6 +987,9 @@ void qcc_send_stream(struct qcs *qcs, int urg, int count)
LIST_APPEND(&qcs->qcc->send_list, &qcs->el_send); LIST_APPEND(&qcs->qcc->send_list, &qcs->el_send);
} }
if (count)
qfctl_sinc(&qcs->tx.fc, count);
TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs); TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs);
} }
@ -1256,16 +1256,18 @@ int qcc_recv_max_stream_data(struct qcc *qcc, uint64_t id, uint64_t max)
goto err; goto err;
if (qcs) { if (qcs) {
TRACE_PROTO("receiving MAX_STREAM_DATA", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs); int unblock_soft = 0, unblock_real = 0;
if (max > qcs->tx.msd) {
qcs->tx.msd = max;
TRACE_DATA("increase remote max-stream-data", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
if (qcs->flags & QC_SF_BLK_SFCTL) { TRACE_PROTO("receiving MAX_STREAM_DATA", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
qcs->flags &= ~QC_SF_BLK_SFCTL; if (qfctl_set_max(&qcs->tx.fc, max, &unblock_soft, &unblock_real)) {
TRACE_DATA("increase remote max-stream-data", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
if (unblock_real) {
/* TODO optim: only wakeup IO-CB if stream has data to sent. */ /* TODO optim: only wakeup IO-CB if stream has data to sent. */
tasklet_wakeup(qcc->wait_event.tasklet); tasklet_wakeup(qcc->wait_event.tasklet);
} }
if (unblock_soft)
qcs_notify_send(qcs);
} }
} }
@ -1561,11 +1563,11 @@ static int qcs_xfer_data(struct qcs *qcs, struct buffer *out, struct buffer *in)
left = qcs->tx.offset - qcs->tx.sent_offset; left = qcs->tx.offset - qcs->tx.sent_offset;
to_xfer = QUIC_MIN(b_data(in), b_room(out)); to_xfer = QUIC_MIN(b_data(in), b_room(out));
BUG_ON_HOT(qcs->tx.offset > qcs->tx.msd); BUG_ON_HOT(qcs->tx.offset > qcs->tx.fc.limit);
/* do not exceed flow control limit */ /* do not exceed flow control limit */
if (qcs->tx.offset + to_xfer > qcs->tx.msd) { if (qcs->tx.offset + to_xfer > qcs->tx.fc.limit) {
TRACE_DATA("do not exceed stream flow control", QMUX_EV_QCS_SEND, qcc->conn, qcs); TRACE_DATA("do not exceed stream flow control", QMUX_EV_QCS_SEND, qcc->conn, qcs);
to_xfer = qcs->tx.msd - qcs->tx.offset; to_xfer = qcs->tx.fc.limit - qcs->tx.offset;
} }
BUG_ON_HOT(qcc->tx.offsets > qcc->rfctl.md); BUG_ON_HOT(qcc->tx.offsets > qcc->rfctl.md);
@ -1730,6 +1732,11 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset)
diff = offset + data - qcs->tx.sent_offset; diff = offset + data - qcs->tx.sent_offset;
if (diff) { if (diff) {
struct quic_fctl *fc_strm = &qcs->tx.fc;
/* Ensure real offset never exceeds soft value. */
BUG_ON(fc_strm->off_real + diff > fc_strm->off_soft);
/* increase offset sum on connection */ /* increase offset sum on connection */
qcc->tx.sent_offsets += diff; qcc->tx.sent_offsets += diff;
BUG_ON_HOT(qcc->tx.sent_offsets > qcc->rfctl.md); BUG_ON_HOT(qcc->tx.sent_offsets > qcc->rfctl.md);
@ -1740,11 +1747,10 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset)
/* increase offset on stream */ /* increase offset on stream */
qcs->tx.sent_offset += diff; qcs->tx.sent_offset += diff;
BUG_ON_HOT(qcs->tx.sent_offset > qcs->tx.msd);
BUG_ON_HOT(qcs->tx.sent_offset > qcs->tx.offset); BUG_ON_HOT(qcs->tx.sent_offset > qcs->tx.offset);
if (qcs->tx.sent_offset == qcs->tx.msd) { if (qfctl_rinc(fc_strm, diff)) {
qcs->flags |= QC_SF_BLK_SFCTL; TRACE_STATE("stream flow-control reached",
TRACE_STATE("stream flow-control reached", QMUX_EV_QCS_SEND, qcc->conn, qcs); QMUX_EV_QCS_SEND, qcc->conn, qcs);
} }
/* If qcs.stream.buf is full, release it to the lower layer. */ /* If qcs.stream.buf is full, release it to the lower layer. */
@ -1969,9 +1975,7 @@ static int qcs_send(struct qcs *qcs, struct list *frms)
} }
qcs->tx.offset += xfer; qcs->tx.offset += xfer;
BUG_ON_HOT(qcs->tx.offset > qcs->tx.msd);
qcc->tx.offsets += xfer; qcc->tx.offsets += xfer;
BUG_ON_HOT(qcc->tx.offsets > qcc->rfctl.md);
/* out buffer cannot be emptied if qcs offsets differ. */ /* out buffer cannot be emptied if qcs offsets differ. */
BUG_ON(!b_data(out) && qcs->tx.sent_offset != qcs->tx.offset); BUG_ON(!b_data(out) && qcs->tx.sent_offset != qcs->tx.offset);
@ -2099,7 +2103,7 @@ static int qcc_io_send(struct qcc *qcc)
} }
if (!(qcc->flags & QC_CF_BLK_MFCTL) && if (!(qcc->flags & QC_CF_BLK_MFCTL) &&
!(qcs->flags & QC_SF_BLK_SFCTL)) { !qfctl_rblocked(&qcs->tx.fc)) {
if ((ret = qcs_send(qcs, &frms)) < 0) { if ((ret = qcs_send(qcs, &frms)) < 0) {
/* Temporarily remove QCS from send-list. */ /* Temporarily remove QCS from send-list. */
LIST_DEL_INIT(&qcs->el_send); LIST_DEL_INIT(&qcs->el_send);
@ -2133,9 +2137,9 @@ static int qcc_io_send(struct qcc *qcc)
* new qc_stream_desc should be present in send_list as * new qc_stream_desc should be present in send_list as
* long as transport layer can handle all data. * long as transport layer can handle all data.
*/ */
BUG_ON(qcs->stream->buf && !(qcs->flags & QC_SF_BLK_SFCTL)); BUG_ON(qcs->stream->buf && !qfctl_rblocked(&qcs->tx.fc));
if (!(qcs->flags & QC_SF_BLK_SFCTL)) { if (!qfctl_rblocked(&qcs->tx.fc)) {
if ((ret = qcs_send(qcs, &frms)) < 0) { if ((ret = qcs_send(qcs, &frms)) < 0) {
LIST_DEL_INIT(&qcs->el_send); LIST_DEL_INIT(&qcs->el_send);
LIST_APPEND(&qcs_failed, &qcs->el_send); LIST_APPEND(&qcs_failed, &qcs->el_send);
@ -2824,6 +2828,12 @@ static size_t qmux_strm_snd_buf(struct stconn *sc, struct buffer *buf,
goto end; goto end;
} }
if (qfctl_sblocked(&qcs->tx.fc)) {
TRACE_DEVEL("leaving on flow-control reached",
QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
goto end;
}
ret = qcs_http_snd_buf(qcs, buf, count, &fin); ret = qcs_http_snd_buf(qcs, buf, count, &fin);
if (fin) { if (fin) {
TRACE_STATE("reached stream fin", QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs); TRACE_STATE("reached stream fin", QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
@ -2877,6 +2887,12 @@ static size_t qmux_strm_nego_ff(struct stconn *sc, struct buffer *input,
goto end; goto end;
} }
if (qfctl_sblocked(&qcs->tx.fc)) {
TRACE_DEVEL("leaving on flow-control reached", QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
qcs->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
goto end;
}
/* Alawys disable splicing */ /* Alawys disable splicing */
qcs->sd->iobuf.flags |= IOBUF_FL_NO_SPLICING; qcs->sd->iobuf.flags |= IOBUF_FL_NO_SPLICING;

View File

@ -84,7 +84,7 @@ static void qmux_trace(enum trace_level level, uint64_t mask,
qcs, (ullong)qcs->id, qcs, (ullong)qcs->id,
qcs_st_to_str(qcs->st)); qcs_st_to_str(qcs->st));
chunk_appendf(&trace_buf, " msd=%llu/%llu/%llu", chunk_appendf(&trace_buf, " msd=%llu/%llu/%llu",
(ullong)qcs->tx.msd, (ullong)qcs->tx.offset, (ullong)qcs->tx.sent_offset); (ullong)qcs->tx.fc.limit, (ullong)qcs->tx.offset, (ullong)qcs->tx.sent_offset);
} }
if (mask & QMUX_EV_QCC_NQCS) { if (mask & QMUX_EV_QCC_NQCS) {