mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2025-04-08 02:01:33 +00:00
MEDIUM: mux-quic: remove qcs tree node
The new qc_stream_desc type has a tree node for storage. Thus, we can remove the node in the qcs structure. When initializing a new stream, it is stored into the qcc streams_by_id tree. When the MUX releases it, it will freed as soon as its buffer is emptied. Before this, the quic-conn is responsible to store it inside its own streams_by_id tree.
This commit is contained in:
parent
7272cd76fc
commit
d8e680cbaf
@ -103,7 +103,7 @@ struct qcs {
|
|||||||
uint64_t msd; /* fctl bytes limit to respect on emission */
|
uint64_t msd; /* fctl bytes limit to respect on emission */
|
||||||
} tx;
|
} tx;
|
||||||
|
|
||||||
struct eb64_node by_id; /* place in qcc's streams_by_id */
|
uint64_t id;
|
||||||
struct qc_stream_desc *stream;
|
struct qc_stream_desc *stream;
|
||||||
|
|
||||||
struct wait_event wait_event;
|
struct wait_event wait_event;
|
||||||
|
@ -6,6 +6,8 @@
|
|||||||
#error "Must define USE_OPENSSL"
|
#error "Must define USE_OPENSSL"
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#include <import/eb64tree.h>
|
||||||
|
|
||||||
#include <haproxy/api.h>
|
#include <haproxy/api.h>
|
||||||
#include <haproxy/connection.h>
|
#include <haproxy/connection.h>
|
||||||
#include <haproxy/mux_quic-t.h>
|
#include <haproxy/mux_quic-t.h>
|
||||||
@ -86,6 +88,22 @@ static inline int qcc_install_app_ops(struct qcc *qcc,
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Retrieve a qc_stream_desc from the MUX <qcc> with <id>. This function is
|
||||||
|
* useful for the transport layer.
|
||||||
|
*
|
||||||
|
* Returns the stream instance or NULL if not found.
|
||||||
|
*/
|
||||||
|
static inline struct qc_stream_desc *qcc_get_stream(struct qcc *qcc, uint64_t id)
|
||||||
|
{
|
||||||
|
struct eb64_node *node;
|
||||||
|
|
||||||
|
node = eb64_lookup(&qcc->streams_by_id, id);
|
||||||
|
if (!node)
|
||||||
|
return NULL;
|
||||||
|
|
||||||
|
return eb64_entry(node, struct qc_stream_desc, by_id);
|
||||||
|
}
|
||||||
|
|
||||||
#endif /* USE_QUIC */
|
#endif /* USE_QUIC */
|
||||||
|
|
||||||
#endif /* _HAPROXY_MUX_QUIC_H */
|
#endif /* _HAPROXY_MUX_QUIC_H */
|
||||||
|
@ -746,7 +746,7 @@ struct quic_conn {
|
|||||||
struct listener *li; /* only valid for frontend connections */
|
struct listener *li; /* only valid for frontend connections */
|
||||||
struct mt_list accept_list; /* chaining element used for accept, only valid for frontend connections */
|
struct mt_list accept_list; /* chaining element used for accept, only valid for frontend connections */
|
||||||
|
|
||||||
struct eb_root streams_by_id; /* stream-descriptors tree */
|
struct eb_root streams_by_id; /* storage for released qc_stream_desc */
|
||||||
|
|
||||||
/* MUX */
|
/* MUX */
|
||||||
struct qcc *qcc;
|
struct qcc *qcc;
|
||||||
|
@ -1243,8 +1243,8 @@ int quic_lstnr_dgram_dispatch(unsigned char *buf, size_t len, void *owner,
|
|||||||
struct quic_dgram *new_dgram, struct list *dgrams);
|
struct quic_dgram *new_dgram, struct list *dgrams);
|
||||||
int qc_send_app_pkts(struct quic_conn *qc, struct list *frms);
|
int qc_send_app_pkts(struct quic_conn *qc, struct list *frms);
|
||||||
|
|
||||||
struct qc_stream_desc *qc_stream_desc_new(struct quic_conn *qc, uint64_t id, void *ctx);
|
struct qc_stream_desc *qc_stream_desc_new(uint64_t id, void *ctx);
|
||||||
void qc_stream_desc_release(struct qc_stream_desc *stream);
|
void qc_stream_desc_release(struct qc_stream_desc *stream, struct quic_conn *qc);
|
||||||
|
|
||||||
#endif /* USE_QUIC */
|
#endif /* USE_QUIC */
|
||||||
#endif /* _HAPROXY_XPRT_QUIC_H */
|
#endif /* _HAPROXY_XPRT_QUIC_H */
|
||||||
|
@ -105,7 +105,7 @@ struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
|
|||||||
goto out;
|
goto out;
|
||||||
|
|
||||||
/* allocate transport layer stream descriptor */
|
/* allocate transport layer stream descriptor */
|
||||||
stream = qc_stream_desc_new(qcc->conn->qc, id, qcs);
|
stream = qc_stream_desc_new(id, qcs);
|
||||||
if (!stream) {
|
if (!stream) {
|
||||||
pool_free(pool_head_qcs, qcs);
|
pool_free(pool_head_qcs, qcs);
|
||||||
qcs = NULL;
|
qcs = NULL;
|
||||||
@ -117,8 +117,9 @@ struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
|
|||||||
qcs->cs = NULL;
|
qcs->cs = NULL;
|
||||||
qcs->flags = QC_SF_NONE;
|
qcs->flags = QC_SF_NONE;
|
||||||
|
|
||||||
qcs->by_id.key = id;
|
qcs->id = id;
|
||||||
eb64_insert(&qcc->streams_by_id, &qcs->by_id);
|
/* store transport layer stream descriptor in qcc tree */
|
||||||
|
eb64_insert(&qcc->streams_by_id, &stream->by_id);
|
||||||
qcc->strms[type].nb_streams++;
|
qcc->strms[type].nb_streams++;
|
||||||
|
|
||||||
/* If stream is local, use peer remote-limit, or else the opposite. */
|
/* If stream is local, use peer remote-limit, or else the opposite. */
|
||||||
@ -153,12 +154,12 @@ void qcs_free(struct qcs *qcs)
|
|||||||
b_free(&qcs->rx.buf);
|
b_free(&qcs->rx.buf);
|
||||||
b_free(&qcs->tx.buf);
|
b_free(&qcs->tx.buf);
|
||||||
|
|
||||||
BUG_ON(!qcs->qcc->strms[qcs_id_type(qcs->by_id.key)].nb_streams);
|
BUG_ON(!qcs->qcc->strms[qcs_id_type(qcs->id)].nb_streams);
|
||||||
--qcs->qcc->strms[qcs_id_type(qcs->by_id.key)].nb_streams;
|
--qcs->qcc->strms[qcs_id_type(qcs->id)].nb_streams;
|
||||||
|
|
||||||
qc_stream_desc_release(qcs->stream);
|
/* stream desc must be removed from MUX tree before release it */
|
||||||
|
eb64_delete(&qcs->stream->by_id);
|
||||||
eb64_delete(&qcs->by_id);
|
qc_stream_desc_release(qcs->stream, qcs->qcc->conn->qc);
|
||||||
pool_free(pool_head_qcs, qcs);
|
pool_free(pool_head_qcs, qcs);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -218,6 +219,7 @@ void qcs_notify_send(struct qcs *qcs)
|
|||||||
*/
|
*/
|
||||||
struct qcs *qcc_get_qcs(struct qcc *qcc, uint64_t id)
|
struct qcs *qcc_get_qcs(struct qcc *qcc, uint64_t id)
|
||||||
{
|
{
|
||||||
|
struct qc_stream_desc *stream;
|
||||||
unsigned int strm_type;
|
unsigned int strm_type;
|
||||||
int64_t sub_id;
|
int64_t sub_id;
|
||||||
struct eb64_node *strm_node;
|
struct eb64_node *strm_node;
|
||||||
@ -233,7 +235,8 @@ struct qcs *qcc_get_qcs(struct qcc *qcc, uint64_t id)
|
|||||||
/* unknown stream id */
|
/* unknown stream id */
|
||||||
goto out;
|
goto out;
|
||||||
}
|
}
|
||||||
qcs = eb64_entry(strm_node, struct qcs, by_id);
|
stream = eb64_entry(strm_node, struct qc_stream_desc, by_id);
|
||||||
|
qcs = stream->ctx;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
/* Remote streams. */
|
/* Remote streams. */
|
||||||
@ -282,8 +285,10 @@ struct qcs *qcc_get_qcs(struct qcc *qcc, uint64_t id)
|
|||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
strm_node = eb64_lookup(strms, id);
|
strm_node = eb64_lookup(strms, id);
|
||||||
if (strm_node)
|
if (strm_node) {
|
||||||
qcs = eb64_entry(strm_node, struct qcs, by_id);
|
stream = eb64_entry(strm_node, struct qc_stream_desc, by_id);
|
||||||
|
qcs = stream->ctx;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -389,12 +394,14 @@ int qcc_recv_max_data(struct qcc *qcc, uint64_t max)
|
|||||||
*/
|
*/
|
||||||
int qcc_recv_max_stream_data(struct qcc *qcc, uint64_t id, uint64_t max)
|
int qcc_recv_max_stream_data(struct qcc *qcc, uint64_t id, uint64_t max)
|
||||||
{
|
{
|
||||||
|
struct qc_stream_desc *stream;
|
||||||
struct qcs *qcs;
|
struct qcs *qcs;
|
||||||
struct eb64_node *node;
|
struct eb64_node *node;
|
||||||
|
|
||||||
node = eb64_lookup(&qcc->streams_by_id, id);
|
node = eb64_lookup(&qcc->streams_by_id, id);
|
||||||
if (node) {
|
if (node) {
|
||||||
qcs = eb64_entry(&node->node, struct qcs, by_id);
|
stream = eb64_entry(node, struct qc_stream_desc, by_id);
|
||||||
|
qcs = stream->ctx;
|
||||||
if (max > qcs->tx.msd) {
|
if (max > qcs->tx.msd) {
|
||||||
qcs->tx.msd = max;
|
qcs->tx.msd = max;
|
||||||
|
|
||||||
@ -436,7 +443,7 @@ static int qc_is_max_streams_needed(struct qcc *qcc)
|
|||||||
static void qcs_destroy(struct qcs *qcs)
|
static void qcs_destroy(struct qcs *qcs)
|
||||||
{
|
{
|
||||||
struct connection *conn = qcs->qcc->conn;
|
struct connection *conn = qcs->qcc->conn;
|
||||||
const uint64_t id = qcs->by_id.key;
|
const uint64_t id = qcs->id;
|
||||||
|
|
||||||
TRACE_ENTER(QMUX_EV_QCS_END, conn, qcs);
|
TRACE_ENTER(QMUX_EV_QCS_END, conn, qcs);
|
||||||
|
|
||||||
@ -501,9 +508,9 @@ static void qc_release(struct qcc *qcc)
|
|||||||
/* liberate remaining qcs instances */
|
/* liberate remaining qcs instances */
|
||||||
node = eb64_first(&qcc->streams_by_id);
|
node = eb64_first(&qcc->streams_by_id);
|
||||||
while (node) {
|
while (node) {
|
||||||
struct qcs *qcs = eb64_entry(node, struct qcs, by_id);
|
struct qc_stream_desc *stream = eb64_entry(node, struct qc_stream_desc, by_id);
|
||||||
node = eb64_next(node);
|
node = eb64_next(node);
|
||||||
qcs_free(qcs);
|
qcs_free(stream->ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
pool_free(pool_head_qcc, qcc);
|
pool_free(pool_head_qcc, qcc);
|
||||||
@ -595,7 +602,7 @@ static int qcs_push_frame(struct qcs *qcs, struct buffer *out,
|
|||||||
|
|
||||||
frm->type = QUIC_FT_STREAM_8;
|
frm->type = QUIC_FT_STREAM_8;
|
||||||
frm->stream.stream = qcs->stream;
|
frm->stream.stream = qcs->stream;
|
||||||
frm->stream.id = qcs->by_id.key;
|
frm->stream.id = qcs->id;
|
||||||
frm->stream.buf = out;
|
frm->stream.buf = out;
|
||||||
frm->stream.data = (unsigned char *)b_peek(out, head);
|
frm->stream.data = (unsigned char *)b_peek(out, head);
|
||||||
|
|
||||||
@ -760,7 +767,8 @@ static int qc_send(struct qcc *qcc)
|
|||||||
*/
|
*/
|
||||||
node = eb64_first(&qcc->streams_by_id);
|
node = eb64_first(&qcc->streams_by_id);
|
||||||
while (node) {
|
while (node) {
|
||||||
struct qcs *qcs = container_of(node, struct qcs, by_id);
|
struct qc_stream_desc *stream = eb64_entry(node, struct qc_stream_desc, by_id);
|
||||||
|
struct qcs *qcs = stream->ctx;
|
||||||
struct buffer *buf = &qcs->tx.buf;
|
struct buffer *buf = &qcs->tx.buf;
|
||||||
struct buffer *out = &qcs->stream->buf;
|
struct buffer *out = &qcs->stream->buf;
|
||||||
|
|
||||||
@ -769,7 +777,7 @@ static int qc_send(struct qcc *qcc)
|
|||||||
* mechanism for sending. This should be unified in the future,
|
* mechanism for sending. This should be unified in the future,
|
||||||
* in this case the next check will be removed.
|
* in this case the next check will be removed.
|
||||||
*/
|
*/
|
||||||
if (quic_stream_is_uni(qcs->by_id.key)) {
|
if (quic_stream_is_uni(qcs->id)) {
|
||||||
node = eb64_next(node);
|
node = eb64_next(node);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@ -824,7 +832,8 @@ static int qc_release_detached_streams(struct qcc *qcc)
|
|||||||
|
|
||||||
node = eb64_first(&qcc->streams_by_id);
|
node = eb64_first(&qcc->streams_by_id);
|
||||||
while (node) {
|
while (node) {
|
||||||
struct qcs *qcs = container_of(node, struct qcs, by_id);
|
struct qc_stream_desc *stream = eb64_entry(node, struct qc_stream_desc, by_id);
|
||||||
|
struct qcs *qcs = stream->ctx;
|
||||||
node = eb64_next(node);
|
node = eb64_next(node);
|
||||||
|
|
||||||
if (qcs->flags & QC_SF_DETACH) {
|
if (qcs->flags & QC_SF_DETACH) {
|
||||||
@ -1223,7 +1232,7 @@ static void qmux_trace(enum trace_level level, uint64_t mask,
|
|||||||
chunk_appendf(&trace_buf, " : qcc=%p(F)", qcc);
|
chunk_appendf(&trace_buf, " : qcc=%p(F)", qcc);
|
||||||
|
|
||||||
if (qcs)
|
if (qcs)
|
||||||
chunk_appendf(&trace_buf, " qcs=%p(%llu)", qcs, qcs->by_id.key);
|
chunk_appendf(&trace_buf, " qcs=%p(%lu)", qcs, qcs->id);
|
||||||
|
|
||||||
if (mask & QMUX_EV_QCC_NQCS) {
|
if (mask & QMUX_EV_QCC_NQCS) {
|
||||||
const uint64_t *id = a3;
|
const uint64_t *id = a3;
|
||||||
|
@ -1484,13 +1484,18 @@ static inline void qc_treat_acked_tx_frm(struct quic_conn *qc,
|
|||||||
|
|
||||||
/* do not use strm_frm->stream as the qc_stream_desc instance
|
/* do not use strm_frm->stream as the qc_stream_desc instance
|
||||||
* might be freed at this stage. Use the id to do a proper
|
* might be freed at this stage. Use the id to do a proper
|
||||||
* lookup.
|
* lookup. First search in the MUX then in the released stream
|
||||||
|
* list.
|
||||||
*
|
*
|
||||||
* TODO if lookup operation impact on the perf is noticeable,
|
* TODO if lookup operation impact on the perf is noticeable,
|
||||||
* implement a refcount on qc_stream_desc instances.
|
* implement a refcount on qc_stream_desc instances.
|
||||||
*/
|
*/
|
||||||
node = eb64_lookup(&qc->streams_by_id, strm_frm->id);
|
if (qc->mux_state == QC_MUX_READY)
|
||||||
stream = eb64_entry(node, struct qc_stream_desc, by_id);
|
stream = qcc_get_stream(qc->qcc, strm_frm->id);
|
||||||
|
if (!stream) {
|
||||||
|
node = eb64_lookup(&qc->streams_by_id, strm_frm->id);
|
||||||
|
stream = eb64_entry(node, struct qc_stream_desc, by_id);
|
||||||
|
}
|
||||||
|
|
||||||
if (!stream) {
|
if (!stream) {
|
||||||
TRACE_PROTO("acked stream for released stream", QUIC_EV_CONN_ACKSTRM, qc, strm_frm);
|
TRACE_PROTO("acked stream for released stream", QUIC_EV_CONN_ACKSTRM, qc, strm_frm);
|
||||||
@ -2155,7 +2160,7 @@ static int qc_handle_bidi_strm_frm(struct quic_rx_packet *pkt,
|
|||||||
frm = eb64_entry(&frm_node->node,
|
frm = eb64_entry(&frm_node->node,
|
||||||
struct quic_rx_strm_frm, offset_node);
|
struct quic_rx_strm_frm, offset_node);
|
||||||
|
|
||||||
ret = qcc_recv(qc->qcc, qcs->by_id.key, frm->len,
|
ret = qcc_recv(qc->qcc, qcs->id, frm->len,
|
||||||
frm->offset_node.key, frm->fin,
|
frm->offset_node.key, frm->fin,
|
||||||
(char *)frm->data, &qcs);
|
(char *)frm->data, &qcs);
|
||||||
|
|
||||||
@ -5733,12 +5738,12 @@ int quic_lstnr_dgram_dispatch(unsigned char *buf, size_t len, void *owner,
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Allocate a new stream descriptor with id <id>. The stream will be stored
|
/* Allocate a new stream descriptor with id <id>. The caller is responsible to
|
||||||
* inside the <qc> connection.
|
* store the stream in the appropriate tree.
|
||||||
*
|
*
|
||||||
* Returns the newly allocated instance on success or else NULL.
|
* Returns the newly allocated instance on success or else NULL.
|
||||||
*/
|
*/
|
||||||
struct qc_stream_desc *qc_stream_desc_new(struct quic_conn *qc, uint64_t id, void *ctx)
|
struct qc_stream_desc *qc_stream_desc_new(uint64_t id, void *ctx)
|
||||||
{
|
{
|
||||||
struct qc_stream_desc *stream;
|
struct qc_stream_desc *stream;
|
||||||
|
|
||||||
@ -5747,7 +5752,7 @@ struct qc_stream_desc *qc_stream_desc_new(struct quic_conn *qc, uint64_t id, voi
|
|||||||
return NULL;
|
return NULL;
|
||||||
|
|
||||||
stream->by_id.key = id;
|
stream->by_id.key = id;
|
||||||
eb64_insert(&qc->streams_by_id, &stream->by_id);
|
stream->by_id.node.leaf_p = NULL;
|
||||||
|
|
||||||
stream->buf = BUF_NULL;
|
stream->buf = BUF_NULL;
|
||||||
stream->acked_frms = EB_ROOT;
|
stream->acked_frms = EB_ROOT;
|
||||||
@ -5759,15 +5764,22 @@ struct qc_stream_desc *qc_stream_desc_new(struct quic_conn *qc, uint64_t id, voi
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Mark the stream descriptor <stream> as released by the upper layer. It will
|
/* Mark the stream descriptor <stream> as released by the upper layer. It will
|
||||||
* be freed as soon as all its buffered data are acknowledged.
|
* be freed as soon as all its buffered data are acknowledged. In the meantime,
|
||||||
|
* the stream is stored in the <qc> tree : thus it must have been removed from
|
||||||
|
* any other tree before calling this function.
|
||||||
*/
|
*/
|
||||||
void qc_stream_desc_release(struct qc_stream_desc *stream)
|
void qc_stream_desc_release(struct qc_stream_desc *stream,
|
||||||
|
struct quic_conn *qc)
|
||||||
{
|
{
|
||||||
|
BUG_ON(stream->by_id.node.leaf_p);
|
||||||
|
|
||||||
stream->release = 1;
|
stream->release = 1;
|
||||||
stream->ctx = NULL;
|
stream->ctx = NULL;
|
||||||
|
|
||||||
if (!b_data(&stream->buf))
|
if (!b_data(&stream->buf))
|
||||||
qc_stream_desc_free(stream);
|
qc_stream_desc_free(stream);
|
||||||
|
else
|
||||||
|
eb64_insert(&qc->streams_by_id, &stream->by_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Function to automatically activate QUIC traces on stdout.
|
/* Function to automatically activate QUIC traces on stdout.
|
||||||
|
Loading…
Reference in New Issue
Block a user