MEDIUM: mux-quic: simplify sending API

The previous commit was a major rework for QUIC MUX sending process.
Following this, this patch cleans up a few elements that remains but can
be removed as they are duplicated.

Of notable changes, offset fields from QCS and QCC are removed. They are
both equivalent to flow control soft offsets.

A new function qcs_prep_bytes() is implemented. Its purpose is to return
the count of prepared data bytes not yet sent. It also replaces
qcs_need_sending().
This commit is contained in:
Amaury Denoyelle 2024-01-30 11:23:48 +01:00
parent 00a3e5f786
commit 3fe3251593
3 changed files with 47 additions and 63 deletions

View File

@ -72,8 +72,6 @@ struct qcc {
struct {
struct quic_fctl fc; /* stream flow control applied on sending */
uint64_t offsets; /* sum of all offsets prepared */
} tx;
uint64_t largest_bidi_r; /* largest remote bidi stream ID opened. */
@ -159,8 +157,6 @@ struct qcs {
} rx;
struct {
struct quic_fctl fc; /* stream flow control applied on sending */
uint64_t offset; /* last offset of data ready to be sent */
} tx;
struct eb64_node by_id;

View File

@ -137,8 +137,6 @@ static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
}
qcs->rx.msd_init = qcs->rx.msd;
qcs->tx.offset = 0;
qcs->wait_event.tasklet = NULL;
qcs->wait_event.events = 0;
qcs->subs = NULL;
@ -949,6 +947,21 @@ struct buffer *qcc_get_stream_txbuf(struct qcs *qcs)
return out;
}
/* Returns total number of bytes not already sent to quic-conn layer. */
static uint64_t qcs_prep_bytes(const struct qcs *qcs)
{
struct buffer *out = qc_stream_buf_get(qcs->stream);
uint64_t diff, base_off;
if (!out)
return 0;
/* if ack_offset < buf_offset, it points to an older buffer. */
base_off = MAX(qcs->stream->buf_offset, qcs->stream->ack_offset);
diff = qcs->tx.fc.off_real - base_off;
return b_data(out) - diff;
}
/* Wakes up every streams of <qcc> which are currently waiting for sending but
* are blocked on connection flow control.
*/
@ -967,6 +980,7 @@ static void qcc_notify_fctl(struct qcc *qcc)
void qcc_reset_stream(struct qcs *qcs, int err)
{
struct qcc *qcc = qcs->qcc;
const uint64_t diff = qcs_prep_bytes(qcs);
if ((qcs->flags & QC_SF_TO_RESET) || qcs_is_close_local(qcs))
return;
@ -975,23 +989,19 @@ void qcc_reset_stream(struct qcs *qcs, int err)
qcs->flags |= QC_SF_TO_RESET;
qcs->err = err;
/* Remove prepared stream data from connection flow-control calcul. */
if (qcs->tx.offset > qcs->tx.fc.off_real) {
const uint64_t diff = qcs->tx.offset - qcs->tx.fc.off_real;
BUG_ON(qcc->tx.offsets - diff < qcc->tx.fc.off_real);
qcc->tx.offsets -= diff;
/* Reset qcs offset to prevent BUG_ON() on qcs_destroy(). */
qcs->tx.offset = qcs->tx.fc.off_real;
}
/* Substract to conn flow control data amount prepared on stream not yet sent. */
if (qcs->tx.fc.off_soft > qcs->tx.fc.off_real) {
if (diff) {
const int soft_blocked = qfctl_sblocked(&qcc->tx.fc);
qcc->tx.fc.off_soft -= (qcs->tx.fc.off_soft - qcs->tx.fc.off_real);
/* Soft offset cannot be inferior to real one. */
BUG_ON(qcc->tx.fc.off_soft - diff < qcc->tx.fc.off_real);
/* Substract to conn flow control data amount prepared on stream not yet sent. */
qcc->tx.fc.off_soft -= diff;
if (soft_blocked && !qfctl_sblocked(&qcc->tx.fc))
qcc_notify_fctl(qcc);
/* Reset QCS soft off to prevent BUG_ON() on qcs_destroy(). */
qcs->tx.fc.off_soft = qcs->tx.fc.off_real;
}
/* Report send error to stream-endpoint layer. */
@ -1030,9 +1040,6 @@ void qcc_send_stream(struct qcs *qcs, int urg, int count)
if (count) {
qfctl_sinc(&qcc->tx.fc, count);
qfctl_sinc(&qcs->tx.fc, count);
qcs->tx.offset += count;
qcs->qcc->tx.offsets += count;
}
TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs);
@ -1559,7 +1566,7 @@ static void qcs_destroy(struct qcs *qcs)
/* MUST not removed a stream with sending prepared data left. This is
* to ensure consistency on connection flow-control calculation.
*/
BUG_ON(qcs->tx.offset < qcs->tx.fc.off_real);
BUG_ON(qcs->tx.fc.off_soft != qcs->tx.fc.off_real);
if (!(qcc->flags & QC_CF_ERRL)) {
if (quic_stream_is_remote(qcc, id))
@ -1585,19 +1592,16 @@ static int qcs_build_stream_frm(struct qcs *qcs, struct buffer *out, char fin,
{
struct qcc *qcc = qcs->qcc;
struct quic_frame *frm;
int head, total;
uint64_t base_off;
const uint64_t window_stream = qfctl_rcap(&qcs->tx.fc);
const uint64_t bytes = qcs_prep_bytes(qcs);
uint64_t total;
TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs);
/* if ack_offset < buf_offset, it points to an older buffer. */
base_off = MAX(qcs->stream->buf_offset, qcs->stream->ack_offset);
BUG_ON(qcs->tx.fc.off_real < base_off);
/* This must only be called if there is data left, or at least a standalone FIN. */
BUG_ON((!out || !b_data(out)) && !fin);
head = qcs->tx.fc.off_real - base_off;
total = out ? b_data(out) - head : 0;
BUG_ON(total < 0);
total = bytes;
/* do not exceed stream flow control limit */
if (total > window_stream) {
@ -1612,7 +1616,7 @@ static int qcs_build_stream_frm(struct qcs *qcs, struct buffer *out, char fin,
}
/* Reset FIN if bytes to send is capped by flow control. */
if (out && total < b_data(out) - head)
if (total < bytes)
fin = 0;
if (!total && !fin) {
@ -1620,10 +1624,6 @@ static int qcs_build_stream_frm(struct qcs *qcs, struct buffer *out, char fin,
TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs);
return 0;
}
BUG_ON((!total && qcs->tx.fc.off_real > qcs->tx.offset) ||
(total && qcs->tx.fc.off_real >= qcs->tx.offset));
BUG_ON(qcs->tx.fc.off_real + total > qcs->tx.offset);
BUG_ON(qcc->tx.fc.off_real + total > qcc->tx.fc.limit);
TRACE_PROTO("sending STREAM frame", QMUX_EV_QCS_SEND, qcc->conn, qcs);
frm = qc_frm_alloc(QUIC_FT_STREAM_8);
@ -1639,7 +1639,7 @@ static int qcs_build_stream_frm(struct qcs *qcs, struct buffer *out, char fin,
if (total) {
frm->stream.buf = out;
frm->stream.data = (unsigned char *)b_peek(out, head);
frm->stream.data = (unsigned char *)b_peek(out, b_data(out) - bytes);
}
else {
/* Empty STREAM frame. */
@ -1681,13 +1681,6 @@ static int qcs_build_stream_frm(struct qcs *qcs, struct buffer *out, char fin,
return -1;
}
/* Return true if <qcs> has data to send in new STREAM frames. */
static forceinline int qcs_need_sending(struct qcs *qcs)
{
return qcs->tx.fc.off_real < qcs->tx.offset ||
qcs->flags & QC_SF_FIN_STREAM;
}
/* This function must be called by the upper layer to inform about the sending
* of a STREAM frame for <qcs> instance. The frame is of <data> length and on
* <offset>.
@ -1699,8 +1692,8 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset)
TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs);
/* Real off MUST always be the greatest offset sent. */
BUG_ON(offset > qcs->tx.fc.off_real);
BUG_ON(offset + data > qcs->tx.offset);
/* check if the STREAM frame has already been notified. It can happen
* for retransmission.
@ -1732,11 +1725,8 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset)
TRACE_STATE("stream flow-control reached",
QMUX_EV_QCS_SEND, qcc->conn, qcs);
}
BUG_ON_HOT(qcs->tx.fc.off_real > qcs->tx.offset);
/* If qcs.stream.buf is full, release it to the lower layer. */
if (qcs->tx.offset == qcs->tx.fc.off_real &&
b_full(&qcs->stream->buf->buf)) {
/* Release buffer if everything sent and buf is full or stream is waiting for room. */
if (!qcs_prep_bytes(qcs) && (b_full(&qcs->stream->buf->buf))) {
qc_stream_buf_release(qcs->stream);
}
@ -1746,7 +1736,7 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset)
increment_send_rate(diff, 0);
}
if (qcs->tx.offset == qcs->tx.fc.off_real) {
if (!qc_stream_buf_get(qcs->stream) || !qcs_prep_bytes(qcs)) {
/* Remove stream from send_list if all was sent. */
LIST_DEL_INIT(&qcs->el_send);
TRACE_STATE("stream sent done", QMUX_EV_QCS_SEND, qcc->conn, qcs);
@ -1925,7 +1915,7 @@ static int qcs_send(struct qcs *qcs, struct list *frms, uint64_t window_conn)
BUG_ON(quic_stream_is_uni(qcs->id) && quic_stream_is_remote(qcc, qcs->id));
/* This function must not be called if there is nothing to send. */
BUG_ON(!fin && !qcs_need_sending(qcs));
BUG_ON(!fin && !qcs_prep_bytes(qcs));
/* Skip STREAM frame allocation if already subscribed for send.
* Happens on sendto transient error or network congestion.
@ -2009,8 +1999,8 @@ static int qcc_io_send(struct qcc *qcc)
break;
/* Stream must not be present in send_list if it has nothing to send. */
BUG_ON(!(qcs->flags & (QC_SF_TO_STOP_SENDING|QC_SF_TO_RESET)) &&
!qcs_need_sending(qcs));
BUG_ON(!(qcs->flags & (QC_SF_FIN_STREAM|QC_SF_TO_STOP_SENDING|QC_SF_TO_RESET)) &&
!qcs_prep_bytes(qcs));
/* Each STOP_SENDING/RESET_STREAM frame is sent individually to
* guarantee its emission.
@ -2024,7 +2014,8 @@ static int qcc_io_send(struct qcc *qcc)
/* Remove stream from send_list if it had only STOP_SENDING
* to send.
*/
if (!(qcs->flags & QC_SF_TO_RESET) && !qcs_need_sending(qcs)) {
if (!(qcs->flags & (QC_SF_FIN_STREAM|QC_SF_TO_RESET)) &&
!qcs_prep_bytes(qcs)) {
LIST_DEL_INIT(&qcs->el_send);
continue;
}
@ -2515,8 +2506,6 @@ static int qmux_init(struct connection *conn, struct proxy *prx,
/* Server parameters, params used for RX flow control. */
lparams = &conn->handle.qc->rx.params;
qcc->tx.offsets = 0;
qcc->lfctl.ms_bidi = qcc->lfctl.ms_bidi_init = lparams->initial_max_streams_bidi;
qcc->lfctl.ms_uni = lparams->initial_max_streams_uni;
qcc->lfctl.msd_bidi_l = lparams->initial_max_stream_data_bidi_local;
@ -2763,8 +2752,7 @@ static size_t qmux_strm_snd_buf(struct stconn *sc, struct buffer *buf,
size_t count, int flags)
{
struct qcs *qcs = __sc_mux_strm(sc);
struct buffer *out = qc_stream_buf_get(qcs->stream);
const size_t old_data = out ? b_data(out) : 0;
const size_t old_data = qcs_prep_bytes(qcs);
size_t ret = 0;
char fin;
@ -2807,7 +2795,7 @@ static size_t qmux_strm_snd_buf(struct stconn *sc, struct buffer *buf,
}
if (ret || fin) {
const size_t data = b_data(qc_stream_buf_get(qcs->stream)) - (old_data);
const size_t data = qcs_prep_bytes(qcs) - old_data;
if (data || fin)
qcc_send_stream(qcs, 0, data);
if (!(qcs->qcc->wait_event.events & SUB_RETRY_SEND))

View File

@ -76,15 +76,15 @@ static void qmux_trace(enum trace_level level, uint64_t mask,
if (qcc->conn->handle.qc)
chunk_appendf(&trace_buf, " qc=%p", qcc->conn->handle.qc);
chunk_appendf(&trace_buf, " md=%llu/%llu/%llu",
(ullong)qcc->tx.fc.limit, (ullong)qcc->tx.offsets, (ullong)qcc->tx.fc.off_real);
chunk_appendf(&trace_buf, " md=%llu/%llu",
(ullong)qcc->tx.fc.limit, (ullong)qcc->tx.fc.off_real);
if (qcs) {
chunk_appendf(&trace_buf, " qcs=%p .id=%llu .st=%s",
qcs, (ullong)qcs->id,
qcs_st_to_str(qcs->st));
chunk_appendf(&trace_buf, " msd=%llu/%llu/%llu",
(ullong)qcs->tx.fc.limit, (ullong)qcs->tx.offset, (ullong)qcs->tx.fc.off_real);
chunk_appendf(&trace_buf, " msd=%llu/%llu",
(ullong)qcs->tx.fc.limit, (ullong)qcs->tx.fc.off_real);
}
if (mask & QMUX_EV_QCC_NQCS) {