mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2025-04-21 14:35:45 +00:00
MEDIUM: connection: close front idling connection on soft-stop
Implement a safe mechanism to close front idling connection which prevents the soft-stop to complete. Every h1/h2 front connection is added in a new per-thread list instance. On shutdown, a new task is waking up which calls wake mux operation on every connection still present in the new list. A new stopping_list attach point has been added in the connection structure. As this member is only used for frontend connections, it shared the same union as the session_list reserved for backend connections.
This commit is contained in:
parent
efc6e95642
commit
d3a88c1c32
@ -424,6 +424,14 @@ struct mux_ops {
|
|||||||
char name[8]; /* mux layer name, zero-terminated */
|
char name[8]; /* mux layer name, zero-terminated */
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/* list of frontend connections. Used to call mux wake operation on soft-stop
|
||||||
|
* to close idling connections.
|
||||||
|
*/
|
||||||
|
struct mux_stopping_data {
|
||||||
|
struct list list; /* list of registered frontend connections */
|
||||||
|
struct task *task; /* task woken up on soft-stop */
|
||||||
|
};
|
||||||
|
|
||||||
/* data_cb describes the data layer's recv and send callbacks which are called
|
/* data_cb describes the data layer's recv and send callbacks which are called
|
||||||
* when I/O activity was detected after the transport layer is ready. These
|
* when I/O activity was detected after the transport layer is ready. These
|
||||||
* callbacks are supposed to make use of the xprt_ops above to exchange data
|
* callbacks are supposed to make use of the xprt_ops above to exchange data
|
||||||
@ -528,6 +536,7 @@ struct connection {
|
|||||||
struct mt_list toremove_list; /* list for connection to clean up */
|
struct mt_list toremove_list; /* list for connection to clean up */
|
||||||
union {
|
union {
|
||||||
struct list session_list; /* used by backend conns, list of attached connections to a session */
|
struct list session_list; /* used by backend conns, list of attached connections to a session */
|
||||||
|
struct list stopping_list; /* used by frontend conns, attach point in mux stopping list */
|
||||||
};
|
};
|
||||||
union conn_handle handle; /* connection handle at the socket layer */
|
union conn_handle handle; /* connection handle at the socket layer */
|
||||||
const struct netns_entry *proxy_netns;
|
const struct netns_entry *proxy_netns;
|
||||||
|
@ -44,6 +44,7 @@ extern struct pool_head *pool_head_sockaddr;
|
|||||||
extern struct pool_head *pool_head_authority;
|
extern struct pool_head *pool_head_authority;
|
||||||
extern struct xprt_ops *registered_xprt[XPRT_ENTRIES];
|
extern struct xprt_ops *registered_xprt[XPRT_ENTRIES];
|
||||||
extern struct mux_proto_list mux_proto_list;
|
extern struct mux_proto_list mux_proto_list;
|
||||||
|
extern struct mux_stopping_data mux_stopping_data[MAX_THREADS];
|
||||||
|
|
||||||
#define IS_HTX_CONN(conn) ((conn)->mux && ((conn)->mux->flags & MX_FL_HTX))
|
#define IS_HTX_CONN(conn) ((conn)->mux && ((conn)->mux->flags & MX_FL_HTX))
|
||||||
#define IS_HTX_CS(cs) (IS_HTX_CONN((cs)->conn))
|
#define IS_HTX_CS(cs) (IS_HTX_CONN((cs)->conn))
|
||||||
@ -373,6 +374,8 @@ static inline void conn_init(struct connection *conn, void *target)
|
|||||||
MT_LIST_INIT(&conn->toremove_list);
|
MT_LIST_INIT(&conn->toremove_list);
|
||||||
if (conn_is_back(conn))
|
if (conn_is_back(conn))
|
||||||
LIST_INIT(&conn->session_list);
|
LIST_INIT(&conn->session_list);
|
||||||
|
else
|
||||||
|
LIST_INIT(&conn->stopping_list);
|
||||||
conn->subs = NULL;
|
conn->subs = NULL;
|
||||||
conn->src = NULL;
|
conn->src = NULL;
|
||||||
conn->dst = NULL;
|
conn->dst = NULL;
|
||||||
|
@ -57,6 +57,8 @@ extern int atexit_flag;
|
|||||||
extern unsigned char boot_seed[20]; // per-boot random seed (160 bits initially)
|
extern unsigned char boot_seed[20]; // per-boot random seed (160 bits initially)
|
||||||
extern THREAD_LOCAL struct buffer trash;
|
extern THREAD_LOCAL struct buffer trash;
|
||||||
|
|
||||||
|
extern struct task *stopping_task[MAX_PROCS];
|
||||||
|
|
||||||
struct proxy;
|
struct proxy;
|
||||||
struct server;
|
struct server;
|
||||||
int main(int argc, char **argv);
|
int main(int argc, char **argv);
|
||||||
|
@ -41,6 +41,8 @@ struct mux_proto_list mux_proto_list = {
|
|||||||
.list = LIST_HEAD_INIT(mux_proto_list.list)
|
.list = LIST_HEAD_INIT(mux_proto_list.list)
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct mux_stopping_data mux_stopping_data[MAX_THREADS];
|
||||||
|
|
||||||
/* disables sending of proxy-protocol-v2's LOCAL command */
|
/* disables sending of proxy-protocol-v2's LOCAL command */
|
||||||
static int pp2_never_send_local;
|
static int pp2_never_send_local;
|
||||||
|
|
||||||
|
@ -2370,11 +2370,31 @@ __attribute__((noreturn)) void deinit_and_exit(int status)
|
|||||||
exit(status);
|
exit(status);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Handler of the task of mux_stopping_data.
|
||||||
|
* Called on soft-stop.
|
||||||
|
*/
|
||||||
|
struct task *mux_stopping_process(struct task *t, void *ctx, unsigned int state)
|
||||||
|
{
|
||||||
|
struct connection *conn, *back;
|
||||||
|
|
||||||
|
list_for_each_entry_safe(conn, back, &mux_stopping_data[tid].list, stopping_list) {
|
||||||
|
if (conn->mux && conn->mux->wake)
|
||||||
|
conn->mux->wake(conn);
|
||||||
|
}
|
||||||
|
|
||||||
|
return t;
|
||||||
|
}
|
||||||
|
|
||||||
/* Runs the polling loop */
|
/* Runs the polling loop */
|
||||||
void run_poll_loop()
|
void run_poll_loop()
|
||||||
{
|
{
|
||||||
int next, wake;
|
int next, wake;
|
||||||
|
|
||||||
|
/* allocates the thread bound mux_stopping_data task */
|
||||||
|
mux_stopping_data[tid].task = task_new(tid_bit);
|
||||||
|
mux_stopping_data[tid].task->process = mux_stopping_process;
|
||||||
|
LIST_INIT(&mux_stopping_data[tid].list);
|
||||||
|
|
||||||
tv_update_date(0,1);
|
tv_update_date(0,1);
|
||||||
while (1) {
|
while (1) {
|
||||||
wake_expired_tasks();
|
wake_expired_tasks();
|
||||||
@ -2411,6 +2431,12 @@ void run_poll_loop()
|
|||||||
int i;
|
int i;
|
||||||
|
|
||||||
if (stopping) {
|
if (stopping) {
|
||||||
|
/* stop muxes before acknowleding stopping */
|
||||||
|
if (!(stopping_thread_mask & tid_bit)) {
|
||||||
|
task_wakeup(mux_stopping_data[tid].task, TASK_WOKEN_OTHER);
|
||||||
|
wake = 1;
|
||||||
|
}
|
||||||
|
|
||||||
if (_HA_ATOMIC_OR_FETCH(&stopping_thread_mask, tid_bit) == tid_bit) {
|
if (_HA_ATOMIC_OR_FETCH(&stopping_thread_mask, tid_bit) == tid_bit) {
|
||||||
/* notify all threads that stopping was just set */
|
/* notify all threads that stopping was just set */
|
||||||
for (i = 0; i < global.nbthread; i++)
|
for (i = 0; i < global.nbthread; i++)
|
||||||
@ -2438,6 +2464,8 @@ void run_poll_loop()
|
|||||||
|
|
||||||
activity[tid].loops++;
|
activity[tid].loops++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
task_destroy(mux_stopping_data[tid].task);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void *run_thread_poll_loop(void *data)
|
static void *run_thread_poll_loop(void *data)
|
||||||
|
@ -816,6 +816,9 @@ static int h1_init(struct connection *conn, struct proxy *proxy, struct session
|
|||||||
h1c->shut_timeout = h1c->timeout = proxy->timeout.client;
|
h1c->shut_timeout = h1c->timeout = proxy->timeout.client;
|
||||||
if (tick_isset(proxy->timeout.clientfin))
|
if (tick_isset(proxy->timeout.clientfin))
|
||||||
h1c->shut_timeout = proxy->timeout.clientfin;
|
h1c->shut_timeout = proxy->timeout.clientfin;
|
||||||
|
|
||||||
|
LIST_APPEND(&mux_stopping_data[tid].list,
|
||||||
|
&h1c->conn->stopping_list);
|
||||||
}
|
}
|
||||||
if (tick_isset(h1c->timeout)) {
|
if (tick_isset(h1c->timeout)) {
|
||||||
t = task_new(tid_bit);
|
t = task_new(tid_bit);
|
||||||
@ -936,6 +939,9 @@ static void h1_release(struct h1c *h1c)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (conn) {
|
if (conn) {
|
||||||
|
if (!conn_is_back(conn))
|
||||||
|
LIST_DEL_INIT(&conn->stopping_list);
|
||||||
|
|
||||||
conn->mux = NULL;
|
conn->mux = NULL;
|
||||||
conn->ctx = NULL;
|
conn->ctx = NULL;
|
||||||
TRACE_DEVEL("freeing conn", H1_EV_H1C_END, conn);
|
TRACE_DEVEL("freeing conn", H1_EV_H1C_END, conn);
|
||||||
|
12
src/mux_h2.c
12
src/mux_h2.c
@ -955,6 +955,15 @@ static int h2_init(struct connection *conn, struct proxy *prx, struct session *s
|
|||||||
h2c->wait_event.tasklet->process = h2_io_cb;
|
h2c->wait_event.tasklet->process = h2_io_cb;
|
||||||
h2c->wait_event.tasklet->context = h2c;
|
h2c->wait_event.tasklet->context = h2c;
|
||||||
h2c->wait_event.events = 0;
|
h2c->wait_event.events = 0;
|
||||||
|
if (!conn_is_back(conn)) {
|
||||||
|
/* Connection might already be in the stopping_list if subject
|
||||||
|
* to h1->h2 upgrade.
|
||||||
|
*/
|
||||||
|
if (!LIST_INLIST(&conn->stopping_list)) {
|
||||||
|
LIST_APPEND(&mux_stopping_data[tid].list,
|
||||||
|
&conn->stopping_list);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
h2c->ddht = hpack_dht_alloc();
|
h2c->ddht = hpack_dht_alloc();
|
||||||
if (!h2c->ddht)
|
if (!h2c->ddht)
|
||||||
@ -1097,6 +1106,9 @@ static void h2_release(struct h2c *h2c)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (conn) {
|
if (conn) {
|
||||||
|
if (!conn_is_back(conn))
|
||||||
|
LIST_DEL_INIT(&conn->stopping_list);
|
||||||
|
|
||||||
conn->mux = NULL;
|
conn->mux = NULL;
|
||||||
conn->ctx = NULL;
|
conn->ctx = NULL;
|
||||||
TRACE_DEVEL("freeing conn", H2_EV_H2C_END, conn);
|
TRACE_DEVEL("freeing conn", H2_EV_H2C_END, conn);
|
||||||
|
Loading…
Reference in New Issue
Block a user