mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2025-01-29 01:22:53 +00:00
MEDIUM: h3: properly manage tx buffers for large data
Properly handle tx buffers management in h3 data sending. If there is not enough contiguous space, the buffer is first realigned. If this is not enough, the stream is flagged with QC_SF_BLK_MROOM waiting for the buffer to be emptied. If a frame on a stream is successfully pushed for sending, the stream is called if it was flagged with QC_SF_BLK_MROOM.
This commit is contained in:
parent
d3d97c6ae7
commit
a543eb1f6f
44
src/h3.c
44
src/h3.c
@ -534,8 +534,7 @@ static int h3_resp_data_send(struct qcs *qcs, struct buffer *buf, size_t count)
|
||||
struct buffer *res;
|
||||
size_t total = 0;
|
||||
struct htx *htx;
|
||||
int bsize, fsize;
|
||||
int frame_length_size; /* size in bytes of frame length varint field */
|
||||
int bsize, fsize, hsize;
|
||||
struct htx_blk *blk;
|
||||
enum htx_blk_type type;
|
||||
|
||||
@ -557,19 +556,34 @@ static int h3_resp_data_send(struct qcs *qcs, struct buffer *buf, size_t count)
|
||||
if (fsize > count)
|
||||
fsize = count;
|
||||
|
||||
frame_length_size = quic_int_getsize(fsize);
|
||||
/* h3 DATA headers : 1-byte frame type + varint frame length */
|
||||
hsize = 1 + QUIC_VARINT_MAX_SIZE;
|
||||
|
||||
b_reset(&outbuf);
|
||||
outbuf = b_make(b_tail(res), b_contig_space(res), 0, 0);
|
||||
while (1) {
|
||||
b_reset(&outbuf);
|
||||
outbuf = b_make(b_tail(res), b_contig_space(res), 0, 0);
|
||||
if (b_size(&outbuf) > hsize || !b_space_wraps(res))
|
||||
break;
|
||||
b_slow_realign(res, trash.area, b_data(res));
|
||||
}
|
||||
|
||||
if (1 + fsize + frame_length_size > b_room(&outbuf))
|
||||
ABORT_NOW();
|
||||
/* not enough room for headers and at least one data byte, block the
|
||||
* stream
|
||||
*/
|
||||
if (b_size(&outbuf) <= hsize) {
|
||||
qcs->flags |= QC_SF_BLK_MROOM;
|
||||
goto end;
|
||||
}
|
||||
|
||||
b_putchr(&outbuf, 0x00); /* h3 frame type = DATA */
|
||||
b_quic_enc_int(&outbuf, fsize);
|
||||
if (b_size(&outbuf) < hsize + fsize)
|
||||
fsize = b_size(&outbuf) - hsize;
|
||||
BUG_ON(fsize <= 0);
|
||||
|
||||
b_putchr(&outbuf, 0x00); /* h3 frame type = DATA */
|
||||
b_quic_enc_int(&outbuf, fsize); /* h3 frame length */
|
||||
|
||||
total += fsize;
|
||||
b_putblk(&outbuf, htx_get_blk_ptr(htx, blk), fsize);
|
||||
total += fsize;
|
||||
count -= fsize;
|
||||
|
||||
if (fsize == bsize)
|
||||
@ -577,6 +591,7 @@ static int h3_resp_data_send(struct qcs *qcs, struct buffer *buf, size_t count)
|
||||
else
|
||||
htx_cut_data_blk(htx, blk, fsize);
|
||||
|
||||
/* commit the buffer */
|
||||
b_add(res, b_data(&outbuf));
|
||||
goto new_frame;
|
||||
|
||||
@ -597,7 +612,7 @@ size_t h3_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int
|
||||
|
||||
htx = htx_from_buf(buf);
|
||||
|
||||
while (count && !htx_is_empty(htx)) {
|
||||
while (count && !htx_is_empty(htx) && !(qcs->flags & QC_SF_BLK_MROOM)) {
|
||||
idx = htx_get_head(htx);
|
||||
blk = htx_get_blk(htx, idx);
|
||||
btype = htx_get_blk_type(blk);
|
||||
@ -643,10 +658,13 @@ size_t h3_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int
|
||||
|
||||
if ((htx->flags & HTX_FL_EOM) && htx_is_empty(htx))
|
||||
qcs->flags |= QC_SF_FIN_STREAM;
|
||||
// TODO should I call the mux directly here ?
|
||||
qc_snd_buf(cs, buf, total, flags);
|
||||
|
||||
out:
|
||||
if (total) {
|
||||
if (!(qcs->qcc->wait_event.events & SUB_RETRY_SEND))
|
||||
tasklet_wakeup(qcs->qcc->wait_event.tasklet);
|
||||
}
|
||||
|
||||
return total;
|
||||
}
|
||||
|
||||
|
@ -1350,7 +1350,7 @@ static int qcs_push_frame(struct qcs *qcs, struct buffer *payload, int fin, uint
|
||||
static int qc_send(struct qcc *qcc)
|
||||
{
|
||||
struct eb64_node *node;
|
||||
int ret, done;
|
||||
int ret, done, xprt_wake = 0;
|
||||
|
||||
TRACE_ENTER(QC_EV_QCC_SEND, qcc->conn);
|
||||
ret = done = 0;
|
||||
@ -1380,18 +1380,26 @@ static int qc_send(struct qcc *qcc)
|
||||
if (ret < 0)
|
||||
ABORT_NOW();
|
||||
|
||||
if (ret > 0) {
|
||||
xprt_wake = 1;
|
||||
if (qcs->flags & QC_SF_BLK_MROOM) {
|
||||
qcs->flags &= ~QC_SF_BLK_MROOM;
|
||||
qcs_notify_send(qcs);
|
||||
}
|
||||
}
|
||||
|
||||
qcs->tx.offset += ret;
|
||||
|
||||
if (b_data(buf)) {
|
||||
qcc->conn->xprt->subscribe(qcc->conn, qcc->conn->xprt_ctx,
|
||||
SUB_RETRY_SEND, &qcc->wait_event);
|
||||
break;
|
||||
}
|
||||
}
|
||||
node = eb64_next(node);
|
||||
}
|
||||
if (ret > 0)
|
||||
tasklet_wakeup(((struct ssl_sock_ctx *)(qcc->conn->xprt_ctx))->wait_event.tasklet);
|
||||
|
||||
if (xprt_wake)
|
||||
tasklet_wakeup(((struct ssl_sock_ctx *)(qcc->conn->xprt_ctx))->wait_event.tasklet);
|
||||
|
||||
TRACE_LEAVE(QC_EV_QCC_SEND, qcc->conn);
|
||||
return 0;
|
||||
@ -1834,25 +1842,6 @@ static size_t qc_rcv_buf(struct conn_stream *cs, struct buffer *buf, size_t coun
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* Called from the upper layer, to send data from buffer <buf> for no more than
|
||||
* <count> bytes. Returns the number of bytes effectively sent. Some status
|
||||
* flags may be updated on the conn_stream.
|
||||
*/
|
||||
size_t qc_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags)
|
||||
{
|
||||
struct qcs *qcs = cs->ctx;
|
||||
|
||||
TRACE_ENTER(QC_EV_QCS_SEND|QC_EV_STRM_SEND, qcs->qcc->conn, qcs);
|
||||
|
||||
if (count) {
|
||||
if (!(qcs->qcc->wait_event.events & SUB_RETRY_SEND))
|
||||
tasklet_wakeup(qcs->qcc->wait_event.tasklet);
|
||||
}
|
||||
|
||||
TRACE_LEAVE(QC_EV_QCS_SEND|QC_EV_STRM_SEND, qcs->qcc->conn, qcs);
|
||||
return count;
|
||||
}
|
||||
|
||||
/* Called from the upper layer, to send data from buffer <buf> for no more than
|
||||
* <count> bytes. Returns the number of bytes effectively sent. Some status
|
||||
* flags may be updated on the mux.
|
||||
|
Loading…
Reference in New Issue
Block a user