MEDIUM: thread: make stopping_threads per-group and add stopping_tgroups

Stopping threads need a mask to figure who's still there without scanning
everything in the poll loop. This means this will have to be per-group.
And we also need to have a global stopping groups mask to know what groups
were already signaled. This is used both to figure what thread is the first
one to catch the event, and which one is the first one to detect the end of
the last job. The logic isn't changed, though a loop is required in the
slow path to make sure all threads are aware of the end.

Note that for now the soft-stop still takes time for group IDs > 1 as the
poller is not yet started on these threads and needs to expire its timeout
as there's no way to wake it up. But all threads are eventually stopped.
This commit is contained in:
Willy Tarreau 2022-06-28 19:29:29 +02:00
parent 03f9b35114
commit ef422ced91
2 changed files with 25 additions and 12 deletions

View File

@ -69,6 +69,7 @@ struct tgroup_ctx {
ulong threads_want_rdv; /* mask of threads that wand a rendez-vous */ ulong threads_want_rdv; /* mask of threads that wand a rendez-vous */
ulong threads_harmless; /* mask of threads that are not modifying anything */ ulong threads_harmless; /* mask of threads that are not modifying anything */
ulong threads_idle; /* mask of threads idling in the poller */ ulong threads_idle; /* mask of threads idling in the poller */
ulong stopping_threads; /* mask of threads currently stopping */
/* pad to cache line (64B) */ /* pad to cache line (64B) */
char __pad[0]; /* unused except to check remaining room */ char __pad[0]; /* unused except to check remaining room */
char __end[0] __attribute__((aligned(64))); char __end[0] __attribute__((aligned(64)));

View File

@ -165,7 +165,7 @@ const char *build_features = "";
static struct list cfg_cfgfiles = LIST_HEAD_INIT(cfg_cfgfiles); static struct list cfg_cfgfiles = LIST_HEAD_INIT(cfg_cfgfiles);
int pid; /* current process id */ int pid; /* current process id */
volatile unsigned long stopping_thread_mask = 0; /* Threads acknowledged stopping */ static unsigned long stopping_tgroup_mask; /* Thread groups acknowledging stopping */
/* global options */ /* global options */
struct global global = { struct global global = {
@ -2818,28 +2818,40 @@ void run_poll_loop()
if (stopping) { if (stopping) {
/* stop muxes before acknowledging stopping */ /* stop muxes before acknowledging stopping */
if (!(stopping_thread_mask & tid_bit)) { if (!(tg_ctx->stopping_threads & tid_bit)) {
task_wakeup(mux_stopping_data[tid].task, TASK_WOKEN_OTHER); task_wakeup(mux_stopping_data[tid].task, TASK_WOKEN_OTHER);
wake = 1; wake = 1;
} }
if (_HA_ATOMIC_OR_FETCH(&stopping_thread_mask, tid_bit) == tid_bit) { if (_HA_ATOMIC_OR_FETCH(&tg_ctx->stopping_threads, ti->ltid_bit) == ti->ltid_bit &&
/* notify all threads that stopping was just set */ _HA_ATOMIC_OR_FETCH(&stopping_tgroup_mask, tg->tgid_bit) == tg->tgid_bit) {
for (i = 0; i < global.nbthread; i++) /* first one to detect it, notify all threads that stopping was just set */
if (((all_threads_mask & ~stopping_thread_mask) >> i) & 1) for (i = 0; i < global.nbthread; i++) {
if (ha_thread_info[i].tg->threads_enabled &
ha_thread_info[i].ltid_bit &
~_HA_ATOMIC_LOAD(&ha_thread_info[i].tg_ctx->stopping_threads))
wake_thread(i); wake_thread(i);
} }
} }
}
/* stop when there's nothing left to do */ /* stop when there's nothing left to do */
if ((jobs - unstoppable_jobs) == 0 && if ((jobs - unstoppable_jobs) == 0 &&
(stopping_thread_mask & all_threads_mask) == all_threads_mask) { (_HA_ATOMIC_LOAD(&stopping_tgroup_mask) & all_tgroups_mask) == all_tgroups_mask) {
/* wake all threads waiting on jobs==0 */ /* check that all threads are aware of the stopping status */
for (i = 0; i < global.nbtgroups; i++)
if (_HA_ATOMIC_LOAD(&ha_tgroup_ctx[i].stopping_threads) != ha_tgroup_info[i].threads_enabled)
break;
#ifdef USE_THREAD
if (i == global.nbtgroups) {
/* all are OK, let's wake them all and stop */
for (i = 0; i < global.nbthread; i++) for (i = 0; i < global.nbthread; i++)
if (((all_threads_mask & ~tid_bit) >> i) & 1) if (i != tid && ha_thread_info[i].tg->threads_enabled & ha_thread_info[i].ltid_bit)
wake_thread(i); wake_thread(i);
break; break;
} }
#endif
}
} }
/* If we have to sleep, measure how long */ /* If we have to sleep, measure how long */