diff --git a/include/haproxy/mux_h2-t.h b/include/haproxy/mux_h2-t.h index f6a034bc1..1ed3fe673 100644 --- a/include/haproxy/mux_h2-t.h +++ b/include/haproxy/mux_h2-t.h @@ -41,12 +41,12 @@ */ #define H2_CF_DEM_DALLOC 0x00000004 // demux blocked on lack of connection's demux buffer #define H2_CF_DEM_DFULL 0x00000008 // demux blocked on connection's demux buffer full -/* unused: 0x10 */ +#define H2_CF_DEM_RXBUF 0x00000010 // demux blocked on missing rxbuf slots #define H2_CF_DEM_MROOM 0x00000020 // demux blocked on lack of room in mux buffer #define H2_CF_DEM_SALLOC 0x00000040 // demux blocked on lack of stream's request buffer #define H2_CF_DEM_SFULL 0x00000080 // demux blocked on stream request buffer full #define H2_CF_DEM_TOOMANY 0x00000100 // demux blocked waiting for some stream connectors to leave -#define H2_CF_DEM_BLOCK_ANY 0x000001E0 // aggregate of the demux flags above except DALLOC/DFULL +#define H2_CF_DEM_BLOCK_ANY 0x000001F0 // aggregate of the demux flags above except DALLOC/DFULL // (SHORT_READ is also excluded) #define H2_CF_DEM_SHORT_READ 0x00000200 // demux blocked on incomplete frame diff --git a/src/mux_h2.c b/src/mux_h2.c index 972750b43..ef62b1b95 100644 --- a/src/mux_h2.c +++ b/src/mux_h2.c @@ -120,7 +120,6 @@ struct h2s { uint rx_head, rx_tail; /* head and tail of rx buffer in the conn's shared rx buf */ uint rx_count; /* total number of allocated rxbufs */ /* 4 bytes hole here */ - struct buffer rxbuf; /* receive buffer, always valid (buf_empty or real buffer) */ struct wait_event *subs; /* recv wait_event the stream connector associated is waiting on (via h2_subscribe) */ struct list list; /* To be used when adding in h2c->send_list or h2c->fctl_lsit */ struct tasklet *shut_tl; /* deferred shutdown tasklet, to retry to send an RST after we failed to, @@ -695,15 +694,31 @@ static inline int h2c_max_concurrent_streams(const struct h2c *h2c) return ret; } +/* Returns a pointer to the oldest rxbuf of the stream, which must exist. + * Note that this doesn't indicate that the buffer is allocated nor contains + * any data. + */ +static inline struct buffer *_h2s_rxbuf_head(const struct h2s *h2s) +{ + return &h2s->h2c->shared_rx_bufs[h2s->rx_head].buf; +} + +/* Returns a pointer to the newest rxbuf of the stream, which must exist. + * Note that this doesn't indicate that the buffer is allocated nor contains + * any data. + */ +static inline struct buffer *_h2s_rxbuf_tail(const struct h2s *h2s) +{ + return &h2s->h2c->shared_rx_bufs[h2s->rx_tail].buf; +} + /* Returns a pointer to the oldest rxbuf of the stream, or NULL if there is * none. Note that this doesn't indicate that the buffer is allocated nor * contains any data. */ static inline struct buffer *h2s_rxbuf_head(const struct h2s *h2s) { - if (!h2s->rx_head) - return NULL; - return &h2s->h2c->shared_rx_bufs[h2s->rx_head].buf; + return h2s->rx_head ? _h2s_rxbuf_head(h2s) : NULL; } /* Returns a pointer to the newest rxbuf of the stream, or NULL if there is @@ -712,9 +727,7 @@ static inline struct buffer *h2s_rxbuf_head(const struct h2s *h2s) */ static inline struct buffer *h2s_rxbuf_tail(const struct h2s *h2s) { - if (!h2s->rx_tail) - return NULL; - return &h2s->h2c->shared_rx_bufs[h2s->rx_tail].buf; + return h2s->rx_tail ? _h2s_rxbuf_tail(h2s) : NULL; } /* Returns the number of allocated rxbuf slots for the stream */ @@ -965,10 +978,13 @@ static int h2_buf_available(void *target) if ((h2c->flags & H2_CF_DEM_SALLOC) && (h2s = h2c_st_by_id(h2c, h2c->dsi)) && h2s_sc(h2s) && - b_alloc(&h2s->rxbuf, DB_SE_RX)) { - h2c->flags &= ~H2_CF_DEM_SALLOC; - h2c_restart_reading(h2c, 1); - return 1; + (h2s_rxbuf_tail(h2s) || h2s_get_rxbuf(h2s))) { + h2c->flags &= ~(H2_CF_DEM_RXBUF | H2_CF_DEM_SFULL); + if (b_alloc(_h2s_rxbuf_tail(h2s), DB_SE_RX)) { + h2c->flags &= ~H2_CF_DEM_SALLOC; + h2c_restart_reading(h2c, 1); + return 1; + } } return 0; @@ -1227,6 +1243,8 @@ static int h2_init(struct connection *conn, struct proxy *prx, struct session *s h2c->st0 = H2_CS_PREFACE; h2c->conn = conn; h2c->streams_limit = h2c_max_concurrent_streams(h2c); + bl_init(h2c->shared_rx_bufs, h2c->streams_limit + 1); + h2c->max_id = -1; h2c->errcode = H2_ERR_NO_ERROR; h2c->rcvd_c = 0; @@ -1697,7 +1715,8 @@ static inline void h2s_close(struct h2s *h2s) if (!h2s->id) h2s->h2c->nb_reserved--; if (h2s->sd && h2s_sc(h2s)) { - if (!se_fl_test(h2s->sd, SE_FL_EOS) && !b_data(&h2s->rxbuf)) + if (!se_fl_test(h2s->sd, SE_FL_EOS) && + (!h2s_rxbuf_head(h2s) || !b_data(h2s_rxbuf_head(h2s)))) h2s_notify_recv(h2s); } HA_ATOMIC_DEC(&h2s->h2c->px_counters->open_streams); @@ -1736,14 +1755,26 @@ static inline void h2s_propagate_term_flags(struct h2c *h2c, struct h2s *h2s) static void h2s_destroy(struct h2s *h2s) { struct connection *conn = h2s->h2c->conn; + int freed = 0; TRACE_ENTER(H2_EV_H2S_END, conn, h2s); h2s_close(h2s); eb32_delete(&h2s->by_id); - if (b_size(&h2s->rxbuf)) { - b_free(&h2s->rxbuf); - offer_buffers(NULL, 1); + + while (h2s->rx_head) { + b_free(&h2s->h2c->shared_rx_bufs[h2s->rx_head].buf); + h2s->rx_head = bl_put(h2s->h2c->shared_rx_bufs, h2s->rx_head); + freed++; + } + + if (freed) { + offer_buffers(NULL, freed); + if (h2s->h2c->flags & H2_CF_DEM_RXBUF) { + /* just released resources the demux is waiting for */ + h2s->h2c->flags &= ~(H2_CF_DEM_SFULL | H2_CF_DEM_RXBUF); + h2c_restart_reading(h2s->h2c, 1); + } } if (h2s->subs) @@ -1799,7 +1830,6 @@ static struct h2s *h2s_new(struct h2c *h2c, int id) h2s->st = H2_SS_IDLE; h2s->status = 0; h2s->body_len = 0; - h2s->rxbuf = BUF_NULL; h2s->rx_tail = 0; h2s->rx_head = 0; h2s->rx_count = 0; @@ -3105,10 +3135,16 @@ static struct h2s *h2c_frt_handle_headers(struct h2c *h2c, struct h2s *h2s) if (h2s->st != H2_SS_IDLE) { /* The stream exists/existed, this must be a trailers frame */ if (h2s->st != H2_SS_CLOSED) { - error = h2c_dec_hdrs(h2c, &h2s->rxbuf, &h2s->flags, &body_len, NULL); + if (!h2s_rxbuf_tail(h2s) && !h2s_get_rxbuf(h2s)) { + TRACE_USER("Not allowed to get an extra buffer for H2 request trailers", H2_EV_RX_FRAME|H2_EV_RX_HDR|H2_EV_STRM_NEW|H2_EV_STRM_END, h2c->conn, ); + h2c->flags |= H2_CF_DEM_RXBUF; + goto out; + } + + error = h2c_dec_hdrs(h2c, h2s_rxbuf_tail(h2s), &h2s->flags, &body_len, NULL); /* unrecoverable error ? */ if (h2c->st0 >= H2_CS_ERROR) { - TRACE_USER("Unrecoverable error decoding H2 trailers", H2_EV_RX_FRAME|H2_EV_RX_HDR|H2_EV_STRM_NEW|H2_EV_STRM_END, h2c->conn, 0, &h2s->rxbuf); + TRACE_USER("Unrecoverable error decoding H2 trailers", H2_EV_RX_FRAME|H2_EV_RX_HDR|H2_EV_STRM_NEW|H2_EV_STRM_END, h2c->conn, 0, h2s_rxbuf_tail(h2s)); sess_log(h2c->conn->owner); goto out; } @@ -3128,7 +3164,7 @@ static struct h2s *h2c_frt_handle_headers(struct h2c *h2c, struct h2s *h2s) */ sess_log(h2c->conn->owner); h2s_error(h2s, H2_ERR_INTERNAL_ERROR); - TRACE_USER("Stream error decoding H2 trailers", H2_EV_RX_FRAME|H2_EV_RX_HDR|H2_EV_STRM_NEW|H2_EV_STRM_END, h2c->conn, 0, &h2s->rxbuf); + TRACE_USER("Stream error decoding H2 trailers", H2_EV_RX_FRAME|H2_EV_RX_HDR|H2_EV_STRM_NEW|H2_EV_STRM_END, h2c->conn, 0, h2s_rxbuf_tail(h2s)); h2c->st0 = H2_CS_FRAME_E; goto out; } @@ -3316,10 +3352,7 @@ static struct h2s *h2c_bck_handle_headers(struct h2c *h2c, struct h2s *h2s) goto fail; // incomplete frame } - if (h2s->st != H2_SS_CLOSED) { - error = h2c_dec_hdrs(h2c, &h2s->rxbuf, &h2s->flags, &h2s->body_len, h2s->upgrade_protocol); - } - else { + if (h2s->st == H2_SS_CLOSED) { /* the connection was already killed by an RST, let's consume * the data and send another RST. */ @@ -3329,6 +3362,14 @@ static struct h2s *h2c_bck_handle_headers(struct h2c *h2c, struct h2s *h2s) goto send_rst; } + if (!h2s_rxbuf_tail(h2s) && !h2s_get_rxbuf(h2s)) { + TRACE_USER("Not allowed to get an extra buffer for H2 response HEADERS", H2_EV_RX_FRAME|H2_EV_RX_HDR|H2_EV_STRM_NEW|H2_EV_STRM_END, h2c->conn, ); + h2c->flags |= H2_CF_DEM_RXBUF; + goto fail; + } + + error = h2c_dec_hdrs(h2c, h2s_rxbuf_tail(h2s), &h2s->flags, &h2s->body_len, h2s->upgrade_protocol); + /* unrecoverable error ? */ if (h2c->st0 >= H2_CS_ERROR) { TRACE_USER("Unrecoverable error decoding H2 HEADERS", H2_EV_RX_FRAME|H2_EV_RX_HDR, h2c->conn, h2s); @@ -3385,7 +3426,7 @@ static struct h2s *h2c_bck_handle_headers(struct h2c *h2c, struct h2s *h2s) h2s->flags &= ~H2_SF_BLK_MBUSY; } - TRACE_USER("rcvd H2 response ", H2_EV_RX_FRAME|H2_EV_RX_HDR, h2c->conn, 0, &h2s->rxbuf); + TRACE_USER("rcvd H2 response ", H2_EV_RX_FRAME|H2_EV_RX_HDR, h2c->conn, 0, h2s_rxbuf_tail(h2s)); TRACE_LEAVE(H2_EV_RX_FRAME|H2_EV_RX_HDR, h2c->conn, h2s); return h2s; fail: @@ -4026,7 +4067,7 @@ static void h2_process_demux(struct h2c *h2c) tmp_h2s = h2c_st_by_id(h2c, h2c->dsi); if (tmp_h2s != h2s && h2s && h2s_sc(h2s) && - (b_data(&h2s->rxbuf) || + ((h2s_rxbuf_head(h2s) && b_data(h2s_rxbuf_head(h2s))) || h2c_read0_pending(h2c) || h2s->st == H2_SS_CLOSED || (h2s->flags & H2_SF_ES_RCVD) || @@ -4213,7 +4254,7 @@ static void h2_process_demux(struct h2c *h2c) h2c->flags &= ~H2_CF_DEM_DFULL; if (h2s && h2s_sc(h2s) && - (b_data(&h2s->rxbuf) || + ((h2s_rxbuf_head(h2s) && b_data(h2s_rxbuf_head(h2s))) || h2c_read0_pending(h2c) || h2s->st == H2_SS_CLOSED || (h2s->flags & H2_SF_ES_RCVD) || @@ -5777,7 +5818,7 @@ static int h2c_dec_hdrs(struct h2c *h2c, struct buffer *rxbuf, uint32_t *flags, goto done; } -/* Transfer the payload of a DATA frame to the HTTP/1 side. The HTTP/2 frame +/* Transfer the payload of an H2 DATA frame to the HTX side. The HTTP/2 frame * parser state is automatically updated. Returns > 0 if it could completely * send the current frame, 0 if it couldn't complete, in which case * SE_FL_RCV_MORE must be checked to know if some data remain pending (an empty @@ -5792,14 +5833,28 @@ static int h2_frt_transfer_data(struct h2s *h2s) int block; unsigned int flen = 0; struct htx *htx = NULL; - struct buffer *scbuf; + struct buffer *scbuf = NULL; unsigned int sent; + int full = 0; TRACE_ENTER(H2_EV_RX_FRAME|H2_EV_RX_DATA, h2c->conn, h2s); - h2c->flags &= ~H2_CF_DEM_SFULL; + h2c->flags &= ~(H2_CF_DEM_SFULL | H2_CF_DEM_RXBUF); - scbuf = h2_get_buf(h2c, &h2s->rxbuf); + /* Note: the size estimate is approximative due to HTX fragmentation, + * so here we are optimistic and try to get the work done without + * allocating an rxbuf if possible. If we fail we'll more aggressively + * retry. + */ + if ((!h2s_rxbuf_tail(h2s) || (full || !h2s_may_append_to_rxbuf(h2s))) && !h2s_get_rxbuf(h2s)) { + h2c->flags |= H2_CF_DEM_RXBUF; + TRACE_STATE("waiting for an h2s rxbuf slot", H2_EV_RX_FRAME|H2_EV_RX_DATA|H2_EV_H2S_BLK, h2c->conn, h2s); + goto fail; + } + + next_buffer: + /* now we have a non-full rxbuf */ + scbuf = h2_get_buf(h2c, h2s_rxbuf_tail(h2s)); if (!scbuf) { h2c->flags |= H2_CF_DEM_SALLOC; TRACE_STATE("waiting for an h2s rxbuf", H2_EV_RX_FRAME|H2_EV_RX_DATA|H2_EV_H2S_BLK, h2c->conn, h2s); @@ -5808,6 +5863,7 @@ static int h2_frt_transfer_data(struct h2s *h2s) htx = htx_from_buf(scbuf); try_again: + full = 0; flen = h2c->dfl - h2c->dpl; if (!flen) goto end_transfer; @@ -5820,6 +5876,10 @@ static int h2_frt_transfer_data(struct h2s *h2s) block = htx_free_data_space(htx); if (!block) { + full = 1; + if (h2s_get_rxbuf(h2s)) + goto next_buffer; + h2c->flags |= H2_CF_DEM_SFULL; TRACE_STATE("h2s rxbuf is full", H2_EV_RX_FRAME|H2_EV_RX_DATA|H2_EV_H2S_BLK, h2c->conn, h2s); goto fail; @@ -5846,6 +5906,11 @@ static int h2_frt_transfer_data(struct h2s *h2s) } if (sent < flen) { + if (!sent) + full = 1; + if (h2s_get_rxbuf(h2s)) + goto next_buffer; + h2c->flags |= H2_CF_DEM_SFULL; TRACE_STATE("h2s rxbuf is full", H2_EV_RX_FRAME|H2_EV_RX_DATA|H2_EV_H2S_BLK, h2c->conn, h2s); goto fail; @@ -7205,29 +7270,36 @@ static size_t h2_rcv_buf(struct stconn *sc, struct buffer *buf, size_t count, in struct h2c *h2c = h2s->h2c; struct htx *h2s_htx = NULL; struct htx *buf_htx = NULL; + struct buffer *rxbuf = NULL; size_t ret = 0; uint prev_h2c_flags = h2c->flags; TRACE_ENTER(H2_EV_STRM_RECV, h2c->conn, h2s); /* transfer possibly pending data to the upper layer */ - h2s_htx = htx_from_buf(&h2s->rxbuf); + + xfer_next_buf: + rxbuf = h2s_rxbuf_head(h2s); + if (!rxbuf) + goto end; // may be NULL if empty + + h2s_htx = htx_from_buf(rxbuf); if (htx_is_empty(h2s_htx) && !(h2s_htx->flags & HTX_FL_PARSING_ERROR)) { /* Here htx_to_buf() will set buffer data to 0 because * the HTX is empty. */ - htx_to_buf(h2s_htx, &h2s->rxbuf); + htx_to_buf(h2s_htx, rxbuf); goto end; } - ret = h2s_htx->data; + ret += h2s_htx->data; buf_htx = htx_from_buf(buf); /* is empty and the message is small enough, swap the * buffers. */ if (htx_is_empty(buf_htx) && htx_used_space(h2s_htx) <= count) { htx_to_buf(buf_htx, buf); - htx_to_buf(h2s_htx, &h2s->rxbuf); - b_xfer(buf, &h2s->rxbuf, b_data(&h2s->rxbuf)); + htx_to_buf(h2s_htx, rxbuf); + b_xfer(buf, rxbuf, b_data(rxbuf)); goto end; } @@ -7244,19 +7316,29 @@ static size_t h2_rcv_buf(struct stconn *sc, struct buffer *buf, size_t count, in buf_htx->extra = (h2s_htx->extra ? (h2s_htx->data + h2s_htx->extra) : 0); htx_to_buf(buf_htx, buf); - htx_to_buf(h2s_htx, &h2s->rxbuf); + htx_to_buf(h2s_htx, rxbuf); ret -= h2s_htx->data; end: /* release the rxbuf if it's not used anymore */ - if (!b_data(&h2s->rxbuf) && b_size(&h2s->rxbuf)) { - b_free(&h2s->rxbuf); + if (rxbuf && !b_data(rxbuf) && b_size(rxbuf)) { + BUG_ON_HOT(rxbuf != _h2s_rxbuf_head(h2s)); + b_free(_h2s_rxbuf_head(h2s)); offer_buffers(NULL, 1); } + /* release the unused rxbuf slot */ + if (rxbuf && !b_size(rxbuf)) { + h2s_put_rxbuf(h2s); + h2c->flags &= ~(H2_CF_DEM_RXBUF | H2_CF_DEM_SFULL); + goto xfer_next_buf; + } + /* tell the stream layer whether there are data left or not */ - if (b_data(&h2s->rxbuf)) + if (h2s_rxbuf_cnt(h2s)) { se_fl_set(h2s->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM); + BUG_ON_HOT(!buf->data); + } else { if (!(h2c->flags & H2_CF_IS_BACK) && (h2s->flags & (H2_SF_BODY_TUNNEL|H2_SF_ES_RCVD))) { /* If request ES is reported to the upper layer, it means the @@ -7274,9 +7356,11 @@ static size_t h2_rcv_buf(struct stconn *sc, struct buffer *buf, size_t count, in h2c->flags &= ~H2_CF_DEM_SFULL; /* wake up processing if we've unblocked something */ - if ((prev_h2c_flags & ~h2c->flags) & H2_CF_DEM_SFULL) + if ((prev_h2c_flags & ~h2c->flags) & ((H2_CF_DEM_SFULL | H2_CF_DEM_RXBUF))) h2c_restart_reading(h2c, 1); + BUG_ON_HOT(!buf->data && se_fl_test(h2s->sd, SE_FL_WANT_ROOM)); + TRACE_LEAVE(H2_EV_STRM_RECV, h2c->conn, h2s); return ret; } @@ -7683,16 +7767,27 @@ static int h2_resume_ff(struct stconn *sc, unsigned int flags) */ static int h2_dump_h2s_info(struct buffer *msg, const struct h2s *h2s, const char *pfx) { + const struct buffer *head, *tail; int ret = 0; if (!h2s) return ret; - chunk_appendf(msg, " h2s.id=%d .st=%s .flg=0x%04x .rxwin=%u .rxbuf=%u@%p+%u/%u", + head = h2s_rxbuf_head(h2s); + tail = h2s_rxbuf_tail(h2s); + + chunk_appendf(msg, " h2s.id=%d .st=%s .flg=0x%04x .rxwin=%u .rxbuf.c=%u .t=%u@%p+%u/%u .h=%u@%p+%u/%u", h2s->id, h2s_st_to_str(h2s->st), h2s->flags, (uint)(h2s->next_max_ofs - h2s->curr_rx_ofs), - (unsigned int)b_data(&h2s->rxbuf), b_orig(&h2s->rxbuf), - (unsigned int)b_head_ofs(&h2s->rxbuf), (unsigned int)b_size(&h2s->rxbuf)); + h2s_rxbuf_cnt(h2s), + tail ? (uint)b_data(tail) : 0, + tail ? b_orig(tail) : NULL, + tail ? (uint)b_head_ofs(tail) : 0, + tail ? (uint)b_size(tail) : 0, + head ? (uint)b_data(head) : 0, + head ? b_orig(head) : NULL, + head ? (uint)b_head_ofs(head) : 0, + head ? (uint)b_size(head) : 0); if (pfx) chunk_appendf(msg, "\n%s", pfx);