diff --git a/src/mux_h2.c b/src/mux_h2.c index 5255ca0a1c..a5ea6c3ad9 100644 --- a/src/mux_h2.c +++ b/src/mux_h2.c @@ -121,6 +121,7 @@ struct h2c { struct eb_root streams_by_id; /* all active streams by their ID */ struct list send_list; /* list of blocked streams requesting to send */ struct list fctl_list; /* list of streams blocked by connection's fctl */ + struct list sending_list; /* list of h2s scheduled to send data */ struct buffer_wait buf_wait; /* wait list for buffer allocations */ struct wait_event wait_event; /* To be used if we're waiting for I/Os */ }; @@ -412,6 +413,7 @@ static int h2_init(struct connection *conn, struct proxy *prx) h2c->streams_by_id = EB_ROOT_UNIQUE; LIST_INIT(&h2c->send_list); LIST_INIT(&h2c->fctl_list); + LIST_INIT(&h2c->sending_list); LIST_INIT(&h2c->buf_wait.list); conn->mux_ctx = h2c; @@ -2183,10 +2185,11 @@ static int h2_process_mux(struct h2c *h2c) h2s->flags &= ~H2_SF_BLK_ANY; h2s->send_wait->wait_reason &= ~SUB_CAN_SEND; + h2s->send_wait->wait_reason |= SUB_CALL_UNSUBSCRIBE; tasklet_wakeup(h2s->send_wait->task); - h2s->send_wait = NULL; LIST_DEL(&h2s->list); LIST_INIT(&h2s->list); + LIST_ADDQ(&h2c->sending_list, &h2s->list); } list_for_each_entry_safe(h2s, h2s_back, &h2c->send_list, list) { @@ -2195,10 +2198,11 @@ static int h2_process_mux(struct h2c *h2c) h2s->flags &= ~H2_SF_BLK_ANY; h2s->send_wait->wait_reason &= ~SUB_CAN_SEND; + h2s->send_wait->wait_reason |= SUB_CALL_UNSUBSCRIBE; tasklet_wakeup(h2s->send_wait->task); - h2s->send_wait = NULL; LIST_DEL(&h2s->list); LIST_INIT(&h2s->list); + LIST_ADDQ(&h2c->sending_list, &h2s->list); } fail: @@ -2334,9 +2338,10 @@ static int h2_send(struct h2c *h2c) struct h2s *, list); LIST_DEL(&h2s->list); LIST_INIT(&h2s->list); + LIST_ADDQ(&h2c->sending_list, &h2s->list); h2s->send_wait->wait_reason &= ~SUB_CAN_SEND; + h2s->send_wait->wait_reason |= SUB_CALL_UNSUBSCRIBE; tasklet_wakeup(h2s->send_wait->task); - h2s->send_wait = NULL; } } /* We're done, no more to send */ @@ -3521,6 +3526,13 @@ static int h2_unsubscribe(struct conn_stream *cs, int event_type, void *param) h2s->send_wait = NULL; } } + if (event_type & SUB_CALL_UNSUBSCRIBE) { + sw = param; + if (h2s->send_wait == sw) { + sw->wait_reason &= ~SUB_CALL_UNSUBSCRIBE; + h2s->send_wait = NULL; + } + } return 0; } @@ -3549,6 +3561,23 @@ static size_t h2_rcv_buf(struct conn_stream *cs, struct buffer *buf, size_t coun return ret; } +static void h2_stop_senders(struct h2c *h2c) +{ + struct h2s *h2s, *h2s_back; + + list_for_each_entry_safe(h2s, h2s_back, &h2c->sending_list, list) { + /* Don't unschedule the stream if the mux is just busy waiting for more data fro mthat stream */ + if (h2c->msi == h2s_id(h2s)) + continue; + LIST_DEL(&h2s->list); + LIST_INIT(&h2s->list); + task_remove_from_task_list((struct task *)h2s->send_wait->task); + h2s->send_wait->wait_reason |= SUB_CAN_SEND; + h2s->send_wait->wait_reason &= ~SUB_CALL_UNSUBSCRIBE; + LIST_ADD(&h2c->send_list, &h2s->list); + } +} + /* Called from the upper layer, to send data */ static size_t h2_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags) { @@ -3556,6 +3585,12 @@ static size_t h2_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t coun size_t total = 0; size_t ret; + if (h2s->send_wait) { + h2s->send_wait->wait_reason &= ~SUB_CALL_UNSUBSCRIBE; + h2s->send_wait = NULL; + LIST_DEL(&h2s->list); + LIST_INIT(&h2s->list); + } if (h2s->h2c->st0 < H2_CS_FRAME_H) return 0; @@ -3615,9 +3650,15 @@ static size_t h2_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t coun } b_del(buf, total); + + /* The mux is full, cancel the pending tasks */ + if ((h2s->h2c->flags & H2_CF_MUX_BLOCK_ANY) || + (h2s->flags & H2_SF_BLK_MBUSY)) + h2_stop_senders(h2s->h2c); if (total > 0) { if (!(h2s->h2c->wait_event.wait_reason & SUB_CAN_SEND)) tasklet_wakeup(h2s->h2c->wait_event.task); + } return total; }