mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2025-01-02 02:02:03 +00:00
MEDIUM: mux-quic: implement STOP_SENDING emission
Implement STOP_SENDING. This is divided in two main functions : * qcc_abort_stream_read() which can be used by application protocol to request for a STOP_SENDING. This set the flag QC_SF_READ_ABORTED. * qcs_send_reset() is a static function called after the preceding one. It will send a STOP_SENDING via qcc_send(). QC_SF_READ_ABORTED flag is now properly used : if activated on a stream during qcc_recv(), <qcc.app_ops.decode_qcs> callback is skipped. Also, abort reading on unknown unidirection remote stream is now fully supported with the emission of a STOP_SENDING as specified by RFC 9000. This commit is part of implementing H3 errors at the stream level. This will allows the H3 layer to request the peer to close its endpoint for an error on a stream. This should be backported up to 2.7.
This commit is contained in:
parent
5854fc08cc
commit
663e872e3a
@ -117,9 +117,10 @@ struct qcc {
|
||||
#define QC_SF_DETACH 0x00000008 /* sc is detached but there is remaining data to send */
|
||||
#define QC_SF_BLK_SFCTL 0x00000010 /* stream blocked due to stream flow control limit */
|
||||
#define QC_SF_DEM_FULL 0x00000020 /* demux blocked on request channel buffer full */
|
||||
#define QC_SF_READ_ABORTED 0x00000040 /* stream rejected by app layer */
|
||||
#define QC_SF_READ_ABORTED 0x00000040 /* Rx closed using STOP_SENDING*/
|
||||
#define QC_SF_TO_RESET 0x00000080 /* a RESET_STREAM must be sent */
|
||||
#define QC_SF_HREQ_RECV 0x00000100 /* a full HTTP request has been received */
|
||||
#define QC_SF_TO_STOP_SENDING 0x00000200 /* a STOP_SENDING must be sent */
|
||||
|
||||
/* Maximum size of stream Rx buffer. */
|
||||
#define QC_S_RX_BUF_SZ (global.tune.bufsize - NCB_RESERVED_SZ)
|
||||
|
@ -21,6 +21,7 @@ void qcs_notify_send(struct qcs *qcs);
|
||||
|
||||
void qcc_emit_cc_app(struct qcc *qcc, int err, int immediate);
|
||||
void qcc_reset_stream(struct qcs *qcs, int err);
|
||||
void qcc_abort_stream_read(struct qcs *qcs);
|
||||
int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
|
||||
char fin, char *data);
|
||||
int qcc_recv_max_data(struct qcc *qcc, uint64_t max);
|
||||
|
2
src/h3.c
2
src/h3.c
@ -220,7 +220,7 @@ static ssize_t h3_init_uni_stream(struct h3c *h3c, struct qcs *qcs,
|
||||
* Implementations MUST [...] abort reading on unidirectional
|
||||
* streams that have unknown or unsupported types.
|
||||
*/
|
||||
qcs->flags |= QC_SF_READ_ABORTED;
|
||||
qcc_abort_stream_read(qcs);
|
||||
return -1;
|
||||
};
|
||||
|
||||
|
@ -755,10 +755,16 @@ static int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs)
|
||||
if (qcs_is_close_remote(qcs))
|
||||
fin = 1;
|
||||
|
||||
ret = qcc->app_ops->decode_qcs(qcs, &b, fin);
|
||||
if (ret < 0) {
|
||||
TRACE_ERROR("decoding error", QMUX_EV_QCS_RECV, qcc->conn, qcs);
|
||||
goto err;
|
||||
if (!(qcs->flags & QC_SF_READ_ABORTED)) {
|
||||
ret = qcc->app_ops->decode_qcs(qcs, &b, fin);
|
||||
if (ret < 0) {
|
||||
TRACE_ERROR("decoding error", QMUX_EV_QCS_RECV, qcc->conn, qcs);
|
||||
goto err;
|
||||
}
|
||||
}
|
||||
else {
|
||||
TRACE_DATA("ignore read on stream", QMUX_EV_QCS_RECV, qcc->conn, qcs);
|
||||
ret = b_data(&b);
|
||||
}
|
||||
|
||||
if (ret) {
|
||||
@ -813,6 +819,24 @@ void qcc_reset_stream(struct qcs *qcs, int err)
|
||||
tasklet_wakeup(qcc->wait_event.tasklet);
|
||||
}
|
||||
|
||||
/* Prepare for the emission of STOP_SENDING on <qcs>. */
|
||||
void qcc_abort_stream_read(struct qcs *qcs)
|
||||
{
|
||||
struct qcc *qcc = qcs->qcc;
|
||||
|
||||
TRACE_ENTER(QMUX_EV_QCC_NEW, qcc->conn, qcs);
|
||||
|
||||
if ((qcs->flags & QC_SF_TO_STOP_SENDING) || qcs_is_close_remote(qcs))
|
||||
goto end;
|
||||
|
||||
TRACE_STATE("abort stream read", QMUX_EV_QCS_END, qcc->conn, qcs);
|
||||
qcs->flags |= (QC_SF_TO_STOP_SENDING|QC_SF_READ_ABORTED);
|
||||
tasklet_wakeup(qcc->wait_event.tasklet);
|
||||
|
||||
end:
|
||||
TRACE_LEAVE(QMUX_EV_QCC_NEW, qcc->conn, qcs);
|
||||
}
|
||||
|
||||
/* Install the <app_ops> applicative layer of a QUIC connection on mux <qcc>.
|
||||
* Returns 0 on success else non-zero.
|
||||
*/
|
||||
@ -977,11 +1001,6 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
|
||||
qcc_refresh_timeout(qcc);
|
||||
}
|
||||
|
||||
if (qcs->flags & QC_SF_READ_ABORTED) {
|
||||
/* TODO should send a STOP_SENDING */
|
||||
qcs_free(qcs);
|
||||
}
|
||||
|
||||
out:
|
||||
TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn);
|
||||
return 0;
|
||||
@ -1538,6 +1557,58 @@ static int qcs_send_reset(struct qcs *qcs)
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* Emit a STOP_SENDING on <qcs>.
|
||||
*
|
||||
* Returns 0 if the frame has been successfully sent else non-zero.
|
||||
*/
|
||||
static int qcs_send_stop_sending(struct qcs *qcs)
|
||||
{
|
||||
struct list frms = LIST_HEAD_INIT(frms);
|
||||
struct quic_frame *frm;
|
||||
struct qcc *qcc = qcs->qcc;
|
||||
|
||||
TRACE_ENTER(QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
|
||||
|
||||
/* RFC 9000 3.3. Permitted Frame Types
|
||||
*
|
||||
* A
|
||||
* receiver MAY send a STOP_SENDING frame in any state where it has not
|
||||
* received a RESET_STREAM frame -- that is, states other than "Reset
|
||||
* Recvd" or "Reset Read". However, there is little value in sending a
|
||||
* STOP_SENDING frame in the "Data Recvd" state, as all stream data has
|
||||
* been received. A sender could receive either of these two types of
|
||||
* frames in any state as a result of delayed delivery of packets.¶
|
||||
*/
|
||||
if (qcs_is_close_remote(qcs)) {
|
||||
TRACE_STATE("skip STOP_SENDING on remote already closed", QMUX_EV_QCS_SEND, qcc->conn, qcs);
|
||||
goto done;
|
||||
}
|
||||
|
||||
frm = pool_zalloc(pool_head_quic_frame);
|
||||
if (!frm) {
|
||||
TRACE_LEAVE(QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
|
||||
return 1;
|
||||
}
|
||||
|
||||
LIST_INIT(&frm->reflist);
|
||||
frm->type = QUIC_FT_STOP_SENDING;
|
||||
frm->stop_sending.id = qcs->id;
|
||||
frm->stop_sending.app_error_code = qcs->err;
|
||||
|
||||
LIST_APPEND(&frms, &frm->list);
|
||||
if (qc_send_frames(qcs->qcc, &frms)) {
|
||||
pool_free(pool_head_quic_frame, frm);
|
||||
TRACE_DEVEL("cannot send STOP_SENDING", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
|
||||
return 1;
|
||||
}
|
||||
|
||||
done:
|
||||
qcs->flags &= ~QC_SF_TO_STOP_SENDING;
|
||||
|
||||
TRACE_LEAVE(QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* Used internally by qc_send function. Proceed to send for <qcs>. This will
|
||||
* transfer data from qcs buffer to its quic_stream counterpart. A STREAM frame
|
||||
* is then generated and inserted in <frms> list.
|
||||
@ -1650,6 +1721,9 @@ static int qc_send(struct qcc *qcc)
|
||||
continue;
|
||||
}
|
||||
|
||||
if (qcs->flags & QC_SF_TO_STOP_SENDING)
|
||||
qcs_send_stop_sending(qcs);
|
||||
|
||||
if (qcs->flags & QC_SF_TO_RESET) {
|
||||
qcs_send_reset(qcs);
|
||||
node = eb64_next(node);
|
||||
@ -1751,11 +1825,6 @@ static int qc_recv(struct qcc *qcc)
|
||||
|
||||
qcc_decode_qcs(qcc, qcs);
|
||||
node = eb64_next(node);
|
||||
|
||||
if (qcs->flags & QC_SF_READ_ABORTED) {
|
||||
/* TODO should send a STOP_SENDING */
|
||||
qcs_free(qcs);
|
||||
}
|
||||
}
|
||||
|
||||
TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn);
|
||||
|
Loading…
Reference in New Issue
Block a user