diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h index acc855bf73..ab2b952af9 100644 --- a/include/haproxy/mux_quic-t.h +++ b/include/haproxy/mux_quic-t.h @@ -103,7 +103,7 @@ struct qcs { uint64_t msd; /* fctl bytes limit to respect on emission */ } tx; - struct eb64_node by_id; /* place in qcc's streams_by_id */ + uint64_t id; struct qc_stream_desc *stream; struct wait_event wait_event; diff --git a/include/haproxy/mux_quic.h b/include/haproxy/mux_quic.h index 1adbfcad5c..f0c0a430ef 100644 --- a/include/haproxy/mux_quic.h +++ b/include/haproxy/mux_quic.h @@ -6,6 +6,8 @@ #error "Must define USE_OPENSSL" #endif +#include + #include #include #include @@ -86,6 +88,22 @@ static inline int qcc_install_app_ops(struct qcc *qcc, return 0; } +/* Retrieve a qc_stream_desc from the MUX with . This function is + * useful for the transport layer. + * + * Returns the stream instance or NULL if not found. + */ +static inline struct qc_stream_desc *qcc_get_stream(struct qcc *qcc, uint64_t id) +{ + struct eb64_node *node; + + node = eb64_lookup(&qcc->streams_by_id, id); + if (!node) + return NULL; + + return eb64_entry(node, struct qc_stream_desc, by_id); +} + #endif /* USE_QUIC */ #endif /* _HAPROXY_MUX_QUIC_H */ diff --git a/include/haproxy/xprt_quic-t.h b/include/haproxy/xprt_quic-t.h index f4b160a10e..6247abc0f7 100644 --- a/include/haproxy/xprt_quic-t.h +++ b/include/haproxy/xprt_quic-t.h @@ -746,7 +746,7 @@ struct quic_conn { struct listener *li; /* only valid for frontend connections */ struct mt_list accept_list; /* chaining element used for accept, only valid for frontend connections */ - struct eb_root streams_by_id; /* stream-descriptors tree */ + struct eb_root streams_by_id; /* storage for released qc_stream_desc */ /* MUX */ struct qcc *qcc; diff --git a/include/haproxy/xprt_quic.h b/include/haproxy/xprt_quic.h index cc6cd49c64..567cbefe90 100644 --- a/include/haproxy/xprt_quic.h +++ b/include/haproxy/xprt_quic.h @@ -1243,8 +1243,8 @@ int quic_lstnr_dgram_dispatch(unsigned char *buf, size_t len, void *owner, struct quic_dgram *new_dgram, struct list *dgrams); int qc_send_app_pkts(struct quic_conn *qc, struct list *frms); -struct qc_stream_desc *qc_stream_desc_new(struct quic_conn *qc, uint64_t id, void *ctx); -void qc_stream_desc_release(struct qc_stream_desc *stream); +struct qc_stream_desc *qc_stream_desc_new(uint64_t id, void *ctx); +void qc_stream_desc_release(struct qc_stream_desc *stream, struct quic_conn *qc); #endif /* USE_QUIC */ #endif /* _HAPROXY_XPRT_QUIC_H */ diff --git a/src/mux_quic.c b/src/mux_quic.c index f7ef248ba4..8e91d7b4d9 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -105,7 +105,7 @@ struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type) goto out; /* allocate transport layer stream descriptor */ - stream = qc_stream_desc_new(qcc->conn->qc, id, qcs); + stream = qc_stream_desc_new(id, qcs); if (!stream) { pool_free(pool_head_qcs, qcs); qcs = NULL; @@ -117,8 +117,9 @@ struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type) qcs->cs = NULL; qcs->flags = QC_SF_NONE; - qcs->by_id.key = id; - eb64_insert(&qcc->streams_by_id, &qcs->by_id); + qcs->id = id; + /* store transport layer stream descriptor in qcc tree */ + eb64_insert(&qcc->streams_by_id, &stream->by_id); qcc->strms[type].nb_streams++; /* If stream is local, use peer remote-limit, or else the opposite. */ @@ -153,12 +154,12 @@ void qcs_free(struct qcs *qcs) b_free(&qcs->rx.buf); b_free(&qcs->tx.buf); - BUG_ON(!qcs->qcc->strms[qcs_id_type(qcs->by_id.key)].nb_streams); - --qcs->qcc->strms[qcs_id_type(qcs->by_id.key)].nb_streams; + BUG_ON(!qcs->qcc->strms[qcs_id_type(qcs->id)].nb_streams); + --qcs->qcc->strms[qcs_id_type(qcs->id)].nb_streams; - qc_stream_desc_release(qcs->stream); - - eb64_delete(&qcs->by_id); + /* stream desc must be removed from MUX tree before release it */ + eb64_delete(&qcs->stream->by_id); + qc_stream_desc_release(qcs->stream, qcs->qcc->conn->qc); pool_free(pool_head_qcs, qcs); } @@ -218,6 +219,7 @@ void qcs_notify_send(struct qcs *qcs) */ struct qcs *qcc_get_qcs(struct qcc *qcc, uint64_t id) { + struct qc_stream_desc *stream; unsigned int strm_type; int64_t sub_id; struct eb64_node *strm_node; @@ -233,7 +235,8 @@ struct qcs *qcc_get_qcs(struct qcc *qcc, uint64_t id) /* unknown stream id */ goto out; } - qcs = eb64_entry(strm_node, struct qcs, by_id); + stream = eb64_entry(strm_node, struct qc_stream_desc, by_id); + qcs = stream->ctx; } else { /* Remote streams. */ @@ -282,8 +285,10 @@ struct qcs *qcc_get_qcs(struct qcc *qcc, uint64_t id) } else { strm_node = eb64_lookup(strms, id); - if (strm_node) - qcs = eb64_entry(strm_node, struct qcs, by_id); + if (strm_node) { + stream = eb64_entry(strm_node, struct qc_stream_desc, by_id); + qcs = stream->ctx; + } } } @@ -389,12 +394,14 @@ int qcc_recv_max_data(struct qcc *qcc, uint64_t max) */ int qcc_recv_max_stream_data(struct qcc *qcc, uint64_t id, uint64_t max) { + struct qc_stream_desc *stream; struct qcs *qcs; struct eb64_node *node; node = eb64_lookup(&qcc->streams_by_id, id); if (node) { - qcs = eb64_entry(&node->node, struct qcs, by_id); + stream = eb64_entry(node, struct qc_stream_desc, by_id); + qcs = stream->ctx; if (max > qcs->tx.msd) { qcs->tx.msd = max; @@ -436,7 +443,7 @@ static int qc_is_max_streams_needed(struct qcc *qcc) static void qcs_destroy(struct qcs *qcs) { struct connection *conn = qcs->qcc->conn; - const uint64_t id = qcs->by_id.key; + const uint64_t id = qcs->id; TRACE_ENTER(QMUX_EV_QCS_END, conn, qcs); @@ -501,9 +508,9 @@ static void qc_release(struct qcc *qcc) /* liberate remaining qcs instances */ node = eb64_first(&qcc->streams_by_id); while (node) { - struct qcs *qcs = eb64_entry(node, struct qcs, by_id); + struct qc_stream_desc *stream = eb64_entry(node, struct qc_stream_desc, by_id); node = eb64_next(node); - qcs_free(qcs); + qcs_free(stream->ctx); } pool_free(pool_head_qcc, qcc); @@ -595,7 +602,7 @@ static int qcs_push_frame(struct qcs *qcs, struct buffer *out, frm->type = QUIC_FT_STREAM_8; frm->stream.stream = qcs->stream; - frm->stream.id = qcs->by_id.key; + frm->stream.id = qcs->id; frm->stream.buf = out; frm->stream.data = (unsigned char *)b_peek(out, head); @@ -760,7 +767,8 @@ static int qc_send(struct qcc *qcc) */ node = eb64_first(&qcc->streams_by_id); while (node) { - struct qcs *qcs = container_of(node, struct qcs, by_id); + struct qc_stream_desc *stream = eb64_entry(node, struct qc_stream_desc, by_id); + struct qcs *qcs = stream->ctx; struct buffer *buf = &qcs->tx.buf; struct buffer *out = &qcs->stream->buf; @@ -769,7 +777,7 @@ static int qc_send(struct qcc *qcc) * mechanism for sending. This should be unified in the future, * in this case the next check will be removed. */ - if (quic_stream_is_uni(qcs->by_id.key)) { + if (quic_stream_is_uni(qcs->id)) { node = eb64_next(node); continue; } @@ -824,7 +832,8 @@ static int qc_release_detached_streams(struct qcc *qcc) node = eb64_first(&qcc->streams_by_id); while (node) { - struct qcs *qcs = container_of(node, struct qcs, by_id); + struct qc_stream_desc *stream = eb64_entry(node, struct qc_stream_desc, by_id); + struct qcs *qcs = stream->ctx; node = eb64_next(node); if (qcs->flags & QC_SF_DETACH) { @@ -1223,7 +1232,7 @@ static void qmux_trace(enum trace_level level, uint64_t mask, chunk_appendf(&trace_buf, " : qcc=%p(F)", qcc); if (qcs) - chunk_appendf(&trace_buf, " qcs=%p(%llu)", qcs, qcs->by_id.key); + chunk_appendf(&trace_buf, " qcs=%p(%lu)", qcs, qcs->id); if (mask & QMUX_EV_QCC_NQCS) { const uint64_t *id = a3; diff --git a/src/xprt_quic.c b/src/xprt_quic.c index 5f31d0edd5..e13773f5b1 100644 --- a/src/xprt_quic.c +++ b/src/xprt_quic.c @@ -1484,13 +1484,18 @@ static inline void qc_treat_acked_tx_frm(struct quic_conn *qc, /* do not use strm_frm->stream as the qc_stream_desc instance * might be freed at this stage. Use the id to do a proper - * lookup. + * lookup. First search in the MUX then in the released stream + * list. * * TODO if lookup operation impact on the perf is noticeable, * implement a refcount on qc_stream_desc instances. */ - node = eb64_lookup(&qc->streams_by_id, strm_frm->id); - stream = eb64_entry(node, struct qc_stream_desc, by_id); + if (qc->mux_state == QC_MUX_READY) + stream = qcc_get_stream(qc->qcc, strm_frm->id); + if (!stream) { + node = eb64_lookup(&qc->streams_by_id, strm_frm->id); + stream = eb64_entry(node, struct qc_stream_desc, by_id); + } if (!stream) { TRACE_PROTO("acked stream for released stream", QUIC_EV_CONN_ACKSTRM, qc, strm_frm); @@ -2155,7 +2160,7 @@ static int qc_handle_bidi_strm_frm(struct quic_rx_packet *pkt, frm = eb64_entry(&frm_node->node, struct quic_rx_strm_frm, offset_node); - ret = qcc_recv(qc->qcc, qcs->by_id.key, frm->len, + ret = qcc_recv(qc->qcc, qcs->id, frm->len, frm->offset_node.key, frm->fin, (char *)frm->data, &qcs); @@ -5733,12 +5738,12 @@ int quic_lstnr_dgram_dispatch(unsigned char *buf, size_t len, void *owner, return 0; } -/* Allocate a new stream descriptor with id . The stream will be stored - * inside the connection. +/* Allocate a new stream descriptor with id . The caller is responsible to + * store the stream in the appropriate tree. * * Returns the newly allocated instance on success or else NULL. */ -struct qc_stream_desc *qc_stream_desc_new(struct quic_conn *qc, uint64_t id, void *ctx) +struct qc_stream_desc *qc_stream_desc_new(uint64_t id, void *ctx) { struct qc_stream_desc *stream; @@ -5747,7 +5752,7 @@ struct qc_stream_desc *qc_stream_desc_new(struct quic_conn *qc, uint64_t id, voi return NULL; stream->by_id.key = id; - eb64_insert(&qc->streams_by_id, &stream->by_id); + stream->by_id.node.leaf_p = NULL; stream->buf = BUF_NULL; stream->acked_frms = EB_ROOT; @@ -5759,15 +5764,22 @@ struct qc_stream_desc *qc_stream_desc_new(struct quic_conn *qc, uint64_t id, voi } /* Mark the stream descriptor as released by the upper layer. It will - * be freed as soon as all its buffered data are acknowledged. + * be freed as soon as all its buffered data are acknowledged. In the meantime, + * the stream is stored in the tree : thus it must have been removed from + * any other tree before calling this function. */ -void qc_stream_desc_release(struct qc_stream_desc *stream) +void qc_stream_desc_release(struct qc_stream_desc *stream, + struct quic_conn *qc) { + BUG_ON(stream->by_id.node.leaf_p); + stream->release = 1; stream->ctx = NULL; if (!b_data(&stream->buf)) qc_stream_desc_free(stream); + else + eb64_insert(&qc->streams_by_id, &stream->by_id); } /* Function to automatically activate QUIC traces on stdout.