MAJOR: mux-h2: make streams use the connection's buffers

For now it seems to work as before, and even when artificially inflating
the number of allocatable buffers per stream. The number of allocated
slots is always the same as the max number of streams, which guarantees
that each stream will find one buffer. we only grant one buffer per
stream at this point, since the goal was to replace the existing single
rxbuf.

A new demux blocking flag, H2_CF_DEM_RXBUF, was added to indicate
a failure to get an rxbuf slot from the connection. It was lightly
tested (by forcing bl_init() to a lower number of buffers). It is not
yet certain whether it's more useful to have a new flag or to reuse
the existing H2_CF_DEM_SFULL which indicates the rxbuf is full,
but at least the new flag more accurately translates the condition,
that may make a difference in the future. However, given that when
RXBUF is set, most of the time it results in a failure to find more
room to demux and it sets SFULL, for now we have to always clear
SFULL when clearing RXBUF as well. This means that most of the time
we'll see 3 combinations:
  - none: everything's OK
  - SFULL: the unique rx buffer is full
  - RXBUF || (RXBUF|SFULL): cannot allocate more entries

Note that we need to be super careful in h2_frt_transfer_data() because
the htx_free_data_space() function doesn't guarantee that the room is
usable, so htx_add_data() may still fail despite an apparent room. For
this reason, h2_frt_transfer_data() maintains a "full" flag to indicate
that a transfer attempt failed and that a new buffer is required.
This commit is contained in:
Willy Tarreau 2024-10-02 17:46:32 +02:00
parent 6279cbc9e9
commit 4eb3ff1d3b
2 changed files with 140 additions and 45 deletions

View File

@ -41,12 +41,12 @@
*/
#define H2_CF_DEM_DALLOC 0x00000004 // demux blocked on lack of connection's demux buffer
#define H2_CF_DEM_DFULL 0x00000008 // demux blocked on connection's demux buffer full
/* unused: 0x10 */
#define H2_CF_DEM_RXBUF 0x00000010 // demux blocked on missing rxbuf slots
#define H2_CF_DEM_MROOM 0x00000020 // demux blocked on lack of room in mux buffer
#define H2_CF_DEM_SALLOC 0x00000040 // demux blocked on lack of stream's request buffer
#define H2_CF_DEM_SFULL 0x00000080 // demux blocked on stream request buffer full
#define H2_CF_DEM_TOOMANY 0x00000100 // demux blocked waiting for some stream connectors to leave
#define H2_CF_DEM_BLOCK_ANY 0x000001E0 // aggregate of the demux flags above except DALLOC/DFULL
#define H2_CF_DEM_BLOCK_ANY 0x000001F0 // aggregate of the demux flags above except DALLOC/DFULL
// (SHORT_READ is also excluded)
#define H2_CF_DEM_SHORT_READ 0x00000200 // demux blocked on incomplete frame

View File

