MINOR: mux-quic: report local error on stream endpoint asap

If an error a detected at the MUX layer, all remaining stream endpoints
should be closed asap with error set. This is now done by checking for
QC_CF_ERRL flag on qc_wake_some_streams() and qc_send_buf(). To complete
this, qc_wake_some_streams() is called by qc_process() if needed.

This should help to quickly release streams as soon as a new error is
detected locally by the MUX or APP layer. This allows to in turn free
the MUX instance itself. Previously, error would not have been
automatically reported until the transport layer closure would occur
on CONNECTION_CLOSE emission.

This should be backported up to 2.7.
This commit is contained in:
Amaury Denoyelle 2023-05-03 18:16:40 +02:00
parent 51f116d65e
commit 35542ce7bf

View File

@ -2129,13 +2129,39 @@ static void qc_shutdown(struct qcc *qcc)
TRACE_LEAVE(QMUX_EV_QCC_END, qcc->conn);
}
/* Loop through all qcs from <qcc>. Report error on stream endpoint if
* connection on error and wake them.
*/
static int qc_wake_some_streams(struct qcc *qcc)
{
struct qcs *qcs;
struct eb64_node *node;
TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn);
for (node = eb64_first(&qcc->streams_by_id); node;
node = eb64_next(node)) {
qcs = eb64_entry(node, struct qcs, by_id);
if (!qcs_sc(qcs))
continue;
if (qcc->conn->flags & CO_FL_ERROR || qcc->flags & QC_CF_ERRL) {
TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn, qcs);
se_fl_set_error(qcs->sd);
qcs_alert(qcs);
}
}
return 0;
}
/* Conduct operations which should be made for <qcc> connection after
* input/output. Most notably, closed streams are purged which may leave the
* connection has ready to be released.
*
* Returns 1 if <qcc> must be released else 0.
*/
static int qc_process(struct qcc *qcc)
{
qc_purge_streams(qcc);
@ -2179,6 +2205,10 @@ static int qc_process(struct qcc *qcc)
qc_shutdown(qcc);
}
/* Report error if set on stream endpoint layer. */
if (qcc->flags & QC_CF_ERRL)
qc_wake_some_streams(qcc);
out:
if (qcc_is_dead(qcc))
return 1;
@ -2593,7 +2623,7 @@ static size_t qc_send_buf(struct stconn *sc, struct buffer *buf,
size_t count, int flags)
{
struct qcs *qcs = __sc_mux_strm(sc);
size_t ret;
size_t ret = 0;
char fin;
TRACE_ENTER(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
@ -2601,6 +2631,13 @@ static size_t qc_send_buf(struct stconn *sc, struct buffer *buf,
/* stream layer has been detached so no transfer must occur after. */
BUG_ON_HOT(qcs->flags & QC_SF_DETACH);
/* Report error if set on stream endpoint layer. */
if (qcs->qcc->flags & QC_CF_ERRL) {
se_fl_set(qcs->sd, SE_FL_ERROR);
TRACE_DEVEL("connection in error", QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
goto end;
}
if (qcs_is_close_local(qcs) || (qcs->flags & QC_SF_TO_RESET)) {
ret = qcs_http_reset_buf(qcs, buf, count);
goto end;
@ -2653,31 +2690,6 @@ static int qc_unsubscribe(struct stconn *sc, int event_type, struct wait_event *
return 0;
}
/* Loop through all qcs from <qcc>. If CO_FL_ERROR is set on the connection,
* report SE_FL_ERR_PENDING|SE_FL_ERROR on the attached stream connectors and
* wake them.
*/
static int qc_wake_some_streams(struct qcc *qcc)
{
struct qcs *qcs;
struct eb64_node *node;
for (node = eb64_first(&qcc->streams_by_id); node;
node = eb64_next(node)) {
qcs = eb64_entry(node, struct qcs, by_id);
if (!qcs_sc(qcs))
continue;
if (qcc->conn->flags & CO_FL_ERROR) {
se_fl_set_error(qcs->sd);
qcs_alert(qcs);
}
}
return 0;
}
static int qc_wake(struct connection *conn)
{
struct qcc *qcc = conn->ctx;