mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2025-04-11 03:31:36 +00:00
MAJOR: threads: change thread_isolate to support inter-group synchronization
thread_isolate() and thread_isolate_full() were relying on a set of thread masks for all threads in different states (rdv, harmless, idle). This cannot work anymore when the number of threads increases beyond LONGBITS so we need to change the mechanism. What is done here is to have a counter of requesters and the number of the current isolated thread. Threads which want to isolate themselves increment the request counter and wait for all threads to be marked harmless (or idle) by scanning all groups and watching the respective masks. This is possible because threads cannot escape once they discover this counter, unless they also want to isolate and possibly pass first. Once all threads are harmless, the requesting thread tries to self-assign the isolated thread number, and if it fails it loops back to checking all threads. If it wins it's guaranted to be alone, and can drop its harmless bit, so that other competing threads go back to the loop waiting for all threads to be harmless. The benefit of proceeding this way is that there's very little write contention on the thread number (none during work), hence no cache line moves between caches, thus frozen threads do not slow down the isolated one. Once it's done, the isolated thread resets the thread number (hence lets another thread take the place) and decrements the requester count, thus possibly releasing all harmless threads. With this change there's no more need for any global mask to synchronize any thread, and we only need to loop over a number of groups to check 64 threads at a time per iteration. As such, tinfo's threads_want_rdv could be dropped. This was tested with 64 threads spread into 2 groups, running 64 tasks (from the debug dev command), 20 "show sess" (thread_isolate()), 20 "add server blah/blah" (thread_isolate()), and 20 "del server blah/blah" (thread_isolate_full()). The load remained very low (limited by external socat forks) and no stuck nor starved thread was found.
This commit is contained in:
parent
ef422ced91
commit
598cf3f22e
@ -173,29 +173,12 @@ unsigned long long ha_get_pthread_id(unsigned int thr);
|
||||
|
||||
extern volatile unsigned long all_threads_mask;
|
||||
extern volatile unsigned long all_tgroups_mask;
|
||||
extern volatile unsigned int rdv_requests;
|
||||
extern volatile unsigned int isolated_thread;
|
||||
extern THREAD_LOCAL unsigned long tid_bit; /* The bit corresponding to the thread id */
|
||||
extern THREAD_LOCAL unsigned int tid; /* The thread id */
|
||||
extern THREAD_LOCAL unsigned int tgid; /* The thread group id (starts at 1) */
|
||||
|
||||
/* explanation for tg_ctx->threads_want_rdv, and tg_ctx->threads_harmless:
|
||||
* - tg_ctx->threads_want_rdv is a bit field indicating all threads that have
|
||||
* requested a rendez-vous of other threads using thread_isolate().
|
||||
* - tg_ctx->threads_harmless is a bit field indicating all threads that are
|
||||
* currently harmless in that they promise not to access a shared resource.
|
||||
*
|
||||
* For a given thread, its bits in want_rdv and harmless can be translated like
|
||||
* this :
|
||||
*
|
||||
* ----------+----------+----------------------------------------------------
|
||||
* want_rdv | harmless | description
|
||||
* ----------+----------+----------------------------------------------------
|
||||
* 0 | 0 | thread not interested in RDV, possibly harmful
|
||||
* 0 | 1 | thread not interested in RDV but harmless
|
||||
* 1 | 1 | thread interested in RDV and waiting for its turn
|
||||
* 1 | 0 | thread currently working isolated from others
|
||||
* ----------+----------+----------------------------------------------------
|
||||
*/
|
||||
|
||||
#define ha_sigmask(how, set, oldset) pthread_sigmask(how, set, oldset)
|
||||
|
||||
/* Sets the current thread to a valid one described by <thr>, or to any thread
|
||||
@ -276,19 +259,17 @@ static inline void thread_harmless_now()
|
||||
static inline void thread_harmless_end()
|
||||
{
|
||||
while (1) {
|
||||
HA_ATOMIC_AND(&tg_ctx->threads_harmless, ~tid_bit);
|
||||
if (likely((_HA_ATOMIC_LOAD(&tg_ctx->threads_want_rdv) &
|
||||
tg->threads_enabled & ~ti->ltid_bit) == 0))
|
||||
HA_ATOMIC_AND(&tg_ctx->threads_harmless, ~ti->ltid_bit);
|
||||
if (likely(_HA_ATOMIC_LOAD(&rdv_requests) == 0))
|
||||
break;
|
||||
thread_harmless_till_end();
|
||||
}
|
||||
}
|
||||
|
||||
/* an isolated thread has harmless cleared and want_rdv set */
|
||||
/* an isolated thread has its ID in isolated_thread */
|
||||
static inline unsigned long thread_isolated()
|
||||
{
|
||||
return _HA_ATOMIC_LOAD(&tg_ctx->threads_want_rdv) &
|
||||
~_HA_ATOMIC_LOAD(&tg_ctx->threads_harmless) & ti->ltid_bit;
|
||||
return _HA_ATOMIC_LOAD(&isolated_thread) == tid;
|
||||
}
|
||||
|
||||
/* Returns 1 if the cpu set is currently restricted for the process else 0.
|
||||
|
@ -66,7 +66,6 @@ struct tgroup_info {
|
||||
* etc). It uses one cache line per thread to limit false sharing.
|
||||
*/
|
||||
struct tgroup_ctx {
|
||||
ulong threads_want_rdv; /* mask of threads that wand a rendez-vous */
|
||||
ulong threads_harmless; /* mask of threads that are not modifying anything */
|
||||
ulong threads_idle; /* mask of threads idling in the poller */
|
||||
ulong stopping_threads; /* mask of threads currently stopping */
|
||||
|
98
src/thread.c
98
src/thread.c
@ -65,6 +65,8 @@ THREAD_LOCAL struct thread_ctx *th_ctx = &ha_thread_ctx[0];
|
||||
|
||||
volatile unsigned long all_threads_mask __read_mostly = 1; // nbthread 1 assumed by default
|
||||
volatile unsigned long all_tgroups_mask __read_mostly = 1; // nbtgroup 1 assumed by default
|
||||
volatile unsigned int rdv_requests = 0; // total number of threads requesting RDV
|
||||
volatile unsigned int isolated_thread = ~0; // ID of the isolated thread, or ~0 when none
|
||||
THREAD_LOCAL unsigned int tgid = 1; // thread ID starts at 1
|
||||
THREAD_LOCAL unsigned int tid = 0;
|
||||
THREAD_LOCAL unsigned long tid_bit = (1UL << 0);
|
||||
@ -72,15 +74,14 @@ int thread_cpus_enabled_at_boot = 1;
|
||||
static pthread_t ha_pthread[MAX_THREADS] = { };
|
||||
|
||||
/* Marks the thread as harmless until the last thread using the rendez-vous
|
||||
* point quits, excluding the current one. Thus an isolated thread may be safely
|
||||
* marked as harmless. Given that we can wait for a long time, sched_yield() is
|
||||
* point quits. Given that we can wait for a long time, sched_yield() is
|
||||
* used when available to offer the CPU resources to competing threads if
|
||||
* needed.
|
||||
*/
|
||||
void thread_harmless_till_end()
|
||||
{
|
||||
_HA_ATOMIC_OR(&tg_ctx->threads_harmless, ti->ltid_bit);
|
||||
while (_HA_ATOMIC_LOAD(&tg_ctx->threads_want_rdv) & tg->threads_enabled & ~ti->ltid_bit) {
|
||||
while (_HA_ATOMIC_LOAD(&rdv_requests) != 0) {
|
||||
ha_thread_relax();
|
||||
}
|
||||
}
|
||||
@ -93,25 +94,42 @@ void thread_harmless_till_end()
|
||||
*/
|
||||
void thread_isolate()
|
||||
{
|
||||
unsigned long old;
|
||||
uint tgrp, thr;
|
||||
|
||||
_HA_ATOMIC_OR(&tg_ctx->threads_harmless, ti->ltid_bit);
|
||||
__ha_barrier_atomic_store();
|
||||
_HA_ATOMIC_OR(&tg_ctx->threads_want_rdv, ti->ltid_bit);
|
||||
_HA_ATOMIC_INC(&rdv_requests);
|
||||
|
||||
/* wait for all threads to become harmless */
|
||||
old = _HA_ATOMIC_LOAD(&tg_ctx->threads_harmless);
|
||||
/* wait for all threads to become harmless. They cannot change their
|
||||
* mind once seen thanks to rdv_requests above, unless they pass in
|
||||
* front of us.
|
||||
*/
|
||||
while (1) {
|
||||
if (unlikely((old & tg->threads_enabled) != tg->threads_enabled))
|
||||
old = _HA_ATOMIC_LOAD(&tg_ctx->threads_harmless);
|
||||
else if (_HA_ATOMIC_CAS(&tg_ctx->threads_harmless, &old, old & ~ti->ltid_bit))
|
||||
break;
|
||||
for (tgrp = 0; tgrp < global.nbtgroups; tgrp++) {
|
||||
while ((_HA_ATOMIC_LOAD(&ha_tgroup_ctx[tgrp].threads_harmless) &
|
||||
ha_tgroup_info[tgrp].threads_enabled) != ha_tgroup_info[tgrp].threads_enabled)
|
||||
ha_thread_relax();
|
||||
}
|
||||
|
||||
/* Now we've seen all threads marked harmless, we can try to run
|
||||
* by competing with other threads to win the race of the isolated
|
||||
* thread. It eventually converges since winners will enventually
|
||||
* relax their request and go back to wait for this to be over.
|
||||
* Competing on this only after seeing all threads harmless limits
|
||||
* the write contention.
|
||||
*/
|
||||
thr = _HA_ATOMIC_LOAD(&isolated_thread);
|
||||
if (thr == ~0U && _HA_ATOMIC_CAS(&isolated_thread, &thr, tid))
|
||||
break; // we won!
|
||||
ha_thread_relax();
|
||||
}
|
||||
/* one thread gets released at a time here, with its harmess bit off.
|
||||
* The loss of this bit makes the other one continue to spin while the
|
||||
* thread is working alone.
|
||||
|
||||
/* the thread is no longer harmless as it runs */
|
||||
_HA_ATOMIC_AND(&tg_ctx->threads_harmless, ~ti->ltid_bit);
|
||||
|
||||
/* the thread is isolated until it calls thread_release() which will
|
||||
* 1) reset isolated_thread to ~0;
|
||||
* 2) decrement rdv_requests.
|
||||
*/
|
||||
}
|
||||
|
||||
@ -131,42 +149,56 @@ void thread_isolate()
|
||||
*/
|
||||
void thread_isolate_full()
|
||||
{
|
||||
unsigned long old;
|
||||
uint tgrp, thr;
|
||||
|
||||
_HA_ATOMIC_OR(&tg_ctx->threads_idle, ti->ltid_bit);
|
||||
_HA_ATOMIC_OR(&tg_ctx->threads_harmless, ti->ltid_bit);
|
||||
__ha_barrier_atomic_store();
|
||||
_HA_ATOMIC_OR(&tg_ctx->threads_want_rdv, ti->ltid_bit);
|
||||
_HA_ATOMIC_INC(&rdv_requests);
|
||||
|
||||
/* wait for all threads to become harmless */
|
||||
old = _HA_ATOMIC_LOAD(&tg_ctx->threads_harmless);
|
||||
/* wait for all threads to become harmless. They cannot change their
|
||||
* mind once seen thanks to rdv_requests above, unless they pass in
|
||||
* front of us.
|
||||
*/
|
||||
while (1) {
|
||||
unsigned long idle = _HA_ATOMIC_LOAD(&tg_ctx->threads_idle);
|
||||
|
||||
if (unlikely((old & tg->threads_enabled) != tg->threads_enabled))
|
||||
old = _HA_ATOMIC_LOAD(&tg_ctx->threads_harmless);
|
||||
else if ((idle & tg->threads_enabled) == tg->threads_enabled &&
|
||||
_HA_ATOMIC_CAS(&tg_ctx->threads_harmless, &old, old & ~ti->ltid_bit))
|
||||
break;
|
||||
for (tgrp = 0; tgrp < global.nbtgroups; tgrp++) {
|
||||
while ((_HA_ATOMIC_LOAD(&ha_tgroup_ctx[tgrp].threads_harmless) &
|
||||
_HA_ATOMIC_LOAD(&ha_tgroup_ctx[tgrp].threads_idle) &
|
||||
ha_tgroup_info[tgrp].threads_enabled) != ha_tgroup_info[tgrp].threads_enabled)
|
||||
ha_thread_relax();
|
||||
}
|
||||
|
||||
/* Now we've seen all threads marked harmless and idle, we can
|
||||
* try to run by competing with other threads to win the race
|
||||
* of the isolated thread. It eventually converges since winners
|
||||
* will enventually relax their request and go back to wait for
|
||||
* this to be over. Competing on this only after seeing all
|
||||
* threads harmless+idle limits the write contention.
|
||||
*/
|
||||
thr = _HA_ATOMIC_LOAD(&isolated_thread);
|
||||
if (thr == ~0U && _HA_ATOMIC_CAS(&isolated_thread, &thr, tid))
|
||||
break; // we won!
|
||||
ha_thread_relax();
|
||||
}
|
||||
|
||||
/* we're not idle anymore at this point. Other threads waiting on this
|
||||
* condition will need to wait until out next pass to the poller, or
|
||||
* our next call to thread_isolate_full().
|
||||
/* we're not idle nor harmless anymore at this point. Other threads
|
||||
* waiting on this condition will need to wait until out next pass to
|
||||
* the poller, or our next call to thread_isolate_full().
|
||||
*/
|
||||
_HA_ATOMIC_AND(&tg_ctx->threads_idle, ~ti->ltid_bit);
|
||||
_HA_ATOMIC_AND(&tg_ctx->threads_harmless, ~ti->ltid_bit);
|
||||
}
|
||||
|
||||
/* Cancels the effect of thread_isolate() by releasing the current thread's bit
|
||||
* in &tg_ctx->threads_want_rdv. This immediately allows other threads to expect be
|
||||
* executed, though they will first have to wait for this thread to become
|
||||
* harmless again (possibly by reaching the poller again).
|
||||
/* Cancels the effect of thread_isolate() by resetting the ID of the isolated
|
||||
* thread and decrementing the number of RDV requesters. This immediately allows
|
||||
* other threads to expect to be executed, though they will first have to wait
|
||||
* for this thread to become harmless again (possibly by reaching the poller
|
||||
* again).
|
||||
*/
|
||||
void thread_release()
|
||||
{
|
||||
_HA_ATOMIC_AND(&tg_ctx->threads_want_rdv, ~ti->ltid_bit);
|
||||
HA_ATOMIC_STORE(&isolated_thread, ~0U);
|
||||
HA_ATOMIC_DEC(&rdv_requests);
|
||||
}
|
||||
|
||||
/* Sets up threads, signals and masks, and starts threads 2 and above.
|
||||
|
Loading…
Reference in New Issue
Block a user