MEDIUM: mux-quic: report errors on conn-streams

Complete the error reporting. For each attached streams, if CO_FL_ERROR
is set, mark them with CS_FL_ERR_PENDING|CS_FL_ERROR. This will notify
the upper layer to trigger streams detach and release of the MUX.

This reporting is implemented in a new function qc_wake_some_streams(),
called by qc_wake(). This ensures that a lower-layer error is quickly
reported to the individual streams.
This commit is contained in:
Amaury Denoyelle 2022-04-06 15:46:30 +02:00
parent d97fc804f9
commit fe035eca3a
2 changed files with 41 additions and 1 deletions

View File

@ -112,6 +112,7 @@ static inline struct conn_stream *qc_attach_cs(struct qcs *qcs, struct buffer *b
return NULL;
cs_attach_endp(cs, &qcs->qcc->conn->obj_type, qcs);
qcs->cs = cs;
cs->ctx = qcs;
stream_new(qcs->qcc->conn->owner, cs, buf);

View File

@ -1068,7 +1068,8 @@ static void qc_detach(struct conn_stream *cs)
--qcc->nb_cs;
if (b_data(&qcs->tx.buf) || qcs->tx.offset > qcs->tx.sent_offset) {
if ((b_data(&qcs->tx.buf) || qcs->tx.offset > qcs->tx.sent_offset) &&
!(qcc->conn->flags & CO_FL_ERROR)) {
TRACE_DEVEL("leaving with remaining data, detaching qcs", QMUX_EV_STRM_END, qcc->conn, qcs);
qcs->flags |= QC_SF_DETACH;
return;
@ -1203,6 +1204,42 @@ static int qc_unsubscribe(struct conn_stream *cs, int event_type, struct wait_ev
return 0;
}
/* Loop through all qcs from <qcc>. If CO_FL_ERROR is set on the connection,
* report CS_FL_ERR_PENDING|CS_FL_ERROR on the attached conn-streams and wake
* them.
*/
static int qc_wake_some_streams(struct qcc *qcc)
{
struct qc_stream_desc *stream;
struct qcs *qcs;
struct eb64_node *node;
for (node = eb64_first(&qcc->streams_by_id); node;
node = eb64_next(node)) {
stream = eb64_entry(node, struct qc_stream_desc, by_id);
qcs = stream->ctx;
if (!qcs->cs)
continue;
if (qcc->conn->flags & CO_FL_ERROR) {
qcs->cs->flags |= CS_FL_ERR_PENDING;
if (qcs->cs->flags & CS_FL_EOS)
qcs->cs->flags |= CS_FL_ERROR;
if (qcs->subs) {
qcs_notify_recv(qcs);
qcs_notify_send(qcs);
}
else if (qcs->cs->data_cb->wake) {
qcs->cs->data_cb->wake(qcs->cs);
}
}
}
return 0;
}
static int qc_wake(struct connection *conn)
{
struct qcc *qcc = conn->ctx;
@ -1220,6 +1257,8 @@ static int qc_wake(struct connection *conn)
qc_send(qcc);
qc_wake_some_streams(qcc);
if (qcc_is_dead(qcc))
goto release;