diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h index f1c96f077..256cdd93a 100644 --- a/include/haproxy/mux_quic-t.h +++ b/include/haproxy/mux_quic-t.h @@ -95,6 +95,7 @@ struct qcc { struct eb_root streams_by_id; /* all active streams by their ID */ struct list send_retry_list; /* list of qcs eligible to send retry */ + struct list send_list; /* list of qcs ready to send */ struct wait_event wait_event; /* To be used if we're waiting for I/Os */ @@ -174,6 +175,7 @@ struct qcs { struct qc_stream_desc *stream; struct list el; /* element of qcc.send_retry_list */ + struct list el_send; /* element of qcc.send_list */ struct list el_opening; /* element of qcc.opening_list */ struct wait_event wait_event; diff --git a/include/haproxy/mux_quic.h b/include/haproxy/mux_quic.h index a165779fc..1d374b0b0 100644 --- a/include/haproxy/mux_quic.h +++ b/include/haproxy/mux_quic.h @@ -21,6 +21,7 @@ void qcs_notify_send(struct qcs *qcs); void qcc_emit_cc_app(struct qcc *qcc, int err, int immediate); void qcc_reset_stream(struct qcs *qcs, int err); +void qcc_send_stream(struct qcs *qcs); void qcc_abort_stream_read(struct qcs *qcs); int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset, char fin, char *data); diff --git a/src/h3.c b/src/h3.c index 80357647b..e84044857 100644 --- a/src/h3.c +++ b/src/h3.c @@ -1032,8 +1032,11 @@ static int h3_control_send(struct qcs *qcs, void *ctx) } ret = b_force_xfer(res, &pos, b_data(&pos)); - if (ret > 0) + if (ret > 0) { + /* Register qcs for sending before other streams. */ + qcc_send_stream(qcs); h3c->flags |= H3_CF_SETTINGS_SENT; + } TRACE_LEAVE(H3_EV_TX_SETTINGS, qcs->qcc->conn, qcs); return ret; diff --git a/src/mux_quic.c b/src/mux_quic.c index 120eeb1f1..6a324acf5 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -61,6 +61,7 @@ static void qcs_free(struct qcs *qcs) /* Safe to use even if already removed from the list. */ LIST_DEL_INIT(&qcs->el_opening); + LIST_DEL_INIT(&qcs->el_send); /* Release stream endpoint descriptor. */ BUG_ON(qcs->sd && !se_fl_test(qcs->sd, SE_FL_ORPHAN)); @@ -112,6 +113,7 @@ static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type) * These fields must be initialed before. */ LIST_INIT(&qcs->el_opening); + LIST_INIT(&qcs->el_send); qcs->start = TICK_ETERNITY; /* store transport layer stream descriptor in qcc tree */ @@ -819,6 +821,22 @@ void qcc_reset_stream(struct qcs *qcs, int err) tasklet_wakeup(qcc->wait_event.tasklet); } +/* Register stream for emission of STREAM, STOP_SENDING or RESET_STREAM. */ +void qcc_send_stream(struct qcs *qcs) +{ + struct qcc *qcc = qcs->qcc; + + TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs); + + /* Cannot send if already closed. */ + BUG_ON(qcs_is_close_local(qcs)); + + if (!LIST_INLIST(&qcs->el_send)) + LIST_APPEND(&qcs->qcc->send_list, &qcs->el_send); + + TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs); +} + /* Prepare for the emission of STOP_SENDING on . */ void qcc_abort_stream_read(struct qcs *qcs) { @@ -1067,6 +1085,7 @@ int qcc_recv_max_stream_data(struct qcc *qcc, uint64_t id, uint64_t max) if (qcs->flags & QC_SF_BLK_SFCTL) { qcs->flags &= ~QC_SF_BLK_SFCTL; + /* TODO optim: only wakeup IO-CB if stream has data to sent. */ tasklet_wakeup(qcc->wait_event.tasklet); } } @@ -1469,12 +1488,17 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset) } } - if (qcs->tx.offset == qcs->tx.sent_offset && !b_data(&qcs->tx.buf) && - qcs->flags & (QC_SF_FIN_STREAM|QC_SF_DETACH)) { - /* Close stream locally. */ - qcs_close_local(qcs); - /* Reset flag to not emit multiple FIN STREAM frames. */ - qcs->flags &= ~QC_SF_FIN_STREAM; + if (qcs->tx.offset == qcs->tx.sent_offset && !b_data(&qcs->tx.buf)) { + /* Remove stream from send_list if all was sent. */ + LIST_DEL_INIT(&qcs->el_send); + TRACE_STATE("stream sent done", QMUX_EV_QCS_SEND, qcc->conn, qcs); + + if (qcs->flags & (QC_SF_FIN_STREAM|QC_SF_DETACH)) { + /* Close stream locally. */ + qcs_close_local(qcs); + /* Reset flag to not emit multiple FIN STREAM frames. */ + qcs->flags &= ~QC_SF_FIN_STREAM; + } } out: @@ -1627,6 +1651,9 @@ static int _qc_send_qcs(struct qcs *qcs, struct list *frms) int xfer = 0; char fin = 0; + /* Cannot send STREAM on remote unidirectional streams. */ + BUG_ON(quic_stream_is_uni(qcs->id) && quic_stream_is_remote(qcc, qcs->id)); + /* Allocate buffer if necessary. */ if (!out) { if (qcc->flags & QC_CF_CONN_FULL) @@ -1708,12 +1735,13 @@ static int qc_send(struct qcc *qcc) qcc->flags |= QC_CF_APP_FINAL; } - /* loop through all streams, construct STREAM frames if data available. - * TODO optimize the loop to favor streams which are not too heavy. + /* Loop through all streams for STOP_SENDING/RESET_STREAM sending. Each + * frame is send individually to guarantee emission. + * + * TODO Optimize sending by multiplexing several frames in one datagram. */ node = eb64_first(&qcc->streams_by_id); while (node) { - int ret; uint64_t id; qcs = eb64_entry(node, struct qcs, by_id); @@ -1733,28 +1761,25 @@ static int qc_send(struct qcc *qcc) continue; } - if (qcs_is_close_local(qcs)) { - node = eb64_next(node); - continue; - } - - if (qcs->flags & QC_SF_BLK_SFCTL) { - node = eb64_next(node); - continue; - } - - /* Check if there is something to send. */ - if (!b_data(&qcs->tx.buf) && !qcs_stream_fin(qcs) && - !qc_stream_buf_get(qcs->stream)) { - node = eb64_next(node); - continue; - } - - ret = _qc_send_qcs(qcs, &frms); - total += ret; node = eb64_next(node); } + /* Send STREAM data for registered streams. */ + list_for_each_entry(qcs, &qcc->send_list, el_send) { + /* Stream must not be present in send_list if it has nothing to send. */ + BUG_ON(!b_data(&qcs->tx.buf) && + qcs->tx.sent_offset == qcs->tx.offset && + !qcs_stream_fin(qcs)); + + if (qcs_is_close_local(qcs)) { + LIST_DEL_INIT(&qcs->el_send); + continue; + } + + if (!(qcs->flags & QC_SF_BLK_SFCTL)) + total += _qc_send_qcs(qcs, &frms); + } + if (qc_send_frames(qcc, &frms)) { /* data rejected by transport layer, do not retry. */ goto out; @@ -1780,6 +1805,7 @@ static int qc_send(struct qcc *qcc) /* Deallocate frames that the transport layer has rejected. */ if (!LIST_ISEMPTY(&frms)) { struct quic_frame *frm, *frm2; + list_for_each_entry_safe(frm, frm2, &frms, list) { LIST_DELETE(&frm->list); pool_free(pool_head_quic_frame, frm); @@ -2130,6 +2156,7 @@ static int qc_init(struct connection *conn, struct proxy *prx, goto fail_no_tasklet; } + LIST_INIT(&qcc->send_list); LIST_INIT(&qcc->send_retry_list); qcc->wait_event.tasklet->process = qc_io_cb; @@ -2300,6 +2327,7 @@ static size_t qc_send_buf(struct stconn *sc, struct buffer *buf, qcs->flags |= QC_SF_FIN_STREAM; if (ret || fin) { + qcc_send_stream(qcs); if (!(qcs->qcc->wait_event.events & SUB_RETRY_SEND)) tasklet_wakeup(qcs->qcc->wait_event.tasklet); }