mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2025-01-20 04:30:46 +00:00
MINOR: quic-stream: use distinct tree nodes for quic stream and qcs
Simplify the model qcs/qc_stream_desc. Each types has now its own tree node, stored respectively in qcc and quic-conn trees. It is still necessary to mark the stream as detached by the MUX once all data is transfered to the lower layer. This might improve slightly the performance on ACK management as now only the lookup in quic-conn is necessary. On the other hand, memory size of qcs structure is increased.
This commit is contained in:
parent
0cc02a345b
commit
e4301da5ed
@ -106,6 +106,7 @@ struct qcs {
|
||||
uint64_t msd; /* fctl bytes limit to respect on emission */
|
||||
} tx;
|
||||
|
||||
struct eb64_node by_id;
|
||||
uint64_t id;
|
||||
struct qc_stream_desc *stream;
|
||||
|
||||
|
@ -10,14 +10,14 @@
|
||||
/* QUIC STREAM descriptor.
|
||||
*
|
||||
* This structure is the low-level counterpart of the QUIC STREAM at the MUX
|
||||
* layer. It provides a node for tree-storage and buffering for Tx.
|
||||
* layer. It is stored in the quic-conn and provides facility for Tx buffering.
|
||||
*
|
||||
* Once the MUX has finished to transfer data on a STREAM, it must release its
|
||||
* QUIC STREAM descriptor. The descriptor will be kept by the quic_conn until
|
||||
* all acknowledgement has been received.
|
||||
*/
|
||||
struct qc_stream_desc {
|
||||
struct eb64_node by_id; /* id of the stream used for <streams_by_id> tree */
|
||||
struct eb64_node by_id; /* node for quic_conn tree */
|
||||
|
||||
struct buffer buf; /* buffer for STREAM data on Tx, emptied on acknowledge */
|
||||
uint64_t ack_offset; /* last acknowledged offset */
|
||||
|
@ -7,9 +7,9 @@
|
||||
|
||||
struct quic_conn;
|
||||
|
||||
struct qc_stream_desc *qc_stream_desc_new(uint64_t id, void *ctx);
|
||||
void qc_stream_desc_release(struct qc_stream_desc *stream,
|
||||
struct quic_conn *qc);
|
||||
struct qc_stream_desc *qc_stream_desc_new(uint64_t id, void *ctx,
|
||||
struct quic_conn *qc);
|
||||
void qc_stream_desc_release(struct qc_stream_desc *stream);
|
||||
int qc_stream_desc_free(struct qc_stream_desc *stream);
|
||||
|
||||
#endif /* USE_QUIC */
|
||||
|
@ -749,7 +749,7 @@ struct quic_conn {
|
||||
struct listener *li; /* only valid for frontend connections */
|
||||
struct mt_list accept_list; /* chaining element used for accept, only valid for frontend connections */
|
||||
|
||||
struct eb_root streams_by_id; /* storage for released qc_stream_desc */
|
||||
struct eb_root streams_by_id; /* qc_stream_desc tree */
|
||||
|
||||
/* MUX */
|
||||
struct qcc *qcc;
|
||||
|
@ -112,8 +112,12 @@ struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
|
||||
if (!qcs)
|
||||
goto out;
|
||||
|
||||
/* allocate transport layer stream descriptor */
|
||||
stream = qc_stream_desc_new(id, qcs);
|
||||
/* allocate transport layer stream descriptor
|
||||
*
|
||||
* TODO qc_stream_desc is only useful for Tx buffering. It should not
|
||||
* be required for unidirectional remote streams.
|
||||
*/
|
||||
stream = qc_stream_desc_new(id, qcs, qcc->conn->handle.qc);
|
||||
if (!stream) {
|
||||
pool_free(pool_head_qcs, qcs);
|
||||
qcs = NULL;
|
||||
@ -134,9 +138,9 @@ struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
|
||||
qcs->endp->ctx = qcc->conn;
|
||||
qcs->endp->flags |= (CS_EP_T_MUX|CS_EP_ORPHAN|CS_EP_NOT_FIRST);
|
||||
|
||||
qcs->id = id;
|
||||
qcs->id = qcs->by_id.key = id;
|
||||
/* store transport layer stream descriptor in qcc tree */
|
||||
eb64_insert(&qcc->streams_by_id, &stream->by_id);
|
||||
eb64_insert(&qcc->streams_by_id, &qcs->by_id);
|
||||
|
||||
qcc->strms[type].nb_streams++;
|
||||
|
||||
@ -175,11 +179,12 @@ void qcs_free(struct qcs *qcs)
|
||||
BUG_ON(!qcs->qcc->strms[qcs_id_type(qcs->id)].nb_streams);
|
||||
--qcs->qcc->strms[qcs_id_type(qcs->id)].nb_streams;
|
||||
|
||||
/* stream desc must be removed from MUX tree before release it */
|
||||
eb64_delete(&qcs->stream->by_id);
|
||||
qc_stream_desc_release(qcs->stream, qcs->qcc->conn->handle.qc);
|
||||
qc_stream_desc_release(qcs->stream);
|
||||
|
||||
BUG_ON(qcs->endp && !(qcs->endp->flags & CS_EP_ORPHAN));
|
||||
cs_endpoint_free(qcs->endp);
|
||||
|
||||
eb64_delete(&qcs->by_id);
|
||||
pool_free(pool_head_qcs, qcs);
|
||||
}
|
||||
|
||||
@ -239,24 +244,22 @@ void qcs_notify_send(struct qcs *qcs)
|
||||
*/
|
||||
struct qcs *qcc_get_qcs(struct qcc *qcc, uint64_t id)
|
||||
{
|
||||
struct qc_stream_desc *stream;
|
||||
unsigned int strm_type;
|
||||
int64_t sub_id;
|
||||
struct eb64_node *strm_node;
|
||||
struct eb64_node *node;
|
||||
struct qcs *qcs = NULL;
|
||||
|
||||
strm_type = id & QCS_ID_TYPE_MASK;
|
||||
sub_id = id >> QCS_ID_TYPE_SHIFT;
|
||||
strm_node = NULL;
|
||||
node = NULL;
|
||||
if (quic_stream_is_local(qcc, id)) {
|
||||
/* Local streams: this stream must be already opened. */
|
||||
strm_node = eb64_lookup(&qcc->streams_by_id, id);
|
||||
if (!strm_node) {
|
||||
node = eb64_lookup(&qcc->streams_by_id, id);
|
||||
if (!node) {
|
||||
/* unknown stream id */
|
||||
goto out;
|
||||
}
|
||||
stream = eb64_entry(strm_node, struct qc_stream_desc, by_id);
|
||||
qcs = stream->ctx;
|
||||
qcs = eb64_entry(node, struct qcs, by_id);
|
||||
}
|
||||
else {
|
||||
/* Remote streams. */
|
||||
@ -304,11 +307,9 @@ struct qcs *qcc_get_qcs(struct qcc *qcc, uint64_t id)
|
||||
qcs = tmp_qcs;
|
||||
}
|
||||
else {
|
||||
strm_node = eb64_lookup(strms, id);
|
||||
if (strm_node) {
|
||||
stream = eb64_entry(strm_node, struct qc_stream_desc, by_id);
|
||||
qcs = stream->ctx;
|
||||
}
|
||||
node = eb64_lookup(strms, id);
|
||||
if (node)
|
||||
qcs = eb64_entry(node, struct qcs, by_id);
|
||||
}
|
||||
}
|
||||
|
||||
@ -414,14 +415,12 @@ int qcc_recv_max_data(struct qcc *qcc, uint64_t max)
|
||||
*/
|
||||
int qcc_recv_max_stream_data(struct qcc *qcc, uint64_t id, uint64_t max)
|
||||
{
|
||||
struct qc_stream_desc *stream;
|
||||
struct qcs *qcs;
|
||||
struct eb64_node *node;
|
||||
|
||||
node = eb64_lookup(&qcc->streams_by_id, id);
|
||||
if (node) {
|
||||
stream = eb64_entry(node, struct qc_stream_desc, by_id);
|
||||
qcs = stream->ctx;
|
||||
qcs = eb64_entry(node, struct qcs, by_id);
|
||||
if (max > qcs->tx.msd) {
|
||||
qcs->tx.msd = max;
|
||||
|
||||
@ -522,9 +521,9 @@ static void qc_release(struct qcc *qcc)
|
||||
/* liberate remaining qcs instances */
|
||||
node = eb64_first(&qcc->streams_by_id);
|
||||
while (node) {
|
||||
struct qc_stream_desc *stream = eb64_entry(node, struct qc_stream_desc, by_id);
|
||||
struct qcs *qcs = eb64_entry(node, struct qcs, by_id);
|
||||
node = eb64_next(node);
|
||||
qcs_free(stream->ctx);
|
||||
qcs_free(qcs);
|
||||
}
|
||||
|
||||
pool_free(pool_head_qcc, qcc);
|
||||
@ -846,8 +845,7 @@ static int qc_send(struct qcc *qcc)
|
||||
*/
|
||||
node = eb64_first(&qcc->streams_by_id);
|
||||
while (node) {
|
||||
struct qc_stream_desc *stream = eb64_entry(node, struct qc_stream_desc, by_id);
|
||||
struct qcs *qcs = stream->ctx;
|
||||
struct qcs *qcs = eb64_entry(node, struct qcs, by_id);
|
||||
struct buffer *buf = &qcs->tx.buf;
|
||||
struct buffer *out = &qcs->stream->buf;
|
||||
|
||||
@ -921,8 +919,7 @@ static int qc_release_detached_streams(struct qcc *qcc)
|
||||
|
||||
node = eb64_first(&qcc->streams_by_id);
|
||||
while (node) {
|
||||
struct qc_stream_desc *stream = eb64_entry(node, struct qc_stream_desc, by_id);
|
||||
struct qcs *qcs = stream->ctx;
|
||||
struct qcs *qcs = eb64_entry(node, struct qcs, by_id);
|
||||
node = eb64_next(node);
|
||||
|
||||
if (qcs->flags & QC_SF_DETACH) {
|
||||
@ -1263,14 +1260,12 @@ static int qc_unsubscribe(struct conn_stream *cs, int event_type, struct wait_ev
|
||||
*/
|
||||
static int qc_wake_some_streams(struct qcc *qcc)
|
||||
{
|
||||
struct qc_stream_desc *stream;
|
||||
struct qcs *qcs;
|
||||
struct eb64_node *node;
|
||||
|
||||
for (node = eb64_first(&qcc->streams_by_id); node;
|
||||
node = eb64_next(node)) {
|
||||
stream = eb64_entry(node, struct qc_stream_desc, by_id);
|
||||
qcs = stream->ctx;
|
||||
qcs = eb64_entry(node, struct qcs, by_id);
|
||||
|
||||
if (!qcs->cs)
|
||||
continue;
|
||||
|
@ -17,7 +17,8 @@ DECLARE_STATIC_POOL(pool_head_quic_conn_stream, "qc_stream_desc",
|
||||
*
|
||||
* Returns the newly allocated instance on success or else NULL.
|
||||
*/
|
||||
struct qc_stream_desc *qc_stream_desc_new(uint64_t id, void *ctx)
|
||||
struct qc_stream_desc *qc_stream_desc_new(uint64_t id, void *ctx,
|
||||
struct quic_conn *qc)
|
||||
{
|
||||
struct qc_stream_desc *stream;
|
||||
|
||||
@ -26,7 +27,7 @@ struct qc_stream_desc *qc_stream_desc_new(uint64_t id, void *ctx)
|
||||
return NULL;
|
||||
|
||||
stream->by_id.key = id;
|
||||
stream->by_id.node.leaf_p = NULL;
|
||||
eb64_insert(&qc->streams_by_id, &stream->by_id);
|
||||
|
||||
stream->buf = BUF_NULL;
|
||||
stream->acked_frms = EB_ROOT;
|
||||
@ -37,23 +38,19 @@ struct qc_stream_desc *qc_stream_desc_new(uint64_t id, void *ctx)
|
||||
return stream;
|
||||
}
|
||||
|
||||
/* Mark the stream descriptor <stream> as released by the upper layer. It will
|
||||
* be freed as soon as all its buffered data are acknowledged. In the meantime,
|
||||
* the stream is stored in the <qc> tree : thus it must have been removed from
|
||||
* any other tree before calling this function.
|
||||
/* Mark the stream descriptor <stream> as released. It will be freed as soon as
|
||||
* all its buffered data are acknowledged.
|
||||
*/
|
||||
void qc_stream_desc_release(struct qc_stream_desc *stream,
|
||||
struct quic_conn *qc)
|
||||
void qc_stream_desc_release(struct qc_stream_desc *stream)
|
||||
{
|
||||
BUG_ON(stream->by_id.node.leaf_p);
|
||||
/* A stream can be released only one time. */
|
||||
BUG_ON(stream->release);
|
||||
|
||||
stream->release = 1;
|
||||
stream->ctx = NULL;
|
||||
|
||||
if (!b_data(&stream->buf))
|
||||
qc_stream_desc_free(stream);
|
||||
else
|
||||
eb64_insert(&qc->streams_by_id, &stream->by_id);
|
||||
}
|
||||
|
||||
/* Free the stream descriptor <stream> buffer. This function should be used
|
||||
|
@ -1494,20 +1494,13 @@ static inline void qc_treat_acked_tx_frm(struct quic_conn *qc,
|
||||
|
||||
/* do not use strm_frm->stream as the qc_stream_desc instance
|
||||
* might be freed at this stage. Use the id to do a proper
|
||||
* lookup. First search in the MUX then in the released stream
|
||||
* list.
|
||||
* lookup.
|
||||
*
|
||||
* TODO if lookup operation impact on the perf is noticeable,
|
||||
* implement a refcount on qc_stream_desc instances.
|
||||
*/
|
||||
if (qc->mux_state == QC_MUX_READY)
|
||||
stream = qcc_get_stream(qc->qcc, strm_frm->id);
|
||||
if (!stream) {
|
||||
node = eb64_lookup(&qc->streams_by_id, strm_frm->id);
|
||||
stream = eb64_entry(node, struct qc_stream_desc, by_id);
|
||||
}
|
||||
|
||||
if (!stream) {
|
||||
node = eb64_lookup(&qc->streams_by_id, strm_frm->id);
|
||||
if (!node) {
|
||||
TRACE_PROTO("acked stream for released stream", QUIC_EV_CONN_ACKSTRM, qc, strm_frm);
|
||||
LIST_DELETE(&frm->list);
|
||||
quic_tx_packet_refdec(frm->pkt);
|
||||
@ -1516,6 +1509,7 @@ static inline void qc_treat_acked_tx_frm(struct quic_conn *qc,
|
||||
/* early return */
|
||||
return;
|
||||
}
|
||||
stream = eb64_entry(node, struct qc_stream_desc, by_id);
|
||||
|
||||
TRACE_PROTO("acked stream", QUIC_EV_CONN_ACKSTRM, qc, strm_frm, stream);
|
||||
if (strm_frm->offset.key <= stream->ack_offset) {
|
||||
|
Loading…
Reference in New Issue
Block a user