diff --git a/include/haproxy/tinfo-t.h b/include/haproxy/tinfo-t.h index 7eb798e6b..505bef631 100644 --- a/include/haproxy/tinfo-t.h +++ b/include/haproxy/tinfo-t.h @@ -69,6 +69,7 @@ struct tgroup_ctx { ulong threads_want_rdv; /* mask of threads that wand a rendez-vous */ ulong threads_harmless; /* mask of threads that are not modifying anything */ ulong threads_idle; /* mask of threads idling in the poller */ + ulong stopping_threads; /* mask of threads currently stopping */ /* pad to cache line (64B) */ char __pad[0]; /* unused except to check remaining room */ char __end[0] __attribute__((aligned(64))); diff --git a/src/haproxy.c b/src/haproxy.c index 0a11d3a00..e2503c228 100644 --- a/src/haproxy.c +++ b/src/haproxy.c @@ -165,7 +165,7 @@ const char *build_features = ""; static struct list cfg_cfgfiles = LIST_HEAD_INIT(cfg_cfgfiles); int pid; /* current process id */ -volatile unsigned long stopping_thread_mask = 0; /* Threads acknowledged stopping */ +static unsigned long stopping_tgroup_mask; /* Thread groups acknowledging stopping */ /* global options */ struct global global = { @@ -2818,27 +2818,39 @@ void run_poll_loop() if (stopping) { /* stop muxes before acknowledging stopping */ - if (!(stopping_thread_mask & tid_bit)) { + if (!(tg_ctx->stopping_threads & tid_bit)) { task_wakeup(mux_stopping_data[tid].task, TASK_WOKEN_OTHER); wake = 1; } - if (_HA_ATOMIC_OR_FETCH(&stopping_thread_mask, tid_bit) == tid_bit) { - /* notify all threads that stopping was just set */ - for (i = 0; i < global.nbthread; i++) - if (((all_threads_mask & ~stopping_thread_mask) >> i) & 1) + if (_HA_ATOMIC_OR_FETCH(&tg_ctx->stopping_threads, ti->ltid_bit) == ti->ltid_bit && + _HA_ATOMIC_OR_FETCH(&stopping_tgroup_mask, tg->tgid_bit) == tg->tgid_bit) { + /* first one to detect it, notify all threads that stopping was just set */ + for (i = 0; i < global.nbthread; i++) { + if (ha_thread_info[i].tg->threads_enabled & + ha_thread_info[i].ltid_bit & + ~_HA_ATOMIC_LOAD(&ha_thread_info[i].tg_ctx->stopping_threads)) wake_thread(i); + } } } /* stop when there's nothing left to do */ if ((jobs - unstoppable_jobs) == 0 && - (stopping_thread_mask & all_threads_mask) == all_threads_mask) { - /* wake all threads waiting on jobs==0 */ - for (i = 0; i < global.nbthread; i++) - if (((all_threads_mask & ~tid_bit) >> i) & 1) - wake_thread(i); - break; + (_HA_ATOMIC_LOAD(&stopping_tgroup_mask) & all_tgroups_mask) == all_tgroups_mask) { + /* check that all threads are aware of the stopping status */ + for (i = 0; i < global.nbtgroups; i++) + if (_HA_ATOMIC_LOAD(&ha_tgroup_ctx[i].stopping_threads) != ha_tgroup_info[i].threads_enabled) + break; +#ifdef USE_THREAD + if (i == global.nbtgroups) { + /* all are OK, let's wake them all and stop */ + for (i = 0; i < global.nbthread; i++) + if (i != tid && ha_thread_info[i].tg->threads_enabled & ha_thread_info[i].ltid_bit) + wake_thread(i); + break; + } +#endif } }