From 6ccfa3c40fc665620a0639fbed727b04d05716d5 Mon Sep 17 00:00:00 2001 From: Amaury Denoyelle Date: Thu, 10 Mar 2022 16:45:53 +0100 Subject: [PATCH] MEDIUM: mux-quic: improve bidir STREAM frames sending The current implementation of STREAM frames emission has some limitation. Most notably when we cannot sent all frames in a single qc_send run. In this case, frames are left in front of the MUX list. It will be re-send individually before other frames, possibly another frame from the same STREAM with new data. An opportunity to merge the frames is lost here. This method is now improved. If a frame cannot be send entirely, it is discarded. On the next qc_send run, we retry to send to this position. A new field qcs.sent_offset is used to remember this. A new frame list is used for each qc_send. The impact of this change is not precisely known. The most notable point is that it is a more logical method of emission. It might also improve performance as we do not keep old STREAM frames which might delay other streams. --- include/haproxy/mux_quic-t.h | 4 +- src/mux_quic.c | 84 +++++++++++++++++++++++++----------- 2 files changed, 60 insertions(+), 28 deletions(-) diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h index bca0444cfe..b2730b7ca5 100644 --- a/include/haproxy/mux_quic-t.h +++ b/include/haproxy/mux_quic-t.h @@ -58,7 +58,6 @@ struct qcc { } rx; struct { uint64_t max_data; /* Maximum number of bytes which may be sent */ - struct list frms; /* list of frames ready to be sent */ } tx; struct eb_root streams_by_id; /* all active streams by their ID */ @@ -92,7 +91,8 @@ struct qcs { struct buffer app_buf; /* receive buffer used by conn_stream layer */ } rx; struct { - uint64_t offset; /* the current offset of received data */ + uint64_t offset; /* last offset of data ready to be sent */ + uint64_t sent_offset; /* last offset sent by transport layer */ struct eb_root acked_frms; /* acked frames ordered by their offsets */ uint64_t ack_offset; /* last acked ordered byte offset */ struct buffer buf; /* transmit buffer before sending via xprt */ diff --git a/src/mux_quic.c b/src/mux_quic.c index e12edcc958..cb90bb38b6 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -41,6 +41,7 @@ struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type) qcs->tx.buf = BUF_NULL; qcs->tx.xprt_buf = BUF_NULL; qcs->tx.offset = 0; + qcs->tx.sent_offset = 0; qcs->tx.ack_offset = 0; qcs->tx.acked_frms = EB_ROOT_UNIQUE; @@ -350,50 +351,72 @@ static void qc_release(struct qcc *qcc) } } -static int qcs_push_frame(struct qcs *qcs, struct buffer *payload, int fin, uint64_t offset, +static int qcs_push_frame(struct qcs *qcs, struct buffer *out, + struct buffer *payload, int fin, struct list *frm_list) { struct quic_frame *frm; - struct buffer *buf = &qcs->tx.xprt_buf; - int total = 0, to_xfer; - unsigned char *btail; + int head, left, to_xfer; + int total = 0; fprintf(stderr, "%s\n", __func__); - qc_get_buf(qcs, buf); - to_xfer = QUIC_MIN(b_data(payload), b_room(buf)); - if (!to_xfer) + qc_get_buf(qcs, out); + + /* + * QCS out buffer diagram + * head left to_xfer + * -------------> ----------> -----> + * ================================================== + * |...............|xxxxxxxxxxx|<<<<< + * ================================================== + * ^ ack-off ^ sent-off ^ off + * + * STREAM frame + * ^ ^ + * |xxxxxxxxxxxxxxxxx| + */ + + BUG_ON_HOT(qcs->tx.sent_offset < qcs->tx.ack_offset); + BUG_ON_HOT(qcs->tx.offset < qcs->tx.sent_offset); + + head = qcs->tx.sent_offset - qcs->tx.ack_offset; + left = qcs->tx.offset - qcs->tx.sent_offset; + to_xfer = QUIC_MIN(b_data(payload), b_room(out)); + if (!left && !to_xfer) goto out; frm = pool_zalloc(pool_head_quic_frame); if (!frm) goto err; - /* store buffer end before transfering data for frm.stream.data */ - btail = (unsigned char *)b_tail(buf); - total = b_force_xfer(buf, payload, to_xfer); + total = b_force_xfer(out, payload, to_xfer); + + frm->type = QUIC_FT_STREAM_8; + frm->stream.qcs = (struct qcs *)qcs; + frm->stream.id = qcs->by_id.key; + frm->stream.buf = out; + frm->stream.data = (unsigned char *)b_peek(out, head); + /* FIN is positioned only when the buffer has been totally emptied. */ fin = fin && !b_data(payload); - frm->type = QUIC_FT_STREAM_8; if (fin) frm->type |= QUIC_STREAM_FRAME_TYPE_FIN_BIT; - if (offset) { + + if (qcs->tx.sent_offset) { frm->type |= QUIC_STREAM_FRAME_TYPE_OFF_BIT; - frm->stream.offset.key = offset; + frm->stream.offset.key = qcs->tx.sent_offset; } - frm->stream.qcs = (struct qcs *)qcs; - frm->stream.buf = buf; - frm->stream.data = btail; - frm->stream.id = qcs->by_id.key; - if (total) { + + if (left + total) { frm->type |= QUIC_STREAM_FRAME_TYPE_LEN_BIT; - frm->stream.len = total; + frm->stream.len = left + total; } LIST_APPEND(frm_list, &frm->list); out: - fprintf(stderr, "%s: total=%d fin=%d id=%llu offset=%lu\n", - __func__, total, fin, (ull)qcs->by_id.key, offset); + fprintf(stderr, "%s: sent=%lu total=%d fin=%d id=%llu offset=%lu\n", + __func__, (long unsigned)b_data(out), total, fin, (ull)qcs->by_id.key, qcs->tx.sent_offset); return total; err: @@ -406,11 +429,20 @@ static int qcs_push_frame(struct qcs *qcs, struct buffer *payload, int fin, uint */ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset) { + uint64_t diff = data; + + BUG_ON(offset > qcs->tx.sent_offset); + /* check if the STREAM frame has already been notified. It can happen * for retransmission. */ if (offset + data <= qcs->tx.sent_offset) return; + + diff = offset + data - qcs->tx.sent_offset; + + /* increase offset on stream */ + qcs->tx.sent_offset += diff; } /* Wrapper for send on transport layer. Send a list of frames for the @@ -480,6 +512,7 @@ static int qc_send_frames(struct qcc *qcc, struct list *frms) static int qc_send(struct qcc *qcc) { + struct list frms = LIST_HEAD_INIT(frms); struct eb64_node *node; int ret = 0; @@ -492,6 +525,7 @@ static int qc_send(struct qcc *qcc) while (node) { struct qcs *qcs = container_of(node, struct qcs, by_id); struct buffer *buf = &qcs->tx.buf; + struct buffer *out = &qcs->tx.xprt_buf; /* TODO * for the moment, unidirectional streams have their own @@ -503,10 +537,9 @@ static int qc_send(struct qcc *qcc) continue; } - if (b_data(buf)) { + if (b_data(buf) || b_data(out)) { char fin = qcs->flags & QC_SF_FIN_STREAM; - ret = qcs_push_frame(qcs, buf, fin, qcs->tx.offset, - &qcc->tx.frms); + ret = qcs_push_frame(qcs, out, buf, fin, &frms); BUG_ON(ret < 0); /* TODO handle this properly */ if (ret > 0) { @@ -527,7 +560,7 @@ static int qc_send(struct qcc *qcc) node = eb64_next(node); } - qc_send_frames(qcc, &qcc->tx.frms); + qc_send_frames(qcc, &frms); /* TODO adjust ret if not all frames are sent. */ return ret; @@ -672,7 +705,6 @@ static int qc_init(struct connection *conn, struct proxy *prx, qcc->rx.max_data = lparams->initial_max_data; qcc->tx.max_data = 0; - LIST_INIT(&qcc->tx.frms); /* Client initiated streams must respect the server flow control. */ qcc->strms[QCS_CLT_BIDI].max_streams = lparams->initial_max_streams_bidi;