MEDIUM: quic: move transport fields from qcs to qc_conn_stream

Move the xprt-buf and ack related fields from qcs to the qc_stream_desc
structure. In exchange, qcs has a pointer to the low-level stream. For
each new qcs, a qc_stream_desc is automatically allocated.

This simplify the transport layer by removing qcs/mux manipulation
during ACK frame parsing. An additional check is done to not notify the
MUX on sending if the stream is already released : this case may now
happen on retransmission.

To complete this change, the quic_stream frame now references the
quic_stream instance instead of a qcs.
This commit is contained in:
Amaury Denoyelle 2022-03-29 15:15:54 +02:00
parent 5c3859c509
commit 7272cd76fc
4 changed files with 86 additions and 61 deletions

View File

@ -10,6 +10,7 @@
#include <haproxy/buf-t.h>
#include <haproxy/connection-t.h>
#include <haproxy/xprt_quic-t.h>
/* Stream types */
enum qcs_type {
@ -98,14 +99,12 @@ struct qcs {
struct {
uint64_t offset; /* last offset of data ready to be sent */
uint64_t sent_offset; /* last offset sent by transport layer */
struct eb_root acked_frms; /* acked frames ordered by their offsets */
uint64_t ack_offset; /* last acked ordered byte offset */
struct buffer buf; /* transmit buffer before sending via xprt */
struct buffer xprt_buf; /* buffer for xprt sending, cleared on ACK. */
uint64_t msd; /* fctl bytes limit to respect on emission */
} tx;
struct eb64_node by_id; /* place in qcc's streams_by_id */
struct qc_stream_desc *stream;
struct wait_event wait_event;
struct wait_event *subs;

View File

@ -34,6 +34,7 @@
#include <import/ebtree-t.h>
#include <haproxy/mux_quic-t.h>
#include <haproxy/xprt_quic-t.h>
/* QUIC frame types. */
enum quic_frame_type {
@ -147,7 +148,7 @@ struct quic_new_token {
struct quic_stream {
uint64_t id;
struct qcs *qcs;
struct qc_stream_desc *stream;
/* used only on TX when constructing frames.
* Data cleared when processing ACK related to this STREAM frame.

View File

@ -96,6 +96,7 @@ INITCALL1(STG_REGISTER, trace_register_source, TRACE_SOURCE);
struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
{
struct qcs *qcs;
struct qc_stream_desc *stream;
TRACE_ENTER(QMUX_EV_QCS_NEW, qcc->conn);
@ -103,6 +104,15 @@ struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
if (!qcs)
goto out;
/* allocate transport layer stream descriptor */
stream = qc_stream_desc_new(qcc->conn->qc, id, qcs);
if (!stream) {
pool_free(pool_head_qcs, qcs);
qcs = NULL;
goto out;
}
qcs->stream = stream;
qcs->qcc = qcc;
qcs->cs = NULL;
qcs->flags = QC_SF_NONE;
@ -122,11 +132,8 @@ struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
qcs->rx.frms = EB_ROOT_UNIQUE;
qcs->tx.buf = BUF_NULL;
qcs->tx.xprt_buf = BUF_NULL;
qcs->tx.offset = 0;
qcs->tx.sent_offset = 0;
qcs->tx.ack_offset = 0;
qcs->tx.acked_frms = EB_ROOT;
qcs->wait_event.tasklet = NULL;
qcs->wait_event.events = 0;
@ -145,11 +152,12 @@ void qcs_free(struct qcs *qcs)
{
b_free(&qcs->rx.buf);
b_free(&qcs->tx.buf);
b_free(&qcs->tx.xprt_buf);
BUG_ON(!qcs->qcc->strms[qcs_id_type(qcs->by_id.key)].nb_streams);
--qcs->qcc->strms[qcs_id_type(qcs->by_id.key)].nb_streams;
qc_stream_desc_release(qcs->stream);
eb64_delete(&qcs->by_id);
pool_free(pool_head_qcs, qcs);
}
@ -260,6 +268,7 @@ struct qcs *qcc_get_qcs(struct qcc *qcc, uint64_t id)
for (i = largest_id + 1; i <= sub_id; i++) {
uint64_t id = (i << QCS_ID_TYPE_SHIFT) | strm_type;
enum qcs_type type = id & QCS_ID_DIR_BIT ? QCS_CLT_UNI : QCS_CLT_BIDI;
tmp_qcs = qcs_new(qcc, id, type);
if (!tmp_qcs) {
/* allocation failure */
@ -558,10 +567,10 @@ static int qcs_push_frame(struct qcs *qcs, struct buffer *out,
* |xxxxxxxxxxxxxxxxx|
*/
BUG_ON_HOT(qcs->tx.sent_offset < qcs->tx.ack_offset);
BUG_ON_HOT(qcs->tx.sent_offset < qcs->stream->ack_offset);
BUG_ON_HOT(qcs->tx.offset < qcs->tx.sent_offset);
head = qcs->tx.sent_offset - qcs->tx.ack_offset;
head = qcs->tx.sent_offset - qcs->stream->ack_offset;
left = qcs->tx.offset - qcs->tx.sent_offset;
to_xfer = QUIC_MIN(b_data(payload), b_room(out));
@ -585,7 +594,7 @@ static int qcs_push_frame(struct qcs *qcs, struct buffer *out,
total = b_force_xfer(out, payload, to_xfer);
frm->type = QUIC_FT_STREAM_8;
frm->stream.qcs = (struct qcs *)qcs;
frm->stream.stream = qcs->stream;
frm->stream.id = qcs->by_id.key;
frm->stream.buf = out;
frm->stream.data = (unsigned char *)b_peek(out, head);
@ -753,7 +762,7 @@ static int qc_send(struct qcc *qcc)
while (node) {
struct qcs *qcs = container_of(node, struct qcs, by_id);
struct buffer *buf = &qcs->tx.buf;
struct buffer *out = &qcs->tx.xprt_buf;
struct buffer *out = &qcs->stream->buf;
/* TODO
* for the moment, unidirectional streams have their own
@ -819,7 +828,8 @@ static int qc_release_detached_streams(struct qcc *qcc)
node = eb64_next(node);
if (qcs->flags & QC_SF_DETACH) {
if ((!b_data(&qcs->tx.buf) && !b_data(&qcs->tx.xprt_buf))) {
if (!b_data(&qcs->tx.buf) &&
qcs->tx.offset == qcs->tx.sent_offset) {
qcs_destroy(qcs);
release = 1;
}
@ -1033,7 +1043,7 @@ static void qc_detach(struct conn_stream *cs)
* managment between xprt and mux is reorganized.
*/
if ((b_data(&qcs->tx.buf) || b_data(&qcs->tx.xprt_buf))) {
if (b_data(&qcs->tx.buf) || qcs->tx.offset > qcs->tx.sent_offset) {
TRACE_DEVEL("leaving with remaining data, detaching qcs", QMUX_EV_STRM_END, qcc->conn, qcs);
qcs->flags |= QC_SF_DETACH;
return;

View File

@ -447,12 +447,12 @@ static void quic_trace(enum trace_level level, uint64_t mask, const struct trace
if (mask & QUIC_EV_CONN_ACKSTRM) {
const struct quic_stream *s = a2;
const struct qcs *qcs = a3;
const struct qc_stream_desc *stream = a3;
if (s)
chunk_appendf(&trace_buf, " off=%llu len=%llu", (ull)s->offset.key, (ull)s->len);
if (qcs)
chunk_appendf(&trace_buf, " ack_offset=%llu", (ull)qcs->tx.ack_offset);
if (stream)
chunk_appendf(&trace_buf, " ack_offset=%llu", (ull)stream->ack_offset);
}
if (mask & QUIC_EV_CONN_RTTUPDT) {
@ -1421,38 +1421,35 @@ static int qc_stream_desc_free(struct qc_stream_desc *stream)
return 0;
}
/* Remove from <qcs> stream the acknowledged frames.
/* Remove from <stream> the acknowledged frames.
*
* Returns 1 if at least one frame was removed else 0.
*/
static int qcs_try_to_consume(struct qcs *qcs)
static int quic_stream_try_to_consume(struct quic_conn *qc,
struct qc_stream_desc *stream)
{
int ret;
struct eb64_node *frm_node;
ret = 0;
frm_node = eb64_first(&qcs->tx.acked_frms);
frm_node = eb64_first(&stream->acked_frms);
while (frm_node) {
struct quic_stream *strm;
struct quic_frame *frm;
strm = eb64_entry(&frm_node->node, struct quic_stream, offset);
if (strm->offset.key > qcs->tx.ack_offset)
if (strm->offset.key > stream->ack_offset)
break;
TRACE_PROTO("stream consumed", QUIC_EV_CONN_ACKSTRM,
qcs->qcc->conn->qc, strm, qcs);
if (strm->offset.key + strm->len > qcs->tx.ack_offset) {
qc, strm, stream);
if (strm->offset.key + strm->len > stream->ack_offset) {
const size_t diff = strm->offset.key + strm->len -
qcs->tx.ack_offset;
qcs->tx.ack_offset += diff;
stream->ack_offset;
stream->ack_offset += diff;
b_del(strm->buf, diff);
ret = 1;
if (!b_data(strm->buf)) {
b_free(strm->buf);
offer_buffers(NULL, 1);
}
}
frm_node = eb64_next(frm_node);
@ -1464,6 +1461,9 @@ static int qcs_try_to_consume(struct qcs *qcs)
pool_free(pool_head_quic_frame, frm);
}
if (!b_data(&stream->buf))
qc_stream_desc_free(stream);
return ret;
}
@ -1478,23 +1478,22 @@ static inline void qc_treat_acked_tx_frm(struct quic_conn *qc,
switch (frm->type) {
case QUIC_FT_STREAM_8 ... QUIC_FT_STREAM_F:
{
struct quic_stream *strm = &frm->stream;
struct quic_stream *strm_frm = &frm->stream;
struct eb64_node *node = NULL;
struct qcs *qcs = NULL;
struct qc_stream_desc *stream = NULL;
/* do not use strm->qcs as the qcs instance might be freed at
* this stage. Use the id to do a proper lookup.
/* do not use strm_frm->stream as the qc_stream_desc instance
* might be freed at this stage. Use the id to do a proper
* lookup.
*
* TODO if lookup operation impact on the perf is noticeable,
* implement a refcount on qcs instances.
* implement a refcount on qc_stream_desc instances.
*/
if (qc->mux_state == QC_MUX_READY) {
node = eb64_lookup(&qc->qcc->streams_by_id, strm->id);
qcs = eb64_entry(node, struct qcs, by_id);
}
node = eb64_lookup(&qc->streams_by_id, strm_frm->id);
stream = eb64_entry(node, struct qc_stream_desc, by_id);
if (!qcs) {
TRACE_PROTO("acked stream for released stream", QUIC_EV_CONN_ACKSTRM, qc, strm);
if (!stream) {
TRACE_PROTO("acked stream for released stream", QUIC_EV_CONN_ACKSTRM, qc, strm_frm);
LIST_DELETE(&frm->list);
quic_tx_packet_refdec(frm->pkt);
pool_free(pool_head_quic_frame, frm);
@ -1503,32 +1502,34 @@ static inline void qc_treat_acked_tx_frm(struct quic_conn *qc,
return;
}
TRACE_PROTO("acked stream", QUIC_EV_CONN_ACKSTRM, qc, strm, qcs);
if (strm->offset.key <= qcs->tx.ack_offset) {
if (strm->offset.key + strm->len > qcs->tx.ack_offset) {
const size_t diff = strm->offset.key + strm->len -
qcs->tx.ack_offset;
qcs->tx.ack_offset += diff;
b_del(strm->buf, diff);
TRACE_PROTO("acked stream", QUIC_EV_CONN_ACKSTRM, qc, strm_frm, stream);
if (strm_frm->offset.key <= stream->ack_offset) {
if (strm_frm->offset.key + strm_frm->len > stream->ack_offset) {
const size_t diff = strm_frm->offset.key + strm_frm->len -
stream->ack_offset;
stream->ack_offset += diff;
b_del(strm_frm->buf, diff);
stream_acked = 1;
if (!b_data(strm->buf)) {
b_free(strm->buf);
offer_buffers(NULL, 1);
if (!b_data(strm_frm->buf)) {
if (qc_stream_desc_free(stream)) {
/* early return */
return;
}
}
}
TRACE_PROTO("stream consumed", QUIC_EV_CONN_ACKSTRM,
qcs->qcc->conn->qc, strm, qcs);
qc, strm_frm, stream);
LIST_DELETE(&frm->list);
quic_tx_packet_refdec(frm->pkt);
pool_free(pool_head_quic_frame, frm);
}
else {
eb64_insert(&qcs->tx.acked_frms, &strm->offset);
eb64_insert(&stream->acked_frms, &strm_frm->offset);
}
stream_acked |= qcs_try_to_consume(qcs);
stream_acked |= quic_stream_try_to_consume(qc, stream);
}
break;
default:
@ -5089,9 +5090,16 @@ static inline int qc_build_frms(struct list *outlist, struct list *inlist,
LIST_DELETE(&cf->list);
LIST_APPEND(outlist, &cf->list);
qcc_streams_sent_done(cf->stream.qcs,
cf->stream.len,
cf->stream.offset.key);
/* The MUX stream might be released at this
* stage. This can most notably happen on
* retransmission.
*/
if (qc->mux_state == QC_MUX_READY &&
!cf->stream.stream->release) {
qcc_streams_sent_done(cf->stream.stream->ctx,
cf->stream.len,
cf->stream.offset.key);
}
}
else {
struct quic_frame *new_cf;
@ -5104,7 +5112,7 @@ static inline int qc_build_frms(struct list *outlist, struct list *inlist,
}
new_cf->type = cf->type;
new_cf->stream.qcs = cf->stream.qcs;
new_cf->stream.stream = cf->stream.stream;
new_cf->stream.buf = cf->stream.buf;
new_cf->stream.id = cf->stream.id;
if (cf->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT)
@ -5124,9 +5132,16 @@ static inline int qc_build_frms(struct list *outlist, struct list *inlist,
cf->stream.offset.key += dlen;
cf->stream.data = (unsigned char *)b_peek(&cf_buf, dlen);
qcc_streams_sent_done(new_cf->stream.qcs,
new_cf->stream.len,
new_cf->stream.offset.key);
/* The MUX stream might be released at this
* stage. This can most notably happen on
* retransmission.
*/
if (qc->mux_state == QC_MUX_READY &&
!cf->stream.stream->release) {
qcc_streams_sent_done(new_cf->stream.stream->ctx,
new_cf->stream.len,
new_cf->stream.offset.key);
}
}
/* TODO the MUX is notified about the frame sending via