MEDIUM: mux-quic: limit conn flow control on snd_buf

This commit is a direct follow-up on the previous one. This time, it
deals with connection level flow control. Process is similar to stream
level : soft offset is incremented during snd_buf and real offset during
STREAM frame emission.

On MAX_DATA reception, both stream layer and QMUX is woken up if
necessary. One extra feature for conn level is the introduction of a new
QCC list to reference QCS instances. It will store instances for which
snd_buf callback has been interrupted on QCC soft offset reached. Every
stream instances is woken up on MAX_DATA reception if soft_offset is
unblocked.
This commit is contained in:
Amaury Denoyelle 2023-10-18 17:48:11 +02:00
parent c44692356d
commit d4bf6f0526
4 changed files with 79 additions and 22 deletions

View File

@ -31,7 +31,7 @@ enum qcs_type {
#define QC_CF_ERRL 0x00000001 /* fatal error detected locally, connection should be closed soon */
#define QC_CF_ERRL_DONE 0x00000002 /* local error properly handled, connection can be released */
#define QC_CF_BLK_MFCTL 0x00000004 /* sending blocked due to connection flow-control */
/* unused 0x00000004 */
#define QC_CF_CONN_FULL 0x00000008 /* no stream buffers available on connection */
#define QC_CF_APP_SHUT 0x00000010 /* Application layer shutdown done. */
#define QC_CF_ERR_CONN 0x00000020 /* fatal error reported by transport layer */
@ -71,6 +71,8 @@ struct qcc {
} rfctl;
struct {
struct quic_fctl fc; /* stream flow control applied on sending */
uint64_t offsets; /* sum of all offsets prepared */
uint64_t sent_offsets; /* sum of all offset sent */
} tx;
@ -84,6 +86,7 @@ struct qcc {
struct list send_retry_list; /* list of qcs eligible to send retry */
struct list send_list; /* list of qcs ready to send (STREAM, STOP_SENDING or RESET_STREAM emission) */
struct list fctl_list; /* list of sending qcs blocked on conn flow control */
struct wait_event wait_event; /* To be used if we're waiting for I/Os */
@ -170,6 +173,7 @@ struct qcs {
struct list el; /* element of qcc.send_retry_list */
struct list el_send; /* element of qcc.send_list */
struct list el_opening; /* element of qcc.opening_list */
struct list el_fctl; /* element of qcc.fctl_list */
struct wait_event wait_event;
struct wait_event *subs;

View File

@ -1463,7 +1463,7 @@ static int h3_control_send(struct qcs *qcs, void *ctx)
b_quic_enc_int(&pos, h3_settings_max_field_section_size, 0);
}
if (qfctl_sblocked(&qcs->tx.fc)) {
if (qfctl_sblocked(&qcs->tx.fc) || qfctl_sblocked(&qcs->qcc->tx.fc)) {
TRACE_ERROR("not enough initial credit for control stream", H3_EV_TX_FRAME|H3_EV_TX_SETTINGS, qcs->qcc->conn, qcs);
goto err;
}
@ -2236,7 +2236,7 @@ static int h3_send_goaway(struct h3c *h3c)
res = qcc_get_stream_txbuf(qcs);
if (!res || b_room(res) < b_data(&pos) ||
qfctl_sblocked(&qcs->tx.fc)) {
qfctl_sblocked(&qcs->tx.fc) || qfctl_sblocked(&h3c->qcc->tx.fc)) {
/* Do not try forcefully to emit GOAWAY if no space left. */
TRACE_ERROR("cannot send GOAWAY", H3_EV_H3C_END, h3c->qcc->conn, qcs);
goto err;

View File

@ -59,6 +59,7 @@ static void qcs_free(struct qcs *qcs)
/* Safe to use even if already removed from the list. */
LIST_DEL_INIT(&qcs->el_opening);
LIST_DEL_INIT(&qcs->el_send);
LIST_DEL_INIT(&qcs->el_fctl);
/* Release stream endpoint descriptor. */
BUG_ON(qcs->sd && !se_fl_test(qcs->sd, SE_FL_ORPHAN));
@ -108,6 +109,7 @@ static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
*/
LIST_INIT(&qcs->el_opening);
LIST_INIT(&qcs->el_send);
LIST_INIT(&qcs->el_fctl);
qcs->start = TICK_ETERNITY;
/* store transport layer stream descriptor in qcc tree */
@ -933,6 +935,20 @@ struct buffer *qcc_get_stream_txbuf(struct qcs *qcs)
return b_alloc(&qcs->tx.buf);
}
/* Wakes up every streams of <qcc> which are currently waiting for sending but
* are blocked on connection flow control.
*/
static void qcc_notify_fctl(struct qcc *qcc)
{
struct qcs *qcs;
while (!LIST_ISEMPTY(&qcc->fctl_list)) {
qcs = LIST_ELEM(qcc->fctl_list.n, struct qcs *, el_fctl);
LIST_DEL_INIT(&qcs->el_fctl);
qcs_notify_send(qcs);
}
}
/* Prepare for the emission of RESET_STREAM on <qcs> with error code <err>. */
void qcc_reset_stream(struct qcs *qcs, int err)
{
@ -954,6 +970,15 @@ void qcc_reset_stream(struct qcs *qcs, int err)
qcs->tx.offset = qcs->tx.sent_offset;
}
/* Substract to conn flow control data amount prepared on stream not yet sent. */
if (qcs->tx.fc.off_soft > qcs->tx.fc.off_real) {
const int soft_blocked = qfctl_sblocked(&qcc->tx.fc);
qcc->tx.fc.off_soft -= (qcs->tx.fc.off_soft - qcs->tx.fc.off_real);
if (soft_blocked && !qfctl_sblocked(&qcc->tx.fc))
qcc_notify_fctl(qcc);
}
/* Report send error to stream-endpoint layer. */
if (qcs_sc(qcs)) {
se_fl_set_error(qcs->sd);
@ -987,8 +1012,10 @@ void qcc_send_stream(struct qcs *qcs, int urg, int count)
LIST_APPEND(&qcs->qcc->send_list, &qcs->el_send);
}
if (count)
if (count) {
qfctl_sinc(&qcc->tx.fc, count);
qfctl_sinc(&qcs->tx.fc, count);
}
TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs);
}
@ -1210,17 +1237,19 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
*/
int qcc_recv_max_data(struct qcc *qcc, uint64_t max)
{
int unblock_soft = 0, unblock_real = 0;
TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn);
TRACE_PROTO("receiving MAX_DATA", QMUX_EV_QCC_RECV, qcc->conn);
if (qcc->rfctl.md < max) {
qcc->rfctl.md = max;
if (qfctl_set_max(&qcc->tx.fc, max, &unblock_soft, &unblock_real)) {
TRACE_DATA("increase remote max-data", QMUX_EV_QCC_RECV, qcc->conn);
if (qcc->flags & QC_CF_BLK_MFCTL) {
qcc->flags &= ~QC_CF_BLK_MFCTL;
if (unblock_real)
tasklet_wakeup(qcc->wait_event.tasklet);
}
if (unblock_soft)
qcc_notify_fctl(qcc);
}
TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn);
@ -1570,11 +1599,11 @@ static int qcs_xfer_data(struct qcs *qcs, struct buffer *out, struct buffer *in)
to_xfer = qcs->tx.fc.limit - qcs->tx.offset;
}
BUG_ON_HOT(qcc->tx.offsets > qcc->rfctl.md);
BUG_ON_HOT(qcc->tx.offsets > qcc->tx.fc.limit);
/* do not overcome flow control limit on connection */
if (qcc->tx.offsets + to_xfer > qcc->rfctl.md) {
if (qcc->tx.offsets + to_xfer > qcc->tx.fc.limit) {
TRACE_DATA("do not exceed conn flow control", QMUX_EV_QCS_SEND, qcc->conn, qcs);
to_xfer = qcc->rfctl.md - qcc->tx.offsets;
to_xfer = qcc->tx.fc.limit - qcc->tx.offsets;
}
if (!left && !to_xfer)
@ -1631,7 +1660,7 @@ static int qcs_build_stream_frm(struct qcs *qcs, struct buffer *out, char fin,
BUG_ON((!total && qcs->tx.sent_offset > qcs->tx.offset) ||
(total && qcs->tx.sent_offset >= qcs->tx.offset));
BUG_ON(qcs->tx.sent_offset + total > qcs->tx.offset);
BUG_ON(qcc->tx.sent_offsets + total > qcc->rfctl.md);
BUG_ON(qcc->tx.sent_offsets + total > qcc->tx.fc.limit);
TRACE_PROTO("sending STREAM frame", QMUX_EV_QCS_SEND, qcc->conn, qcs);
frm = qc_frm_alloc(QUIC_FT_STREAM_8);
@ -1732,17 +1761,19 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset)
diff = offset + data - qcs->tx.sent_offset;
if (diff) {
struct quic_fctl *fc_conn = &qcc->tx.fc;
struct quic_fctl *fc_strm = &qcs->tx.fc;
/* Ensure real offset never exceeds soft value. */
BUG_ON(fc_conn->off_real + diff > fc_conn->off_soft);
BUG_ON(fc_strm->off_real + diff > fc_strm->off_soft);
/* increase offset sum on connection */
qcc->tx.sent_offsets += diff;
BUG_ON_HOT(qcc->tx.sent_offsets > qcc->rfctl.md);
if (qcc->tx.sent_offsets == qcc->rfctl.md) {
qcc->flags |= QC_CF_BLK_MFCTL;
TRACE_STATE("connection flow-control reached", QMUX_EV_QCS_SEND, qcc->conn);
BUG_ON_HOT(qcc->tx.sent_offsets > fc_conn->limit);
if (qfctl_rinc(fc_conn, diff)) {
TRACE_STATE("connection flow-control reached",
QMUX_EV_QCS_SEND, qcc->conn);
}
/* increase offset on stream */
@ -2102,7 +2133,7 @@ static int qcc_io_send(struct qcc *qcc)
continue;
}
if (!(qcc->flags & QC_CF_BLK_MFCTL) &&
if (!qfctl_rblocked(&qcc->tx.fc) &&
!qfctl_rblocked(&qcs->tx.fc)) {
if ((ret = qcs_send(qcs, &frms)) < 0) {
/* Temporarily remove QCS from send-list. */
@ -2128,7 +2159,7 @@ static int qcc_io_send(struct qcc *qcc)
/* Retry sending until no frame to send, data rejected or connection
* flow-control limit reached.
*/
while (qcc_send_frames(qcc, &frms) == 0 && !(qcc->flags & QC_CF_BLK_MFCTL)) {
while (qcc_send_frames(qcc, &frms) == 0 && !qfctl_rblocked(&qcc->tx.fc)) {
/* Reloop over <qcc.send_list>. Useful for streams which have
* fulfilled their qc_stream_desc buf and have now release it.
*/
@ -2167,7 +2198,7 @@ static int qcc_io_send(struct qcc *qcc)
LIST_APPEND(&qcc->send_list, &qcs->el_send);
}
if (!(qcc->flags & QC_CF_BLK_MFCTL))
if (!qfctl_rblocked(&qcc->tx.fc))
tasklet_wakeup(qcc->wait_event.tasklet);
}
@ -2575,7 +2606,7 @@ static int qmux_init(struct connection *conn, struct proxy *prx,
qcc->lfctl.offsets_recv = qcc->lfctl.offsets_consume = 0;
rparams = &conn->handle.qc->tx.params;
qcc->rfctl.md = rparams->initial_max_data;
qfctl_init(&qcc->tx.fc, rparams->initial_max_data);
qcc->rfctl.msd_bidi_l = rparams->initial_max_stream_data_bidi_local;
qcc->rfctl.msd_bidi_r = rparams->initial_max_stream_data_bidi_remote;
qcc->rfctl.msd_uni_l = rparams->initial_max_stream_data_uni;
@ -2600,6 +2631,7 @@ static int qmux_init(struct connection *conn, struct proxy *prx,
}
LIST_INIT(&qcc->send_list);
LIST_INIT(&qcc->fctl_list);
qcc->wait_event.tasklet->process = qcc_io_cb;
qcc->wait_event.tasklet->context = qcc;
@ -2828,6 +2860,17 @@ static size_t qmux_strm_snd_buf(struct stconn *sc, struct buffer *buf,
goto end;
}
if (qfctl_sblocked(&qcs->qcc->tx.fc)) {
TRACE_DEVEL("leaving on connection flow control",
QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
if (!LIST_INLIST(&qcs->el_fctl)) {
TRACE_DEVEL("append to fctl-list",
QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
LIST_APPEND(&qcs->qcc->fctl_list, &qcs->el_fctl);
}
goto end;
}
if (qfctl_sblocked(&qcs->tx.fc)) {
TRACE_DEVEL("leaving on flow-control reached",
QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
@ -2887,6 +2930,16 @@ static size_t qmux_strm_nego_ff(struct stconn *sc, struct buffer *input,
goto end;
}
if (qfctl_sblocked(&qcs->qcc->tx.fc)) {
TRACE_DEVEL("leaving on connection flow control", QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
if (!LIST_INLIST(&qcs->el_fctl)) {
TRACE_DEVEL("append to fctl-list", QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
LIST_APPEND(&qcs->qcc->fctl_list, &qcs->el_fctl);
}
qcs->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
goto end;
}
if (qfctl_sblocked(&qcs->tx.fc)) {
TRACE_DEVEL("leaving on flow-control reached", QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
qcs->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;

View File

@ -77,7 +77,7 @@ static void qmux_trace(enum trace_level level, uint64_t mask,
chunk_appendf(&trace_buf, " qc=%p", qcc->conn->handle.qc);
chunk_appendf(&trace_buf, " md=%llu/%llu/%llu",
(ullong)qcc->rfctl.md, (ullong)qcc->tx.offsets, (ullong)qcc->tx.sent_offsets);
(ullong)qcc->tx.fc.limit, (ullong)qcc->tx.offsets, (ullong)qcc->tx.sent_offsets);
if (qcs) {
chunk_appendf(&trace_buf, " qcs=%p .id=%llu .st=%s",