From 0e3010b1bbb48394e1d4fb5a2e3722ff625b715a Mon Sep 17 00:00:00 2001 From: Amaury Denoyelle Date: Mon, 28 Feb 2022 11:37:48 +0100 Subject: [PATCH] MEDIUM: quic: rearchitecture Rx path for bidirectional STREAM frames Reorganize the Rx path for STREAM frames on bidirectional streams. A new function qcc_recv is implemented on the MUX. It will handle the STREAM frames copy and offset calculation from transport to MUX. Another function named qcc_decode_qcs from the MUX can be called by transport each time new STREAM data has been copied. The architecture is now cleaner with the MUX layer in charge of parsing the STREAM frames offsets. This is required to be able to implement the flow-control on the MUX layer. Note that as a convenience, a STREAM frame is not partially copied to the MUX buffer. This simplify the implementation for the moment but it may change in the future to optimize the STREAM frames handling. For the moment, only bidirectional streams benefit from this change. In the future, it may be extended to unidirectional streams to unify the STREAM frames processing. --- include/haproxy/mux_quic.h | 4 + src/mux_quic.c | 85 +++++++++++++++++ src/xprt_quic.c | 182 +++++++++---------------------------- 3 files changed, 133 insertions(+), 138 deletions(-) diff --git a/include/haproxy/mux_quic.h b/include/haproxy/mux_quic.h index 4452076f08..0b957e9fcd 100644 --- a/include/haproxy/mux_quic.h +++ b/include/haproxy/mux_quic.h @@ -19,6 +19,10 @@ int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es); void qcs_notify_recv(struct qcs *qcs); void qcs_notify_send(struct qcs *qcs); +int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset, + char fin, char *data, struct qcs **out_qcs); +int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs); + /* Bit shift to get the stream sub ID for internal use which is obtained * shifting the stream IDs by this value, knowing that the * QCS_ID_TYPE_SHIFT less significant bits identify the stream ID diff --git a/src/mux_quic.c b/src/mux_quic.c index 4be7d83d86..c6f12cb85f 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -192,6 +192,91 @@ struct eb64_node *qcc_get_qcs(struct qcc *qcc, uint64_t id) return NULL; } +/* Handle a new STREAM frame . The frame content will be copied in + * the buffer of the stream instance. The stream instance will be stored in + * . In case of success, the caller can immediatly call qcc_decode_qcs + * to process the frame content. + * + * Returns 0 on success. On errors, two codes are present. + * - 1 is returned if the frame cannot be decoded and must be discarded. + * - 2 is returned if the stream cannot decode at the moment the frame. The + * frame should be buffered to be handled later. + */ +int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset, + char fin, char *data, struct qcs **out_qcs) +{ + struct qcs *qcs; + struct eb64_node *strm_node; + size_t total, diff; + + strm_node = qcc_get_qcs(qcc, id); + if (!strm_node) { + fprintf(stderr, "%s: stream not found\n", __func__); + return 1; + } + + qcs = eb64_entry(&strm_node->node, struct qcs, by_id); + *out_qcs = qcs; + + if (offset > qcs->rx.offset) + return 2; + + if (offset + len <= qcs->rx.offset) { + fprintf(stderr, "%s: already received STREAM data\n", __func__); + return 1; + } + + /* Last frame already handled for this stream. */ + BUG_ON(qcs->flags & QC_SF_FIN_RECV); + + if (!qc_get_buf(qcs, &qcs->rx.buf)) { + /* TODO should mark qcs as full */ + return 2; + } + + fprintf(stderr, "%s: new STREAM data\n", __func__); + diff = qcs->rx.offset - offset; + + /* TODO do not partially copy a frame if not enough size left. Maybe + * this can be optimized. + */ + if (len > b_room(&qcs->rx.buf)) { + /* TODO handle STREAM frames larger than RX buffer. */ + BUG_ON(len > b_size(&qcs->rx.buf)); + return 2; + } + + len -= diff; + data += diff; + + total = b_putblk(&qcs->rx.buf, data, len); + /* TODO handle partial copy of a STREAM frame. */ + BUG_ON(len != total); + + qcs->rx.offset += total; + + if (fin) + qcs->flags |= QC_SF_FIN_RECV; + + out: + return 0; +} + +/* Decode the content of STREAM frames already received on the stream instance + * . + * + * Returns 0 on success else non-zero. + */ +int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs) +{ + if (qcc->app_ops->decode_qcs(qcs, qcs->flags & QC_SF_FIN_RECV, qcc->ctx) < 0) { + fprintf(stderr, "%s: decoding error\n", __func__); + return 1; + } + + return 0; +} + /* detaches the QUIC stream from its QCC and releases it to the QCS pool. */ static void qcs_destroy(struct qcs *qcs) { diff --git a/src/xprt_quic.c b/src/xprt_quic.c index d69bbf54f8..6dccb13ba9 100644 --- a/src/xprt_quic.c +++ b/src/xprt_quic.c @@ -1984,92 +1984,6 @@ static size_t qc_strm_cpy(struct buffer *buf, struct quic_stream *strm_frm) return ret; } -/* Copy as most as possible STREAM data from into buffer. - * - * Note that is not updated as it is implied that the frame may be - * present in a tree and offset node is used as the key. The caller should - * update offset/lenght of the frame after the function call. - * - * Return the total count of copied bytes. - */ -static size_t qc_rx_strm_frm_cpy(struct buffer *buf, - struct quic_rx_strm_frm *strm_frm) -{ - size_t flen = strm_frm->len; - size_t ret = 0; - size_t try; - - ret = 0; - while (flen && (try = b_contig_space(buf))) { - if (try > flen) - try = flen; - - memcpy(b_tail(buf), strm_frm->data + ret, try); - b_add(buf, try); - ret += try; - flen -= try; - } - - return ret; -} - -/* Process as much as possible RX STREAM frames received for */ -static size_t qc_treat_rx_strm_frms(struct qcs *qcs) -{ - int total; - struct eb64_node *frm_node; - - total = 0; - frm_node = eb64_first(&qcs->rx.frms); - while (frm_node) { - int ret; - struct quic_rx_strm_frm *frm; - size_t diff; - - frm = eb64_entry(&frm_node->node, struct quic_rx_strm_frm, offset_node); - if (frm->offset_node.key + frm->len < qcs->rx.offset) { - /* fully already received STREAM offset */ - goto next; - } - - BUG_ON(qcs->flags & QC_SF_FIN_RECV); - if (frm->offset_node.key > qcs->rx.offset) - break; - - diff = qcs->rx.offset - frm->offset_node.key; - frm->data += diff; - frm->len -= diff; - - ret = qc_rx_strm_frm_cpy(&qcs->rx.buf, frm); - qcs->rx.offset += ret; - total += ret; - - BUG_ON(frm->len < ret); - if (frm->len - ret > 0) { - /* Remove the frame from the tree before updating the - * offset field. - */ - eb64_delete(&frm->offset_node); - frm->offset_node.key += (diff + ret); - frm->data += ret; - frm->len -= ret; - eb64_insert(&qcs->rx.frms, &frm->offset_node); - break; - } - - if (frm->fin) - qcs->flags |= QC_SF_FIN_RECV; - - next: - frm_node = eb64_next(frm_node); - quic_rx_packet_refdec(frm->pkt); - eb64_delete(&frm->offset_node); - pool_free(pool_head_quic_rx_strm_frm, frm); - } - - return total; -} - /* Handle bidirectional STREAM frame. Depending on its ID, several * streams may be open. The data are copied to the stream RX buffer if possible. * If not, the STREAM frame is stored to be treated again later. @@ -2080,74 +1994,66 @@ static int qc_handle_bidi_strm_frm(struct quic_rx_packet *pkt, struct quic_stream *strm_frm, struct quic_conn *qc) { - int total; - struct qcs *strm; - struct eb64_node *strm_node; struct quic_rx_strm_frm *frm; + struct eb64_node *frm_node; + struct qcs *qcs = NULL; + int ret; - strm_node = qcc_get_qcs(qc->qcc, strm_frm->id); - if (!strm_node) { - TRACE_PROTO("Stream not found", QUIC_EV_CONN_PSTRM, qc); + ret = qcc_recv(qc->qcc, strm_frm->id, strm_frm->len, + strm_frm->offset.key, strm_frm->fin, + (char *)strm_frm->data, &qcs); + + /* invalid or already received frame */ + if (ret == 1) return 0; - } - strm = eb64_entry(&strm_node->node, struct qcs, by_id); - if (strm_frm->offset.key < strm->rx.offset) { - size_t diff; - - if (strm_frm->offset.key + strm_frm->len <= strm->rx.offset) { - TRACE_PROTO("Already received STREAM data", + if (ret == 2) { + /* frame cannot be parsed at the moment and should be + * buffered. + */ + frm = new_quic_rx_strm_frm(strm_frm, pkt); + if (!frm) { + TRACE_PROTO("Could not alloc RX STREAM frame", QUIC_EV_CONN_PSTRM, qc); - goto out; + return 0; } - TRACE_PROTO("Partially already received STREAM data", QUIC_EV_CONN_PSTRM, qc); - diff = strm->rx.offset - strm_frm->offset.key; - strm_frm->offset.key = strm->rx.offset; - strm_frm->len -= diff; - strm_frm->data += diff; + eb64_insert(&qcs->rx.frms, &frm->offset_node); + quic_rx_packet_refinc(pkt); + + return 1; } - BUG_ON(strm->flags & QC_SF_FIN_RECV); + /* Frame correctly received by the mux. + * If there is buffered frame for next offset, it may be possible to + * receive them now. + */ + frm_node = eb64_first(&qcs->rx.frms); + while (frm_node) { + frm = eb64_entry(&frm_node->node, + struct quic_rx_strm_frm, offset_node); - total = 0; - if (strm_frm->offset.key == strm->rx.offset) { - int ret; + ret = qcc_recv(qc->qcc, qcs->by_id.key, frm->len, + frm->offset_node.key, frm->fin, + (char *)frm->data, &qcs); - if (!qc_get_buf(strm, &strm->rx.buf)) - goto store_frm; + /* interrupt the parsing if the frame cannot be handled for the + * moment only by the MUX. + */ + if (ret == 2) + break; - ret = qc_strm_cpy(&strm->rx.buf, strm_frm); - total += ret; - strm->rx.offset += ret; + /* Remove a newly received frame or an invalid one. */ + frm_node = eb64_next(frm_node); + eb64_delete(&frm->offset_node); + quic_rx_packet_refdec(frm->pkt); + pool_free(pool_head_quic_rx_strm_frm, frm); } - /* FIN is set only if all data were copied. */ - if (strm_frm->fin && !strm_frm->len) - strm->flags |= QC_SF_FIN_RECV; - - total += qc_treat_rx_strm_frms(strm); - - if (total && qc->qcc->app_ops->decode_qcs(strm, strm->flags & QC_SF_FIN_RECV, qc->qcc->ctx) < 0) { - TRACE_PROTO("Decoding error", QUIC_EV_CONN_PSTRM, qc); + /* Decode the received data. */ + if (qcc_decode_qcs(qc->qcc, qcs)) return 0; - } - if (!strm_frm->len) - goto out; - - store_frm: - frm = new_quic_rx_strm_frm(strm_frm, pkt); - if (!frm) { - TRACE_PROTO("Could not alloc RX STREAM frame", - QUIC_EV_CONN_PSTRM, qc); - return 0; - } - - eb64_insert(&strm->rx.frms, &frm->offset_node); - quic_rx_packet_refinc(pkt); - - out: return 1; }