MEDIUM: mux-quic: properly handle conn Tx buf exhaustion

This commit is a direct follow-up on the major rearchitecture of send
buffering. This patch implements the proper handling of connection pool
buffer temporary exhaustion.

The first step is to be able to differentiate a fatal allocation error
from a temporary pool exhaustion. This is done via a new output argument
on qcc_get_stream_txbuf(). For a fatal error, application protocol layer
will schedule the immediate connection closing. For a pool exhaustion,
QCC is flagged with QC_CF_CONN_FULL and stream sending process is
interrupted. QCS instance is also registered in a new list
<qcc.buf_wait_list>.

A new connection buffer can become available when all ACKs are received
for an older buffer. This process is taken in charge by quic-conn layer.
It uses qcc_notify_buf() function to clear QC_CF_CONN_FULL and to wake
up every streams registered on buf_wait_list to resume sending process.
This commit is contained in:
Amaury Denoyelle 2024-01-17 15:15:55 +01:00
parent cd22200d23
commit 4513787d0d
6 changed files with 147 additions and 33 deletions

View File

@ -32,7 +32,7 @@ enum qcs_type {
#define QC_CF_ERRL 0x00000001 /* fatal error detected locally, connection should be closed soon */
#define QC_CF_ERRL_DONE 0x00000002 /* local error properly handled, connection can be released */
/* unused 0x00000004 */
/* unused 0x00000008 */
#define QC_CF_CONN_FULL 0x00000008 /* no stream buffers available on connection */
#define QC_CF_APP_SHUT 0x00000010 /* Application layer shutdown done. */
#define QC_CF_ERR_CONN 0x00000020 /* fatal error reported by transport layer */
@ -84,6 +84,7 @@ struct qcc {
struct list send_retry_list; /* list of qcs eligible to send retry */
struct list send_list; /* list of qcs ready to send (STREAM, STOP_SENDING or RESET_STREAM emission) */
struct list fctl_list; /* list of sending qcs blocked on conn flow control */
struct list buf_wait_list; /* list of qcs blocked on stream desc buf */
struct wait_event wait_event; /* To be used if we're waiting for I/Os */
@ -167,6 +168,7 @@ struct qcs {
struct list el_send; /* element of qcc.send_list */
struct list el_opening; /* element of qcc.opening_list */
struct list el_fctl; /* element of qcc.fctl_list */
struct list el_buf; /* element of qcc.buf_wait_list */
struct wait_event wait_event;
struct wait_event *subs;

View File

@ -21,9 +21,10 @@ int qcs_is_close_remote(struct qcs *qcs);
int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es);
void qcs_notify_recv(struct qcs *qcs);
void qcs_notify_send(struct qcs *qcs);
int qcc_notify_buf(struct qcc *qcc);
struct buffer *qcc_get_stream_rxbuf(struct qcs *qcs);
struct buffer *qcc_get_stream_txbuf(struct qcs *qcs);
struct buffer *qcc_get_stream_txbuf(struct qcs *qcs, int *err);
int qcc_release_stream_txbuf(struct qcs *qcs);
int qcc_stream_can_send(const struct qcs *qcs);
void qcc_reset_stream(struct qcs *qcs, int err);

View File

@ -1428,6 +1428,7 @@ static ssize_t h3_rcv_buf(struct qcs *qcs, struct buffer *b, int fin)
*/
static int h3_control_send(struct qcs *qcs, void *ctx)
{
int err;
int ret;
struct h3c *h3c = ctx;
unsigned char data[(2 + 3) * 2 * QUIC_VARINT_MAX_SIZE]; /* enough for 3 settings */
@ -1468,7 +1469,8 @@ static int h3_control_send(struct qcs *qcs, void *ctx)
goto err;
}
if (!(res = qcc_get_stream_txbuf(qcs))) {
if (!(res = qcc_get_stream_txbuf(qcs, &err))) {
/* Consider alloc failure fatal for control stream even on conn buf limit. */
TRACE_ERROR("cannot allocate Tx buffer", H3_EV_TX_FRAME|H3_EV_TX_SETTINGS, qcs->qcc->conn, qcs);
goto err;
}
@ -1496,6 +1498,7 @@ static int h3_control_send(struct qcs *qcs, void *ctx)
static int h3_resp_headers_send(struct qcs *qcs, struct htx *htx)
{
int err;
struct h3s *h3s = qcs->ctx;
struct h3c *h3c = h3s->h3c;
struct buffer outbuf;
@ -1550,10 +1553,15 @@ static int h3_resp_headers_send(struct qcs *qcs, struct htx *htx)
list[hdr].n = ist("");
if (!(res = qcc_get_stream_txbuf(qcs))) {
TRACE_ERROR("cannot allocate Tx buffer", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs);
h3c->err = H3_INTERNAL_ERROR;
goto err;
if (!(res = qcc_get_stream_txbuf(qcs, &err))) {
if (err) {
TRACE_ERROR("cannot allocate Tx buffer", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs);
h3c->err = H3_INTERNAL_ERROR;
goto err;
}
TRACE_STATE("conn buf limit reached", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs);
goto end;
}
/* At least 5 bytes to store frame type + length as a varint max size */
@ -1626,6 +1634,7 @@ static int h3_resp_headers_send(struct qcs *qcs, struct htx *htx)
break;
}
end:
TRACE_LEAVE(H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs);
return ret;
@ -1647,6 +1656,7 @@ static int h3_resp_headers_send(struct qcs *qcs, struct htx *htx)
*/
static int h3_resp_trailers_send(struct qcs *qcs, struct htx *htx)
{
int err;
struct h3s *h3s = qcs->ctx;
struct h3c *h3c = h3s->h3c;
struct buffer headers_buf = BUF_NULL;
@ -1708,10 +1718,15 @@ static int h3_resp_trailers_send(struct qcs *qcs, struct htx *htx)
list[hdr].n = ist("");
start:
if (!(res = qcc_get_stream_txbuf(qcs))) {
TRACE_ERROR("cannot allocate Tx buffer", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs);
h3c->err = H3_INTERNAL_ERROR;
goto err;
if (!(res = qcc_get_stream_txbuf(qcs, &err))) {
if (err) {
TRACE_ERROR("cannot allocate Tx buffer", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs);
h3c->err = H3_INTERNAL_ERROR;
goto err;
}
TRACE_STATE("conn buf limit reached", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs);
goto end;
}
/* At least 9 bytes to store frame type + length as a varint max size */
@ -1815,6 +1830,7 @@ static int h3_resp_trailers_send(struct qcs *qcs, struct htx *htx)
static int h3_resp_data_send(struct qcs *qcs, struct htx *htx,
struct buffer *buf, size_t count)
{
int err;
struct h3s *h3s = qcs->ctx;
struct h3c *h3c = h3s->h3c;
struct buffer outbuf;
@ -1840,10 +1856,16 @@ static int h3_resp_data_send(struct qcs *qcs, struct htx *htx,
if (type != HTX_BLK_DATA)
goto end;
if (!(res = qcc_get_stream_txbuf(qcs))) {
TRACE_ERROR("cannot allocate Tx buffer", H3_EV_TX_FRAME|H3_EV_TX_DATA, qcs->qcc->conn, qcs);
h3c->err = H3_INTERNAL_ERROR;
goto err;
if (!(res = qcc_get_stream_txbuf(qcs, &err))) {
if (err) {
TRACE_ERROR("cannot allocate Tx buffer", H3_EV_TX_FRAME|H3_EV_TX_DATA, qcs->qcc->conn, qcs);
h3c->err = H3_INTERNAL_ERROR;
goto err;
}
/* Connection buf limit reached, stconn will subscribe on SEND. */
TRACE_STATE("conn buf limit reached", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs);
goto end;
}
/* If HTX contains only one DATA block, try to exchange it with MUX
@ -2040,6 +2062,7 @@ static size_t h3_snd_buf(struct qcs *qcs, struct buffer *buf, size_t count)
static size_t h3_nego_ff(struct qcs *qcs, size_t count)
{
int err;
struct buffer *res;
int hsize;
size_t sz, ret = 0;
@ -2047,8 +2070,13 @@ static size_t h3_nego_ff(struct qcs *qcs, size_t count)
TRACE_ENTER(H3_EV_STRM_SEND, qcs->qcc->conn, qcs);
start:
if (!(res = qcc_get_stream_txbuf(qcs))) {
qcs->sd->iobuf.flags |= IOBUF_FL_NO_FF;
if (!(res = qcc_get_stream_txbuf(qcs, &err))) {
if (err) {
qcs->sd->iobuf.flags |= IOBUF_FL_NO_FF;
goto end;
}
qcs->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
goto end;
}
@ -2224,6 +2252,7 @@ static void h3_detach(struct qcs *qcs)
*/
static int h3_send_goaway(struct h3c *h3c)
{
int err;
struct qcs *qcs = h3c->ctrl_strm;
struct buffer pos, *res;
unsigned char data[3 * QUIC_VARINT_MAX_SIZE];
@ -2243,10 +2272,10 @@ static int h3_send_goaway(struct h3c *h3c)
b_quic_enc_int(&pos, frm_len, 0);
b_quic_enc_int(&pos, h3c->id_goaway, 0);
res = qcc_get_stream_txbuf(qcs);
res = qcc_get_stream_txbuf(qcs, &err);
if (!res || b_room(res) < b_data(&pos) ||
qfctl_sblocked(&qcs->tx.fc) || qfctl_sblocked(&h3c->qcc->tx.fc)) {
/* Do not try forcefully to emit GOAWAY if no space left. */
/* Do not try forcefully to emit GOAWAY if no buffer available or not enough space left. */
TRACE_ERROR("cannot send GOAWAY", H3_EV_H3C_END, h3c->qcc->conn, qcs);
goto err;
}

View File

@ -95,6 +95,7 @@ static size_t hq_interop_snd_buf(struct qcs *qcs, struct buffer *buf,
uint32_t bsize, fsize;
struct buffer *res = NULL;
size_t total = 0;
int err;
htx = htx_from_buf(buf);
@ -109,10 +110,11 @@ static size_t hq_interop_snd_buf(struct qcs *qcs, struct buffer *buf,
switch (btype) {
case HTX_BLK_DATA:
res = qcc_get_stream_txbuf(qcs);
res = qcc_get_stream_txbuf(qcs, &err);
if (!res) {
/* TODO */
ABORT_NOW();
if (err)
ABORT_NOW();
goto end;
}
if (unlikely(fsize == count &&
@ -179,16 +181,16 @@ static size_t hq_interop_snd_buf(struct qcs *qcs, struct buffer *buf,
static size_t hq_interop_nego_ff(struct qcs *qcs, size_t count)
{
int ret = 0;
int err, ret = 0;
struct buffer *res;
start:
res = qcc_get_stream_txbuf(qcs);
res = qcc_get_stream_txbuf(qcs, &err);
if (!res) {
if (err)
ABORT_NOW();
qcs->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
goto end;
/* TODO */
ABORT_NOW();
}
if (!b_room(res)) {

View File

@ -60,6 +60,7 @@ static void qcs_free(struct qcs *qcs)
LIST_DEL_INIT(&qcs->el_opening);
LIST_DEL_INIT(&qcs->el_send);
LIST_DEL_INIT(&qcs->el_fctl);
LIST_DEL_INIT(&qcs->el_buf);
/* Release stream endpoint descriptor. */
BUG_ON(qcs->sd && !se_fl_test(qcs->sd, SE_FL_ORPHAN));
@ -109,6 +110,7 @@ static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
LIST_INIT(&qcs->el_opening);
LIST_INIT(&qcs->el_send);
LIST_INIT(&qcs->el_fctl);
LIST_INIT(&qcs->el_buf);
qcs->start = TICK_ETERNITY;
/* store transport layer stream descriptor in qcc tree */
@ -496,6 +498,35 @@ void qcs_notify_send(struct qcs *qcs)
}
}
/* Notify on a new stream-desc buffer available for <qcc> connection.
*
* Returns true if a stream was woken up. If false is returned, this indicates
* to the caller that it's currently unnecessary to notify for the rest of the
* available buffers.
*/
int qcc_notify_buf(struct qcc *qcc)
{
struct qcs *qcs;
int ret = 0;
TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
if (qcc->flags & QC_CF_CONN_FULL) {
TRACE_STATE("new stream desc buffer available", QMUX_EV_QCC_WAKE, qcc->conn);
qcc->flags &= ~QC_CF_CONN_FULL;
}
if (!LIST_ISEMPTY(&qcc->buf_wait_list)) {
qcs = LIST_ELEM(qcc->buf_wait_list.n, struct qcs *, el_buf);
LIST_DEL_INIT(&qcs->el_buf);
qcs_notify_send(qcs);
ret = 1;
}
TRACE_LEAVE(QMUX_EV_QCC_WAKE, qcc->conn);
return ret;
}
/* A fatal error is detected locally for <qcc> connection. It should be closed
* with a CONNECTION_CLOSE using <err> code. Set <app> to true to indicate that
* the code must be considered as an application level error. This function
@ -923,22 +954,48 @@ struct buffer *qcc_get_stream_rxbuf(struct qcs *qcs)
/* Allocate if needed and retrieve <qcs> stream buffer for data emission.
*
* Returns buffer pointer. May be NULL on allocation failure.
* <err> is an output argument which is useful to differentiate the failure
* cause when the buffer cannot be allocated. It is set to 0 if the connection
* buffer limit is reached. For fatal errors, its value is non-zero.
*
* Returns buffer pointer. May be NULL on allocation failure, in which case
* <err> will refer to the cause.
*/
struct buffer *qcc_get_stream_txbuf(struct qcs *qcs)
struct buffer *qcc_get_stream_txbuf(struct qcs *qcs, int *err)
{
struct qcc *qcc = qcs->qcc;
int buf_avail;
struct buffer *out = qc_stream_buf_get(qcs->stream);
/* Stream must not try to reallocate a buffer if currently waiting for one. */
BUG_ON(LIST_INLIST(&qcs->el_buf));
*err = 0;
if (!out) {
if (qcc->flags & QC_CF_CONN_FULL) {
LIST_APPEND(&qcc->buf_wait_list, &qcs->el_buf);
goto out;
}
out = qc_stream_buf_alloc(qcs->stream, qcs->tx.fc.off_real,
&buf_avail);
if (!out)
if (!out) {
if (buf_avail) {
TRACE_ERROR("stream desc alloc failure", QMUX_EV_QCS_SEND, qcc->conn, qcs);
*err = 1;
goto out;
}
TRACE_STATE("hitting stream desc buffer limit", QMUX_EV_QCS_SEND, qcc->conn, qcs);
LIST_APPEND(&qcc->buf_wait_list, &qcs->el_buf);
qcc->flags |= QC_CF_CONN_FULL;
goto out;
}
if (!b_alloc(out)) {
TRACE_ERROR("buffer alloc failure", QMUX_EV_QCS_SEND, qcc->conn, qcs);
*err = 1;
goto out;
}
}
@ -988,7 +1045,7 @@ int qcc_release_stream_txbuf(struct qcs *qcs)
/* Returns true if stream layer can proceed to emission via <qcs>. */
int qcc_stream_can_send(const struct qcs *qcs)
{
return !(qcs->flags & QC_SF_BLK_MROOM);
return !(qcs->flags & QC_SF_BLK_MROOM) && !LIST_INLIST(&qcs->el_buf);
}
/* Wakes up every streams of <qcc> which are currently waiting for sending but
@ -1014,6 +1071,10 @@ void qcc_reset_stream(struct qcs *qcs, int err)
if ((qcs->flags & QC_SF_TO_RESET) || qcs_is_close_local(qcs))
return;
/* TODO if QCS waiting for buffer, it could be removed from
* <qcc.buf_wait_list> if sending is closed now.
*/
TRACE_STATE("reset stream", QMUX_EV_QCS_END, qcc->conn, qcs);
qcs->flags |= QC_SF_TO_RESET;
qcs->err = err;
@ -2575,6 +2636,7 @@ static int qmux_init(struct connection *conn, struct proxy *prx,
LIST_INIT(&qcc->send_list);
LIST_INIT(&qcc->fctl_list);
LIST_INIT(&qcc->buf_wait_list);
qcc->wait_event.tasklet->process = qcc_io_cb;
qcc->wait_event.tasklet->context = qcc;
@ -2790,6 +2852,9 @@ static size_t qmux_strm_snd_buf(struct stconn *sc, struct buffer *buf,
TRACE_ENTER(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
/* Stream must not be woken up if already waiting for conn buffer. */
BUG_ON(LIST_INLIST(&qcs->el_buf));
/* Sending forbidden if QCS is locally closed (FIN or RESET_STREAM sent). */
BUG_ON(qcs_is_close_local(qcs) || (qcs->flags & QC_SF_TO_RESET));
@ -2849,6 +2914,9 @@ static size_t qmux_strm_nego_ff(struct stconn *sc, struct buffer *input,
TRACE_ENTER(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
/* Stream must not be woken up if already waiting for conn buffer. */
BUG_ON(LIST_INLIST(&qcs->el_buf));
/* Sending forbidden if QCS is locally closed (FIN or RESET_STREAM sent). */
BUG_ON(qcs_is_close_local(qcs) || (qcs->flags & QC_SF_TO_RESET));

View File

@ -6,7 +6,7 @@
#include <haproxy/buf.h>
#include <haproxy/dynbuf.h>
#include <haproxy/list.h>
#include <haproxy/mux_quic-t.h>
#include <haproxy/mux_quic.h>
#include <haproxy/pool.h>
#include <haproxy/quic_conn.h>
#include <haproxy/task.h>
@ -37,7 +37,13 @@ static void qc_stream_buf_free(struct qc_stream_desc *stream,
/* notify MUX about available buffers. */
--qc->stream_buf_count;
if (qc->mux_state == QC_MUX_READY) {
/* TODO notify MUX for available buffer. */
/* notify MUX about available buffers.
*
* TODO several streams may be woken up even if a single buffer
* is available for now.
*/
while (qcc_notify_buf(qc->qcc))
;
}
}
@ -199,7 +205,13 @@ void qc_stream_desc_free(struct qc_stream_desc *stream, int closing)
qc->stream_buf_count -= free_count;
if (qc->mux_state == QC_MUX_READY) {
/* TODO notify MUX for available buffer. */
/* notify MUX about available buffers.
*
* TODO several streams may be woken up even if a single buffer
* is available for now.
*/
while (qcc_notify_buf(qc->qcc))
;
}
}