From 663e872e3a5c8726eb7df41acf54f15037aaa8c1 Mon Sep 17 00:00:00 2001 From: Amaury Denoyelle Date: Fri, 9 Dec 2022 14:58:28 +0100 Subject: [PATCH] MEDIUM: mux-quic: implement STOP_SENDING emission Implement STOP_SENDING. This is divided in two main functions : * qcc_abort_stream_read() which can be used by application protocol to request for a STOP_SENDING. This set the flag QC_SF_READ_ABORTED. * qcs_send_reset() is a static function called after the preceding one. It will send a STOP_SENDING via qcc_send(). QC_SF_READ_ABORTED flag is now properly used : if activated on a stream during qcc_recv(), callback is skipped. Also, abort reading on unknown unidirection remote stream is now fully supported with the emission of a STOP_SENDING as specified by RFC 9000. This commit is part of implementing H3 errors at the stream level. This will allows the H3 layer to request the peer to close its endpoint for an error on a stream. This should be backported up to 2.7. --- include/haproxy/mux_quic-t.h | 3 +- include/haproxy/mux_quic.h | 1 + src/h3.c | 2 +- src/mux_quic.c | 97 ++++++++++++++++++++++++++++++------ 4 files changed, 87 insertions(+), 16 deletions(-) diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h index d716e08d5..f1c96f077 100644 --- a/include/haproxy/mux_quic-t.h +++ b/include/haproxy/mux_quic-t.h @@ -117,9 +117,10 @@ struct qcc { #define QC_SF_DETACH 0x00000008 /* sc is detached but there is remaining data to send */ #define QC_SF_BLK_SFCTL 0x00000010 /* stream blocked due to stream flow control limit */ #define QC_SF_DEM_FULL 0x00000020 /* demux blocked on request channel buffer full */ -#define QC_SF_READ_ABORTED 0x00000040 /* stream rejected by app layer */ +#define QC_SF_READ_ABORTED 0x00000040 /* Rx closed using STOP_SENDING*/ #define QC_SF_TO_RESET 0x00000080 /* a RESET_STREAM must be sent */ #define QC_SF_HREQ_RECV 0x00000100 /* a full HTTP request has been received */ +#define QC_SF_TO_STOP_SENDING 0x00000200 /* a STOP_SENDING must be sent */ /* Maximum size of stream Rx buffer. */ #define QC_S_RX_BUF_SZ (global.tune.bufsize - NCB_RESERVED_SZ) diff --git a/include/haproxy/mux_quic.h b/include/haproxy/mux_quic.h index 0a88f6b4b..a165779fc 100644 --- a/include/haproxy/mux_quic.h +++ b/include/haproxy/mux_quic.h @@ -21,6 +21,7 @@ void qcs_notify_send(struct qcs *qcs); void qcc_emit_cc_app(struct qcc *qcc, int err, int immediate); void qcc_reset_stream(struct qcs *qcs, int err); +void qcc_abort_stream_read(struct qcs *qcs); int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset, char fin, char *data); int qcc_recv_max_data(struct qcc *qcc, uint64_t max); diff --git a/src/h3.c b/src/h3.c index 8912fa5d9..518f413e5 100644 --- a/src/h3.c +++ b/src/h3.c @@ -220,7 +220,7 @@ static ssize_t h3_init_uni_stream(struct h3c *h3c, struct qcs *qcs, * Implementations MUST [...] abort reading on unidirectional * streams that have unknown or unsupported types. */ - qcs->flags |= QC_SF_READ_ABORTED; + qcc_abort_stream_read(qcs); return -1; }; diff --git a/src/mux_quic.c b/src/mux_quic.c index c780ebcad..fbc4a935a 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -755,10 +755,16 @@ static int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs) if (qcs_is_close_remote(qcs)) fin = 1; - ret = qcc->app_ops->decode_qcs(qcs, &b, fin); - if (ret < 0) { - TRACE_ERROR("decoding error", QMUX_EV_QCS_RECV, qcc->conn, qcs); - goto err; + if (!(qcs->flags & QC_SF_READ_ABORTED)) { + ret = qcc->app_ops->decode_qcs(qcs, &b, fin); + if (ret < 0) { + TRACE_ERROR("decoding error", QMUX_EV_QCS_RECV, qcc->conn, qcs); + goto err; + } + } + else { + TRACE_DATA("ignore read on stream", QMUX_EV_QCS_RECV, qcc->conn, qcs); + ret = b_data(&b); } if (ret) { @@ -813,6 +819,24 @@ void qcc_reset_stream(struct qcs *qcs, int err) tasklet_wakeup(qcc->wait_event.tasklet); } +/* Prepare for the emission of STOP_SENDING on . */ +void qcc_abort_stream_read(struct qcs *qcs) +{ + struct qcc *qcc = qcs->qcc; + + TRACE_ENTER(QMUX_EV_QCC_NEW, qcc->conn, qcs); + + if ((qcs->flags & QC_SF_TO_STOP_SENDING) || qcs_is_close_remote(qcs)) + goto end; + + TRACE_STATE("abort stream read", QMUX_EV_QCS_END, qcc->conn, qcs); + qcs->flags |= (QC_SF_TO_STOP_SENDING|QC_SF_READ_ABORTED); + tasklet_wakeup(qcc->wait_event.tasklet); + + end: + TRACE_LEAVE(QMUX_EV_QCC_NEW, qcc->conn, qcs); +} + /* Install the applicative layer of a QUIC connection on mux . * Returns 0 on success else non-zero. */ @@ -977,11 +1001,6 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset, qcc_refresh_timeout(qcc); } - if (qcs->flags & QC_SF_READ_ABORTED) { - /* TODO should send a STOP_SENDING */ - qcs_free(qcs); - } - out: TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn); return 0; @@ -1538,6 +1557,58 @@ static int qcs_send_reset(struct qcs *qcs) return 0; } +/* Emit a STOP_SENDING on . + * + * Returns 0 if the frame has been successfully sent else non-zero. + */ +static int qcs_send_stop_sending(struct qcs *qcs) +{ + struct list frms = LIST_HEAD_INIT(frms); + struct quic_frame *frm; + struct qcc *qcc = qcs->qcc; + + TRACE_ENTER(QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs); + + /* RFC 9000 3.3. Permitted Frame Types + * + * A + * receiver MAY send a STOP_SENDING frame in any state where it has not + * received a RESET_STREAM frame -- that is, states other than "Reset + * Recvd" or "Reset Read". However, there is little value in sending a + * STOP_SENDING frame in the "Data Recvd" state, as all stream data has + * been received. A sender could receive either of these two types of + * frames in any state as a result of delayed delivery of packets.ΒΆ + */ + if (qcs_is_close_remote(qcs)) { + TRACE_STATE("skip STOP_SENDING on remote already closed", QMUX_EV_QCS_SEND, qcc->conn, qcs); + goto done; + } + + frm = pool_zalloc(pool_head_quic_frame); + if (!frm) { + TRACE_LEAVE(QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs); + return 1; + } + + LIST_INIT(&frm->reflist); + frm->type = QUIC_FT_STOP_SENDING; + frm->stop_sending.id = qcs->id; + frm->stop_sending.app_error_code = qcs->err; + + LIST_APPEND(&frms, &frm->list); + if (qc_send_frames(qcs->qcc, &frms)) { + pool_free(pool_head_quic_frame, frm); + TRACE_DEVEL("cannot send STOP_SENDING", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs); + return 1; + } + + done: + qcs->flags &= ~QC_SF_TO_STOP_SENDING; + + TRACE_LEAVE(QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs); + return 0; +} + /* Used internally by qc_send function. Proceed to send for . This will * transfer data from qcs buffer to its quic_stream counterpart. A STREAM frame * is then generated and inserted in list. @@ -1650,6 +1721,9 @@ static int qc_send(struct qcc *qcc) continue; } + if (qcs->flags & QC_SF_TO_STOP_SENDING) + qcs_send_stop_sending(qcs); + if (qcs->flags & QC_SF_TO_RESET) { qcs_send_reset(qcs); node = eb64_next(node); @@ -1751,11 +1825,6 @@ static int qc_recv(struct qcc *qcc) qcc_decode_qcs(qcc, qcs); node = eb64_next(node); - - if (qcs->flags & QC_SF_READ_ABORTED) { - /* TODO should send a STOP_SENDING */ - qcs_free(qcs); - } } TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn);