MEDIUM: hathreads: implement a more flexible rendez-vous point

The current synchronization point enforces certain restrictions which
are hard to workaround in certain areas of the code. The fact that the
critical code can only be called from the sync point itself is a problem
for some callback-driven parts. The "show fd" command for example is
fragile regarding this.

Also it is expensive in terms of CPU usage because it wakes every other
thread just to be sure all of them join to the rendez-vous point. It's a
problem because the sleeping threads would not need to be woken up just
to know they're doing nothing.

Here we implement a different approach. We keep track of harmless threads,
which are defined as those either doing nothing, or doing harmless things.
The rendez-vous is used "for others" as a way for a thread to isolate itself.
A thread then requests to be alone using thread_isolate() when approaching
the dangerous area, and then waits until all other threads are either doing
the same or are doing something harmless (typically polling). The function
only returns once the thread is guaranteed to be alone, and the critical
section is terminated using thread_release().
This commit is contained in:
Willy Tarreau 2018-08-02 10:16:17 +02:00
parent 0c026f49e7
commit 60b639ccbe
6 changed files with 161 additions and 0 deletions

View File

@ -135,6 +135,27 @@ static inline void __ha_barrier_full(void)
{
}
static inline void thread_harmless_now()
{
}
static inline void thread_harmless_end()
{
}
static inline void thread_isolate()
{
}
static inline void thread_release()
{
}
static inline unsigned long thread_isolated()
{
return 1;
}
#else /* USE_THREAD */
#include <stdio.h>
@ -272,10 +293,34 @@ void thread_enter_sync(void);
void thread_exit_sync(void);
int thread_no_sync(void);
int thread_need_sync(void);
void thread_harmless_till_end();
void thread_isolate();
void thread_release();
extern THREAD_LOCAL unsigned int tid; /* The thread id */
extern THREAD_LOCAL unsigned long tid_bit; /* The bit corresponding to the thread id */
extern volatile unsigned long all_threads_mask;
extern volatile unsigned long threads_want_rdv_mask;
extern volatile unsigned long threads_harmless_mask;
/* explanation for threads_want_rdv_mask and threads_harmless_mask :
* - threads_want_rdv_mask is a bit field indicating all threads that have
* requested a rendez-vous of other threads using thread_isolate().
* - threads_harmless_mask is a bit field indicating all threads that are
* currently harmless in that they promise not to access a shared resource.
*
* For a given thread, its bits in want_rdv and harmless can be translated like
* this :
*
* ----------+----------+----------------------------------------------------
* want_rdv | harmless | description
* ----------+----------+----------------------------------------------------
* 0 | 0 | thread not interested in RDV, possibly harmful
* 0 | 1 | thread not interested in RDV but harmless
* 1 | 1 | thread interested in RDV and waiting for its turn
* 1 | 0 | thread currently working isolated from others
* ----------+----------+----------------------------------------------------
*/
#define ha_sigmask(how, set, oldset) pthread_sigmask(how, set, oldset)
@ -286,6 +331,38 @@ static inline void ha_set_tid(unsigned int data)
tid_bit = (1UL << tid);
}
/* Marks the thread as harmless. Note: this must be true, i.e. the thread must
* not be touching any unprotected shared resource during this period. Usually
* this is called before poll(), but it may also be placed around very slow
* calls (eg: some crypto operations). Needs to be terminated using
* thread_harmless_end().
*/
static inline void thread_harmless_now()
{
HA_ATOMIC_OR(&threads_harmless_mask, tid_bit);
}
/* Ends the harmless period started by thread_harmless_now(). Usually this is
* placed after the poll() call. If it is discovered that a job was running and
* is relying on the thread still being harmless, the thread waits for the
* other one to finish.
*/
static inline void thread_harmless_end()
{
while (1) {
HA_ATOMIC_AND(&threads_harmless_mask, ~tid_bit);
if (likely((threads_want_rdv_mask & all_threads_mask) == 0))
break;
thread_harmless_till_end();
}
}
/* an isolated thread has harmless cleared and want_rdv set */
static inline unsigned long thread_isolated()
{
return threads_want_rdv_mask & ~threads_harmless_mask & tid_bit;
}
#if defined(DEBUG_THREAD) || defined(DEBUG_FULL)

View File

@ -17,6 +17,7 @@
#include <common/config.h>
#include <common/debug.h>
#include <common/epoll.h>
#include <common/hathreads.h>
#include <common/standard.h>
#include <common/ticks.h>
#include <common/time.h>
@ -141,6 +142,8 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
_update_fd(fd);
}
thread_harmless_now();
/* compute the epoll_wait() timeout */
if (!exp)
wait_time = MAX_DELAY_MS;
@ -161,6 +164,8 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
tv_update_date(wait_time, status);
measure_idle();
thread_harmless_end();
/* process polled events */
for (count = 0; count < status; count++) {

View File

@ -19,6 +19,7 @@
#include <common/compat.h>
#include <common/config.h>
#include <common/hathreads.h>
#include <common/ticks.h>
#include <common/time.h>
#include <common/tools.h>
@ -112,6 +113,8 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
changes = _update_fd(fd, changes);
}
thread_harmless_now();
if (changes) {
#ifdef EV_RECEIPT
kev[0].flags |= EV_RECEIPT;
@ -154,6 +157,8 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
tv_update_date(delta_ms, status);
measure_idle();
thread_harmless_end();
for (count = 0; count < status; count++) {
unsigned int n = 0;
fd = kev[count].ident;

View File

@ -19,6 +19,7 @@
#include <common/compat.h>
#include <common/config.h>
#include <common/hathreads.h>
#include <common/ticks.h>
#include <common/time.h>
@ -156,6 +157,8 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
break;
} while (!HA_ATOMIC_CAS(&maxfd, &old_maxfd, new_maxfd));
thread_harmless_now();
fd_nbupdt = 0;
nbfd = 0;
@ -207,6 +210,8 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
tv_update_date(wait_time, status);
measure_idle();
thread_harmless_end();
for (count = 0; status > 0 && count < nbfd; count++) {
unsigned int n;
int e = poll_events[count].revents;

View File

@ -16,6 +16,7 @@
#include <common/compat.h>
#include <common/config.h>
#include <common/hathreads.h>
#include <common/ticks.h>
#include <common/time.h>
@ -148,6 +149,8 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
break;
} while (!HA_ATOMIC_CAS(&maxfd, &old_maxfd, new_maxfd));
thread_harmless_now();
fd_nbupdt = 0;
/* let's restore fdset state */
@ -186,6 +189,8 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
tv_update_date(delta_ms, status);
measure_idle();
thread_harmless_end();
if (status <= 0)
return;

View File

@ -30,6 +30,8 @@ void thread_sync_io_handler(int fd)
static HA_SPINLOCK_T sync_lock;
static int threads_sync_pipe[2];
static unsigned long threads_want_sync = 0;
volatile unsigned long threads_want_rdv_mask = 0;
volatile unsigned long threads_harmless_mask = 0;
volatile unsigned long all_threads_mask = 1; // nbthread 1 assumed by default
THREAD_LOCAL unsigned int tid = 0;
THREAD_LOCAL unsigned long tid_bit = (1UL << 0);
@ -160,6 +162,68 @@ void thread_exit_sync()
thread_sync_barrier(&barrier);
}
/* Marks the thread as harmless until the last thread using the rendez-vous
* point quits. Given that we can wait for a long time, sched_yield() is used
* when available to offer the CPU resources to competing threads if needed.
*/
void thread_harmless_till_end()
{
HA_ATOMIC_OR(&threads_harmless_mask, tid_bit);
while (threads_want_rdv_mask & all_threads_mask) {
#if _POSIX_PRIORITY_SCHEDULING
sched_yield();
#else
pl_cpu_relax();
#endif
}
}
/* Isolates the current thread : request the ability to work while all other
* threads are harmless. Only returns once all of them are harmless, with the
* current thread's bit in threads_harmless_mask cleared. Needs to be completed
* using thread_release().
*/
void thread_isolate()
{
unsigned long old;
HA_ATOMIC_OR(&threads_harmless_mask, tid_bit);
__ha_barrier_store();
HA_ATOMIC_OR(&threads_want_rdv_mask, tid_bit);
/* wait for all threads to become harmless */
old = threads_harmless_mask;
while (1) {
if (unlikely((old & all_threads_mask) != all_threads_mask))
old = threads_harmless_mask;
else if (HA_ATOMIC_CAS(&threads_harmless_mask, &old, old & ~tid_bit))
break;
#if _POSIX_PRIORITY_SCHEDULING
sched_yield();
#else
pl_cpu_relax();
#endif
}
/* one thread gets released at a time here, with its harmess bit off.
* The loss of this bit makes the other one continue to spin while the
* thread is working alone.
*/
}
/* Cancels the effect of thread_isolate() by releasing the current thread's bit
* in threads_want_rdv_mask and by marking this thread as harmless until the
* last worker finishes.
*/
void thread_release()
{
while (1) {
HA_ATOMIC_AND(&threads_want_rdv_mask, ~tid_bit);
if (!(threads_want_rdv_mask & all_threads_mask))
break;
thread_harmless_till_end();
}
}
__attribute__((constructor))
static void __hathreads_init(void)