diff --git a/src/connection.c b/src/connection.c index ad0386352..b970d4189 100644 --- a/src/connection.c +++ b/src/connection.c @@ -229,7 +229,7 @@ void conn_fd_handler(int fd) if ((io_available || (((conn->flags ^ flags) & CO_FL_NOTIFY_DATA) || ((flags & (CO_FL_CONNECTED|CO_FL_HANDSHAKE)) != CO_FL_CONNECTED && (conn->flags & (CO_FL_CONNECTED|CO_FL_HANDSHAKE)) == CO_FL_CONNECTED))) && - conn->mux->wake(conn) < 0) + conn->mux->wake && conn->mux->wake(conn) < 0) return; /* commit polling changes */ diff --git a/src/mux_h2.c b/src/mux_h2.c index c6fa1c90c..f0df0e0c7 100644 --- a/src/mux_h2.c +++ b/src/mux_h2.c @@ -222,6 +222,7 @@ static const struct h2s *h2_idle_stream = &(const struct h2s){ static struct task *h2_timeout_task(struct task *t, void *context, unsigned short state); static int h2_send(struct h2c *h2c); static int h2_recv(struct h2c *h2c); +static int h2_process(struct h2c *h2c); static struct task *h2_io_cb(struct task *t, void *ctx, unsigned short state); static inline struct h2s *h2c_st_by_id(struct h2c *h2c, int id); static int h2_frt_decode_headers(struct h2s *h2s); @@ -282,7 +283,8 @@ static int h2_buf_available(void *target) h2c->flags &= ~H2_CF_DEM_DALLOC; if (h2_recv_allowed(h2c)) { conn_xprt_want_recv(h2c->conn); - h2_recv(h2c); + if (h2_recv(h2c)) + h2_process(h2c); } return 1; } @@ -296,7 +298,8 @@ static int h2_buf_available(void *target) h2c->flags &= ~H2_CF_DEM_MROOM; if (h2_recv_allowed(h2c)) { conn_xprt_want_recv(h2c->conn); - h2_recv(h2c); + if (h2_recv(h2c)) + h2_process(h2c); } } return 1; @@ -308,7 +311,8 @@ static int h2_buf_available(void *target) h2c->flags &= ~H2_CF_DEM_SALLOC; if (h2_recv_allowed(h2c)) { conn_xprt_want_recv(h2c->conn); - h2_recv(h2c); + if (h2_recv(h2c)) + h2_process(h2c); } return 1; } @@ -417,7 +421,8 @@ static int h2c_frt_init(struct connection *conn) LIST_INIT(&h2c->wait_list.list); /* Try to read, if nothing is available yet we'll just subscribe */ - h2_recv(h2c); + if (h2_recv(h2c)) + h2_process(h2c); return 0; fail: if (t) @@ -2239,6 +2244,7 @@ static int h2_recv(struct h2c *h2c) struct connection *conn = h2c->conn; struct buffer *buf; int max; + size_t ret; if (h2c->wait_list.wait_reason & SUB_CAN_RECV) return 0; @@ -2252,12 +2258,18 @@ static int h2_recv(struct h2c *h2c) return 0; } - max = buf->size - b_data(buf); - if (max) - conn->xprt->rcv_buf(conn, buf, max, 0); + do { + max = buf->size - b_data(buf); + if (max) + ret = conn->xprt->rcv_buf(conn, buf, max, 0); + else + ret = 0; + } while (ret > 0); - if (h2_recv_allowed(h2c)) + if (h2_recv_allowed(h2c)) { + conn_xprt_want_recv(conn); conn->xprt->subscribe(conn, SUB_CAN_RECV, &h2c->wait_list); + } if (!b_data(buf)) { h2_release_buf(h2c, &h2c->dbuf); return 0; @@ -2278,6 +2290,7 @@ static int h2_send(struct h2c *h2c) if (conn->flags & CO_FL_ERROR) return 0; + if (conn->flags & (CO_FL_HANDSHAKE|CO_FL_WAIT_L4_CONN|CO_FL_WAIT_L6_CONN)) { /* a handshake was requested */ goto schedule; @@ -2358,11 +2371,14 @@ schedule: static struct task *h2_io_cb(struct task *t, void *ctx, unsigned short status) { struct h2c *h2c = ctx; + int ret = 0; if (!(h2c->wait_list.wait_reason & SUB_CAN_SEND)) - h2_send(h2c); + ret = h2_send(h2c); if (!(h2c->wait_list.wait_reason & SUB_CAN_RECV)) - h2_recv(h2c); + ret |= h2_recv(h2c); + if (ret) + h2_process(h2c); return NULL; } @@ -2370,15 +2386,11 @@ static struct task *h2_io_cb(struct task *t, void *ctx, unsigned short status) * It applies changes and returns zero, or < 0 if it wants immediate * destruction of the connection (which normally doesn not happen in h2). */ -static int h2_wake(struct connection *conn) +static int h2_process(struct h2c *h2c) { - struct h2c *h2c = conn->mux_ctx; + struct connection *conn = h2c->conn; struct session *sess = conn->owner; - h2_send(h2c); - if (h2_recv_allowed(h2c)) - h2_recv(h2c); - if (b_data(&h2c->dbuf) && !(h2c->flags & H2_CF_DEM_BLOCK_ANY)) { h2_process_demux(h2c); @@ -2388,6 +2400,7 @@ static int h2_wake(struct connection *conn) if (!b_full(&h2c->dbuf)) h2c->flags &= ~H2_CF_DEM_DFULL; } + h2_send(h2c); if (sess && unlikely(sess->fe->state == PR_STSTOPPED)) { /* frontend is stopping, reload likely in progress, let's try @@ -2440,10 +2453,8 @@ static int h2_wake(struct connection *conn) /* stop being notified of incoming data if we can't process them */ if (!h2_recv_allowed(h2c)) __conn_xprt_stop_recv(conn); - else { + else __conn_xprt_want_recv(conn); - h2_recv(h2c); - } /* adjust output polling */ if (!(conn->flags & CO_FL_SOCK_WR_SH) && @@ -2468,6 +2479,7 @@ static int h2_wake(struct connection *conn) h2c->task->expire = TICK_ETERNITY; } + h2_send(h2c); return 0; } @@ -2556,7 +2568,8 @@ static void h2_update_poll(struct conn_stream *cs) h2s->h2c->flags &= ~H2_CF_DEM_SFULL; if (h2s->h2c->dsi == h2s->id) { conn_xprt_want_recv(cs->conn); - h2_recv(h2s->h2c); + if (h2_recv(h2s->h2c)) + h2_process(h2s->h2c); conn_xprt_want_send(cs->conn); } } @@ -2575,6 +2588,7 @@ static void h2_update_poll(struct conn_stream *cs) !(cs->conn->flags & CO_FL_SOCK_WR_SH)) conn_xprt_want_send(cs->conn); LIST_ADDQ(&h2s->h2c->send_list, &h2s->list); + tasklet_wakeup(h2s->h2c->wait_list.task); } } else if (!LIST_ISEMPTY(&h2s->list)) { @@ -3597,8 +3611,11 @@ static size_t h2_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t coun } b_del(buf, total); - if (total > 0) + if (total > 0) { conn_xprt_want_send(h2s->h2c->conn); + if (!(h2s->h2c->wait_list.wait_reason & SUB_CAN_SEND)) + tasklet_wakeup(h2s->h2c->wait_list.task); + } return total; } @@ -3699,7 +3716,6 @@ static int h2_parse_max_concurrent_streams(char **args, int section_type, struct /* The mux operations */ const struct mux_ops h2_ops = { .init = h2_init, - .wake = h2_wake, .update_poll = h2_update_poll, .snd_buf = h2_snd_buf, .rcv_buf = h2_rcv_buf,