From e7475c8e793f335bb61c2db4eedc30416321b9e2 Mon Sep 17 00:00:00 2001 From: Willy Tarreau Date: Mon, 20 Jun 2022 09:23:24 +0200 Subject: [PATCH] MEDIUM: tasks/fd: replace sleeping_thread_mask with a TH_FL_SLEEPING flag Every single place where sleeping_thread_mask was still used was to test or set a single thread. We can now add a per-thread flag to indicate a thread is sleeping, and remove this shared mask. The wake_thread() function now always performs an atomic fetch-and-or instead of a first load then an atomic OR. That's cleaner and more reliable. This is not easy to test, as broadcast FD events are rare. The good way to test for this is to run a very low rate-limited frontend with a listener that listens to the fewest possible threads (2), and to send it only 1 connection at a time. The listener will periodically pause and the wakeup task will sometimes wake up on a random thread and will call wake_thread(): frontend test bind :8888 maxconn 10 thread 1-2 rate-limit sessions 5 Alternately, disabling/enabling a frontend in loops via the CLI also broadcasts such events, but they're more difficult to observe since this is causing connection failures. --- include/haproxy/fd.h | 4 +--- include/haproxy/global.h | 1 - include/haproxy/tinfo-t.h | 1 + src/debug.c | 3 ++- src/fd.c | 3 +-- src/haproxy.c | 5 ++--- src/wdt.c | 3 ++- 7 files changed, 9 insertions(+), 11 deletions(-) diff --git a/include/haproxy/fd.h b/include/haproxy/fd.h index 29765d82e..eaee211aa 100644 --- a/include/haproxy/fd.h +++ b/include/haproxy/fd.h @@ -375,10 +375,8 @@ static inline void wake_thread(int thr) { struct thread_ctx *ctx = &ha_thread_ctx[thr]; - if (sleeping_thread_mask & (1UL << thr) && - (_HA_ATOMIC_LOAD(&ctx->flags) & TH_FL_NOTIFIED) == 0) { + if ((_HA_ATOMIC_FETCH_OR(&ctx->flags, TH_FL_NOTIFIED) & (TH_FL_SLEEPING|TH_FL_NOTIFIED)) == TH_FL_SLEEPING) { char c = 'c'; - _HA_ATOMIC_OR(&ctx->flags, TH_FL_NOTIFIED); DISGUISE(write(poller_wr_pipe[thr], &c, 1)); } } diff --git a/include/haproxy/global.h b/include/haproxy/global.h index dcabd7c3c..3f6f1209e 100644 --- a/include/haproxy/global.h +++ b/include/haproxy/global.h @@ -43,7 +43,6 @@ extern int killed; /* >0 means a hard-stop is triggered, >1 means hard-stop imme extern char hostname[MAX_HOSTNAME_LEN]; extern char *localpeer; extern unsigned int warned; /* bitfield of a few warnings to emit just once */ -extern volatile unsigned long sleeping_thread_mask; extern struct list proc_list; /* list of process in mworker mode */ extern int master; /* 1 if in master, 0 otherwise */ extern unsigned int rlim_fd_cur_at_boot; diff --git a/include/haproxy/tinfo-t.h b/include/haproxy/tinfo-t.h index 51a0e275b..15ebcd17d 100644 --- a/include/haproxy/tinfo-t.h +++ b/include/haproxy/tinfo-t.h @@ -43,6 +43,7 @@ enum { #define TH_FL_STUCK 0x00000001 #define TH_FL_TASK_PROFILING 0x00000002 #define TH_FL_NOTIFIED 0x00000004 /* task was notified about the need to wake up */ +#define TH_FL_SLEEPING 0x00000008 /* thread won't check its task list before next wakeup */ /* Thread group information. This defines a base and a count of global thread diff --git a/src/debug.c b/src/debug.c index 4207d1309..ee0379809 100644 --- a/src/debug.c +++ b/src/debug.c @@ -1365,7 +1365,8 @@ void debug_handler(int sig, siginfo_t *si, void *arg) /* mark the current thread as stuck to detect it upon next invocation * if it didn't move. */ - if (!((threads_harmless_mask|sleeping_thread_mask) & tid_bit)) + if (!(threads_harmless_mask & tid_bit) && + !(_HA_ATOMIC_LOAD(&th_ctx->flags) & TH_FL_SLEEPING)) _HA_ATOMIC_OR(&th_ctx->flags, TH_FL_STUCK); } diff --git a/src/fd.c b/src/fd.c index 56fa292ec..a0248289e 100644 --- a/src/fd.c +++ b/src/fd.c @@ -770,8 +770,7 @@ void fd_leaving_poll(int wait_time, int status) thread_harmless_end(); thread_idle_end(); - if (sleeping_thread_mask & tid_bit) - _HA_ATOMIC_AND(&sleeping_thread_mask, ~tid_bit); + _HA_ATOMIC_AND(&th_ctx->flags, ~TH_FL_SLEEPING); } /* disable the specified poller */ diff --git a/src/haproxy.c b/src/haproxy.c index a1addfe7b..1df98d615 100644 --- a/src/haproxy.c +++ b/src/haproxy.c @@ -165,7 +165,6 @@ const char *build_features = ""; static struct list cfg_cfgfiles = LIST_HEAD_INIT(cfg_cfgfiles); int pid; /* current process id */ -volatile unsigned long sleeping_thread_mask = 0; /* Threads that are about to sleep in poll() */ volatile unsigned long stopping_thread_mask = 0; /* Threads acknowledged stopping */ /* global options */ @@ -2804,12 +2803,12 @@ void run_poll_loop() if (thread_has_tasks()) activity[tid].wake_tasks++; else { - _HA_ATOMIC_OR(&sleeping_thread_mask, tid_bit); + _HA_ATOMIC_OR(&th_ctx->flags, TH_FL_SLEEPING); _HA_ATOMIC_AND(&th_ctx->flags, ~TH_FL_NOTIFIED); __ha_barrier_atomic_store(); if (thread_has_tasks()) { activity[tid].wake_tasks++; - _HA_ATOMIC_AND(&sleeping_thread_mask, ~tid_bit); + _HA_ATOMIC_AND(&th_ctx->flags, ~TH_FL_SLEEPING); } else wake = 0; } diff --git a/src/wdt.c b/src/wdt.c index dc5c1dde5..fab1269f0 100644 --- a/src/wdt.c +++ b/src/wdt.c @@ -80,7 +80,8 @@ void wdt_handler(int sig, siginfo_t *si, void *arg) if (!p || n - p < 1000000000UL) goto update_and_leave; - if ((threads_harmless_mask|sleeping_thread_mask|threads_to_dump) & (1UL << thr)) { + if ((_HA_ATOMIC_LOAD(&th_ctx->flags) & TH_FL_SLEEPING) && + ((threads_harmless_mask|threads_to_dump) & (1UL << thr))) { /* This thread is currently doing exactly nothing * waiting in the poll loop (unlikely but possible), * waiting for all other threads to join the rendez-vous