MEDIUM: mux-quic: implement MAX_STREAMS emission for bidir streams

Implement the locally flow-control streams limit for opened
bidirectional streams. Add a counter which is used to count the total
number of closed streams. If this number is big enough, emit a
MAX_STREAMS frame to increase the limit of remotely opened bidirectional
streams.

This is the first commit to implement QUIC flow-control. A series of
patches should follow to complete this.

This is required to be able to handle more than 100 client requests.
This should help to validate the Multiplexing interop test.
This commit is contained in:
Amaury Denoyelle 2022-02-07 16:09:06 +01:00
parent e9c4cc13fc
commit c055e30176
2 changed files with 61 additions and 3 deletions

View File

@ -44,6 +44,9 @@ struct qcc {
/* Flow-control related fields which are enforced on our side. */
struct {
uint64_t max_bidi_streams; /* max sub-ID of bidi stream allowed for the peer */
uint64_t initial_max_bidi_streams; /* max initial sub-ID of bidi stream allowed for the peer */
uint64_t closed_bidi_streams; /* total count of closed bidi stream since last MAX_STREAMS emission */
} lfctl;
/* Flow-control related fields from the endpoint which we must respect. */

View File

@ -130,9 +130,13 @@ struct eb64_node *qcc_get_qcs(struct qcc *qcc, uint64_t id)
strms = &qcc->streams_by_id;
qcs_type = qcs_id_type(id);
if (sub_id + 1 > qcc->strms[qcs_type].max_streams) {
/* streams limit reached */
goto out;
/* TODO also checks max-streams for uni streams */
if (quic_stream_is_bidi(id)) {
if (sub_id + 1 > qcc->lfctl.max_bidi_streams) {
/* streams limit reached */
goto out;
}
}
/* Note: ->largest_id was initialized with (uint64_t)-1 as value, 0 being a
@ -258,11 +262,26 @@ int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs)
return 0;
}
static int qc_is_max_streams_needed(struct qcc *qcc)
{
return qcc->lfctl.closed_bidi_streams > qcc->lfctl.initial_max_bidi_streams / 2;
}
/* detaches the QUIC stream from its QCC and releases it to the QCS pool. */
static void qcs_destroy(struct qcs *qcs)
{
const uint64_t id = qcs->by_id.key;
fprintf(stderr, "%s: release stream %llu\n", __func__, qcs->by_id.key);
if (quic_stream_is_remote(qcs->qcc, id)) {
if (quic_stream_is_bidi(id)) {
++qcs->qcc->lfctl.closed_bidi_streams;
if (qc_is_max_streams_needed(qcs->qcc))
tasklet_wakeup(qcs->qcc->wait_event.tasklet);
}
}
eb64_delete(&qcs->by_id);
b_free(&qcs->rx.buf);
@ -491,12 +510,45 @@ static int qc_release_detached_streams(struct qcc *qcc)
return release;
}
/* Send a MAX_STREAM_BIDI frame to update the limit of bidirectional streams
* allowed to be opened by the peer. The caller should have first checked if
* this is required with qc_is_max_streams_needed.
*
* Returns 0 on success else non-zero.
*/
static int qc_send_max_streams(struct qcc *qcc)
{
struct list frms = LIST_HEAD_INIT(frms);
struct quic_frame *frm;
frm = pool_zalloc(pool_head_quic_frame);
BUG_ON(!frm); /* TODO handle this properly */
frm->type = QUIC_FT_MAX_STREAMS_BIDI;
frm->max_streams_bidi.max_streams = qcc->lfctl.max_bidi_streams +
qcc->lfctl.closed_bidi_streams;
fprintf(stderr, "SET MAX_STREAMS %lu\n", frm->max_streams_bidi.max_streams);
LIST_APPEND(&frms, &frm->list);
if (qc_send_frames(qcc, &frms))
return 1;
/* save the new limit if the frame has been send. */
qcc->lfctl.max_bidi_streams += qcc->lfctl.closed_bidi_streams;
qcc->lfctl.closed_bidi_streams = 0;
return 0;
}
static struct task *qc_io_cb(struct task *t, void *ctx, unsigned int status)
{
struct qcc *qcc = ctx;
fprintf(stderr, "%s\n", __func__);
if (qc_is_max_streams_needed(qcc))
qc_send_max_streams(qcc);
qc_send(qcc);
if (qc_release_detached_streams(qcc)) {
@ -595,6 +647,9 @@ static int qc_init(struct connection *conn, struct proxy *prx,
qcc->strms[QCS_SRV_UNI].rx.max_data = lparams->initial_max_stream_data_uni;
qcc->strms[QCS_SRV_UNI].tx.max_data = 0;
qcc->lfctl.max_bidi_streams = qcc->lfctl.initial_max_bidi_streams = lparams->initial_max_streams_bidi;
qcc->lfctl.closed_bidi_streams = 0;
qcc->wait_event.tasklet = tasklet_new();
if (!qcc->wait_event.tasklet)
goto fail_no_tasklet;