diff --git a/include/haproxy/mux_quic.h b/include/haproxy/mux_quic.h index 4452076f08..0b957e9fcd 100644 --- a/include/haproxy/mux_quic.h +++ b/include/haproxy/mux_quic.h @@ -19,6 +19,10 @@ int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es); void qcs_notify_recv(struct qcs *qcs); void qcs_notify_send(struct qcs *qcs); +int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset, + char fin, char *data, struct qcs **out_qcs); +int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs); + /* Bit shift to get the stream sub ID for internal use which is obtained * shifting the stream IDs by this value, knowing that the * QCS_ID_TYPE_SHIFT less significant bits identify the stream ID diff --git a/src/mux_quic.c b/src/mux_quic.c index 4be7d83d86..c6f12cb85f 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -192,6 +192,91 @@ struct eb64_node *qcc_get_qcs(struct qcc *qcc, uint64_t id) return NULL; } +/* Handle a new STREAM frame . The frame content will be copied in + * the buffer of the stream instance. The stream instance will be stored in + * . In case of success, the caller can immediatly call qcc_decode_qcs + * to process the frame content. + * + * Returns 0 on success. On errors, two codes are present. + * - 1 is returned if the frame cannot be decoded and must be discarded. + * - 2 is returned if the stream cannot decode at the moment the frame. The + * frame should be buffered to be handled later. + */ +int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset, + char fin, char *data, struct qcs **out_qcs) +{ + struct qcs *qcs; + struct eb64_node *strm_node; + size_t total, diff; + + strm_node = qcc_get_qcs(qcc, id); + if (!strm_node) { + fprintf(stderr, "%s: stream not found\n", __func__); + return 1; + } + + qcs = eb64_entry(&strm_node->node, struct qcs, by_id); + *out_qcs = qcs; + + if (offset > qcs->rx.offset) + return 2; + + if (offset + len <= qcs->rx.offset) { + fprintf(stderr, "%s: already received STREAM data\n", __func__); + return 1; + } + + /* Last frame already handled for this stream. */ + BUG_ON(qcs->flags & QC_SF_FIN_RECV); + + if (!qc_get_buf(qcs, &qcs->rx.buf)) { + /* TODO should mark qcs as full */ + return 2; + } + + fprintf(stderr, "%s: new STREAM data\n", __func__); + diff = qcs->rx.offset - offset; + + /* TODO do not partially copy a frame if not enough size left. Maybe + * this can be optimized. + */ + if (len > b_room(&qcs->rx.buf)) { + /* TODO handle STREAM frames larger than RX buffer. */ + BUG_ON(len > b_size(&qcs->rx.buf)); + return 2; + } + + len -= diff; + data += diff; + + total = b_putblk(&qcs->rx.buf, data, len); + /* TODO handle partial copy of a STREAM frame. */ + BUG_ON(len != total); + + qcs->rx.offset += total; + + if (fin) + qcs->flags |= QC_SF_FIN_RECV; + + out: + return 0; +} + +/* Decode the content of STREAM frames already received on the stream instance + * . + * + * Returns 0 on success else non-zero. + */ +int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs) +{ + if (qcc->app_ops->decode_qcs(qcs, qcs->flags & QC_SF_FIN_RECV, qcc->ctx) < 0) { + fprintf(stderr, "%s: decoding error\n", __func__); + return 1; + } + + return 0; +} + /* detaches the QUIC stream from its QCC and releases it to the QCS pool. */ static void qcs_destroy(struct qcs *qcs) { diff --git a/src/xprt_quic.c b/src/xprt_quic.c index d69bbf54f8..6dccb13ba9 100644 --- a/src/xprt_quic.c +++ b/src/xprt_quic.c @@ -1984,92 +1984,6 @@ static size_t qc_strm_cpy(struct buffer *buf, struct quic_stream *strm_frm) return ret; } -/* Copy as most as possible STREAM data from into buffer. - * - * Note that is not updated as it is implied that the frame may be - * present in a tree and offset node is used as the key. The caller should - * update offset/lenght of the frame after the function call. - * - * Return the total count of copied bytes. - */ -static size_t qc_rx_strm_frm_cpy(struct buffer *buf, - struct quic_rx_strm_frm *strm_frm) -{ - size_t flen = strm_frm->len; - size_t ret = 0; - size_t try; - - ret = 0; - while (flen && (try = b_contig_space(buf))) { - if (try > flen) - try = flen; - - memcpy(b_tail(buf), strm_frm->data + ret, try); - b_add(buf, try); - ret += try; - flen -= try; - } - - return ret; -} - -/* Process as much as possible RX STREAM frames received for */ -static size_t qc_treat_rx_strm_frms(struct qcs *qcs) -{ - int total; - struct eb64_node *frm_node; - - total = 0; - frm_node = eb64_first(&qcs->rx.frms); - while (frm_node) { - int ret; - struct quic_rx_strm_frm *frm; - size_t diff; - - frm = eb64_entry(&frm_node->node, struct quic_rx_strm_frm, offset_node); - if (frm->offset_node.key + frm->len < qcs->rx.offset) { - /* fully already received STREAM offset */ - goto next; - } - - BUG_ON(qcs->flags & QC_SF_FIN_RECV); - if (frm->offset_node.key > qcs->rx.offset) - break; - - diff = qcs->rx.offset - frm->offset_node.key; - frm->data += diff; - frm->len -= diff; - - ret = qc_rx_strm_frm_cpy(&qcs->rx.buf, frm); - qcs->rx.offset += ret; - total += ret; - - BUG_ON(frm->len < ret); - if (frm->len - ret > 0) { - /* Remove the frame from the tree before updating the - * offset field. - */ - eb64_delete(&frm->offset_node); - frm->offset_node.key += (diff + ret); - frm->data += ret; - frm->len -= ret; - eb64_insert(&qcs->rx.frms, &frm->offset_node); - break; - } - - if (frm->fin) - qcs->flags |= QC_SF_FIN_RECV; - - next: - frm_node = eb64_next(frm_node); - quic_rx_packet_refdec(frm->pkt); - eb64_delete(&frm->offset_node); - pool_free(pool_head_quic_rx_strm_frm, frm); - } - - return total; -} - /* Handle bidirectional STREAM frame. Depending on its ID, several * streams may be open. The data are copied to the stream RX buffer if possible. * If not, the STREAM frame is stored to be treated again later. @@ -2080,74 +1994,66 @@ static int qc_handle_bidi_strm_frm(struct quic_rx_packet *pkt, struct quic_stream *strm_frm, struct quic_conn *qc) { - int total; - struct qcs *strm; - struct eb64_node *strm_node; struct quic_rx_strm_frm *frm; + struct eb64_node *frm_node; + struct qcs *qcs = NULL; + int ret; - strm_node = qcc_get_qcs(qc->qcc, strm_frm->id); - if (!strm_node) { - TRACE_PROTO("Stream not found", QUIC_EV_CONN_PSTRM, qc); + ret = qcc_recv(qc->qcc, strm_frm->id, strm_frm->len, + strm_frm->offset.key, strm_frm->fin, + (char *)strm_frm->data, &qcs); + + /* invalid or already received frame */ + if (ret == 1) return 0; - } - strm = eb64_entry(&strm_node->node, struct qcs, by_id); - if (strm_frm->offset.key < strm->rx.offset) { - size_t diff; - - if (strm_frm->offset.key + strm_frm->len <= strm->rx.offset) { - TRACE_PROTO("Already received STREAM data", + if (ret == 2) { + /* frame cannot be parsed at the moment and should be + * buffered. + */ + frm = new_quic_rx_strm_frm(strm_frm, pkt); + if (!frm) { + TRACE_PROTO("Could not alloc RX STREAM frame", QUIC_EV_CONN_PSTRM, qc); - goto out; + return 0; } - TRACE_PROTO("Partially already received STREAM data", QUIC_EV_CONN_PSTRM, qc); - diff = strm->rx.offset - strm_frm->offset.key; - strm_frm->offset.key = strm->rx.offset; - strm_frm->len -= diff; - strm_frm->data += diff; + eb64_insert(&qcs->rx.frms, &frm->offset_node); + quic_rx_packet_refinc(pkt); + + return 1; } - BUG_ON(strm->flags & QC_SF_FIN_RECV); + /* Frame correctly received by the mux. + * If there is buffered frame for next offset, it may be possible to + * receive them now. + */ + frm_node = eb64_first(&qcs->rx.frms); + while (frm_node) { + frm = eb64_entry(&frm_node->node, + struct quic_rx_strm_frm, offset_node); - total = 0; - if (strm_frm->offset.key == strm->rx.offset) { - int ret; + ret = qcc_recv(qc->qcc, qcs->by_id.key, frm->len, + frm->offset_node.key, frm->fin, + (char *)frm->data, &qcs); - if (!qc_get_buf(strm, &strm->rx.buf)) - goto store_frm; + /* interrupt the parsing if the frame cannot be handled for the + * moment only by the MUX. + */ + if (ret == 2) + break; - ret = qc_strm_cpy(&strm->rx.buf, strm_frm); - total += ret; - strm->rx.offset += ret; + /* Remove a newly received frame or an invalid one. */ + frm_node = eb64_next(frm_node); + eb64_delete(&frm->offset_node); + quic_rx_packet_refdec(frm->pkt); + pool_free(pool_head_quic_rx_strm_frm, frm); } - /* FIN is set only if all data were copied. */ - if (strm_frm->fin && !strm_frm->len) - strm->flags |= QC_SF_FIN_RECV; - - total += qc_treat_rx_strm_frms(strm); - - if (total && qc->qcc->app_ops->decode_qcs(strm, strm->flags & QC_SF_FIN_RECV, qc->qcc->ctx) < 0) { - TRACE_PROTO("Decoding error", QUIC_EV_CONN_PSTRM, qc); + /* Decode the received data. */ + if (qcc_decode_qcs(qc->qcc, qcs)) return 0; - } - if (!strm_frm->len) - goto out; - - store_frm: - frm = new_quic_rx_strm_frm(strm_frm, pkt); - if (!frm) { - TRACE_PROTO("Could not alloc RX STREAM frame", - QUIC_EV_CONN_PSTRM, qc); - return 0; - } - - eb64_insert(&strm->rx.frms, &frm->offset_node); - quic_rx_packet_refinc(pkt); - - out: return 1; }