From 8ae735da05f1be8f4bd5590b656af5782f027596 Mon Sep 17 00:00:00 2001 From: Olivier Houchard Date: Tue, 11 Sep 2018 18:24:28 +0200 Subject: [PATCH] MEDIUM: mux_h2: Revamp the send path when blocking. Change fctl_list and send_list to be lists of struct wait_list, and nuke send_wait_list, as it's now redundant. Make the code responsible for shutr/shutw subscribe to those lists. --- src/mux_h2.c | 260 +++++++++++++++++++++++++-------------------------- 1 file changed, 125 insertions(+), 135 deletions(-) diff --git a/src/mux_h2.c b/src/mux_h2.c index f0df0e0c7..64cd59682 100644 --- a/src/mux_h2.c +++ b/src/mux_h2.c @@ -120,7 +120,6 @@ struct h2c { struct list send_list; /* list of blocked streams requesting to send */ struct list fctl_list; /* list of streams blocked by connection's fctl */ struct buffer_wait buf_wait; /* wait list for buffer allocations */ - struct list send_wait_list; /* list of tasks to wake when we're ready to send */ struct wait_list wait_list; /* We're in a wait list, to send */ }; @@ -177,14 +176,14 @@ struct h2s { struct h2c *h2c; struct h1m req, res; /* request and response parser state for H1 */ struct eb32_node by_id; /* place in h2c's streams_by_id */ - struct list list; /* position in active/blocked lists if blocked>0 */ int32_t id; /* stream ID */ uint32_t flags; /* H2_SF_* */ int mws; /* mux window size for this stream */ enum h2_err errcode; /* H2 err code (H2_ERR_*) */ enum h2_ss st; struct buffer rxbuf; /* receive buffer, always valid (buf_empty or real buffer) */ - struct wait_list *recv_wait_list; /* Somebody subscribed to be waken up on recv */ + struct wait_list wait_list; /* Wait list, when we're attempting to send a RST but we can't send */ + struct wait_list *recv_wait_list; /* Address of the wait_list the conn_stream associated is waiting on */ }; /* descriptor for an h2 frame header */ @@ -227,6 +226,7 @@ static struct task *h2_io_cb(struct task *t, void *ctx, unsigned short state); static inline struct h2s *h2c_st_by_id(struct h2c *h2c, int id); static int h2_frt_decode_headers(struct h2s *h2s); static int h2_frt_transfer_data(struct h2s *h2s); +static struct task *h2_deferred_shut(struct task *t, void *ctx, unsigned short state); /*****************************************************/ /* functions below are for dynamic buffer management */ @@ -417,7 +417,6 @@ static int h2c_frt_init(struct connection *conn) if (t) task_queue(t); conn_xprt_want_recv(conn); - LIST_INIT(&h2c->send_wait_list); LIST_INIT(&h2c->wait_list.list); /* Try to read, if nothing is available yet we'll just subscribe */ @@ -648,13 +647,12 @@ static inline void h2s_close(struct h2s *h2s) static void h2s_destroy(struct h2s *h2s) { h2s_close(h2s); - LIST_DEL(&h2s->list); - LIST_INIT(&h2s->list); eb32_delete(&h2s->by_id); if (b_size(&h2s->rxbuf)) { b_free(&h2s->rxbuf); offer_buffers(NULL, tasks_run_queue); } + tasklet_free(h2s->wait_list.task); pool_free(pool_head_h2s, h2s); } @@ -671,6 +669,17 @@ static struct h2s *h2c_stream_new(struct h2c *h2c, int id) if (!h2s) goto out; + h2s->wait_list.task = tasklet_new(); + if (!h2s->wait_list.task) { + pool_free(pool_head_h2s, h2s); + goto out; + } + LIST_INIT(&h2s->wait_list.list); + h2s->recv_wait_list = NULL; + h2s->wait_list.task->process = h2_deferred_shut; + h2s->wait_list.task->context = h2s; + h2s->wait_list.handle = NULL; + h2s->wait_list.wait_reason = 0; h2s->h2c = h2c; h2s->mws = h2c->miw; h2s->flags = H2_SF_NONE; @@ -681,7 +690,6 @@ static struct h2s *h2c_stream_new(struct h2c *h2c, int id) h1m_init(&h2s->res); h2s->by_id.key = h2s->id = id; h2c->max_id = id; - LIST_INIT(&h2s->list); eb32_insert(&h2c->streams_by_id, &h2s->by_id); h2c->nb_streams++; @@ -1442,14 +1450,7 @@ static int h2c_handle_window_update(struct h2c *h2c, struct h2s *h2s) h2s->mws += inc; if (h2s->mws > 0 && (h2s->flags & H2_SF_BLK_SFCTL)) { h2s->flags &= ~H2_SF_BLK_SFCTL; - if (h2s->cs && LIST_ISEMPTY(&h2s->list) && - (h2s->cs->flags & CS_FL_DATA_WR_ENA)) { - /* This stream wanted to send but could not due to its - * own flow control. We can put it back into the send - * list now, it will be handled upon next send() call. - */ - LIST_ADDQ(&h2c->send_list, &h2s->list); - } + /* The task will be waken up later */ } } else { @@ -2127,7 +2128,8 @@ static void h2_process_demux(struct h2c *h2c) */ static int h2_process_mux(struct h2c *h2c) { - struct h2s *h2s, *h2s_back; + struct h2s *h2s; + struct wait_list *sw, *sw_back; /* start by sending possibly pending window updates */ if (h2c->rcvd_c > 0 && @@ -2140,84 +2142,47 @@ static int h2_process_mux(struct h2c *h2c) * blocked just on this. */ - list_for_each_entry_safe(h2s, h2s_back, &h2c->fctl_list, list) { + list_for_each_entry_safe(sw, sw_back, &h2c->fctl_list, list) { + h2s = sw->handle; if (h2c->mws <= 0 || h2c->flags & H2_CF_MUX_BLOCK_ANY || h2c->st0 >= H2_CS_ERROR) break; - /* In theory it's possible that h2s->cs == NULL here : - * - client sends crap that causes a parse error - * - RST_STREAM is produced and CS_FL_ERROR at the same time - * - RST_STREAM cannot be emitted because mux is busy/full - * - stream gets notified, detaches and quits - * - mux buffer gets ready and wakes pending streams up - * - bam! - */ - h2s->flags &= ~H2_SF_BLK_ANY; - - if (h2s->cs) { - h2s->cs->data_cb->wake(h2s->cs); - } else { - h2s_send_rst_stream(h2c, h2s); - } - - /* depending on callee's blocking reasons, we may queue in send - * list or completely dequeue. - */ - if ((h2s->flags & H2_SF_BLK_MFCTL) == 0) { - if (h2s->flags & H2_SF_BLK_ANY) { - LIST_DEL(&h2s->list); - LIST_ADDQ(&h2c->send_list, &h2s->list); - } - else { - LIST_DEL(&h2s->list); - LIST_INIT(&h2s->list); - if (h2s->cs) - h2s->cs->flags &= ~CS_FL_DATA_WR_ENA; - else { - /* just sent the last frame for this orphaned stream */ - h2s_destroy(h2s); - } - } + /* If the tasklet was added to finish shutr/shutw, just wake the task */ + if ((long)(h2s) & 3) { + sw->wait_reason &= ~SUB_CAN_SEND; + LIST_DEL(&sw->list); + LIST_INIT(&sw->list); + tasklet_wakeup(sw->task); + } else if (!(h2s->flags & H2_SF_BLK_SFCTL)) { + h2s->flags &= ~H2_SF_BLK_ANY; + LIST_DEL(&sw->list); + LIST_INIT(&sw->list); + sw->wait_reason &= ~SUB_CAN_SEND; + tasklet_wakeup(sw->task); } } - list_for_each_entry_safe(h2s, h2s_back, &h2c->send_list, list) { + list_for_each_entry_safe(sw, sw_back, &h2c->send_list, list) { + h2s = sw->handle; + if (h2c->st0 >= H2_CS_ERROR || h2c->flags & H2_CF_MUX_BLOCK_ANY) break; - /* In theory it's possible that h2s->cs == NULL here : - * - client sends crap that causes a parse error - * - RST_STREAM is produced and CS_FL_ERROR at the same time - * - RST_STREAM cannot be emitted because mux is busy/full - * - stream gets notified, detaches and quits - * - mux buffer gets ready and wakes pending streams up - * - bam! - */ - h2s->flags &= ~H2_SF_BLK_ANY; + /* If the tasklet was added to finish shutr/shutw, just wake the task */ + if ((long)(h2s) & 3) { + sw->wait_reason &= ~SUB_CAN_SEND; + LIST_DEL(&sw->list); + LIST_INIT(&sw->list); + tasklet_wakeup(sw->task); + } + else if (!(h2s->flags & H2_SF_BLK_SFCTL)) { + h2s->flags &= ~H2_SF_BLK_ANY; - if (h2s->cs) { - h2s->cs->data_cb->wake(h2s->cs); - } else { - h2s_send_rst_stream(h2c, h2s); - } - /* depending on callee's blocking reasons, we may queue in fctl - * list or completely dequeue. - */ - if (h2s->flags & H2_SF_BLK_MFCTL) { - /* stream hit the connection's flow control */ - LIST_DEL(&h2s->list); - LIST_ADDQ(&h2c->fctl_list, &h2s->list); - } - else if (!(h2s->flags & H2_SF_BLK_ANY)) { - LIST_DEL(&h2s->list); - LIST_INIT(&h2s->list); - if (h2s->cs) - h2s->cs->flags &= ~CS_FL_DATA_WR_ENA; - else { - /* just sent the last frame for this orphaned stream */ - h2s_destroy(h2s); - } + LIST_DEL(&sw->list); + LIST_INIT(&sw->list); + sw->wait_reason &= ~SUB_CAN_SEND; + tasklet_wakeup(sw->task); } } @@ -2350,8 +2315,8 @@ static int h2_send(struct h2c *h2c) * for us. */ if (!(h2c->flags & (H2_CF_MUX_MFULL | H2_CF_DEM_MROOM))) { - while (!LIST_ISEMPTY(&h2c->send_wait_list)) { - struct wait_list *sw = LIST_ELEM(h2c->send_wait_list.n, + while (!LIST_ISEMPTY(&h2c->send_list)) { + struct wait_list *sw = LIST_ELEM(h2c->send_list.n, struct wait_list *, list); LIST_DEL(&sw->list); LIST_INIT(&sw->list); @@ -2582,20 +2547,11 @@ static void h2_update_poll(struct conn_stream *cs) */ if (cs->flags & CS_FL_DATA_WR_ENA) { - if (LIST_ISEMPTY(&h2s->list)) { - if (LIST_ISEMPTY(&h2s->h2c->send_list) && - !b_data(&h2s->h2c->mbuf) && // not yet subscribed - !(cs->conn->flags & CO_FL_SOCK_WR_SH)) - conn_xprt_want_send(cs->conn); - LIST_ADDQ(&h2s->h2c->send_list, &h2s->list); - tasklet_wakeup(h2s->h2c->wait_list.task); - } - } - else if (!LIST_ISEMPTY(&h2s->list)) { - LIST_DEL(&h2s->list); - LIST_INIT(&h2s->list); - h2s->flags &= ~(H2_SF_BLK_MBUSY | H2_SF_BLK_MROOM | H2_SF_BLK_MFCTL); + if (!b_data(&h2s->h2c->mbuf) && !(cs->conn->flags & CO_FL_SOCK_WR_SH)) + conn_xprt_want_send(cs->conn); + tasklet_wakeup(h2s->h2c->wait_list.task); } + /* We don't support unsubscribing from here, it shouldn't be a problem */ /* this can happen from within si_chk_snd() */ if (b_data(&h2s->h2c->mbuf) && !(cs->conn->flags & CO_FL_XPRT_WR_ENA)) @@ -2674,12 +2630,10 @@ static void h2_detach(struct conn_stream *cs) } } -static void h2_shutr(struct conn_stream *cs, enum cs_shr_mode mode) +static void h2_do_shutr(struct h2s *h2s) { - struct h2s *h2s = cs->ctx; - - if (!mode) - return; + struct h2c *h2c = h2s->h2c; + struct wait_list *sw = &h2s->wait_list; if (h2s->st == H2_SS_HLOC || h2s->st == H2_SS_ERROR || h2s->st == H2_SS_CLOSED) return; @@ -2690,31 +2644,36 @@ static void h2_shutr(struct conn_stream *cs, enum cs_shr_mode mode) * case we send a goaway to close the connection. */ if (!(h2s->flags & H2_SF_RST_SENT) && - h2s_send_rst_stream(h2s->h2c, h2s) <= 0) + h2s_send_rst_stream(h2c, h2s) <= 0) goto add_to_list; if (!(h2s->flags & H2_SF_OUTGOING_DATA) && !(h2s->h2c->flags & (H2_CF_GOAWAY_SENT|H2_CF_GOAWAY_FAILED)) && - h2c_send_goaway_error(h2s->h2c, h2s) <= 0) - goto add_to_list; - - if (b_data(&h2s->h2c->mbuf) && !(cs->conn->flags & CO_FL_XPRT_WR_ENA)) - conn_xprt_want_send(cs->conn); + h2c_send_goaway_error(h2c, h2s) <= 0) + return; + if (b_data(&h2c->mbuf) && !(h2c->conn->flags & CO_FL_XPRT_WR_ENA)) + conn_xprt_want_send(h2c->conn); h2s_close(h2s); - add_to_list: - if (LIST_ISEMPTY(&h2s->list)) { + return; +add_to_list: + if (LIST_ISEMPTY(&sw->list)) { + sw->wait_reason |= SUB_CAN_SEND; if (h2s->flags & H2_SF_BLK_MFCTL) - LIST_ADDQ(&h2s->h2c->fctl_list, &h2s->list); + LIST_ADDQ(&h2c->fctl_list, &sw->list); else if (h2s->flags & (H2_SF_BLK_MBUSY|H2_SF_BLK_MROOM)) - LIST_ADDQ(&h2s->h2c->send_list, &h2s->list); + LIST_ADDQ(&h2c->send_list, &sw->list); } + /* Let the handler know we want shutr */ + sw->handle = (void *)((long)sw->handle | 1); + } -static void h2_shutw(struct conn_stream *cs, enum cs_shw_mode mode) +static void h2_do_shutw(struct h2s *h2s) { - struct h2s *h2s = cs->ctx; + struct h2c *h2c = h2s->h2c; + struct wait_list *sw = &h2s->wait_list; if (h2s->st == H2_SS_HLOC || h2s->st == H2_SS_ERROR || h2s->st == H2_SS_CLOSED) return; @@ -2737,27 +2696,62 @@ static void h2_shutw(struct conn_stream *cs, enum cs_shw_mode mode) * case we send a goaway to close the connection. */ if (!(h2s->flags & H2_SF_RST_SENT) && - h2s_send_rst_stream(h2s->h2c, h2s) <= 0) + h2s_send_rst_stream(h2c, h2s) <= 0) goto add_to_list; if (!(h2s->flags & H2_SF_OUTGOING_DATA) && !(h2s->h2c->flags & (H2_CF_GOAWAY_SENT|H2_CF_GOAWAY_FAILED)) && - h2c_send_goaway_error(h2s->h2c, h2s) <= 0) + h2c_send_goaway_error(h2c, h2s) <= 0) goto add_to_list; h2s_close(h2s); } - if (b_data(&h2s->h2c->mbuf) && !(cs->conn->flags & CO_FL_XPRT_WR_ENA)) - conn_xprt_want_send(cs->conn); + if (b_data(&h2s->h2c->mbuf) && !(h2c->conn->flags & CO_FL_XPRT_WR_ENA)) + conn_xprt_want_send(h2c->conn); add_to_list: - if (LIST_ISEMPTY(&h2s->list)) { + sw = &h2s->wait_list; + + if (LIST_ISEMPTY(&sw->list)) { + sw->wait_reason |= SUB_CAN_SEND; if (h2s->flags & H2_SF_BLK_MFCTL) - LIST_ADDQ(&h2s->h2c->fctl_list, &h2s->list); + LIST_ADDQ(&h2s->h2c->fctl_list, &sw->list); else if (h2s->flags & (H2_SF_BLK_MBUSY|H2_SF_BLK_MROOM)) - LIST_ADDQ(&h2s->h2c->send_list, &h2s->list); + LIST_ADDQ(&h2s->h2c->send_list, &sw->list); } + /* let the handler know we want to shutr */ + sw->handle = (void *)((long)(sw->handle) | 2); +} + +static struct task *h2_deferred_shut(struct task *t, void *ctx, unsigned short state) +{ + struct h2s *h2s = ctx; + long reason = (long)h2s->wait_list.handle; + + if (reason & 1) + h2_do_shutr(h2s); + if (reason & 2) + h2_do_shutw(h2s); + + return NULL; +} + +static void h2_shutr(struct conn_stream *cs, enum cs_shr_mode mode) +{ + struct h2s *h2s = cs->ctx; + + if (!mode) + return; + + h2_do_shutr(h2s); +} + +static void h2_shutw(struct conn_stream *cs, enum cs_shw_mode mode) +{ + struct h2s *h2s = cs->ctx; + + h2_do_shutw(h2s); } /* Decode the payload of a HEADERS frame and produce the equivalent HTTP/1 @@ -3496,6 +3490,7 @@ static int h2_subscribe(struct conn_stream *cs, int event_type, void *param) sw = param; if (!(sw->wait_reason & SUB_CAN_RECV)) { sw->wait_reason |= SUB_CAN_RECV; + sw->handle = h2s; h2s->recv_wait_list = sw; } return 0; @@ -3503,7 +3498,11 @@ static int h2_subscribe(struct conn_stream *cs, int event_type, void *param) sw = param; if (!(sw->wait_reason & SUB_CAN_SEND)) { sw->wait_reason |= SUB_CAN_SEND; - LIST_ADDQ(&h2c->send_wait_list, &sw->list); + sw->handle = h2s; + if (h2s->flags & H2_SF_BLK_MFCTL) + LIST_ADDQ(&h2c->fctl_list, &sw->list); + else + LIST_ADDQ(&h2c->send_list, &sw->list); } return 0; default: @@ -3600,16 +3599,6 @@ static size_t h2_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t coun h2s_close(h2s); } - if (h2s->flags & H2_SF_BLK_SFCTL) { - /* stream flow control, quit the list */ - LIST_DEL(&h2s->list); - LIST_INIT(&h2s->list); - } - else if (LIST_ISEMPTY(&h2s->list)) { - if (h2s->flags & H2_SF_BLK_MFCTL) - LIST_ADDQ(&h2s->h2c->fctl_list, &h2s->list); - } - b_del(buf, total); if (total > 0) { conn_xprt_want_send(h2s->h2c->conn); @@ -3624,6 +3613,7 @@ static void h2_show_fd(struct buffer *msg, struct connection *conn) { struct h2c *h2c = conn->mux_ctx; struct h2s *h2s; + struct wait_list *sw; struct eb32_node *node; int fctl_cnt = 0; int send_cnt = 0; @@ -3633,10 +3623,10 @@ static void h2_show_fd(struct buffer *msg, struct connection *conn) if (!h2c) return; - list_for_each_entry(h2s, &h2c->fctl_list, list) + list_for_each_entry(sw, &h2c->fctl_list, list) fctl_cnt++; - list_for_each_entry(h2s, &h2c->send_list, list) + list_for_each_entry(sw, &h2c->send_list, list) send_cnt++; node = eb32_first(&h2c->streams_by_id);