diff --git a/include/haproxy/connection-t.h b/include/haproxy/connection-t.h index 531405607..7cac07a69 100644 --- a/include/haproxy/connection-t.h +++ b/include/haproxy/connection-t.h @@ -424,6 +424,14 @@ struct mux_ops { char name[8]; /* mux layer name, zero-terminated */ }; +/* list of frontend connections. Used to call mux wake operation on soft-stop + * to close idling connections. + */ +struct mux_stopping_data { + struct list list; /* list of registered frontend connections */ + struct task *task; /* task woken up on soft-stop */ +}; + /* data_cb describes the data layer's recv and send callbacks which are called * when I/O activity was detected after the transport layer is ready. These * callbacks are supposed to make use of the xprt_ops above to exchange data @@ -528,6 +536,7 @@ struct connection { struct mt_list toremove_list; /* list for connection to clean up */ union { struct list session_list; /* used by backend conns, list of attached connections to a session */ + struct list stopping_list; /* used by frontend conns, attach point in mux stopping list */ }; union conn_handle handle; /* connection handle at the socket layer */ const struct netns_entry *proxy_netns; diff --git a/include/haproxy/connection.h b/include/haproxy/connection.h index 296da89e5..b0b855e2c 100644 --- a/include/haproxy/connection.h +++ b/include/haproxy/connection.h @@ -44,6 +44,7 @@ extern struct pool_head *pool_head_sockaddr; extern struct pool_head *pool_head_authority; extern struct xprt_ops *registered_xprt[XPRT_ENTRIES]; extern struct mux_proto_list mux_proto_list; +extern struct mux_stopping_data mux_stopping_data[MAX_THREADS]; #define IS_HTX_CONN(conn) ((conn)->mux && ((conn)->mux->flags & MX_FL_HTX)) #define IS_HTX_CS(cs) (IS_HTX_CONN((cs)->conn)) @@ -373,6 +374,8 @@ static inline void conn_init(struct connection *conn, void *target) MT_LIST_INIT(&conn->toremove_list); if (conn_is_back(conn)) LIST_INIT(&conn->session_list); + else + LIST_INIT(&conn->stopping_list); conn->subs = NULL; conn->src = NULL; conn->dst = NULL; diff --git a/include/haproxy/global.h b/include/haproxy/global.h index 9d8f7896e..4231ba829 100644 --- a/include/haproxy/global.h +++ b/include/haproxy/global.h @@ -57,6 +57,8 @@ extern int atexit_flag; extern unsigned char boot_seed[20]; // per-boot random seed (160 bits initially) extern THREAD_LOCAL struct buffer trash; +extern struct task *stopping_task[MAX_PROCS]; + struct proxy; struct server; int main(int argc, char **argv); diff --git a/src/connection.c b/src/connection.c index c0ba8155f..67b212452 100644 --- a/src/connection.c +++ b/src/connection.c @@ -41,6 +41,8 @@ struct mux_proto_list mux_proto_list = { .list = LIST_HEAD_INIT(mux_proto_list.list) }; +struct mux_stopping_data mux_stopping_data[MAX_THREADS]; + /* disables sending of proxy-protocol-v2's LOCAL command */ static int pp2_never_send_local; diff --git a/src/haproxy.c b/src/haproxy.c index 5d3e735e2..f149fc5cb 100644 --- a/src/haproxy.c +++ b/src/haproxy.c @@ -2370,11 +2370,31 @@ __attribute__((noreturn)) void deinit_and_exit(int status) exit(status); } +/* Handler of the task of mux_stopping_data. + * Called on soft-stop. + */ +struct task *mux_stopping_process(struct task *t, void *ctx, unsigned int state) +{ + struct connection *conn, *back; + + list_for_each_entry_safe(conn, back, &mux_stopping_data[tid].list, stopping_list) { + if (conn->mux && conn->mux->wake) + conn->mux->wake(conn); + } + + return t; +} + /* Runs the polling loop */ void run_poll_loop() { int next, wake; + /* allocates the thread bound mux_stopping_data task */ + mux_stopping_data[tid].task = task_new(tid_bit); + mux_stopping_data[tid].task->process = mux_stopping_process; + LIST_INIT(&mux_stopping_data[tid].list); + tv_update_date(0,1); while (1) { wake_expired_tasks(); @@ -2411,6 +2431,12 @@ void run_poll_loop() int i; if (stopping) { + /* stop muxes before acknowleding stopping */ + if (!(stopping_thread_mask & tid_bit)) { + task_wakeup(mux_stopping_data[tid].task, TASK_WOKEN_OTHER); + wake = 1; + } + if (_HA_ATOMIC_OR_FETCH(&stopping_thread_mask, tid_bit) == tid_bit) { /* notify all threads that stopping was just set */ for (i = 0; i < global.nbthread; i++) @@ -2438,6 +2464,8 @@ void run_poll_loop() activity[tid].loops++; } + + task_destroy(mux_stopping_data[tid].task); } static void *run_thread_poll_loop(void *data) diff --git a/src/mux_h1.c b/src/mux_h1.c index 940b7540c..62bd4771d 100644 --- a/src/mux_h1.c +++ b/src/mux_h1.c @@ -816,6 +816,9 @@ static int h1_init(struct connection *conn, struct proxy *proxy, struct session h1c->shut_timeout = h1c->timeout = proxy->timeout.client; if (tick_isset(proxy->timeout.clientfin)) h1c->shut_timeout = proxy->timeout.clientfin; + + LIST_APPEND(&mux_stopping_data[tid].list, + &h1c->conn->stopping_list); } if (tick_isset(h1c->timeout)) { t = task_new(tid_bit); @@ -936,6 +939,9 @@ static void h1_release(struct h1c *h1c) } if (conn) { + if (!conn_is_back(conn)) + LIST_DEL_INIT(&conn->stopping_list); + conn->mux = NULL; conn->ctx = NULL; TRACE_DEVEL("freeing conn", H1_EV_H1C_END, conn); diff --git a/src/mux_h2.c b/src/mux_h2.c index ebae4382c..1374cd7e0 100644 --- a/src/mux_h2.c +++ b/src/mux_h2.c @@ -955,6 +955,15 @@ static int h2_init(struct connection *conn, struct proxy *prx, struct session *s h2c->wait_event.tasklet->process = h2_io_cb; h2c->wait_event.tasklet->context = h2c; h2c->wait_event.events = 0; + if (!conn_is_back(conn)) { + /* Connection might already be in the stopping_list if subject + * to h1->h2 upgrade. + */ + if (!LIST_INLIST(&conn->stopping_list)) { + LIST_APPEND(&mux_stopping_data[tid].list, + &conn->stopping_list); + } + } h2c->ddht = hpack_dht_alloc(); if (!h2c->ddht) @@ -1097,6 +1106,9 @@ static void h2_release(struct h2c *h2c) } if (conn) { + if (!conn_is_back(conn)) + LIST_DEL_INIT(&conn->stopping_list); + conn->mux = NULL; conn->ctx = NULL; TRACE_DEVEL("freeing conn", H2_EV_H2C_END, conn);