MINOR: h2: Don't run tasks that are waiting to send if mux in full.
We wake up all the streams waiting to send data when we have space available in the mux buffer. Doing so means we probably wake way too many streams, because after a few the buffer will probably be full instead. So keep a list of all the streams that are about to send data, and if we detect that the buffer is full, unschedule the tasks and put the streams back to the send_list.
This commit is contained in:
parent
3f03ab5b15
commit
d846c267d5
47
src/mux_h2.c
47
src/mux_h2.c
|
@ -121,6 +121,7 @@ struct h2c {
|
|||
struct eb_root streams_by_id; /* all active streams by their ID */
|
||||
struct list send_list; /* list of blocked streams requesting to send */
|
||||
struct list fctl_list; /* list of streams blocked by connection's fctl */
|
||||
struct list sending_list; /* list of h2s scheduled to send data */
|
||||
struct buffer_wait buf_wait; /* wait list for buffer allocations */
|
||||
struct wait_event wait_event; /* To be used if we're waiting for I/Os */
|
||||
};
|
||||
|
@ -412,6 +413,7 @@ static int h2_init(struct connection *conn, struct proxy *prx)
|
|||
h2c->streams_by_id = EB_ROOT_UNIQUE;
|
||||
LIST_INIT(&h2c->send_list);
|
||||
LIST_INIT(&h2c->fctl_list);
|
||||
LIST_INIT(&h2c->sending_list);
|
||||
LIST_INIT(&h2c->buf_wait.list);
|
||||
conn->mux_ctx = h2c;
|
||||
|
||||
|
@ -2183,10 +2185,11 @@ static int h2_process_mux(struct h2c *h2c)
|
|||
|
||||
h2s->flags &= ~H2_SF_BLK_ANY;
|
||||
h2s->send_wait->wait_reason &= ~SUB_CAN_SEND;
|
||||
h2s->send_wait->wait_reason |= SUB_CALL_UNSUBSCRIBE;
|
||||
tasklet_wakeup(h2s->send_wait->task);
|
||||
h2s->send_wait = NULL;
|
||||
LIST_DEL(&h2s->list);
|
||||
LIST_INIT(&h2s->list);
|
||||
LIST_ADDQ(&h2c->sending_list, &h2s->list);
|
||||
}
|
||||
|
||||
list_for_each_entry_safe(h2s, h2s_back, &h2c->send_list, list) {
|
||||
|
@ -2195,10 +2198,11 @@ static int h2_process_mux(struct h2c *h2c)
|
|||
|
||||
h2s->flags &= ~H2_SF_BLK_ANY;
|
||||
h2s->send_wait->wait_reason &= ~SUB_CAN_SEND;
|
||||
h2s->send_wait->wait_reason |= SUB_CALL_UNSUBSCRIBE;
|
||||
tasklet_wakeup(h2s->send_wait->task);
|
||||
h2s->send_wait = NULL;
|
||||
LIST_DEL(&h2s->list);
|
||||
LIST_INIT(&h2s->list);
|
||||
LIST_ADDQ(&h2c->sending_list, &h2s->list);
|
||||
}
|
||||
|
||||
fail:
|
||||
|
@ -2334,9 +2338,10 @@ static int h2_send(struct h2c *h2c)
|
|||
struct h2s *, list);
|
||||
LIST_DEL(&h2s->list);
|
||||
LIST_INIT(&h2s->list);
|
||||
LIST_ADDQ(&h2c->sending_list, &h2s->list);
|
||||
h2s->send_wait->wait_reason &= ~SUB_CAN_SEND;
|
||||
h2s->send_wait->wait_reason |= SUB_CALL_UNSUBSCRIBE;
|
||||
tasklet_wakeup(h2s->send_wait->task);
|
||||
h2s->send_wait = NULL;
|
||||
}
|
||||
}
|
||||
/* We're done, no more to send */
|
||||
|
@ -3521,6 +3526,13 @@ static int h2_unsubscribe(struct conn_stream *cs, int event_type, void *param)
|
|||
h2s->send_wait = NULL;
|
||||
}
|
||||
}
|
||||
if (event_type & SUB_CALL_UNSUBSCRIBE) {
|
||||
sw = param;
|
||||
if (h2s->send_wait == sw) {
|
||||
sw->wait_reason &= ~SUB_CALL_UNSUBSCRIBE;
|
||||
h2s->send_wait = NULL;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -3549,6 +3561,23 @@ static size_t h2_rcv_buf(struct conn_stream *cs, struct buffer *buf, size_t coun
|
|||
return ret;
|
||||
}
|
||||
|
||||
static void h2_stop_senders(struct h2c *h2c)
|
||||
{
|
||||
struct h2s *h2s, *h2s_back;
|
||||
|
||||
list_for_each_entry_safe(h2s, h2s_back, &h2c->sending_list, list) {
|
||||
/* Don't unschedule the stream if the mux is just busy waiting for more data fro mthat stream */
|
||||
if (h2c->msi == h2s_id(h2s))
|
||||
continue;
|
||||
LIST_DEL(&h2s->list);
|
||||
LIST_INIT(&h2s->list);
|
||||
task_remove_from_task_list((struct task *)h2s->send_wait->task);
|
||||
h2s->send_wait->wait_reason |= SUB_CAN_SEND;
|
||||
h2s->send_wait->wait_reason &= ~SUB_CALL_UNSUBSCRIBE;
|
||||
LIST_ADD(&h2c->send_list, &h2s->list);
|
||||
}
|
||||
}
|
||||
|
||||
/* Called from the upper layer, to send data */
|
||||
static size_t h2_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags)
|
||||
{
|
||||
|
@ -3556,6 +3585,12 @@ static size_t h2_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t coun
|
|||
size_t total = 0;
|
||||
size_t ret;
|
||||
|
||||
if (h2s->send_wait) {
|
||||
h2s->send_wait->wait_reason &= ~SUB_CALL_UNSUBSCRIBE;
|
||||
h2s->send_wait = NULL;
|
||||
LIST_DEL(&h2s->list);
|
||||
LIST_INIT(&h2s->list);
|
||||
}
|
||||
if (h2s->h2c->st0 < H2_CS_FRAME_H)
|
||||
return 0;
|
||||
|
||||
|
@ -3615,9 +3650,15 @@ static size_t h2_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t coun
|
|||
}
|
||||
|
||||
b_del(buf, total);
|
||||
|
||||
/* The mux is full, cancel the pending tasks */
|
||||
if ((h2s->h2c->flags & H2_CF_MUX_BLOCK_ANY) ||
|
||||
(h2s->flags & H2_SF_BLK_MBUSY))
|
||||
h2_stop_senders(h2s->h2c);
|
||||
if (total > 0) {
|
||||
if (!(h2s->h2c->wait_event.wait_reason & SUB_CAN_SEND))
|
||||
tasklet_wakeup(h2s->h2c->wait_event.task);
|
||||
|
||||
}
|
||||
return total;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue