1
0
mirror of http://git.haproxy.org/git/haproxy.git/ synced 2025-04-08 02:01:33 +00:00

MEDIUM: connections: Change struct wait_list to wait_event.

When subscribing, we don't need to provide a list element, only the h2 mux
needs it. So instead, Add a list element to struct h2s, and use it when a
list is needed.
This forces us to use the unsubscribe method, since we can't just unsubscribe
by using LIST_DEL anymore.
This patch is larger than it should be because it includes some renaming.
This commit is contained in:
Olivier Houchard 2018-10-10 18:25:41 +02:00 committed by Willy Tarreau
parent 83a0cd8a36
commit fa8aa867b9
10 changed files with 195 additions and 261 deletions

View File

@ -627,9 +627,8 @@ static inline void conn_init(struct connection *conn)
conn->destroy_cb = NULL;
conn->proxy_netns = NULL;
LIST_INIT(&conn->list);
LIST_INIT(&conn->send_wait_list);
LIST_INIT(&conn->recv_wait_list);
LIST_INIT(&conn->sendrecv_wait_list);
conn->send_wait = NULL;
conn->recv_wait = NULL;
}
/* sets <owner> as the connection's owner */
@ -705,19 +704,10 @@ static inline struct conn_stream *cs_new(struct connection *conn)
/* Releases a connection previously allocated by conn_new() */
static inline void conn_free(struct connection *conn)
{
struct wait_list *sw, *sw_back;
list_for_each_entry_safe(sw, sw_back, &conn->recv_wait_list, list) {
LIST_DEL(&sw->list);
LIST_INIT(&sw->list);
}
list_for_each_entry_safe(sw, sw_back, &conn->send_wait_list, list) {
LIST_DEL(&sw->list);
LIST_INIT(&sw->list);
}
list_for_each_entry_safe(sw, sw_back, &conn->sendrecv_wait_list, list) {
LIST_DEL(&sw->list);
LIST_INIT(&sw->list);
}
if (conn->recv_wait)
conn->recv_wait->wait_reason &= ~SUB_CAN_RECV;
if (conn->send_wait)
conn->send_wait->wait_reason &= ~SUB_CAN_SEND;
pool_free(pool_head_connection, conn);
}
@ -786,7 +776,7 @@ static inline void cs_attach(struct conn_stream *cs, void *data, const struct da
cs->data = data;
}
static inline struct wait_list *wl_set_waitcb(struct wait_list *wl, struct task *(*cb)(struct task *, void *, unsigned short), void *ctx)
static inline struct wait_event *wl_set_waitcb(struct wait_event *wl, struct task *(*cb)(struct task *, void *, unsigned short), void *ctx)
{
if (!wl->task->process) {
wl->task->process = cb;

View File

@ -127,13 +127,12 @@ static inline int si_reset(struct stream_interface *si)
si->end = NULL;
si->state = si->prev_state = SI_ST_INI;
si->ops = &si_embedded_ops;
si->wait_list.task = tasklet_new();
if (!si->wait_list.task)
si->wait_event.task = tasklet_new();
if (!si->wait_event.task)
return -1;
si->wait_list.task->process = si_cs_io_cb;
si->wait_list.task->context = si;
si->wait_list.wait_reason = 0;
LIST_INIT(&si->wait_list.list);
si->wait_event.task->process = si_cs_io_cb;
si->wait_event.task->context = si;
si->wait_event.wait_reason = 0;
return 0;
}

View File

@ -184,7 +184,7 @@ struct check {
char **envp; /* the environment to use if running a process-based check */
struct pid_list *curpid; /* entry in pid_list used for current process-based test, or -1 if not in test */
struct sockaddr_storage addr; /* the address to check */
struct wait_list wait_list; /* Waiting for I/O events */
struct wait_event wait_list; /* Waiting for I/O events */
char *sni; /* Server name */
};

View File

@ -50,9 +50,8 @@ enum sub_event_type {
SUB_CAN_RECV = 0x00000002, /* Schedule the tasklet when we can recv more */
};
struct wait_list {
struct wait_event {
struct tasklet *task;
struct list list;
void *handle; /* To be used by the callee */
int wait_reason;
};
@ -404,9 +403,8 @@ struct connection {
enum obj_type *target; /* the target to connect to (server, proxy, applet, ...) */
/* second cache line */
struct list send_wait_list; /* list of tasks to wake when we're ready to send */
struct list recv_wait_list; /* list of tasks to wake when we're ready to recv */
struct list sendrecv_wait_list; /* list of tasks to wake when we're ready to either send or recv */
struct wait_event *send_wait; /* Task to wake when we're ready to send */
struct wait_event *recv_wait; /* Task to wake when we're ready to recv */
struct list list; /* attach point to various connection lists (idle, ...) */
int xprt_st; /* transport layer state, initialized to zero */
int tmp_early_data; /* 1st byte of early data, if any */

View File

@ -101,7 +101,7 @@ struct stream_interface {
unsigned int err_type; /* first error detected, one of SI_ET_* */
int conn_retries; /* number of connect retries left */
unsigned int hcto; /* half-closed timeout (0 = unset) */
struct wait_list wait_list; /* We're in a wait list */
struct wait_event wait_event; /* We're in a wait list */
};
/* operations available on a stream-interface */

View File

@ -3142,7 +3142,6 @@ const char *init_check(struct check *check, int type)
check->wait_list.task = tasklet_new();
if (!check->wait_list.task)
return "out of memroy while allocating check tasklet";
LIST_INIT(&check->wait_list.list);
check->wait_list.wait_reason = 0;
check->wait_list.task->process = event_srv_chk_io;
check->wait_list.task->context = check;

View File

@ -128,25 +128,12 @@ void conn_fd_handler(int fd)
* both of which will be detected below.
*/
flags = 0;
io_available = (LIST_ISEMPTY(&conn->send_wait_list) &&
LIST_ISEMPTY(&conn->sendrecv_wait_list));;
while (!LIST_ISEMPTY(&conn->send_wait_list)) {
struct wait_list *sw = LIST_ELEM(conn->send_wait_list.n,
struct wait_list *, list);
LIST_DEL(&sw->list);
LIST_INIT(&sw->list);
sw->wait_reason &= ~SUB_CAN_SEND;
tasklet_wakeup(sw->task);
}
while (!(LIST_ISEMPTY(&conn->sendrecv_wait_list))) {
struct wait_list *sw = LIST_ELEM(conn->sendrecv_wait_list.n,
struct wait_list *, list);
LIST_DEL(&sw->list);
LIST_INIT(&sw->list);
LIST_ADDQ(&conn->recv_wait_list, &sw->list);
sw->wait_reason &= ~SUB_CAN_SEND;
tasklet_wakeup(sw->task);
}
if (conn->send_wait != NULL) {
conn->send_wait->wait_reason &= ~SUB_CAN_SEND;
tasklet_wakeup(conn->send_wait->task);
conn->send_wait = NULL;
} else
io_available = 1;
}
/* The data transfer starts here and stops on error and handshakes. Note
@ -160,26 +147,12 @@ void conn_fd_handler(int fd)
* both of which will be detected below.
*/
flags = 0;
io_available |= (LIST_ISEMPTY(&conn->recv_wait_list) &&
LIST_ISEMPTY(&conn->sendrecv_wait_list));
while (!LIST_ISEMPTY(&conn->recv_wait_list)) {
struct wait_list *sw = LIST_ELEM(conn->recv_wait_list.n,
struct wait_list *, list);
LIST_DEL(&sw->list);
LIST_INIT(&sw->list);
sw->wait_reason &= ~SUB_CAN_RECV;
tasklet_wakeup(sw->task);
}
while (!(LIST_ISEMPTY(&conn->sendrecv_wait_list))) {
struct wait_list *sw = LIST_ELEM(conn->sendrecv_wait_list.n,
struct wait_list *, list);
LIST_DEL(&sw->list);
LIST_INIT(&sw->list);
LIST_ADDQ(&conn->send_wait_list, &sw->list);
sw->wait_reason &= ~SUB_CAN_RECV;
tasklet_wakeup(sw->task);
}
if (conn->recv_wait) {
conn->recv_wait->wait_reason &= ~SUB_CAN_RECV;
tasklet_wakeup(conn->recv_wait->task);
conn->recv_wait = NULL;
} else
io_available = 1;
}
/* It may happen during the data phase that a handshake is
@ -360,26 +333,20 @@ int conn_sock_send(struct connection *conn, const void *buf, int len, int flags)
int conn_unsubscribe(struct connection *conn, int event_type, void *param)
{
struct wait_list *sw;
struct wait_event *sw;
if (event_type & SUB_CAN_RECV) {
sw = param;
if (sw->wait_reason & SUB_CAN_RECV) {
LIST_DEL(&sw->list);
LIST_INIT(&sw->list);
conn->recv_wait = NULL;
sw->wait_reason &= ~SUB_CAN_RECV;
if (sw->wait_reason & SUB_CAN_SEND)
LIST_ADDQ(&conn->send_wait_list, &sw->list);
}
}
if (event_type & SUB_CAN_SEND) {
sw = param;
if (sw->wait_reason & SUB_CAN_SEND) {
LIST_DEL(&sw->list);
LIST_INIT(&sw->list);
conn->send_wait = NULL;
sw->wait_reason &= ~SUB_CAN_SEND;
if (sw->wait_reason & SUB_CAN_RECV)
LIST_ADDQ(&conn->recv_wait_list, &sw->list);
}
}
return 0;
@ -387,21 +354,13 @@ int conn_unsubscribe(struct connection *conn, int event_type, void *param)
int conn_subscribe(struct connection *conn, int event_type, void *param)
{
struct wait_list *sw;
struct wait_event *sw;
if (event_type & SUB_CAN_RECV) {
sw = param;
if (!(sw->wait_reason & SUB_CAN_RECV)) {
sw->wait_reason |= SUB_CAN_RECV;
/* If we're already subscribed for send(), move it
* to the send+recv list
*/
if (sw->wait_reason & SUB_CAN_SEND) {
LIST_DEL(&sw->list);
LIST_INIT(&sw->list);
LIST_ADDQ(&conn->sendrecv_wait_list, &sw->list);
} else
LIST_ADDQ(&conn->recv_wait_list, &sw->list);
conn->recv_wait = sw;
}
event_type &= ~SUB_CAN_RECV;
}
@ -409,15 +368,7 @@ int conn_subscribe(struct connection *conn, int event_type, void *param)
sw = param;
if (!(sw->wait_reason & SUB_CAN_SEND)) {
sw->wait_reason |= SUB_CAN_SEND;
/* If we're already subscribed for recv(), move it
* to the send+recv list
*/
if (sw->wait_reason & SUB_CAN_RECV) {
LIST_DEL(&sw->list);
LIST_INIT(&sw->list);
LIST_ADDQ(&conn->sendrecv_wait_list, &sw->list);
} else
LIST_ADDQ(&conn->send_wait_list, &sw->list);
conn->send_wait = sw;
}
event_type &= ~SUB_CAN_SEND;
}

