mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2025-03-11 05:48:41 +00:00
MAJOR: mux-quic: rework stream sending priorization
Implement a mechanism to register streams ready to send data in new STREAM frames. Internally, this is implemented with a new list <qcc.send_list> which contains qcs instances. A qcs can be registered safely using the new function qcc_send_stream(). This is done automatically in qc_send_buf() which covers most cases. Also, application layer is free to use it for internal usage streams. This is currently the case for H3 control stream with SETTINGS sending. The main point of this patch is to handle stream sending fairly. This is in stark contrast with previous code where streams with lower ID were always prioritized. This could cause other streams to be indefinitely blocked behind a stream which has a lot of data to transfer. Now, streams are handled in an order scheduled by se_desc layer. This commit is the first one of a serie which will bring other improvments which also relied on the send_list implementation. This must be backported up to 2.7 when deemed sufficiently stable.
This commit is contained in:
parent
31d2057c59
commit
20f2a425ff
@ -95,6 +95,7 @@ struct qcc {
|
||||
struct eb_root streams_by_id; /* all active streams by their ID */
|
||||
|
||||
struct list send_retry_list; /* list of qcs eligible to send retry */
|
||||
struct list send_list; /* list of qcs ready to send */
|
||||
|
||||
struct wait_event wait_event; /* To be used if we're waiting for I/Os */
|
||||
|
||||
@ -174,6 +175,7 @@ struct qcs {
|
||||
struct qc_stream_desc *stream;
|
||||
|
||||
struct list el; /* element of qcc.send_retry_list */
|
||||
struct list el_send; /* element of qcc.send_list */
|
||||
struct list el_opening; /* element of qcc.opening_list */
|
||||
|
||||
struct wait_event wait_event;
|
||||
|
@ -21,6 +21,7 @@ void qcs_notify_send(struct qcs *qcs);
|
||||
|
||||
void qcc_emit_cc_app(struct qcc *qcc, int err, int immediate);
|
||||
void qcc_reset_stream(struct qcs *qcs, int err);
|
||||
void qcc_send_stream(struct qcs *qcs);
|
||||
void qcc_abort_stream_read(struct qcs *qcs);
|
||||
int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
|
||||
char fin, char *data);
|
||||
|
5
src/h3.c
5
src/h3.c
@ -1032,8 +1032,11 @@ static int h3_control_send(struct qcs *qcs, void *ctx)
|
||||
}
|
||||
|
||||
ret = b_force_xfer(res, &pos, b_data(&pos));
|
||||
if (ret > 0)
|
||||
if (ret > 0) {
|
||||
/* Register qcs for sending before other streams. */
|
||||
qcc_send_stream(qcs);
|
||||
h3c->flags |= H3_CF_SETTINGS_SENT;
|
||||
}
|
||||
|
||||
TRACE_LEAVE(H3_EV_TX_SETTINGS, qcs->qcc->conn, qcs);
|
||||
return ret;
|
||||
|
@ -61,6 +61,7 @@ static void qcs_free(struct qcs *qcs)
|
||||
|
||||
/* Safe to use even if already removed from the list. */
|
||||
LIST_DEL_INIT(&qcs->el_opening);
|
||||
LIST_DEL_INIT(&qcs->el_send);
|
||||
|
||||
/* Release stream endpoint descriptor. */
|
||||
BUG_ON(qcs->sd && !se_fl_test(qcs->sd, SE_FL_ORPHAN));
|
||||
@ -112,6 +113,7 @@ static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
|
||||
* These fields must be initialed before.
|
||||
*/
|
||||
LIST_INIT(&qcs->el_opening);
|
||||
LIST_INIT(&qcs->el_send);
|
||||
qcs->start = TICK_ETERNITY;
|
||||
|
||||
/* store transport layer stream descriptor in qcc tree */
|
||||
@ -819,6 +821,22 @@ void qcc_reset_stream(struct qcs *qcs, int err)
|
||||
tasklet_wakeup(qcc->wait_event.tasklet);
|
||||
}
|
||||
|
||||
/* Register <qcs> stream for emission of STREAM, STOP_SENDING or RESET_STREAM. */
|
||||
void qcc_send_stream(struct qcs *qcs)
|
||||
{
|
||||
struct qcc *qcc = qcs->qcc;
|
||||
|
||||
TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs);
|
||||
|
||||
/* Cannot send if already closed. */
|
||||
BUG_ON(qcs_is_close_local(qcs));
|
||||
|
||||
if (!LIST_INLIST(&qcs->el_send))
|
||||
LIST_APPEND(&qcs->qcc->send_list, &qcs->el_send);
|
||||
|
||||
TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs);
|
||||
}
|
||||
|
||||
/* Prepare for the emission of STOP_SENDING on <qcs>. */
|
||||
void qcc_abort_stream_read(struct qcs *qcs)
|
||||
{
|
||||
@ -1067,6 +1085,7 @@ int qcc_recv_max_stream_data(struct qcc *qcc, uint64_t id, uint64_t max)
|
||||
|
||||
if (qcs->flags & QC_SF_BLK_SFCTL) {
|
||||
qcs->flags &= ~QC_SF_BLK_SFCTL;
|
||||
/* TODO optim: only wakeup IO-CB if stream has data to sent. */
|
||||
tasklet_wakeup(qcc->wait_event.tasklet);
|
||||
}
|
||||
}
|
||||
@ -1469,12 +1488,17 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset)
|
||||
}
|
||||
}
|
||||
|
||||
if (qcs->tx.offset == qcs->tx.sent_offset && !b_data(&qcs->tx.buf) &&
|
||||
qcs->flags & (QC_SF_FIN_STREAM|QC_SF_DETACH)) {
|
||||
/* Close stream locally. */
|
||||
qcs_close_local(qcs);
|
||||
/* Reset flag to not emit multiple FIN STREAM frames. */
|
||||
qcs->flags &= ~QC_SF_FIN_STREAM;
|
||||
if (qcs->tx.offset == qcs->tx.sent_offset && !b_data(&qcs->tx.buf)) {
|
||||
/* Remove stream from send_list if all was sent. */
|
||||
LIST_DEL_INIT(&qcs->el_send);
|
||||
TRACE_STATE("stream sent done", QMUX_EV_QCS_SEND, qcc->conn, qcs);
|
||||
|
||||
if (qcs->flags & (QC_SF_FIN_STREAM|QC_SF_DETACH)) {
|
||||
/* Close stream locally. */
|
||||
qcs_close_local(qcs);
|
||||
/* Reset flag to not emit multiple FIN STREAM frames. */
|
||||
qcs->flags &= ~QC_SF_FIN_STREAM;
|
||||
}
|
||||
}
|
||||
|
||||
out:
|
||||
@ -1627,6 +1651,9 @@ static int _qc_send_qcs(struct qcs *qcs, struct list *frms)
|
||||
int xfer = 0;
|
||||
char fin = 0;
|
||||
|
||||
/* Cannot send STREAM on remote unidirectional streams. */
|
||||
BUG_ON(quic_stream_is_uni(qcs->id) && quic_stream_is_remote(qcc, qcs->id));
|
||||
|
||||
/* Allocate <out> buffer if necessary. */
|
||||
if (!out) {
|
||||
if (qcc->flags & QC_CF_CONN_FULL)
|
||||
@ -1708,12 +1735,13 @@ static int qc_send(struct qcc *qcc)
|
||||
qcc->flags |= QC_CF_APP_FINAL;
|
||||
}
|
||||
|
||||
/* loop through all streams, construct STREAM frames if data available.
|
||||
* TODO optimize the loop to favor streams which are not too heavy.
|
||||
/* Loop through all streams for STOP_SENDING/RESET_STREAM sending. Each
|
||||
* frame is send individually to guarantee emission.
|
||||
*
|
||||
* TODO Optimize sending by multiplexing several frames in one datagram.
|
||||
*/
|
||||
node = eb64_first(&qcc->streams_by_id);
|
||||
while (node) {
|
||||
int ret;
|
||||
uint64_t id;
|
||||
|
||||
qcs = eb64_entry(node, struct qcs, by_id);
|
||||
@ -1733,28 +1761,25 @@ static int qc_send(struct qcc *qcc)
|
||||
continue;
|
||||
}
|
||||
|
||||
if (qcs_is_close_local(qcs)) {
|
||||
node = eb64_next(node);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (qcs->flags & QC_SF_BLK_SFCTL) {
|
||||
node = eb64_next(node);
|
||||
continue;
|
||||
}
|
||||
|
||||
/* Check if there is something to send. */
|
||||
if (!b_data(&qcs->tx.buf) && !qcs_stream_fin(qcs) &&
|
||||
!qc_stream_buf_get(qcs->stream)) {
|
||||
node = eb64_next(node);
|
||||
continue;
|
||||
}
|
||||
|
||||
ret = _qc_send_qcs(qcs, &frms);
|
||||
total += ret;
|
||||
node = eb64_next(node);
|
||||
}
|
||||
|
||||
/* Send STREAM data for registered streams. */
|
||||
list_for_each_entry(qcs, &qcc->send_list, el_send) {
|
||||
/* Stream must not be present in send_list if it has nothing to send. */
|
||||
BUG_ON(!b_data(&qcs->tx.buf) &&
|
||||
qcs->tx.sent_offset == qcs->tx.offset &&
|
||||
!qcs_stream_fin(qcs));
|
||||
|
||||
if (qcs_is_close_local(qcs)) {
|
||||
LIST_DEL_INIT(&qcs->el_send);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!(qcs->flags & QC_SF_BLK_SFCTL))
|
||||
total += _qc_send_qcs(qcs, &frms);
|
||||
}
|
||||
|
||||
if (qc_send_frames(qcc, &frms)) {
|
||||
/* data rejected by transport layer, do not retry. */
|
||||
goto out;
|
||||
@ -1780,6 +1805,7 @@ static int qc_send(struct qcc *qcc)
|
||||
/* Deallocate frames that the transport layer has rejected. */
|
||||
if (!LIST_ISEMPTY(&frms)) {
|
||||
struct quic_frame *frm, *frm2;
|
||||
|
||||
list_for_each_entry_safe(frm, frm2, &frms, list) {
|
||||
LIST_DELETE(&frm->list);
|
||||
pool_free(pool_head_quic_frame, frm);
|
||||
@ -2130,6 +2156,7 @@ static int qc_init(struct connection *conn, struct proxy *prx,
|
||||
goto fail_no_tasklet;
|
||||
}
|
||||
|
||||
LIST_INIT(&qcc->send_list);
|
||||
LIST_INIT(&qcc->send_retry_list);
|
||||
|
||||
qcc->wait_event.tasklet->process = qc_io_cb;
|
||||
@ -2300,6 +2327,7 @@ static size_t qc_send_buf(struct stconn *sc, struct buffer *buf,
|
||||
qcs->flags |= QC_SF_FIN_STREAM;
|
||||
|
||||
if (ret || fin) {
|
||||
qcc_send_stream(qcs);
|
||||
if (!(qcs->qcc->wait_event.events & SUB_RETRY_SEND))
|
||||
tasklet_wakeup(qcs->qcc->wait_event.tasklet);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user