From 4eb3ff1d3baf307f66afba6921e20ff703ef4cc4 Mon Sep 17 00:00:00 2001 From: Willy Tarreau Date: Wed, 2 Oct 2024 17:46:32 +0200 Subject: [PATCH] MAJOR: mux-h2: make streams use the connection's buffers For now it seems to work as before, and even when artificially inflating the number of allocatable buffers per stream. The number of allocated slots is always the same as the max number of streams, which guarantees that each stream will find one buffer. we only grant one buffer per stream at this point, since the goal was to replace the existing single rxbuf. A new demux blocking flag, H2_CF_DEM_RXBUF, was added to indicate a failure to get an rxbuf slot from the connection. It was lightly tested (by forcing bl_init() to a lower number of buffers). It is not yet certain whether it's more useful to have a new flag or to reuse the existing H2_CF_DEM_SFULL which indicates the rxbuf is full, but at least the new flag more accurately translates the condition, that may make a difference in the future. However, given that when RXBUF is set, most of the time it results in a failure to find more room to demux and it sets SFULL, for now we have to always clear SFULL when clearing RXBUF as well. This means that most of the time we'll see 3 combinations: - none: everything's OK - SFULL: the unique rx buffer is full - RXBUF || (RXBUF|SFULL): cannot allocate more entries Note that we need to be super careful in h2_frt_transfer_data() because the htx_free_data_space() function doesn't guarantee that the room is usable, so htx_add_data() may still fail despite an apparent room. For this reason, h2_frt_transfer_data() maintains a "full" flag to indicate that a transfer attempt failed and that a new buffer is required. --- include/haproxy/mux_h2-t.h | 4 +- src/mux_h2.c | 181 ++++++++++++++++++++++++++++--------- 2 files changed, 140 insertions(+), 45 deletions(-) diff --git a/include/haproxy/mux_h2-t.h b/include/haproxy/mux_h2-t.h index f6a034bc1..1ed3fe673 100644 --- a/include/haproxy/mux_h2-t.h +++ b/include/haproxy/mux_h2-t.h @@ -41,12 +41,12 @@ */ #define H2_CF_DEM_DALLOC 0x00000004 // demux blocked on lack of connection's demux buffer #define H2_CF_DEM_DFULL 0x00000008 // demux blocked on connection's demux buffer full -/* unused: 0x10 */ +#define H2_CF_DEM_RXBUF 0x00000010 // demux blocked on missing rxbuf slots #define H2_CF_DEM_MROOM 0x00000020 // demux blocked on lack of room in mux buffer #define H2_CF_DEM_SALLOC 0x00000040 // demux blocked on lack of stream's request buffer #define H2_CF_DEM_SFULL 0x00000080 // demux blocked on stream request buffer full #define H2_CF_DEM_TOOMANY 0x00000100 // demux blocked waiting for some stream connectors to leave -#define H2_CF_DEM_BLOCK_ANY 0x000001E0 // aggregate of the demux flags above except DALLOC/DFULL +#define H2_CF_DEM_BLOCK_ANY 0x000001F0 // aggregate of the demux flags above except DALLOC/DFULL // (SHORT_READ is also excluded) #define H2_CF_DEM_SHORT_READ 0x00000200 // demux blocked on incomplete frame diff --git a/src/mux_h2.c b/src/mux_h2.c index 972750b43..ef62b1b95 100644 --- a/src/mux_h2.c +++ b/src/mux_h2.c @@ -120,7 +120,6 @@ struct h2s { uint rx_head, rx_tail; /* head and tail of rx buffer in the conn's shared rx buf */ uint rx_count; /* total number of allocated rxbufs */ /* 4 bytes hole here */ - struct buffer rxbuf; /* receive buffer, always valid (buf_empty or real buffer) */ struct wait_event *subs; /* recv wait_event the stream connector associated is waiting on (via h2_subscribe) */ struct list list; /* To be used when adding in h2c->send_list or h2c->fctl_lsit */ struct tasklet *shut_tl; /* deferred shutdown tasklet, to retry to send an RST after we failed to, @@ -695,15 +694,31 @@ static inline int h2c_max_concurrent_streams(const struct h2c *h2c) return ret; } +/* Returns a pointer to the oldest rxbuf of the stream, which must exist. + * Note that this doesn't indicate that the buffer is allocated nor contains + * any data. + */ +static inline struct buffer *_h2s_rxbuf_head(const struct h2s *h2s) +{ + return &h2s->h2c->shared_rx_bufs[h2s->rx_head].buf; +} + +/* Returns a pointer to the newest rxbuf of the stream, which must exist. + * Note that this doesn't indicate that the buffer is allocated nor contains + * any data. + */ +static inline struct buffer *_h2s_rxbuf_tail(const struct h2s *h2s) +{ + return &h2s->h2c->shared_rx_bufs[h2s->rx_tail].buf; +} + /* Returns a pointer to the oldest rxbuf of the stream, or NULL if there is * none. Note that this doesn't indicate that the buffer is allocated nor * contains any data. */ static inline struct buffer *h2s_rxbuf_head(const struct h2s *h2s) { - if (!h2s->rx_head) - return NULL; - return &h2s->h2c->shared_rx_bufs[h2s->rx_head].buf; + return h2s->rx_head ? _h2s_rxbuf_head(h2s) : NULL; } /* Returns a pointer to the newest rxbuf of the stream, or NULL if there is @@ -712,9 +727,7 @@ static inline struct buffer *h2s_rxbuf_head(const struct h2s *h2s) */ static inline struct buffer *h2s_rxbuf_tail(const struct h2s *h2s) { - if (!h2s->rx_tail) - return NULL; - return &h2s->h2c->shared_rx_bufs[h2s->rx_tail].buf; + return h2s->rx_tail ? _h2s_rxbuf_tail(h2s) : NULL; } /* Returns the number of allocated rxbuf slots for the stream */ @@ -965,10 +978,13 @@ static int h2_buf_available(void *target) if ((h2c->flags & H2_CF_DEM_SALLOC) && (h2s = h2c_st_by_id(h2c, h2c->dsi)) && h2s_sc(h2s) && - b_alloc(&h2s->rxbuf, DB_SE_RX)) { - h2c->flags &= ~H2_CF_DEM_SALLOC; - h2c_restart_reading(h2c, 1); - return 1; + (h2s_rxbuf_tail(h2s) || h2s_get_rxbuf(h2s))) { + h2c->flags &= ~(H2_CF_DEM_RXBUF | H2_CF_DEM_SFULL); + if (b_alloc(_h2s_rxbuf_tail(h2s), DB_SE_RX)) { + h2c->flags &= ~H2_CF_DEM_SALLOC; + h2c_restart_reading(h2c, 1); + return 1; + } } return 0; @@ -1227,6 +1243,8 @@ static int h2_init(struct connection *conn, struct proxy *prx, struct session *s h2c->st0 = H2_CS_PREFACE; h2c->conn = conn; h2c->streams_limit = h2c_max_concurrent_streams(h2c); + bl_init(h2c->shared_rx_bufs, h2c->streams_limit + 1); + h2c->max_id = -1; h2c->errcode = H2_ERR_NO_ERROR; h2c->rcvd_c = 0; @@ -1697,7 +1715,8 @@ static inline void h2s_close(struct h2s *h2s) if (!h2s->id) h2s->h2c->nb_reserved--; if (h2s->sd && h2s_sc(h2s)) { - if (!se_fl_test(h2s->sd, SE_FL_EOS) && !b_data(&h2s->rxbuf)) + if (!se_fl_test(h2s->sd, SE_FL_EOS) && + (!h2s_rxbuf_head(h2s) || !b_data(h2s_rxbuf_head(h2s)))) h2s_notify_recv(h2s); } HA_ATOMIC_DEC(&h2s->h2c->px_counters->open_streams); @@ -1736,14 +1755,26 @@ static inline void h2s_propagate_term_flags(struct h2c *h2c, struct h2s *h2s) static void h2s_destroy(struct h2s *h2s) { struct connection *conn = h2s->h2c->conn; + int freed = 0; TRACE_ENTER(H2_EV_H2S_END, conn, h2s); h2s_close(h2s); eb32_delete(&h2s->by_id); - if (b_size(&h2s->rxbuf)) { - b_free(&h2s->rxbuf); - offer_buffers(NULL, 1); + + while (h2s->rx_head) { + b_free(&h2s->h2c->shared_rx_bufs[h2s->rx_head].buf); + h2s->rx_head = bl_put(h2s->h2c->shared_rx_bufs, h2s->rx_head); + freed++; + } + + if (freed) { + offer_buffers(NULL, freed); + if (h2s->h2c->flags & H2_CF_DEM_RXBUF) { + /* just released resources the demux is waiting for */ + h2s->h2c->flags &= ~(H2_CF_DEM_SFULL | H2_CF_DEM_RXBUF); + h2c_restart_reading(h2s->h2c, 1); + } } if (h2s->subs) @@ -1799,7 +1830,6 @@ static struct h2s *h2s_new(struct h2c *h2c, int id) h2s->st = H2_SS_IDLE; h2s->status = 0; h2s->body_len = 0; - h2s->rxbuf = BUF_NULL; h2s->rx_tail = 0; h2s->rx_head = 0; h2s->rx_count = 0; @@ -3105,10 +3135,16 @@ static struct h2s *h2c_frt_handle_headers(struct h2c *h2c, struct h2s *h2s) if (h2s->st != H2_SS_IDLE) { /* The stream exists/existed, this must be a trailers frame */ if (h2s->st != H2_SS_CLOSED) { - error = h2c_dec_hdrs(h2c, &h2s->rxbuf, &h2s->flags, &body_len, NULL); + if (!h2s_rxbuf_tail(h2s) && !h2s_get_rxbuf(h2s)) { + TRACE_USER("Not allowed to get an extra buffer for H2 request trailers", H2_EV_RX_FRAME|H2_EV_RX_HDR|H2_EV_STRM_NEW|H2_EV_STRM_END, h2c->conn, ); + h2c->flags |= H2_CF_DEM_RXBUF; + goto out; + } + + error = h2c_dec_hdrs(h2c, h2s_rxbuf_tail(h2s), &h2s->flags, &body_len, NULL); /* unrecoverable error ? */ if (h2c->st0 >= H2_CS_ERROR) { - TRACE_USER("Unrecoverable error decoding H2 trailers", H2_EV_RX_FRAME|H2_EV_RX_HDR|H2_EV_STRM_NEW|H2_EV_STRM_END, h2c->conn, 0, &h2s->rxbuf); + TRACE_USER("Unrecoverable error decoding H2 trailers", H2_EV_RX_FRAME|H2_EV_RX_HDR|H2_EV_STRM_NEW|H2_EV_STRM_END, h2c->conn, 0, h2s_rxbuf_tail(h2s)); sess_log(h2c->conn->owner); goto out; } @@ -3128,7 +3164,7 @@ static struct h2s *h2c_frt_handle_headers(struct h2c *h2c, struct h2s *h2s) */ sess_log(h2c->conn->owner); h2s_error(h2s, H2_ERR_INTERNAL_ERROR); - TRACE_USER("Stream error decoding H2 trailers", H2_EV_RX_FRAME|H2_EV_RX_HDR|H2_EV_STRM_NEW|H2_EV_STRM_END, h2c->conn, 0, &h2s->rxbuf); + TRACE_USER("Stream error decoding H2 trailers", H2_EV_RX_FRAME|H2_EV_RX_HDR|H2_EV_STRM_NEW|H2_EV_STRM_END, h2c->conn, 0, h2s_rxbuf_tail(h2s)); h2c->st0 = H2_CS_FRAME_E; goto out; } @@ -3316,10 +3352,7 @@ static struct h2s *h2c_bck_handle_headers(struct h2c *h2c, struct h2s *h2s) goto fail; // incomplete frame } - if (h2s->st != H2_SS_CLOSED) { - error = h2c_dec_hdrs(h2c, &h2s->rxbuf, &h2s->flags, &h2s->body_len, h2s->upgrade_protocol); - } - else { + if (h2s->st == H2_SS_CLOSED) { /* the connection was already killed by an RST, let's consume * the data and send another RST. */ @@ -3329,6 +3362,14 @@ static struct h2s *h2c_bck_handle_headers(struct h2c *h2c, struct h2s *h2s) goto send_rst; } + if (!h2s_rxbuf_tail(h2s) && !h2s_get_rxbuf(h2s)) { + TRACE_USER("Not allowed to get an extra buffer for H2 response HEADERS", H2_EV_RX_FRAME|H2_EV_RX_HDR|H2_EV_STRM_NEW|H2_EV_STRM_END, h2c->conn, ); + h2c->flags |= H2_CF_DEM_RXBUF; + goto fail; + } + + error = h2c_dec_hdrs(h2c, h2s_rxbuf_tail(h2s), &h2s->flags, &h2s->body_len, h2s->upgrade_protocol); + /* unrecoverable error ? */ if (h2c->st0 >= H2_CS_ERROR) { TRACE_USER("Unrecoverable error decoding H2 HEADERS", H2_EV_RX_FRAME|H2_EV_RX_HDR, h2c->conn, h2s); @@ -3385,7 +3426,7 @@ static struct h2s *h2c_bck_handle_headers(struct h2c *h2c, struct h2s *h2s) h2s->flags &= ~H2_SF_BLK_MBUSY; } - TRACE_USER("rcvd H2 response ", H2_EV_RX_FRAME|H2_EV_RX_HDR, h2c->conn, 0, &h2s->rxbuf); + TRACE_USER("rcvd H2 response ", H2_EV_RX_FRAME|H2_EV_RX_HDR, h2c->conn, 0, h2s_rxbuf_tail(h2s)); TRACE_LEAVE(H2_EV_RX_FRAME|H2_EV_RX_HDR, h2c->conn, h2s); return h2s; fail: @@ -4026,7 +4067,7 @@ static void h2_process_demux(struct h2c *h2c) tmp_h2s = h2c_st_by_id(h2c, h2c->dsi); if (tmp_h2s != h2s && h2s && h2s_sc(h2s) && - (b_data(&h2s->rxbuf) || + ((h2s_rxbuf_head(h2s) && b_data(h2s_rxbuf_head(h2s))) || h2c_read0_pending(h2c) || h2s->st == H2_SS_CLOSED || (h2s->flags & H2_SF_ES_RCVD) || @@ -4213,7 +4254,7 @@ static void h2_process_demux(struct h2c *h2c) h2c->flags &= ~H2_CF_DEM_DFULL; if (h2s && h2s_sc(h2s) && - (b_data(&h2s->rxbuf) || + ((h2s_rxbuf_head(h2s) && b_data(h2s_rxbuf_head(h2s))) || h2c_read0_pending(h2c) || h2s->st == H2_SS_CLOSED || (h2s->flags & H2_SF_ES_RCVD) || @@ -5777,7 +5818,7 @@ static int h2c_dec_hdrs(struct h2c *h2c, struct buffer *rxbuf, uint32_t *flags, goto done; } -/* Transfer the payload of a DATA frame to the HTTP/1 side. The HTTP/2 frame +/* Transfer the payload of an H2 DATA frame to the HTX side. The HTTP/2 frame * parser state is automatically updated. Returns > 0 if it could completely * send the current frame, 0 if it couldn't complete, in which case * SE_FL_RCV_MORE must be checked to know if some data remain pending (an empty @@ -5792,14 +5833,28 @@ static int h2_frt_transfer_data(struct h2s *h2s) int block; unsigned int flen = 0; struct htx *htx = NULL; - struct buffer *scbuf; + struct buffer *scbuf = NULL; unsigned int sent; + int full = 0; TRACE_ENTER(H2_EV_RX_FRAME|H2_EV_RX_DATA, h2c->conn, h2s); - h2c->flags &= ~H2_CF_DEM_SFULL; + h2c->flags &= ~(H2_CF_DEM_SFULL | H2_CF_DEM_RXBUF); - scbuf = h2_get_buf(h2c, &h2s->rxbuf); + /* Note: the size estimate is approximative due to HTX fragmentation, + * so here we are optimistic and try to get the work done without + * allocating an rxbuf if possible. If we fail we'll more aggressively + * retry. + */ + if ((!h2s_rxbuf_tail(h2s) || (full || !h2s_may_append_to_rxbuf(h2s))) && !h2s_get_rxbuf(h2s)) { + h2c->flags |= H2_CF_DEM_RXBUF; + TRACE_STATE("waiting for an h2s rxbuf slot", H2_EV_RX_FRAME|H2_EV_RX_DATA|H2_EV_H2S_BLK, h2c->conn, h2s); + goto fail; + } + + next_buffer: + /* now we have a non-full rxbuf */ + scbuf = h2_get_buf(h2c, h2s_rxbuf_tail(h2s)); if (!scbuf) { h2c->flags |= H2_CF_DEM_SALLOC; TRACE_STATE("waiting for an h2s rxbuf", H2_EV_RX_FRAME|H2_EV_RX_DATA|H2_EV_H2S_BLK, h2c->conn, h2s); @@ -5808,6 +5863,7 @@ static int h2_frt_transfer_data(struct h2s *h2s) htx = htx_from_buf(scbuf); try_again: + full = 0; flen = h2c->dfl - h2c->dpl; if (!flen) goto end_transfer; @@ -5820,6 +5876,10 @@ static int h2_frt_transfer_data(struct h2s *h2s) block = htx_free_data_space(htx); if (!block) { + full = 1; + if (h2s_get_rxbuf(h2s)) + goto next_buffer; + h2c->flags |= H2_CF_DEM_SFULL; TRACE_STATE("h2s rxbuf is full", H2_EV_RX_FRAME|H2_EV_RX_DATA|H2_EV_H2S_BLK, h2c->conn, h2s); goto fail; @@ -5846,6 +5906,11 @@ static int h2_frt_transfer_data(struct h2s *h2s) } if (sent < flen) { + if (!sent) + full = 1; + if (h2s_get_rxbuf(h2s)) + goto next_buffer; + h2c->flags |= H2_CF_DEM_SFULL; TRACE_STATE("h2s rxbuf is full", H2_EV_RX_FRAME|H2_EV_RX_DATA|H2_EV_H2S_BLK, h2c->conn, h2s); goto fail; @@ -7205,29 +7270,36 @@ static size_t h2_rcv_buf(struct stconn *sc, struct buffer *buf, size_t count, in struct h2c *h2c = h2s->h2c; struct htx *h2s_htx = NULL; struct htx *buf_htx = NULL; + struct buffer *rxbuf = NULL; size_t ret = 0; uint prev_h2c_flags = h2c->flags; TRACE_ENTER(H2_EV_STRM_RECV, h2c->conn, h2s); /* transfer possibly pending data to the upper layer */ - h2s_htx = htx_from_buf(&h2s->rxbuf); + + xfer_next_buf: + rxbuf = h2s_rxbuf_head(h2s); + if (!rxbuf) + goto end; // may be NULL if empty + + h2s_htx = htx_from_buf(rxbuf); if (htx_is_empty(h2s_htx) && !(h2s_htx->flags & HTX_FL_PARSING_ERROR)) { /* Here htx_to_buf() will set buffer data to 0 because * the HTX is empty. */ - htx_to_buf(h2s_htx, &h2s->rxbuf); + htx_to_buf(h2s_htx, rxbuf); goto end; } - ret = h2s_htx->data; + ret += h2s_htx->data; buf_htx = htx_from_buf(buf); /* is empty and the message is small enough, swap the * buffers. */ if (htx_is_empty(buf_htx) && htx_used_space(h2s_htx) <= count) { htx_to_buf(buf_htx, buf); - htx_to_buf(h2s_htx, &h2s->rxbuf); - b_xfer(buf, &h2s->rxbuf, b_data(&h2s->rxbuf)); + htx_to_buf(h2s_htx, rxbuf); + b_xfer(buf, rxbuf, b_data(rxbuf)); goto end; } @@ -7244,19 +7316,29 @@ static size_t h2_rcv_buf(struct stconn *sc, struct buffer *buf, size_t count, in buf_htx->extra = (h2s_htx->extra ? (h2s_htx->data + h2s_htx->extra) : 0); htx_to_buf(buf_htx, buf); - htx_to_buf(h2s_htx, &h2s->rxbuf); + htx_to_buf(h2s_htx, rxbuf); ret -= h2s_htx->data; end: /* release the rxbuf if it's not used anymore */ - if (!b_data(&h2s->rxbuf) && b_size(&h2s->rxbuf)) { - b_free(&h2s->rxbuf); + if (rxbuf && !b_data(rxbuf) && b_size(rxbuf)) { + BUG_ON_HOT(rxbuf != _h2s_rxbuf_head(h2s)); + b_free(_h2s_rxbuf_head(h2s)); offer_buffers(NULL, 1); } + /* release the unused rxbuf slot */ + if (rxbuf && !b_size(rxbuf)) { + h2s_put_rxbuf(h2s); + h2c->flags &= ~(H2_CF_DEM_RXBUF | H2_CF_DEM_SFULL); + goto xfer_next_buf; + } + /* tell the stream layer whether there are data left or not */ - if (b_data(&h2s->rxbuf)) + if (h2s_rxbuf_cnt(h2s)) { se_fl_set(h2s->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM); + BUG_ON_HOT(!buf->data); + } else { if (!(h2c->flags & H2_CF_IS_BACK) && (h2s->flags & (H2_SF_BODY_TUNNEL|H2_SF_ES_RCVD))) { /* If request ES is reported to the upper layer, it means the @@ -7274,9 +7356,11 @@ static size_t h2_rcv_buf(struct stconn *sc, struct buffer *buf, size_t count, in h2c->flags &= ~H2_CF_DEM_SFULL; /* wake up processing if we've unblocked something */ - if ((prev_h2c_flags & ~h2c->flags) & H2_CF_DEM_SFULL) + if ((prev_h2c_flags & ~h2c->flags) & ((H2_CF_DEM_SFULL | H2_CF_DEM_RXBUF))) h2c_restart_reading(h2c, 1); + BUG_ON_HOT(!buf->data && se_fl_test(h2s->sd, SE_FL_WANT_ROOM)); + TRACE_LEAVE(H2_EV_STRM_RECV, h2c->conn, h2s); return ret; } @@ -7683,16 +7767,27 @@ static int h2_resume_ff(struct stconn *sc, unsigned int flags) */ static int h2_dump_h2s_info(struct buffer *msg, const struct h2s *h2s, const char *pfx) { + const struct buffer *head, *tail; int ret = 0; if (!h2s) return ret; - chunk_appendf(msg, " h2s.id=%d .st=%s .flg=0x%04x .rxwin=%u .rxbuf=%u@%p+%u/%u", + head = h2s_rxbuf_head(h2s); + tail = h2s_rxbuf_tail(h2s); + + chunk_appendf(msg, " h2s.id=%d .st=%s .flg=0x%04x .rxwin=%u .rxbuf.c=%u .t=%u@%p+%u/%u .h=%u@%p+%u/%u", h2s->id, h2s_st_to_str(h2s->st), h2s->flags, (uint)(h2s->next_max_ofs - h2s->curr_rx_ofs), - (unsigned int)b_data(&h2s->rxbuf), b_orig(&h2s->rxbuf), - (unsigned int)b_head_ofs(&h2s->rxbuf), (unsigned int)b_size(&h2s->rxbuf)); + h2s_rxbuf_cnt(h2s), + tail ? (uint)b_data(tail) : 0, + tail ? b_orig(tail) : NULL, + tail ? (uint)b_head_ofs(tail) : 0, + tail ? (uint)b_size(tail) : 0, + head ? (uint)b_data(head) : 0, + head ? b_orig(head) : NULL, + head ? (uint)b_head_ofs(head) : 0, + head ? (uint)b_size(head) : 0); if (pfx) chunk_appendf(msg, "\n%s", pfx);