mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2025-01-18 19:50:54 +00:00
MINOR: quic: Make use of buffer structs to handle STREAM frames
The STREAM data to send coming from the upper layer must be stored until having being acked by the peer. To do so, we store them in buffer structs, one by stream (see qcs.tx.buf). Each time a STREAM is built by quic_push_frame(), its offset must match the offset of the first byte added to the buffer (modulo the size of the buffer) by the frame. As they are not always acknowledged in order, they may be stored in eb_trees ordered by their offset to be sure to sequentially delete the STREAM data from their buffer, in the order they have been added to it.
This commit is contained in:
parent
c7860007cc
commit
785d3bdedc
@ -185,7 +185,6 @@ struct qcs {
|
||||
struct session *sess;
|
||||
struct qcc *qcc;
|
||||
struct eb64_node by_id; /* place in qcc's streams_by_id */
|
||||
struct eb_root frms;
|
||||
uint64_t id; /* stream ID */
|
||||
uint32_t flags; /* QC_SF_* */
|
||||
struct {
|
||||
@ -194,12 +193,15 @@ struct qcs {
|
||||
uint64_t offset; /* the current offset of received data */
|
||||
uint64_t bytes; /* number of bytes received */
|
||||
struct buffer buf; /* receive buffer, always valid (buf_empty or real buffer) */
|
||||
struct eb_root frms; /* received frames ordered by their offsets */
|
||||
} rx;
|
||||
struct {
|
||||
enum qcs_tx_st st; /* TX state */
|
||||
uint64_t max_data; /* maximum number of bytes which may be sent */
|
||||
uint64_t offset; /* the current offset of data to send */
|
||||
uint64_t bytes; /* number of bytes sent */
|
||||
uint64_t ack_offset; /* last acked ordered byte offset */
|
||||
struct eb_root acked_frms; /* acked frames ordered by their offsets */
|
||||
struct buffer buf; /* transmit buffer, always valid (buf_empty or real buffer) */
|
||||
struct buffer mbuf[QCC_MBUF_CNT];
|
||||
uint64_t left; /* data currently stored in mbuf waiting for send */
|
||||
|
@ -31,6 +31,8 @@
|
||||
|
||||
#include <haproxy/list.h>
|
||||
|
||||
#include <import/eb64tree.h>
|
||||
|
||||
/* QUIC frame types. */
|
||||
enum quic_frame_type {
|
||||
QUIC_FT_PADDING = 0x00,
|
||||
@ -141,7 +143,9 @@ struct quic_new_token {
|
||||
|
||||
struct quic_stream {
|
||||
uint64_t id;
|
||||
uint64_t offset;
|
||||
struct qcs *qcs;
|
||||
struct buffer *buf;
|
||||
struct eb64_node offset;
|
||||
uint64_t len;
|
||||
const unsigned char *data;
|
||||
};
|
||||
|
@ -72,7 +72,7 @@ static inline size_t qc_frm_len(struct quic_frame *frm)
|
||||
case QUIC_FT_STREAM_8 ... QUIC_FT_STREAM_F: {
|
||||
struct quic_stream *f = &frm->stream;
|
||||
len += 1 + quic_int_getsize(f->id) +
|
||||
((frm->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT) ? quic_int_getsize(f->offset) : 0) +
|
||||
((frm->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT) ? quic_int_getsize(f->offset.key) : 0) +
|
||||
((frm->type & QUIC_STREAM_FRAME_TYPE_LEN_BIT) ? quic_int_getsize(f->len) : 0) + f->len;
|
||||
break;
|
||||
}
|
||||
|
@ -970,19 +970,20 @@ struct qcs *bidi_qcs_new(struct qcc *qcc, uint64_t id)
|
||||
qcs->qcc = qcc;
|
||||
qcs->cs = NULL;
|
||||
qcs->id = qcs->by_id.key = id;
|
||||
qcs->frms = EB_ROOT_UNIQUE;
|
||||
qcs->flags = QC_SF_NONE;
|
||||
|
||||
qcs->rx.buf = BUF_NULL;
|
||||
qcs->rx.st = QC_RX_SS_IDLE;
|
||||
qcs->rx.bytes = qcs->rx.offset = 0;
|
||||
qcs->rx.max_data = qcc->strms[qcs_type].rx.max_data;
|
||||
|
||||
qcs->rx.buf = BUF_NULL;
|
||||
qcs->rx.frms = EB_ROOT_UNIQUE;
|
||||
|
||||
qcs->tx.st = QC_TX_SS_IDLE;
|
||||
qcs->tx.bytes = qcs->tx.offset = 0;
|
||||
qcs->tx.bytes = qcs->tx.offset = qcs->tx.ack_offset = 0;
|
||||
qcs->tx.acked_frms = EB_ROOT_UNIQUE;
|
||||
qcs->tx.max_data = qcc->strms[qcs_type].tx.max_data;
|
||||
qcs->tx.buf = BUF_NULL;
|
||||
qcs->tx.buf = BUF_NULL;
|
||||
br_init(qcs->tx.mbuf, sizeof(qcs->tx.mbuf) / sizeof(qcs->tx.mbuf[0]));
|
||||
qcs->tx.left = 0;
|
||||
|
||||
@ -1041,13 +1042,13 @@ struct qcs *luqs_new(struct qcc *qcc)
|
||||
qcs->qcc = qcc;
|
||||
qcs->cs = NULL;
|
||||
qcs->id = qcs->by_id.key = next_id;
|
||||
qcs->frms = EB_ROOT_UNIQUE;
|
||||
qcs->flags = QC_SF_NONE;
|
||||
|
||||
qcs->tx.st = QC_TX_SS_IDLE;
|
||||
qcs->tx.max_data = qcc->strms[qcs_type].tx.max_data;
|
||||
qcs->tx.offset = qcs->tx.bytes = 0;
|
||||
qcs->tx.buf = BUF_NULL;
|
||||
qcs->tx.st = QC_TX_SS_IDLE;
|
||||
qcs->tx.max_data = qcc->strms[qcs_type].tx.max_data;
|
||||
qcs->tx.offset = qcs->tx.bytes = qcs->tx.ack_offset = 0;
|
||||
qcs->tx.acked_frms = EB_ROOT_UNIQUE;
|
||||
qcs->tx.buf = BUF_NULL;
|
||||
br_init(qcs->tx.mbuf, sizeof(qcs->tx.mbuf) / sizeof(qcs->tx.mbuf[0]));
|
||||
qcs->tx.left = 0;
|
||||
|
||||
@ -1083,13 +1084,13 @@ struct qcs *ruqs_new(struct qcc *qcc, uint64_t id)
|
||||
|
||||
qcs->qcc = qcc;
|
||||
qcs->id = qcs->by_id.key = id;
|
||||
qcs->frms = EB_ROOT_UNIQUE;
|
||||
qcs->flags = QC_SF_NONE;
|
||||
|
||||
qcs->rx.st = QC_RX_SS_IDLE;
|
||||
qcs->rx.max_data = qcc->strms[qcs_type].rx.max_data;
|
||||
qcs->rx.offset = qcs->rx.bytes = 0;
|
||||
qcs->rx.buf = BUF_NULL;
|
||||
qcs->rx.frms = EB_ROOT_UNIQUE;
|
||||
br_init(qcs->tx.mbuf, sizeof(qcs->tx.mbuf) / sizeof(qcs->tx.mbuf[0]));
|
||||
qcs->tx.left = 0;
|
||||
|
||||
@ -1396,12 +1397,12 @@ leave:
|
||||
static int qcs_push_frame(struct qcs *qcs, struct buffer *payload, int fin, uint64_t offset)
|
||||
{
|
||||
struct quic_frame *frm;
|
||||
struct buffer buf = BUF_NULL;
|
||||
struct buffer *buf = &qcs->tx.buf;
|
||||
struct quic_enc_level *qel = &qcs->qcc->conn->qc->els[QUIC_TLS_ENC_LEVEL_APP];
|
||||
int total = 0;
|
||||
|
||||
qc_get_buf(qcs->qcc, &buf);
|
||||
total = b_xfer(&buf, payload, b_data(payload));
|
||||
|
||||
qc_get_buf(qcs->qcc, buf);
|
||||
total = b_force_xfer(buf, payload, QUIC_MIN(b_data(payload), b_room(buf)));
|
||||
frm = pool_zalloc(pool_head_quic_frame);
|
||||
if (!frm)
|
||||
goto err;
|
||||
@ -1411,16 +1412,16 @@ static int qcs_push_frame(struct qcs *qcs, struct buffer *payload, int fin, uint
|
||||
frm->type |= QUIC_STREAM_FRAME_TYPE_FIN_BIT;
|
||||
if (offset) {
|
||||
frm->type |= QUIC_STREAM_FRAME_TYPE_OFF_BIT;
|
||||
frm->stream.offset = offset;
|
||||
frm->stream.offset.key = offset;
|
||||
}
|
||||
frm->stream.qcs = qcs;
|
||||
frm->stream.buf = buf;
|
||||
frm->stream.id = qcs->by_id.key;
|
||||
if (total) {
|
||||
frm->type |= QUIC_STREAM_FRAME_TYPE_LEN_BIT;
|
||||
frm->stream.len = total;
|
||||
frm->stream.data = (unsigned char *)b_head(&buf);
|
||||
}
|
||||
|
||||
struct quic_enc_level *qel = &qcs->qcc->conn->qc->els[QUIC_TLS_ENC_LEVEL_APP];
|
||||
MT_LIST_APPEND(&qel->pktns->tx.frms, &frm->mt_list);
|
||||
fprintf(stderr, "%s: total=%d fin=%d offset=%lu\n", __func__, total, fin, offset);
|
||||
return total;
|
||||
|
@ -375,15 +375,29 @@ static int quic_build_stream_frame(unsigned char **buf, const unsigned char *end
|
||||
struct quic_frame *frm, struct quic_conn *conn)
|
||||
{
|
||||
struct quic_stream *stream = &frm->stream;
|
||||
size_t offset, block1, block2;
|
||||
struct buffer b;
|
||||
|
||||
if (!quic_enc_int(buf, end, stream->id) ||
|
||||
((frm->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT) && !quic_enc_int(buf, end, stream->offset)) ||
|
||||
((frm->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT) && !quic_enc_int(buf, end, stream->offset.key)) ||
|
||||
((frm->type & QUIC_STREAM_FRAME_TYPE_LEN_BIT) &&
|
||||
(!quic_enc_int(buf, end, stream->len) || end - *buf < stream->len)))
|
||||
return 0;
|
||||
|
||||
memcpy(*buf, stream->data, stream->len);
|
||||
*buf += stream->len;
|
||||
/* Buffer copy */
|
||||
b = *stream->buf;
|
||||
offset = (frm->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT) ?
|
||||
stream->offset.key & (b_size(stream->buf) - 1): 0;
|
||||
block1 = b_wrap(&b) - (b_orig(&b) + offset);
|
||||
if (block1 > stream->len)
|
||||
block1 = stream->len;
|
||||
block2 = stream->len - block1;
|
||||
memcpy(*buf, b_orig(&b) + offset, block1);
|
||||
*buf += block1;
|
||||
if (block2) {
|
||||
memcpy(*buf, b_orig(&b), block2);
|
||||
*buf += block2;
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
||||
@ -401,9 +415,9 @@ static int quic_parse_stream_frame(struct quic_frame *frm, struct quic_conn *qc,
|
||||
|
||||
/* Offset parsing */
|
||||
if (!(frm->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT)) {
|
||||
stream->offset = 0;
|
||||
stream->offset.key = 0;
|
||||
}
|
||||
else if (!quic_dec_int(&stream->offset, buf, end))
|
||||
else if (!quic_dec_int((uint64_t *)&stream->offset.key, buf, end))
|
||||
return 0;
|
||||
|
||||
/* Length parsing */
|
||||
|
@ -571,7 +571,7 @@ static void quic_trace(enum trace_level level, uint64_t mask, const struct trace
|
||||
!!(s->id & QUIC_STREAM_FRAME_ID_DIR_BIT),
|
||||
!!(frm->type & QUIC_STREAM_FRAME_TYPE_FIN_BIT),
|
||||
(unsigned long long)s->id,
|
||||
(unsigned long long)s->offset,
|
||||
(unsigned long long)s->offset.key,
|
||||
(unsigned long long)s->len);
|
||||
}
|
||||
}
|
||||
@ -1149,13 +1149,56 @@ static int qc_pkt_decrypt(struct quic_rx_packet *pkt, struct quic_tls_ctx *tls_c
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* Remove from <qcs> stream the acknowledged frames.
|
||||
* Never fails.
|
||||
*/
|
||||
static void qcs_try_to_consume(struct qcs *qcs)
|
||||
{
|
||||
struct eb64_node *frm_node;
|
||||
|
||||
frm_node = eb64_first(&qcs->tx.acked_frms);
|
||||
while (frm_node) {
|
||||
struct quic_stream *strm;
|
||||
|
||||
strm = eb64_entry(&frm_node->node, struct quic_stream, offset);
|
||||
if (strm->offset.key != qcs->tx.ack_offset)
|
||||
break;
|
||||
|
||||
b_del(strm->buf, strm->len);
|
||||
qcs->tx.ack_offset += strm->len;
|
||||
frm_node = eb64_next(frm_node);
|
||||
eb64_delete(&strm->offset);
|
||||
}
|
||||
}
|
||||
|
||||
/* Treat <frm> frame whose packet it is attached to has just been acknowledged. */
|
||||
static inline void qc_treat_acked_tx_frm(struct quic_frame *frm,
|
||||
struct ssl_sock_ctx *ctx)
|
||||
{
|
||||
|
||||
TRACE_PROTO("Removing frame", QUIC_EV_CONN_PRSAFRM, ctx->conn, frm);
|
||||
LIST_DELETE(&frm->list);
|
||||
pool_free(pool_head_quic_frame, frm);
|
||||
switch (frm->type) {
|
||||
case QUIC_FT_STREAM_8 ... QUIC_FT_STREAM_F:
|
||||
{
|
||||
struct qcs *qcs = frm->stream.qcs;
|
||||
struct quic_stream *strm = &frm->stream;
|
||||
|
||||
if (qcs->tx.ack_offset == strm->offset.key) {
|
||||
b_del(strm->buf, strm->len);
|
||||
qcs->tx.ack_offset += strm->len;
|
||||
LIST_DELETE(&frm->list);
|
||||
pool_free(pool_head_quic_frame, frm);
|
||||
}
|
||||
else {
|
||||
eb64_insert(&qcs->tx.acked_frms, &strm->offset);
|
||||
}
|
||||
qcs_try_to_consume(qcs);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
LIST_DELETE(&frm->list);
|
||||
pool_free(pool_head_quic_frame, frm);
|
||||
}
|
||||
}
|
||||
|
||||
/* Remove <largest> down to <smallest> node entries from <pkts> tree of TX packet,
|
||||
@ -1582,7 +1625,7 @@ struct quic_rx_strm_frm *new_quic_rx_strm_frm(struct quic_stream *stream_frm,
|
||||
|
||||
frm = pool_alloc(pool_head_quic_rx_strm_frm);
|
||||
if (frm) {
|
||||
frm->offset_node.key = stream_frm->offset;
|
||||
frm->offset_node.key = stream_frm->offset.key;
|
||||
frm->len = stream_frm->len;
|
||||
frm->data = stream_frm->data;
|
||||
frm->pkt = pkt;
|
||||
@ -1686,7 +1729,7 @@ static size_t qc_strm_cpy(struct buffer *buf, struct quic_stream *strm_frm)
|
||||
try = strm_frm->len;
|
||||
memcpy(b_tail(buf), strm_frm->data, try);
|
||||
strm_frm->len -= try;
|
||||
strm_frm->offset += try;
|
||||
strm_frm->offset.key += try;
|
||||
b_add(buf, try);
|
||||
ret += try;
|
||||
}
|
||||
@ -1715,7 +1758,7 @@ static int qc_handle_bidi_strm_frm(struct quic_rx_packet *pkt,
|
||||
}
|
||||
|
||||
strm = eb64_entry(&strm_node->node, struct qcs, by_id);
|
||||
frm_node = eb64_lookup(&strm->frms, strm_frm->offset);
|
||||
frm_node = eb64_lookup(&strm->rx.frms, strm_frm->offset.key);
|
||||
/* FIXME: handle the case where this frame overlap others */
|
||||
if (frm_node) {
|
||||
TRACE_PROTO("Already existing stream data",
|
||||
@ -1723,7 +1766,7 @@ static int qc_handle_bidi_strm_frm(struct quic_rx_packet *pkt,
|
||||
goto out;
|
||||
}
|
||||
|
||||
if (strm_frm->offset == strm->rx.offset) {
|
||||
if (strm_frm->offset.key == strm->rx.offset) {
|
||||
int ret;
|
||||
|
||||
if (!qc_get_buf(qc->qcc, &strm->rx.buf))
|
||||
@ -1749,7 +1792,7 @@ static int qc_handle_bidi_strm_frm(struct quic_rx_packet *pkt,
|
||||
return 0;
|
||||
}
|
||||
|
||||
eb64_insert(&strm->frms, &frm->offset_node);
|
||||
eb64_insert(&strm->rx.frms, &frm->offset_node);
|
||||
quic_rx_packet_refinc(pkt);
|
||||
|
||||
out:
|
||||
@ -1778,7 +1821,7 @@ static int qc_handle_uni_strm_frm(struct quic_rx_packet *pkt,
|
||||
}
|
||||
|
||||
strm = eb64_entry(&strm_node->node, struct qcs, by_id);
|
||||
frm_node = eb64_lookup(&strm->frms, strm_frm->offset);
|
||||
frm_node = eb64_lookup(&strm->rx.frms, strm_frm->offset.key);
|
||||
/* FIXME: handle the case where this frame overlap others */
|
||||
if (frm_node) {
|
||||
TRACE_PROTO("Already existing stream data",
|
||||
@ -1787,7 +1830,7 @@ static int qc_handle_uni_strm_frm(struct quic_rx_packet *pkt,
|
||||
}
|
||||
|
||||
strm_frm_len = strm_frm->len;
|
||||
if (strm_frm->offset == strm->rx.offset) {
|
||||
if (strm_frm->offset.key == strm->rx.offset) {
|
||||
int ret;
|
||||
|
||||
if (!qc_get_buf(qc->qcc, &strm->rx.buf))
|
||||
@ -1806,7 +1849,7 @@ static int qc_handle_uni_strm_frm(struct quic_rx_packet *pkt,
|
||||
if (ret)
|
||||
ruqs_notify_recv(strm);
|
||||
|
||||
strm_frm->offset += ret;
|
||||
strm_frm->offset.key += ret;
|
||||
}
|
||||
/* Take this frame into an account for the stream flow control */
|
||||
strm->rx.offset += strm_frm_len;
|
||||
@ -1824,7 +1867,7 @@ static int qc_handle_uni_strm_frm(struct quic_rx_packet *pkt,
|
||||
return 0;
|
||||
}
|
||||
|
||||
eb64_insert(&strm->frms, &frm->offset_node);
|
||||
eb64_insert(&strm->rx.frms, &frm->offset_node);
|
||||
quic_rx_packet_refinc(pkt);
|
||||
|
||||
out:
|
||||
@ -3719,11 +3762,11 @@ static inline int qc_build_frms(struct quic_tx_packet *pkt,
|
||||
* excepting the variable ones. Note that +1 is for the type of this frame.
|
||||
*/
|
||||
hlen = 1 + quic_int_getsize(cf->stream.id) +
|
||||
((cf->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT) ? quic_int_getsize(cf->stream.offset) : 0);
|
||||
((cf->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT) ? quic_int_getsize(cf->stream.offset.key) : 0);
|
||||
/* Compute the data length of this STREAM frame. */
|
||||
avail_room = room - hlen - *len;
|
||||
if ((ssize_t)avail_room <= 0)
|
||||
continue;
|
||||
break;
|
||||
|
||||
if (cf->type & QUIC_STREAM_FRAME_TYPE_LEN_BIT) {
|
||||
dlen = max_available_room(avail_room, &dlen_sz);
|
||||
@ -3761,6 +3804,8 @@ static inline int qc_build_frms(struct quic_tx_packet *pkt,
|
||||
}
|
||||
|
||||
new_cf->type = cf->type;
|
||||
new_cf->stream.qcs = cf->stream.qcs;
|
||||
new_cf->stream.buf = cf->stream.buf;
|
||||
new_cf->stream.id = cf->stream.id;
|
||||
if (cf->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT)
|
||||
new_cf->stream.offset = cf->stream.offset;
|
||||
@ -3773,7 +3818,7 @@ static inline int qc_build_frms(struct quic_tx_packet *pkt,
|
||||
cf->type |= QUIC_STREAM_FRAME_TYPE_OFF_BIT;
|
||||
/* Consume <dlen> bytes of the current frame. */
|
||||
cf->stream.len -= dlen;
|
||||
cf->stream.offset += dlen;
|
||||
cf->stream.offset.key += dlen;
|
||||
cf->stream.data += dlen;
|
||||
}
|
||||
break;
|
||||
|
Loading…
Reference in New Issue
Block a user