View File

@ -120,7 +120,7 @@ struct h2c {
struct list send_list; /* list of blocked streams requesting to send */
struct list fctl_list; /* list of streams blocked by connection's fctl */
struct buffer_wait buf_wait; /* wait list for buffer allocations */
struct wait_list wait_list; /* We're in a wait list, to send */
struct wait_event wait_event; /* To be used if we're waiting for I/Os */
};
/* H2 stream state, in h2s->st */
@ -183,8 +183,10 @@ struct h2s {
enum h2_ss st;
uint16_t status; /* HTTP response status */
struct buffer rxbuf; /* receive buffer, always valid (buf_empty or real buffer) */
struct wait_list wait_list; /* Wait list, when we're attempting to send a RST but we can't send */
struct wait_list *recv_wait_list; /* Address of the wait_list the conn_stream associated is waiting on */
struct wait_event wait_event; /* Wait list, when we're attempting to send a RST but we can't send */
struct wait_event *recv_wait; /* Address of the wait_event the conn_stream associated is waiting on */
struct wait_event *send_wait; /* The streeam is waiting for flow control */
struct list list; /* To be used when adding in h2c->send_list or h2c->fctl_lsit */
};
/* descriptor for an h2 frame header */
@ -284,7 +286,7 @@ static int h2_buf_available(void *target)
h2c->flags &= ~H2_CF_DEM_DALLOC;
if (h2_recv_allowed(h2c)) {
conn_xprt_want_recv(h2c->conn);
tasklet_wakeup(h2c->wait_list.task);
tasklet_wakeup(h2c->wait_event.task);
}
return 1;
}
@ -298,7 +300,7 @@ static int h2_buf_available(void *target)
h2c->flags &= ~H2_CF_DEM_MROOM;
if (h2_recv_allowed(h2c)) {
conn_xprt_want_recv(h2c->conn);
tasklet_wakeup(h2c->wait_list.task);
tasklet_wakeup(h2c->wait_event.task);
}
}
return 1;
@ -310,7 +312,7 @@ static int h2_buf_available(void *target)
h2c->flags &= ~H2_CF_DEM_SALLOC;
if (h2_recv_allowed(h2c)) {
conn_xprt_want_recv(h2c->conn);
tasklet_wakeup(h2c->wait_list.task);
tasklet_wakeup(h2c->wait_event.task);
}
return 1;
}
@ -375,13 +377,12 @@ static int h2c_frt_init(struct connection *conn)
t->expire = tick_add(now_ms, h2c->timeout);
}
h2c->wait_list.task = tasklet_new();
if (!h2c->wait_list.task)
h2c->wait_event.task = tasklet_new();
if (!h2c->wait_event.task)
goto fail;
h2c->wait_list.task->process = h2_io_cb;
h2c->wait_list.task->context = h2c;
h2c->wait_list.wait_reason = 0;
LIST_INIT(&h2c->wait_list.list);
h2c->wait_event.task->process = h2_io_cb;
h2c->wait_event.task->context = h2c;
h2c->wait_event.wait_reason = 0;
h2c->ddht = hpack_dht_alloc(h2_settings_header_table_size);
if (!h2c->ddht)
@ -418,13 +419,13 @@ static int h2c_frt_init(struct connection *conn)
/* prepare to read something */
conn_xprt_want_recv(conn);
tasklet_wakeup(h2c->wait_list.task);
tasklet_wakeup(h2c->wait_event.task);
return 0;
fail:
if (t)
task_free(t);
if (h2c->wait_list.task)
tasklet_free(h2c->wait_list.task);
if (h2c->wait_event.task)
tasklet_free(h2c->wait_event.task);
pool_free(pool_head_h2c, h2c);
fail_no_h2c:
return -1;
@ -484,23 +485,11 @@ static void h2_release(struct connection *conn)
task_wakeup(h2c->task, TASK_WOKEN_OTHER);
h2c->task = NULL;
}
if (h2c->wait_list.task)
tasklet_free(h2c->wait_list.task);
LIST_DEL(&h2c->wait_list.list);
LIST_INIT(&h2c->wait_list.list);
while (!LIST_ISEMPTY(&h2c->send_list)) {
struct wait_list *sw = LIST_ELEM(h2c->send_list.n,
struct wait_list *, list);
LIST_DEL(&sw->list);
LIST_INIT(&sw->list);
}
while (!LIST_ISEMPTY(&h2c->fctl_list)) {
struct wait_list *sw = LIST_ELEM(h2c->fctl_list.n,
struct wait_list *, list);
LIST_DEL(&sw->list);
LIST_INIT(&sw->list);
}
if (h2c->wait_event.task)
tasklet_free(h2c->wait_event.task);
if (h2c->wait_event.wait_reason != 0)
conn->xprt->unsubscribe(conn, h2c->wait_event.wait_reason,
&h2c->wait_event);
pool_free(pool_head_h2c, h2c);
}
@ -665,9 +654,17 @@ static void h2s_destroy(struct h2s *h2s)
b_free(&h2s->rxbuf);
offer_buffers(NULL, tasks_run_queue);
}
LIST_DEL(&h2s->wait_list.list);
LIST_INIT(&h2s->wait_list.list);
tasklet_free(h2s->wait_list.task);
if (h2s->send_wait != NULL)
h2s->send_wait->wait_reason &= ~SUB_CAN_SEND;
if (h2s->recv_wait != NULL)
h2s->recv_wait->wait_reason &= ~SUB_CAN_RECV;
/* There's no need to explicitely call unsubscribe here, the only
* reference left would be in the h2c send_list/fctl_list, and if
* we're in it, we're getting out anyway
*/
LIST_DEL(&h2s->list);
LIST_INIT(&h2s->list);
tasklet_free(h2s->wait_event.task);
pool_free(pool_head_h2s, h2s);
}
@ -684,16 +681,18 @@ static struct h2s *h2c_stream_new(struct h2c *h2c, int id)
if (!h2s)
goto out;
h2s->wait_list.task = tasklet_new();
if (!h2s->wait_list.task)
goto out_free_h2s;
LIST_INIT(&h2s->wait_list.list);
h2s->recv_wait_list = NULL;
h2s->wait_list.task->process = h2_deferred_shut;
h2s->wait_list.task->context = h2s;
h2s->wait_list.handle = NULL;
h2s->wait_list.wait_reason = 0;
h2s->wait_event.task = tasklet_new();
if (!h2s->wait_event.task) {
pool_free(pool_head_h2s, h2s);
goto out;
}
h2s->send_wait = NULL;
h2s->recv_wait = NULL;
h2s->wait_event.task->process = h2_deferred_shut;
h2s->wait_event.task->context = h2s;
h2s->wait_event.handle = NULL;
h2s->wait_event.wait_reason = 0;
LIST_INIT(&h2s->list);
h2s->h2c = h2c;
h2s->mws = h2c->miw;
h2s->flags = H2_SF_NONE;
@ -1130,11 +1129,11 @@ static void h2_wake_some_streams(struct h2c *h2c, int last, uint32_t flags)
}
h2s->cs->flags |= flags;
if (h2s->recv_wait_list) {
struct wait_list *sw = h2s->recv_wait_list;
if (h2s->recv_wait) {
struct wait_event *sw = h2s->recv_wait;
sw->wait_reason &= ~SUB_CAN_RECV;
tasklet_wakeup(sw->task);
h2s->recv_wait_list = NULL;
h2s->recv_wait = NULL;
} else if (h2s->cs->data_cb->wake != NULL)
h2s->cs->data_cb->wake(h2s->cs);
@ -1608,12 +1607,12 @@ static int h2c_handle_rst_stream(struct h2c *h2c, struct h2s *h2s)
if (h2s->cs) {
h2s->cs->flags |= CS_FL_REOS | CS_FL_ERROR;
if (h2s->recv_wait_list) {
struct wait_list *sw = h2s->recv_wait_list;
if (h2s->recv_wait) {
struct wait_event *sw = h2s->recv_wait;
sw->wait_reason &= ~SUB_CAN_RECV;
tasklet_wakeup(sw->task);
h2s->recv_wait_list = NULL;
h2s->recv_wait = NULL;
}
}
@ -1899,10 +1898,10 @@ static void h2_process_demux(struct h2c *h2c)
if (tmp_h2s != h2s && h2s && h2s->cs && b_data(&h2s->rxbuf)) {
/* we may have to signal the upper layers */
h2s->cs->flags |= CS_FL_RCV_MORE;
if (h2s->recv_wait_list) {
h2s->recv_wait_list->wait_reason &= ~SUB_CAN_RECV;
tasklet_wakeup(h2s->recv_wait_list->task);
h2s->recv_wait_list = NULL;
if (h2s->recv_wait) {
h2s->recv_wait->wait_reason &= ~SUB_CAN_RECV;
tasklet_wakeup(h2s->recv_wait->task);
h2s->recv_wait = NULL;
}
if (h2c->st0 >= H2_CS_ERROR)
goto strm_err;
@ -2143,10 +2142,10 @@ static void h2_process_demux(struct h2c *h2c)
if (h2s && h2s->cs && b_data(&h2s->rxbuf)) {
/* we may have to signal the upper layers */
h2s->cs->flags |= CS_FL_RCV_MORE;
if (h2s->recv_wait_list) {
h2s->recv_wait_list->wait_reason &= ~SUB_CAN_RECV;
tasklet_wakeup(h2s->recv_wait_list->task);
h2s->recv_wait_list = NULL;
if (h2s->recv_wait) {
h2s->recv_wait->wait_reason &= ~SUB_CAN_RECV;
tasklet_wakeup(h2s->recv_wait->task);
h2s->recv_wait = NULL;
}
}
return;
@ -2157,8 +2156,7 @@ static void h2_process_demux(struct h2c *h2c)
*/
static int h2_process_mux(struct h2c *h2c)
{
struct h2s *h2s;
struct wait_list *sw, *sw_back;
struct h2s *h2s, *h2s_back;
/* start by sending possibly pending window updates */
if (h2c->rcvd_c > 0 &&
@ -2171,48 +2169,29 @@ static int h2_process_mux(struct h2c *h2c)
* blocked just on this.
*/
list_for_each_entry_safe(sw, sw_back, &h2c->fctl_list, list) {
h2s = sw->handle;
list_for_each_entry_safe(h2s, h2s_back, &h2c->fctl_list, list) {
if (h2c->mws <= 0 || h2c->flags & H2_CF_MUX_BLOCK_ANY ||
h2c->st0 >= H2_CS_ERROR)
break;
/* If the tasklet was added to finish shutr/shutw, just wake the task */
if ((long)(h2s) & 3) {
sw->wait_reason &= ~SUB_CAN_SEND;
LIST_DEL(&sw->list);
LIST_INIT(&sw->list);
tasklet_wakeup(sw->task);
} else if (!(h2s->flags & H2_SF_BLK_SFCTL)) {
h2s->flags &= ~H2_SF_BLK_ANY;
LIST_DEL(&sw->list);
LIST_INIT(&sw->list);
sw->wait_reason &= ~SUB_CAN_SEND;
tasklet_wakeup(sw->task);
}
h2s->flags &= ~H2_SF_BLK_ANY;
h2s->send_wait->wait_reason &= ~SUB_CAN_SEND;
tasklet_wakeup(h2s->send_wait->task);
h2s->send_wait = NULL;
LIST_DEL(&h2s->list);
LIST_INIT(&h2s->list);
}
list_for_each_entry_safe(sw, sw_back, &h2c->send_list, list) {
h2s = sw->handle;
list_for_each_entry_safe(h2s, h2s_back, &h2c->send_list, list) {
if (h2c->st0 >= H2_CS_ERROR || h2c->flags & H2_CF_MUX_BLOCK_ANY)
break;
/* If the tasklet was added to finish shutr/shutw, just wake the task */
if ((long)(h2s) & 3) {
sw->wait_reason &= ~SUB_CAN_SEND;
LIST_DEL(&sw->list);
LIST_INIT(&sw->list);
tasklet_wakeup(sw->task);
}
else if (!(h2s->flags & H2_SF_BLK_SFCTL)) {
h2s->flags &= ~H2_SF_BLK_ANY;
LIST_DEL(&sw->list);
LIST_INIT(&sw->list);
sw->wait_reason &= ~SUB_CAN_SEND;
tasklet_wakeup(sw->task);
}
h2s->flags &= ~H2_SF_BLK_ANY;
h2s->send_wait->wait_reason &= ~SUB_CAN_SEND;
tasklet_wakeup(h2s->send_wait->task);
h2s->send_wait = NULL;
LIST_DEL(&h2s->list);
LIST_INIT(&h2s->list);
}
fail:
@ -2240,7 +2219,7 @@ static int h2_recv(struct h2c *h2c)
int max;
size_t ret;
if (h2c->wait_list.wait_reason & SUB_CAN_RECV)
if (h2c->wait_event.wait_reason & SUB_CAN_RECV)
return 0;
if (!h2_recv_allowed(h2c))
@ -2262,7 +2241,7 @@ static int h2_recv(struct h2c *h2c)
if (h2_recv_allowed(h2c)) {
conn_xprt_want_recv(conn);
conn->xprt->subscribe(conn, SUB_CAN_RECV, &h2c->wait_list);
conn->xprt->subscribe(conn, SUB_CAN_RECV, &h2c->wait_event);
}
if (!b_data(buf)) {
h2_release_buf(h2c, &h2c->dbuf);
@ -2345,20 +2324,21 @@ static int h2_send(struct h2c *h2c)
*/
if (!(h2c->flags & (H2_CF_MUX_MFULL | H2_CF_DEM_MROOM))) {
while (!LIST_ISEMPTY(&h2c->send_list)) {
struct wait_list *sw = LIST_ELEM(h2c->send_list.n,
struct wait_list *, list);
LIST_DEL(&sw->list);
LIST_INIT(&sw->list);
sw->wait_reason &= ~SUB_CAN_SEND;
tasklet_wakeup(sw->task);
struct h2s *h2s = LIST_ELEM(h2c->send_list.n,
struct h2s *, list);
LIST_DEL(&h2s->list);
LIST_INIT(&h2s->list);
h2s->send_wait->wait_reason &= ~SUB_CAN_SEND;
tasklet_wakeup(h2s->send_wait->task);
h2s->send_wait = NULL;
}
}
/* We're done, no more to send */
if (!b_data(&h2c->mbuf))
return sent;
schedule:
if (LIST_ISEMPTY(&h2c->wait_list.list))
conn->xprt->subscribe(conn, SUB_CAN_SEND, &h2c->wait_list);
if (!(h2c->wait_event.wait_reason & SUB_CAN_SEND))
conn->xprt->subscribe(conn, SUB_CAN_SEND, &h2c->wait_event);
return sent;
}
@ -2367,9 +2347,9 @@ static struct task *h2_io_cb(struct task *t, void *ctx, unsigned short status)
struct h2c *h2c = ctx;
int ret = 0;
if (!(h2c->wait_list.wait_reason & SUB_CAN_SEND))
if (!(h2c->wait_event.wait_reason & SUB_CAN_SEND))
ret = h2_send(h2c);
if (!(h2c->wait_list.wait_reason & SUB_CAN_RECV))
if (!(h2c->wait_event.wait_reason & SUB_CAN_RECV))
ret |= h2_recv(h2c);
if (ret)
h2_process(h2c);
@ -2423,11 +2403,11 @@ static int h2_process(struct h2c *h2c)
while (node) {
h2s = container_of(node, struct h2s, by_id);
if ((h2s->cs->flags & CS_FL_WAIT_FOR_HS) &&
h2s->recv_wait_list) {
struct wait_list *sw = h2s->recv_wait_list;
h2s->recv_wait) {
struct wait_event *sw = h2s->recv_wait;
sw->wait_reason &= ~SUB_CAN_RECV;
tasklet_wakeup(sw->task);
h2s->recv_wait_list = NULL;
h2s->recv_wait = NULL;
}
node = eb32_next(node);
}
@ -2575,7 +2555,7 @@ static void h2_update_poll(struct conn_stream *cs)
h2s->h2c->flags &= ~H2_CF_DEM_SFULL;
if (h2s->h2c->dsi == h2s->id) {
conn_xprt_want_recv(cs->conn);
tasklet_wakeup(h2s->h2c->wait_list.task);
tasklet_wakeup(h2s->h2c->wait_event.task);
conn_xprt_want_send(cs->conn);
}
}
@ -2590,7 +2570,7 @@ static void h2_update_poll(struct conn_stream *cs)
if (cs->flags & CS_FL_DATA_WR_ENA) {
if (!b_data(&h2s->h2c->mbuf) && !(cs->conn->flags & CO_FL_SOCK_WR_SH))
conn_xprt_want_send(cs->conn);
tasklet_wakeup(h2s->h2c->wait_list.task);
tasklet_wakeup(h2s->h2c->wait_event.task);
}
/* We don't support unsubscribing from here, it shouldn't be a problem */
@ -2612,9 +2592,6 @@ static void h2_detach(struct conn_stream *cs)
return;
h2c = h2s->h2c;
/* If the stream we're detaching waited for more data, unsubscribe it now */
if (h2s->recv_wait_list && !((long)h2s->recv_wait_list->handle & 3))
h2s->recv_wait_list = NULL;
h2s->cs = NULL;
h2c->nb_cs--;
if (h2c->flags & H2_CF_DEM_TOOMANY &&
@ -2622,7 +2599,7 @@ static void h2_detach(struct conn_stream *cs)
h2c->flags &= ~H2_CF_DEM_TOOMANY;
if (h2_recv_allowed(h2c)) {
__conn_xprt_want_recv(h2c->conn);
tasklet_wakeup(h2c->wait_list.task);
tasklet_wakeup(h2c->wait_event.task);
conn_xprt_want_send(h2c->conn);
}
}
@ -2643,7 +2620,7 @@ static void h2_detach(struct conn_stream *cs)
h2c->flags &= ~H2_CF_DEM_BLOCK_ANY;
h2c->flags &= ~H2_CF_MUX_BLOCK_ANY;
conn_xprt_want_recv(cs->conn);
tasklet_wakeup(h2c->wait_list.task);
tasklet_wakeup(h2c->wait_event.task);
conn_xprt_want_send(cs->conn);
}
@ -2677,7 +2654,7 @@ static void h2_detach(struct conn_stream *cs)
static void h2_do_shutr(struct h2s *h2s)
{
struct h2c *h2c = h2s->h2c;
struct wait_list *sw = &h2s->wait_list;
struct wait_event *sw = &h2s->wait_event;
if (h2s->st == H2_SS_HLOC || h2s->st == H2_SS_ERROR || h2s->st == H2_SS_CLOSED)
return;
@ -2702,12 +2679,15 @@ static void h2_do_shutr(struct h2s *h2s)
return;
add_to_list:
if (LIST_ISEMPTY(&sw->list)) {
if (LIST_ISEMPTY(&h2s->list)) {
sw->wait_reason |= SUB_CAN_SEND;
if (h2s->flags & H2_SF_BLK_MFCTL)
LIST_ADDQ(&h2c->fctl_list, &sw->list);
else if (h2s->flags & (H2_SF_BLK_MBUSY|H2_SF_BLK_MROOM))
LIST_ADDQ(&h2c->send_list, &sw->list);
if (h2s->flags & H2_SF_BLK_MFCTL) {
LIST_ADDQ(&h2c->fctl_list, &h2s->list);
h2s->send_wait = sw;
} else if (h2s->flags & (H2_SF_BLK_MBUSY|H2_SF_BLK_MROOM)) {
h2s->send_wait = sw;
LIST_ADDQ(&h2c->send_list, &h2s->list);
}
}
/* Let the handler know we want shutr */
sw->handle = (void *)((long)sw->handle | 1);
@ -2717,7 +2697,7 @@ add_to_list:
static void h2_do_shutw(struct h2s *h2s)
{
struct h2c *h2c = h2s->h2c;
struct wait_list *sw = &h2s->wait_list;
struct wait_event *sw = &h2s->wait_event;
if (h2s->st == H2_SS_HLOC || h2s->st == H2_SS_ERROR || h2s->st == H2_SS_CLOSED)
return;
@ -2755,23 +2735,25 @@ static void h2_do_shutw(struct h2s *h2s)
conn_xprt_want_send(h2c->conn);
add_to_list:
sw = &h2s->wait_list;
if (LIST_ISEMPTY(&sw->list)) {
if (LIST_ISEMPTY(&h2s->list)) {
sw->wait_reason |= SUB_CAN_SEND;
if (h2s->flags & H2_SF_BLK_MFCTL)
LIST_ADDQ(&h2s->h2c->fctl_list, &sw->list);
else if (h2s->flags & (H2_SF_BLK_MBUSY|H2_SF_BLK_MROOM))
LIST_ADDQ(&h2s->h2c->send_list, &sw->list);
if (h2s->flags & H2_SF_BLK_MFCTL) {
LIST_ADDQ(&h2c->fctl_list, &h2s->list);
h2s->send_wait = sw;
} else if (h2s->flags & (H2_SF_BLK_MBUSY|H2_SF_BLK_MROOM)) {
h2s->send_wait = sw;
LIST_ADDQ(&h2c->send_list, &h2s->list);
}
}
/* let the handler know we want to shutr */
sw->handle = (void *)((long)(sw->handle) | 2);
/* let the handler know we want to shutw */
sw->handle = (void *)((long)(sw->handle) | 2);
}
static struct task *h2_deferred_shut(struct task *t, void *ctx, unsigned short state)
{
struct h2s *h2s = ctx;
long reason = (long)h2s->wait_list.handle;
long reason = (long)h2s->wait_event.handle;
if (reason & 1)
h2_do_shutr(h2s);
@ -3537,7 +3519,7 @@ static size_t h2s_frt_make_resp_data(struct h2s *h2s, const struct buffer *buf,
/* Called from the upper layer, to subscribe to events, such as being able to send */
static int h2_subscribe(struct conn_stream *cs, int event_type, void *param)
{
struct wait_list *sw;
struct wait_event *sw;
struct h2s *h2s = cs->ctx;
struct h2c *h2c = h2s->h2c;
@ -3546,7 +3528,7 @@ static int h2_subscribe(struct conn_stream *cs, int event_type, void *param)
if (!(sw->wait_reason & SUB_CAN_RECV)) {
sw->wait_reason |= SUB_CAN_RECV;
sw->handle = h2s;
h2s->recv_wait_list = sw;
h2s->recv_wait = sw;
}
event_type &= ~SUB_CAN_RECV;
}
@ -3555,10 +3537,13 @@ static int h2_subscribe(struct conn_stream *cs, int event_type, void *param)
if (!(sw->wait_reason & SUB_CAN_SEND)) {
sw->wait_reason |= SUB_CAN_SEND;
sw->handle = h2s;
if (h2s->flags & H2_SF_BLK_MFCTL)
LIST_ADDQ(&h2c->fctl_list, &sw->list);
else
LIST_ADDQ(&h2c->send_list, &sw->list);
h2s->send_wait = sw;
if (!(h2s->flags & H2_SF_BLK_SFCTL)) {
if (h2s->flags & H2_SF_BLK_MFCTL)
LIST_ADDQ(&h2c->fctl_list, &h2s->list);
else
LIST_ADDQ(&h2c->send_list, &h2s->list);
}
}
event_type &= ~SUB_CAN_SEND;
}
@ -3571,21 +3556,23 @@ static int h2_subscribe(struct conn_stream *cs, int event_type, void *param)
static int h2_unsubscribe(struct conn_stream *cs, int event_type, void *param)
{
struct wait_list *sw;
struct wait_event *sw;
struct h2s *h2s = cs->ctx;
if (event_type & SUB_CAN_RECV) {
sw = param;
if (h2s->recv_wait_list == sw) {
if (h2s->recv_wait == sw) {
sw->wait_reason &= ~SUB_CAN_RECV;
h2s->recv_wait_list = NULL;
h2s->recv_wait = NULL;
}
}
if (event_type & SUB_CAN_SEND) {
sw = param;
if (sw->wait_reason & SUB_CAN_SEND) {
LIST_DEL(&sw->list);
LIST_INIT(&sw->list);
if (h2s->send_wait == sw) {
LIST_DEL(&h2s->list);
LIST_INIT(&h2s->list);
sw->wait_reason &= ~SUB_CAN_SEND;
h2s->send_wait = NULL;
}
}
return 0;
@ -3681,8 +3668,8 @@ static size_t h2_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t coun
b_del(buf, total);
if (total > 0) {
conn_xprt_want_send(h2s->h2c->conn);
if (!(h2s->h2c->wait_list.wait_reason & SUB_CAN_SEND))
tasklet_wakeup(h2s->h2c->wait_list.task);
if (!(h2s->h2c->wait_event.wait_reason & SUB_CAN_SEND))
tasklet_wakeup(h2s->h2c->wait_event.task);
}
return total;
}
@ -3692,7 +3679,6 @@ static void h2_show_fd(struct buffer *msg, struct connection *conn)
{
struct h2c *h2c = conn->mux_ctx;
struct h2s *h2s;
struct wait_list *sw;
struct eb32_node *node;
int fctl_cnt = 0;
int send_cnt = 0;
@ -3702,10 +3688,10 @@ static void h2_show_fd(struct buffer *msg, struct connection *conn)
if (!h2c)
return;
list_for_each_entry(sw, &h2c->fctl_list, list)
list_for_each_entry(h2s, &h2c->fctl_list, list)
fctl_cnt++;
list_for_each_entry(sw, &h2c->send_list, list)
list_for_each_entry(h2s, &h2c->send_list, list)
send_cnt++;
node = eb32_first(&h2c->streams_by_id);

View File

@ -288,9 +288,9 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin)
out_fail_accept:
flt_stream_release(s, 0);
task_free(t);
tasklet_free(s->si[1].wait_list.task);
tasklet_free(s->si[1].wait_event.task);
out_fail_alloc_si1:
tasklet_free(s->si[0].wait_list.task);
tasklet_free(s->si[0].wait_event.task);
out_fail_alloc:
LIST_DEL(&s->list);
pool_free(pool_head_stream, s);
@ -406,10 +406,21 @@ static void stream_free(struct stream *s)
if (must_free_sess)
session_free(sess);
tasklet_free(s->si[0].wait_list.task);
LIST_DEL(&s->si[0].wait_list.list);
tasklet_free(s->si[1].wait_list.task);
LIST_DEL(&s->si[1].wait_list.list);
tasklet_free(s->si[0].wait_event.task);
if (s->si[0].wait_event.wait_reason != 0) {
struct conn_stream *cs = objt_cs(s->si[0].end);
if (cs)
cs->conn->mux->unsubscribe(cs, s->si[0].wait_event.wait_reason,
&s->si[0].wait_event);
}
tasklet_free(s->si[1].wait_event.task);
if (s->si[1].wait_event.wait_reason != 0) {
struct conn_stream *cs = objt_cs(s->si[1].end);
if (cs)
cs->conn->mux->unsubscribe(cs, s->si[1].wait_event.wait_reason,
&s->si[1].wait_event);
}
pool_free(pool_head_stream, s);
/* We may want to free the maximum amount of pools if the proxy is stopping */

View File

@ -632,7 +632,7 @@ int si_cs_send(struct conn_stream *cs)
int did_send = 0;
/* We're already waiting to be able to send, give up */
if (si->wait_list.wait_reason & SUB_CAN_SEND)
if (si->wait_event.wait_reason & SUB_CAN_SEND)
return 0;
if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
@ -641,7 +641,7 @@ int si_cs_send(struct conn_stream *cs)
if (conn->flags & CO_FL_HANDSHAKE) {
/* a handshake was requested */
/* Schedule ourself to be woken up once the handshake is done */
conn->xprt->subscribe(conn, SUB_CAN_SEND, &si->wait_list);
conn->xprt->subscribe(conn, SUB_CAN_SEND, &si->wait_event);
return 0;
}
@ -722,7 +722,7 @@ int si_cs_send(struct conn_stream *cs)
/* We couldn't send all of our data, let the mux know we'd like to send more */
if (co_data(oc)) {
cs_want_send(cs);
conn->mux->subscribe(cs, SUB_CAN_SEND, &si->wait_list);
conn->mux->subscribe(cs, SUB_CAN_SEND, &si->wait_event);
}
return did_send;
}
@ -736,9 +736,9 @@ struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned short state)
if (!cs)
return NULL;
redo:
if (!(si->wait_list.wait_reason & SUB_CAN_SEND))
if (!(si->wait_event.wait_reason & SUB_CAN_SEND))
ret = si_cs_send(cs);
if (!(si->wait_list.wait_reason & SUB_CAN_RECV))
if (!(si->wait_event.wait_reason & SUB_CAN_RECV))
ret |= si_cs_recv(cs);
if (ret != 0)
si_cs_process(cs);
@ -1137,7 +1137,7 @@ int si_cs_recv(struct conn_stream *cs)
/* If another call to si_cs_recv() failed, and we subscribed to
* recv events already, give up now.
*/
if (si->wait_list.wait_reason & SUB_CAN_RECV)
if (si->wait_event.wait_reason & SUB_CAN_RECV)
return 0;
/* maybe we were called immediately after an asynchronous shutr */
@ -1347,7 +1347,7 @@ int si_cs_recv(struct conn_stream *cs)
goto out_shutdown_r;
/* Subscribe to receive events */
conn->mux->subscribe(cs, SUB_CAN_RECV, &si->wait_list);
conn->mux->subscribe(cs, SUB_CAN_RECV, &si->wait_event);
return cur_read != 0;