diff --git a/src/mux_quic.c b/src/mux_quic.c index dc0230600..5262488b9 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -2129,13 +2129,39 @@ static void qc_shutdown(struct qcc *qcc) TRACE_LEAVE(QMUX_EV_QCC_END, qcc->conn); } +/* Loop through all qcs from . Report error on stream endpoint if + * connection on error and wake them. + */ +static int qc_wake_some_streams(struct qcc *qcc) +{ + struct qcs *qcs; + struct eb64_node *node; + + TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn); + + for (node = eb64_first(&qcc->streams_by_id); node; + node = eb64_next(node)) { + qcs = eb64_entry(node, struct qcs, by_id); + + if (!qcs_sc(qcs)) + continue; + + if (qcc->conn->flags & CO_FL_ERROR || qcc->flags & QC_CF_ERRL) { + TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn, qcs); + se_fl_set_error(qcs->sd); + qcs_alert(qcs); + } + } + + return 0; +} + /* Conduct operations which should be made for connection after * input/output. Most notably, closed streams are purged which may leave the * connection has ready to be released. * * Returns 1 if must be released else 0. */ - static int qc_process(struct qcc *qcc) { qc_purge_streams(qcc); @@ -2179,6 +2205,10 @@ static int qc_process(struct qcc *qcc) qc_shutdown(qcc); } + /* Report error if set on stream endpoint layer. */ + if (qcc->flags & QC_CF_ERRL) + qc_wake_some_streams(qcc); + out: if (qcc_is_dead(qcc)) return 1; @@ -2593,7 +2623,7 @@ static size_t qc_send_buf(struct stconn *sc, struct buffer *buf, size_t count, int flags) { struct qcs *qcs = __sc_mux_strm(sc); - size_t ret; + size_t ret = 0; char fin; TRACE_ENTER(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs); @@ -2601,6 +2631,13 @@ static size_t qc_send_buf(struct stconn *sc, struct buffer *buf, /* stream layer has been detached so no transfer must occur after. */ BUG_ON_HOT(qcs->flags & QC_SF_DETACH); + /* Report error if set on stream endpoint layer. */ + if (qcs->qcc->flags & QC_CF_ERRL) { + se_fl_set(qcs->sd, SE_FL_ERROR); + TRACE_DEVEL("connection in error", QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs); + goto end; + } + if (qcs_is_close_local(qcs) || (qcs->flags & QC_SF_TO_RESET)) { ret = qcs_http_reset_buf(qcs, buf, count); goto end; @@ -2653,31 +2690,6 @@ static int qc_unsubscribe(struct stconn *sc, int event_type, struct wait_event * return 0; } -/* Loop through all qcs from . If CO_FL_ERROR is set on the connection, - * report SE_FL_ERR_PENDING|SE_FL_ERROR on the attached stream connectors and - * wake them. - */ -static int qc_wake_some_streams(struct qcc *qcc) -{ - struct qcs *qcs; - struct eb64_node *node; - - for (node = eb64_first(&qcc->streams_by_id); node; - node = eb64_next(node)) { - qcs = eb64_entry(node, struct qcs, by_id); - - if (!qcs_sc(qcs)) - continue; - - if (qcc->conn->flags & CO_FL_ERROR) { - se_fl_set_error(qcs->sd); - qcs_alert(qcs); - } - } - - return 0; -} - static int qc_wake(struct connection *conn) { struct qcc *qcc = conn->ctx;