mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2025-02-20 20:57:00 +00:00
MINOR: mux-quic: account stream txbuf in QCC
A limit per connection is put on the number of buffers allocated by QUIC MUX for emission accross all its streams. This ensures memory consumption remains under control. This limit is simply explained as a count of buffers which can be concurrently allocated for each connection. As such, quic_conn structure was used to account currently allocated buffers. However, a quic_conn nevers allocates new stream buffers. This is only done at QUIC MUX layer. As such, this commit moves buffer accounting inside QCC structure. This simplifies the API, most notably qc_stream_buf_alloc() usage. Note that this commit inverts the accounting. Previously, it was initially set to 0 and increment for each allocated buffer. Now, it is set to the maximum value and decrement for each buf usage. This is considered as clearer to use.
This commit is contained in:
parent
635fbaaa4a
commit
f4d1bd0b76
@ -67,6 +67,7 @@ struct qcc {
|
||||
|
||||
struct {
|
||||
struct quic_fctl fc; /* stream flow control applied on sending */
|
||||
int avail_bufs; /* count of available buffers for this connection */
|
||||
} tx;
|
||||
|
||||
uint64_t largest_bidi_r; /* largest remote bidi stream ID opened. */
|
||||
|
@ -22,7 +22,7 @@ int qcs_is_close_remote(struct qcs *qcs);
|
||||
int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es);
|
||||
void qcs_notify_recv(struct qcs *qcs);
|
||||
void qcs_notify_send(struct qcs *qcs);
|
||||
int qcc_notify_buf(struct qcc *qcc);
|
||||
void qcc_notify_buf(struct qcc *qcc, int free_count);
|
||||
|
||||
struct buffer *qcc_get_stream_rxbuf(struct qcs *qcs);
|
||||
struct buffer *qcc_get_stream_txbuf(struct qcs *qcs, int *err);
|
||||
|
@ -398,7 +398,6 @@ struct quic_conn {
|
||||
struct mt_list accept_list; /* chaining element used for accept, only valid for frontend connections */
|
||||
|
||||
struct eb_root streams_by_id; /* qc_stream_desc tree */
|
||||
int stream_buf_count; /* total count of allocated stream buffers for this connection */
|
||||
|
||||
/* MUX */
|
||||
struct qcc *qcc;
|
||||
|
@ -16,7 +16,7 @@ void qc_stream_desc_free(struct qc_stream_desc *stream, int closing);
|
||||
|
||||
struct buffer *qc_stream_buf_get(struct qc_stream_desc *stream);
|
||||
struct buffer *qc_stream_buf_alloc(struct qc_stream_desc *stream,
|
||||
uint64_t offset, int *avail);
|
||||
uint64_t offset);
|
||||
void qc_stream_buf_release(struct qc_stream_desc *stream);
|
||||
|
||||
#endif /* USE_QUIC */
|
||||
|
@ -7,6 +7,7 @@
|
||||
#include <haproxy/chunk.h>
|
||||
#include <haproxy/connection.h>
|
||||
#include <haproxy/dynbuf.h>
|
||||
#include <haproxy/global-t.h>
|
||||
#include <haproxy/h3.h>
|
||||
#include <haproxy/list.h>
|
||||
#include <haproxy/ncbuf.h>
|
||||
@ -523,34 +524,36 @@ void qcs_notify_send(struct qcs *qcs)
|
||||
}
|
||||
}
|
||||
|
||||
/* Notify on a new stream-desc buffer available for <qcc> connection.
|
||||
*
|
||||
* Returns true if a stream was woken up. If false is returned, this indicates
|
||||
* to the caller that it's currently unnecessary to notify for the rest of the
|
||||
* available buffers.
|
||||
/* Report that <free_count> stream-desc buffer have been released for <qcc>
|
||||
* connection.
|
||||
*/
|
||||
int qcc_notify_buf(struct qcc *qcc)
|
||||
void qcc_notify_buf(struct qcc *qcc, int free_count)
|
||||
{
|
||||
struct qcs *qcs;
|
||||
int ret = 0;
|
||||
|
||||
TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
|
||||
|
||||
BUG_ON(qcc->tx.avail_bufs + free_count > global.tune.quic_streams_buf);
|
||||
qcc->tx.avail_bufs += free_count;
|
||||
|
||||
if (qcc->flags & QC_CF_CONN_FULL) {
|
||||
TRACE_STATE("new stream desc buffer available", QMUX_EV_QCC_WAKE, qcc->conn);
|
||||
qcc->flags &= ~QC_CF_CONN_FULL;
|
||||
}
|
||||
|
||||
if (!LIST_ISEMPTY(&qcc->buf_wait_list)) {
|
||||
/* TODO a simple optimization would be to only wake up <free_count> QCS
|
||||
* instances. But it may not work if a woken QCS is in error and does
|
||||
* not try to allocate a buffer, leaving the unwoken QCS indefinitely
|
||||
* in the buflist.
|
||||
*/
|
||||
while (!LIST_ISEMPTY(&qcc->buf_wait_list)) {
|
||||
qcs = LIST_ELEM(qcc->buf_wait_list.n, struct qcs *, el_buf);
|
||||
LIST_DEL_INIT(&qcs->el_buf);
|
||||
tot_time_stop(&qcs->timer.buf);
|
||||
qcs_notify_send(qcs);
|
||||
ret = 1;
|
||||
}
|
||||
|
||||
TRACE_LEAVE(QMUX_EV_QCC_WAKE, qcc->conn);
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* A fatal error is detected locally for <qcc> connection. It should be closed
|
||||
@ -1007,7 +1010,6 @@ struct buffer *qcc_get_stream_rxbuf(struct qcs *qcs)
|
||||
struct buffer *qcc_get_stream_txbuf(struct qcs *qcs, int *err)
|
||||
{
|
||||
struct qcc *qcc = qcs->qcc;
|
||||
int buf_avail;
|
||||
struct buffer *out = qc_stream_buf_get(qcs->stream);
|
||||
|
||||
/* Stream must not try to reallocate a buffer if currently waiting for one. */
|
||||
@ -1022,21 +1024,22 @@ struct buffer *qcc_get_stream_txbuf(struct qcs *qcs, int *err)
|
||||
goto out;
|
||||
}
|
||||
|
||||
out = qc_stream_buf_alloc(qcs->stream, qcs->tx.fc.off_real,
|
||||
&buf_avail);
|
||||
if (!out) {
|
||||
if (buf_avail) {
|
||||
TRACE_ERROR("stream desc alloc failure", QMUX_EV_QCS_SEND, qcc->conn, qcs);
|
||||
*err = 1;
|
||||
goto out;
|
||||
}
|
||||
|
||||
if (!qcc->tx.avail_bufs) {
|
||||
TRACE_STATE("hitting stream desc buffer limit", QMUX_EV_QCS_SEND, qcc->conn, qcs);
|
||||
LIST_APPEND(&qcc->buf_wait_list, &qcs->el_buf);
|
||||
tot_time_start(&qcs->timer.buf);
|
||||
qcc->flags |= QC_CF_CONN_FULL;
|
||||
goto out;
|
||||
}
|
||||
|
||||
out = qc_stream_buf_alloc(qcs->stream, qcs->tx.fc.off_real);
|
||||
if (!out) {
|
||||
TRACE_ERROR("stream desc alloc failure", QMUX_EV_QCS_SEND, qcc->conn, qcs);
|
||||
*err = 1;
|
||||
goto out;
|
||||
}
|
||||
|
||||
--qcc->tx.avail_bufs;
|
||||
}
|
||||
|
||||
out:
|
||||
@ -2703,6 +2706,8 @@ static int qmux_init(struct connection *conn, struct proxy *prx,
|
||||
qcc->rfctl.msd_bidi_r = rparams->initial_max_stream_data_bidi_remote;
|
||||
qcc->rfctl.msd_uni_l = rparams->initial_max_stream_data_uni;
|
||||
|
||||
qcc->tx.avail_bufs = global.tune.quic_streams_buf;
|
||||
|
||||
if (conn_is_back(conn)) {
|
||||
qcc->next_bidi_l = 0x00;
|
||||
qcc->largest_bidi_r = 0x01;
|
||||
|
@ -1181,7 +1181,6 @@ struct quic_conn *qc_new_conn(const struct quic_version *qv, int ipv4,
|
||||
quic_cc_path_init(qc->path, ipv4, server ? l->bind_conf->max_cwnd : 0,
|
||||
cc_algo ? cc_algo : default_quic_cc_algo, qc);
|
||||
|
||||
qc->stream_buf_count = 0;
|
||||
memcpy(&qc->local_addr, local_addr, sizeof(qc->local_addr));
|
||||
memcpy(&qc->peer_addr, peer_addr, sizeof qc->peer_addr);
|
||||
|
||||
|
@ -41,15 +41,9 @@ static void qc_stream_buf_free(struct qc_stream_desc *stream,
|
||||
*stream_buf = NULL;
|
||||
|
||||
/* notify MUX about available buffers. */
|
||||
--qc->stream_buf_count;
|
||||
if (qc->mux_state == QC_MUX_READY) {
|
||||
/* notify MUX about available buffers.
|
||||
*
|
||||
* TODO several streams may be woken up even if a single buffer
|
||||
* is available for now.
|
||||
*/
|
||||
while (qcc_notify_buf(qc->qcc))
|
||||
;
|
||||
/* notify MUX about available buffers. */
|
||||
qcc_notify_buf(qc->qcc, 1);
|
||||
}
|
||||
}
|
||||
|
||||
@ -222,15 +216,9 @@ void qc_stream_desc_free(struct qc_stream_desc *stream, int closing)
|
||||
if (free_count) {
|
||||
offer_buffers(NULL, free_count);
|
||||
|
||||
qc->stream_buf_count -= free_count;
|
||||
if (qc->mux_state == QC_MUX_READY) {
|
||||
/* notify MUX about available buffers.
|
||||
*
|
||||
* TODO several streams may be woken up even if a single buffer
|
||||
* is available for now.
|
||||
*/
|
||||
while (qcc_notify_buf(qc->qcc))
|
||||
;
|
||||
/* notify MUX about available buffers. */
|
||||
qcc_notify_buf(qc->qcc, free_count);
|
||||
}
|
||||
}
|
||||
|
||||
@ -265,45 +253,30 @@ struct buffer *qc_stream_buf_get(struct qc_stream_desc *stream)
|
||||
return &stream->buf->buf;
|
||||
}
|
||||
|
||||
/* Returns the count of available buffer left for <qc>. */
|
||||
static int qc_stream_buf_avail(struct quic_conn *qc)
|
||||
{
|
||||
BUG_ON(qc->stream_buf_count > global.tune.quic_streams_buf);
|
||||
return global.tune.quic_streams_buf - qc->stream_buf_count;
|
||||
}
|
||||
|
||||
/* Allocate a new current buffer for <stream>. The buffer limit count for the
|
||||
* connection is checked first. This function is not allowed if current buffer
|
||||
* is not NULL prior to this call. The new buffer represents stream payload at
|
||||
* offset <offset>.
|
||||
/* Allocate a new current buffer for <stream>. This function is not allowed if
|
||||
* current buffer is not NULL prior to this call. The new buffer represents
|
||||
* stream payload at offset <offset>.
|
||||
*
|
||||
* Returns the buffer or NULL on error. Caller may check <avail> to ensure if
|
||||
* the connection buffer limit was reached or a fatal error was encountered.
|
||||
* Returns the buffer or NULL on error.
|
||||
*/
|
||||
struct buffer *qc_stream_buf_alloc(struct qc_stream_desc *stream,
|
||||
uint64_t offset, int *avail)
|
||||
uint64_t offset)
|
||||
{
|
||||
struct quic_conn *qc = stream->qc;
|
||||
|
||||
/* current buffer must be released first before allocate a new one. */
|
||||
BUG_ON(stream->buf);
|
||||
|
||||
*avail = qc_stream_buf_avail(qc);
|
||||
if (!*avail)
|
||||
return NULL;
|
||||
|
||||
stream->buf_offset = offset;
|
||||
stream->buf = pool_alloc(pool_head_quic_stream_buf);
|
||||
if (!stream->buf)
|
||||
return NULL;
|
||||
|
||||
stream->buf->buf = BUF_NULL;
|
||||
if (!b_alloc(&stream->buf->buf, DB_MUX_TX)) {
|
||||
pool_free(pool_head_quic_stream_buf, stream->buf);
|
||||
stream->buf = NULL;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
++qc->stream_buf_count;
|
||||
LIST_APPEND(&stream->buf_list, &stream->buf->list);
|
||||
|
||||
return &stream->buf->buf;
|
||||
|
Loading…
Reference in New Issue
Block a user