@ -120,7 +120,6 @@ struct h2s {
uint rx_head, rx_tail; /* head and tail of rx buffer in the conn's shared rx buf */
uint rx_count; /* total number of allocated rxbufs */
/* 4 bytes hole here */
struct buffer rxbuf; /* receive buffer, always valid (buf_empty or real buffer) */
struct wait_event *subs; /* recv wait_event the stream connector associated is waiting on (via h2_subscribe) */
struct list list; /* To be used when adding in h2c->send_list or h2c->fctl_lsit */
struct tasklet *shut_tl; /* deferred shutdown tasklet, to retry to send an RST after we failed to,
@ -695,15 +694,31 @@ static inline int h2c_max_concurrent_streams(const struct h2c *h2c)
return ret;
}
/* Returns a pointer to the oldest rxbuf of the stream, which must exist.
* Note that this doesn't indicate that the buffer is allocated nor contains
* any data.
*/
static inline struct buffer *_h2s_rxbuf_head(const struct h2s *h2s)
{
return &h2s->h2c->shared_rx_bufs[h2s->rx_head].buf;
}
/* Returns a pointer to the newest rxbuf of the stream, which must exist.
* Note that this doesn't indicate that the buffer is allocated nor contains
* any data.
*/
static inline struct buffer *_h2s_rxbuf_tail(const struct h2s *h2s)
{
return &h2s->h2c->shared_rx_bufs[h2s->rx_tail].buf;
}
/* Returns a pointer to the oldest rxbuf of the stream, or NULL if there is
* none. Note that this doesn't indicate that the buffer is allocated nor
* contains any data.
*/
static inline struct buffer *h2s_rxbuf_head(const struct h2s *h2s)
{
if (!h2s->rx_head)
return NULL;
return &h2s->h2c->shared_rx_bufs[h2s->rx_head].buf;
return h2s->rx_head ? _h2s_rxbuf_head(h2s) : NULL;
}
/* Returns a pointer to the newest rxbuf of the stream, or NULL if there is
@ -712,9 +727,7 @@ static inline struct buffer *h2s_rxbuf_head(const struct h2s *h2s)
*/
static inline struct buffer *h2s_rxbuf_tail(const struct h2s *h2s)
{
if (!h2s->rx_tail)
return NULL;
return &h2s->h2c->shared_rx_bufs[h2s->rx_tail].buf;
return h2s->rx_tail ? _h2s_rxbuf_tail(h2s) : NULL;
}
/* Returns the number of allocated rxbuf slots for the stream */
@ -965,10 +978,13 @@ static int h2_buf_available(void *target)
if ((h2c->flags & H2_CF_DEM_SALLOC) &&
(h2s = h2c_st_by_id(h2c, h2c->dsi)) && h2s_sc(h2s) &&
b_alloc(&h2s->rxbuf, DB_SE_RX)) {
h2c->flags &= ~H2_CF_DEM_SALLOC;
h2c_restart_reading(h2c, 1);
return 1;
(h2s_rxbuf_tail(h2s) || h2s_get_rxbuf(h2s))) {
h2c->flags &= ~(H2_CF_DEM_RXBUF | H2_CF_DEM_SFULL);
if (b_alloc(_h2s_rxbuf_tail(h2s), DB_SE_RX)) {
h2c->flags &= ~H2_CF_DEM_SALLOC;
h2c_restart_reading(h2c, 1);
return 1;
}
}
return 0;
@ -1227,6 +1243,8 @@ static int h2_init(struct connection *conn, struct proxy *prx, struct session *s
h2c->st0 = H2_CS_PREFACE;
h2c->conn = conn;
h2c->streams_limit = h2c_max_concurrent_streams(h2c);
bl_init(h2c->shared_rx_bufs, h2c->streams_limit + 1);
h2c->max_id = -1;
h2c->errcode = H2_ERR_NO_ERROR;
h2c->rcvd_c = 0;
@ -1697,7 +1715,8 @@ static inline void h2s_close(struct h2s *h2s)
if (!h2s->id)
h2s->h2c->nb_reserved--;
if (h2s->sd && h2s_sc(h2s)) {
if (!se_fl_test(h2s->sd, SE_FL_EOS) && !b_data(&h2s->rxbuf))
if (!se_fl_test(h2s->sd, SE_FL_EOS) &&
(!h2s_rxbuf_head(h2s) || !b_data(h2s_rxbuf_head(h2s))))
h2s_notify_recv(h2s);
}
HA_ATOMIC_DEC(&h2s->h2c->px_counters->open_streams);
@ -1736,14 +1755,26 @@ static inline void h2s_propagate_term_flags(struct h2c *h2c, struct h2s *h2s)
static void h2s_destroy(struct h2s *h2s)
{
struct connection *conn = h2s->h2c->conn;
int freed = 0;
TRACE_ENTER(H2_EV_H2S_END, conn, h2s);
h2s_close(h2s);
eb32_delete(&h2s->by_id);
if (b_size(&h2s->rxbuf)) {
b_free(&h2s->rxbuf);
offer_buffers(NULL, 1);
while (h2s->rx_head) {
b_free(&h2s->h2c->shared_rx_bufs[h2s->rx_head].buf);
h2s->rx_head = bl_put(h2s->h2c->shared_rx_bufs, h2s->rx_head);
freed++;
}
if (freed) {
offer_buffers(NULL, freed);
if (h2s->h2c->flags & H2_CF_DEM_RXBUF) {
/* just released resources the demux is waiting for */
h2s->h2c->flags &= ~(H2_CF_DEM_SFULL | H2_CF_DEM_RXBUF);
h2c_restart_reading(h2s->h2c, 1);
}
}
if (h2s->subs)
@ -1799,7 +1830,6 @@ static struct h2s *h2s_new(struct h2c *h2c, int id)
h2s->st = H2_SS_IDLE;
h2s->status = 0;
h2s->body_len = 0;
h2s->rxbuf = BUF_NULL;
h2s->rx_tail = 0;
h2s->rx_head = 0;
h2s->rx_count = 0;
@ -3105,10 +3135,16 @@ static struct h2s *h2c_frt_handle_headers(struct h2c *h2c, struct h2s *h2s)
if (h2s->st != H2_SS_IDLE) {
/* The stream exists/existed, this must be a trailers frame */
if (h2s->st != H2_SS_CLOSED) {
error = h2c_dec_hdrs(h2c, &h2s->rxbuf, &h2s->flags, &body_len, NULL);
if (!h2s_rxbuf_tail(h2s) && !h2s_get_rxbuf(h2s)) {
TRACE_USER("Not allowed to get an extra buffer for H2 request trailers", H2_EV_RX_FRAME|H2_EV_RX_HDR|H2_EV_STRM_NEW|H2_EV_STRM_END, h2c->conn, );
h2c->flags |= H2_CF_DEM_RXBUF;
goto out;
}
error = h2c_dec_hdrs(h2c, h2s_rxbuf_tail(h2s), &h2s->flags, &body_len, NULL);
/* unrecoverable error ? */
if (h2c->st0 >= H2_CS_ERROR) {
TRACE_USER("Unrecoverable error decoding H2 trailers", H2_EV_RX_FRAME|H2_EV_RX_HDR|H2_EV_STRM_NEW|H2_EV_STRM_END, h2c->conn, 0, &h2s->rxbuf);
TRACE_USER("Unrecoverable error decoding H2 trailers", H2_EV_RX_FRAME|H2_EV_RX_HDR|H2_EV_STRM_NEW|H2_EV_STRM_END, h2c->conn, 0, h2s_rxbuf_tail(h2s));
sess_log(h2c->conn->owner);
goto out;
}
@ -3128,7 +3164,7 @@ static struct h2s *h2c_frt_handle_headers(struct h2c *h2c, struct h2s *h2s)
*/
sess_log(h2c->conn->owner);
h2s_error(h2s, H2_ERR_INTERNAL_ERROR);
TRACE_USER("Stream error decoding H2 trailers", H2_EV_RX_FRAME|H2_EV_RX_HDR|H2_EV_STRM_NEW|H2_EV_STRM_END, h2c->conn, 0, &h2s->rxbuf);
TRACE_USER("Stream error decoding H2 trailers", H2_EV_RX_FRAME|H2_EV_RX_HDR|H2_EV_STRM_NEW|H2_EV_STRM_END, h2c->conn, 0, h2s_rxbuf_tail(h2s));
h2c->st0 = H2_CS_FRAME_E;
goto out;
}
@ -3316,10 +3352,7 @@ static struct h2s *h2c_bck_handle_headers(struct h2c *h2c, struct h2s *h2s)
goto fail; // incomplete frame
}
if (h2s->st != H2_SS_CLOSED) {
error = h2c_dec_hdrs(h2c, &h2s->rxbuf, &h2s->flags, &h2s->body_len, h2s->upgrade_protocol);
}
else {
if (h2s->st == H2_SS_CLOSED) {
/* the connection was already killed by an RST, let's consume
* the data and send another RST.
*/
@ -3329,6 +3362,14 @@ static struct h2s *h2c_bck_handle_headers(struct h2c *h2c, struct h2s *h2s)
goto send_rst;
}
if (!h2s_rxbuf_tail(h2s) && !h2s_get_rxbuf(h2s)) {
TRACE_USER("Not allowed to get an extra buffer for H2 response HEADERS", H2_EV_RX_FRAME|H2_EV_RX_HDR|H2_EV_STRM_NEW|H2_EV_STRM_END, h2c->conn, );
h2c->flags |= H2_CF_DEM_RXBUF;
goto fail;
}
error = h2c_dec_hdrs(h2c, h2s_rxbuf_tail(h2s), &h2s->flags, &h2s->body_len, h2s->upgrade_protocol);
/* unrecoverable error ? */
if (h2c->st0 >= H2_CS_ERROR) {
TRACE_USER("Unrecoverable error decoding H2 HEADERS", H2_EV_RX_FRAME|H2_EV_RX_HDR, h2c->conn, h2s);
@ -3385,7 +3426,7 @@ static struct h2s *h2c_bck_handle_headers(struct h2c *h2c, struct h2s *h2s)
h2s->flags &= ~H2_SF_BLK_MBUSY;
}
TRACE_USER("rcvd H2 response ", H2_EV_RX_FRAME|H2_EV_RX_HDR, h2c->conn, 0, &h2s->rxbuf);
TRACE_USER("rcvd H2 response ", H2_EV_RX_FRAME|H2_EV_RX_HDR, h2c->conn, 0, h2s_rxbuf_tail(h2s));
TRACE_LEAVE(H2_EV_RX_FRAME|H2_EV_RX_HDR, h2c->conn, h2s);
return h2s;
fail:
@ -4026,7 +4067,7 @@ static void h2_process_demux(struct h2c *h2c)
tmp_h2s = h2c_st_by_id(h2c, h2c->dsi);
if (tmp_h2s != h2s && h2s && h2s_sc(h2s) &&
(b_data(&h2s->rxbuf) ||
((h2s_rxbuf_head(h2s) && b_data(h2s_rxbuf_head(h2s))) ||
h2c_read0_pending(h2c) ||
h2s->st == H2_SS_CLOSED ||
(h2s->flags & H2_SF_ES_RCVD) ||
@ -4213,7 +4254,7 @@ static void h2_process_demux(struct h2c *h2c)
h2c->flags &= ~H2_CF_DEM_DFULL;
if (h2s && h2s_sc(h2s) &&
(b_data(&h2s->rxbuf) ||
((h2s_rxbuf_head(h2s) && b_data(h2s_rxbuf_head(h2s))) ||
h2c_read0_pending(h2c) ||
h2s->st == H2_SS_CLOSED ||
(h2s->flags & H2_SF_ES_RCVD) ||
@ -5777,7 +5818,7 @@ static int h2c_dec_hdrs(struct h2c *h2c, struct buffer *rxbuf, uint32_t *flags,
goto done;
}
/* Transfer the payload of a DATA frame to the HTTP/1 side. The HTTP/2 frame
/* Transfer the payload of an H2 DATA frame to the HTX side. The HTTP/2 frame
* parser state is automatically updated. Returns > 0 if it could completely
* send the current frame, 0 if it couldn't complete, in which case
* SE_FL_RCV_MORE must be checked to know if some data remain pending (an empty
@ -5792,14 +5833,28 @@ static int h2_frt_transfer_data(struct h2s *h2s)
int block;
unsigned int flen = 0;
struct htx *htx = NULL;
struct buffer *scbuf;
struct buffer *scbuf = NULL;
unsigned int sent;
int full = 0;
TRACE_ENTER(H2_EV_RX_FRAME|H2_EV_RX_DATA, h2c->conn, h2s);
h2c->flags &= ~H2_CF_DEM_SFULL;
h2c->flags &= ~(H2_CF_DEM_SFULL | H2_CF_DEM_RXBUF);
scbuf = h2_get_buf(h2c, &h2s->rxbuf);
/* Note: the size estimate is approximative due to HTX fragmentation,
* so here we are optimistic and try to get the work done without
* allocating an rxbuf if possible. If we fail we'll more aggressively
* retry.
*/
if ((!h2s_rxbuf_tail(h2s) || (full || !h2s_may_append_to_rxbuf(h2s))) && !h2s_get_rxbuf(h2s)) {
h2c->flags |= H2_CF_DEM_RXBUF;
TRACE_STATE("waiting for an h2s rxbuf slot", H2_EV_RX_FRAME|H2_EV_RX_DATA|H2_EV_H2S_BLK, h2c->conn, h2s);
goto fail;
}
next_buffer:
/* now we have a non-full rxbuf */
scbuf = h2_get_buf(h2c, h2s_rxbuf_tail(h2s));
if (!scbuf) {
h2c->flags |= H2_CF_DEM_SALLOC;
TRACE_STATE("waiting for an h2s rxbuf", H2_EV_RX_FRAME|H2_EV_RX_DATA|H2_EV_H2S_BLK, h2c->conn, h2s);
@ -5808,6 +5863,7 @@ static int h2_frt_transfer_data(struct h2s *h2s)
htx = htx_from_buf(scbuf);
try_again:
full = 0;
flen = h2c->dfl - h2c->dpl;
if (!flen)
goto end_transfer;
@ -5820,6 +5876,10 @@ static int h2_frt_transfer_data(struct h2s *h2s)
block = htx_free_data_space(htx);
if (!block) {
full = 1;
if (h2s_get_rxbuf(h2s))
goto next_buffer;
h2c->flags |= H2_CF_DEM_SFULL;
TRACE_STATE("h2s rxbuf is full", H2_EV_RX_FRAME|H2_EV_RX_DATA|H2_EV_H2S_BLK, h2c->conn, h2s);
goto fail;
@ -5846,6 +5906,11 @@ static int h2_frt_transfer_data(struct h2s *h2s)
}
if (sent < flen) {
if (!sent)
full = 1;
if (h2s_get_rxbuf(h2s))
goto next_buffer;
h2c->flags |= H2_CF_DEM_SFULL;
TRACE_STATE("h2s rxbuf is full", H2_EV_RX_FRAME|H2_EV_RX_DATA|H2_EV_H2S_BLK, h2c->conn, h2s);
goto fail;
@ -7205,29 +7270,36 @@ static size_t h2_rcv_buf(struct stconn *sc, struct buffer *buf, size_t count, in
struct h2c *h2c = h2s->h2c;
struct htx *h2s_htx = NULL;
struct htx *buf_htx = NULL;
struct buffer *rxbuf = NULL;
size_t ret = 0;
uint prev_h2c_flags = h2c->flags;
TRACE_ENTER(H2_EV_STRM_RECV, h2c->conn, h2s);
/* transfer possibly pending data to the upper layer */
h2s_htx = htx_from_buf(&h2s->rxbuf);
xfer_next_buf:
rxbuf = h2s_rxbuf_head(h2s);
if (!rxbuf)
goto end; // may be NULL if empty
h2s_htx = htx_from_buf(rxbuf);
if (htx_is_empty(h2s_htx) && !(h2s_htx->flags & HTX_FL_PARSING_ERROR)) {
/* Here htx_to_buf() will set buffer data to 0 because
* the HTX is empty.
*/
htx_to_buf(h2s_htx, &h2s->rxbuf);
htx_to_buf(h2s_htx, rxbuf);
goto end;
}
ret = h2s_htx->data;
ret += h2s_htx->data;
buf_htx = htx_from_buf(buf);
/* <buf> is empty and the message is small enough, swap the
* buffers. */
if (htx_is_empty(buf_htx) && htx_used_space(h2s_htx) <= count) {
htx_to_buf(buf_htx, buf);
htx_to_buf(h2s_htx, &h2s->rxbuf);
b_xfer(buf, &h2s->rxbuf, b_data(&h2s->rxbuf));
htx_to_buf(h2s_htx, rxbuf);
b_xfer(buf, rxbuf, b_data(rxbuf));
goto end;
}
@ -7244,19 +7316,29 @@ static size_t h2_rcv_buf(struct stconn *sc, struct buffer *buf, size_t count, in
buf_htx->extra = (h2s_htx->extra ? (h2s_htx->data + h2s_htx->extra) : 0);
htx_to_buf(buf_htx, buf);
htx_to_buf(h2s_htx, &h2s->rxbuf);
htx_to_buf(h2s_htx, rxbuf);
ret -= h2s_htx->data;
end:
/* release the rxbuf if it's not used anymore */
if (!b_data(&h2s->rxbuf) && b_size(&h2s->rxbuf)) {
b_free(&h2s->rxbuf);
if (rxbuf && !b_data(rxbuf) && b_size(rxbuf)) {
BUG_ON_HOT(rxbuf != _h2s_rxbuf_head(h2s));
b_free(_h2s_rxbuf_head(h2s));
offer_buffers(NULL, 1);
}
/* release the unused rxbuf slot */
if (rxbuf && !b_size(rxbuf)) {
h2s_put_rxbuf(h2s);
h2c->flags &= ~(H2_CF_DEM_RXBUF | H2_CF_DEM_SFULL);
goto xfer_next_buf;
}
/* tell the stream layer whether there are data left or not */
if (b_data(&h2s->rxbuf))
if (h2s_rxbuf_cnt(h2s)) {
se_fl_set(h2s->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM);
BUG_ON_HOT(!buf->data);
}
else {
if (!(h2c->flags & H2_CF_IS_BACK) && (h2s->flags & (H2_SF_BODY_TUNNEL|H2_SF_ES_RCVD))) {
/* If request ES is reported to the upper layer, it means the
@ -7274,9 +7356,11 @@ static size_t h2_rcv_buf(struct stconn *sc, struct buffer *buf, size_t count, in
h2c->flags &= ~H2_CF_DEM_SFULL;
/* wake up processing if we've unblocked something */
if ((prev_h2c_flags & ~h2c->flags) & H2_CF_DEM_SFULL)
if ((prev_h2c_flags & ~h2c->flags) & ((H2_CF_DEM_SFULL | H2_CF_DEM_RXBUF)))
h2c_restart_reading(h2c, 1);
BUG_ON_HOT(!buf->data && se_fl_test(h2s->sd, SE_FL_WANT_ROOM));
TRACE_LEAVE(H2_EV_STRM_RECV, h2c->conn, h2s);
return ret;
}
@ -7683,16 +7767,27 @@ static int h2_resume_ff(struct stconn *sc, unsigned int flags)
*/
static int h2_dump_h2s_info(struct buffer *msg, const struct h2s *h2s, const char *pfx)
{
const struct buffer *head, *tail;
int ret = 0;
if (!h2s)
return ret;
chunk_appendf(msg, " h2s.id=%d .st=%s .flg=0x%04x .rxwin=%u .rxbuf=%u@%p+%u/%u",
head = h2s_rxbuf_head(h2s);
tail = h2s_rxbuf_tail(h2s);
chunk_appendf(msg, " h2s.id=%d .st=%s .flg=0x%04x .rxwin=%u .rxbuf.c=%u .t=%u@%p+%u/%u .h=%u@%p+%u/%u",
h2s->id, h2s_st_to_str(h2s->st), h2s->flags,
(uint)(h2s->next_max_ofs - h2s->curr_rx_ofs),
(unsigned int)b_data(&h2s->rxbuf), b_orig(&h2s->rxbuf),
(unsigned int)b_head_ofs(&h2s->rxbuf), (unsigned int)b_size(&h2s->rxbuf));
h2s_rxbuf_cnt(h2s),
tail ? (uint)b_data(tail) : 0,
tail ? b_orig(tail) : NULL,
tail ? (uint)b_head_ofs(tail) : 0,
tail ? (uint)b_size(tail) : 0,
head ? (uint)b_data(head) : 0,
head ? b_orig(head) : NULL,
head ? (uint)b_head_ofs(head) : 0,
head ? (uint)b_size(head) : 0);
if (pfx)
chunk_appendf(msg, "\n%s", pfx);