MINOR: pollers: Add a way to wake a thread sleeping in the poller.

Add a new pipe, one per thread, so that we can write on it to wake a thread
sleeping in a poller, and use it to wake threads supposed to take care of a
task, if they are all sleeping.
This commit is contained in:
Olivier Houchard 2018-07-26 17:55:11 +02:00 committed by Willy Tarreau
parent eba0c0b51d
commit 79321b95a8
6 changed files with 58 additions and 2 deletions

View File

@ -45,6 +45,8 @@ extern unsigned long fd_cache_mask; // Mask of threads with events in the cache
extern THREAD_LOCAL int *fd_updt; // FD updates list
extern THREAD_LOCAL int fd_nbupdt; // number of updates in the list
extern int poller_wr_pipe[MAX_THREADS];
__decl_hathreads(extern HA_RWLOCK_T __attribute__((aligned(64))) fdcache_lock); /* global lock to protect fd_cache array */
/* Deletes an FD from the fdsets.
@ -60,6 +62,8 @@ void fd_remove(int fd);
/* disable the specified poller */
void disable_poller(const char *poller_name);
void poller_pipe_io_handler(int fd);
/*
* Initialize the pollers till the best one is found.
* If none works, returns 0, otherwise 1.
@ -516,6 +520,13 @@ static inline unsigned int hap_fd_isset(int fd, unsigned int *evts)
return evts[fd / (8*sizeof(*evts))] & (1U << (fd & (8*sizeof(*evts) - 1)));
}
static inline void wake_thread(int tid)
{
char c = 'c';
shut_your_big_mouth_gcc(write(poller_wr_pipe[tid], &c, 1));
}
#endif /* _PROTO_FD_H */

View File

@ -219,6 +219,7 @@ extern char localpeer[MAX_HOSTNAME_LEN];
extern struct list global_listener_queue; /* list of the temporarily limited listeners */
extern struct task *global_listener_queue_task;
extern unsigned int warned; /* bitfield of a few warnings to emit just once */
extern volatile unsigned long sleeping_thread_mask;
/* bit values to go with "warned" above */
#define WARN_BLOCK_DEPRECATED 0x00000001

View File

@ -890,6 +890,7 @@ static int cli_io_handler_show_fd(struct appctx *appctx)
(fdt.iocb == dgram_fd_handler) ? "dgram_fd_handler" :
(fdt.iocb == listener_accept) ? "listener_accept" :
(fdt.iocb == thread_sync_io_handler) ? "thread_sync_io_handler" :
(fdt.iocb == poller_pipe_io_handler) ? "poller_pipe_io_handler" :
"unknown");
if (fdt.iocb == conn_fd_handler) {

View File

@ -147,6 +147,7 @@
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <common/compat.h>
@ -176,6 +177,8 @@ unsigned long fd_cache_mask = 0; // Mask of threads with events in the cache
THREAD_LOCAL int *fd_updt = NULL; // FD updates list
THREAD_LOCAL int fd_nbupdt = 0; // number of updates in the list
THREAD_LOCAL int poller_rd_pipe = -1; // Pipe to wake the thread
int poller_wr_pipe[MAX_THREADS]; // Pipe to wake the threads
#define _GET_NEXT(fd, off) ((struct fdlist_entry *)(void *)((char *)(&fdtab[fd]) + off))->next
#define _GET_PREV(fd, off) ((struct fdlist_entry *)(void *)((char *)(&fdtab[fd]) + off))->prev
@ -461,11 +464,31 @@ void disable_poller(const char *poller_name)
pollers[p].pref = 0;
}
void poller_pipe_io_handler(int fd)
{
char buf[1024];
/* Flush the pipe */
while (read(fd, buf, sizeof(buf)) > 0);
fd_cant_recv(fd);
}
/* Initialize the pollers per thread */
static int init_pollers_per_thread()
{
int mypipe[2];
if ((fd_updt = calloc(global.maxsock, sizeof(*fd_updt))) == NULL)
return 0;
if (pipe(mypipe) < 0) {
free(fd_updt);
fd_updt = NULL;
return 0;
}
poller_rd_pipe = mypipe[0];
poller_wr_pipe[tid] = mypipe[1];
fcntl(poller_rd_pipe, F_SETFL, O_NONBLOCK);
fd_insert(poller_rd_pipe, poller_pipe_io_handler, poller_pipe_io_handler,
tid_bit);
fd_want_recv(poller_rd_pipe);
return 1;
}

View File

@ -123,6 +123,7 @@ int pid; /* current process id */
int relative_pid = 1; /* process id starting at 1 */
unsigned long pid_bit = 1; /* bit corresponding to the process id */
volatile unsigned long sleeping_thread_mask; /* Threads that are about to sleep in poll() */
/* global options */
struct global global = {
.hard_stop_after = TICK_ETERNITY,
@ -2427,11 +2428,20 @@ static void run_poll_loop()
activity[tid].wake_tasks++;
else if (signal_queue_len && tid == 0)
activity[tid].wake_signal++;
else
exp = next;
else {
HA_ATOMIC_OR(&sleeping_thread_mask, tid_bit);
__ha_barrier_store();
if (active_tasks_mask & tid_bit) {
activity[tid].wake_tasks++;
HA_ATOMIC_AND(&sleeping_thread_mask, ~tid_bit);
} else
exp = next;
}
/* The poller will ensure it returns around <next> */
cur_poller.poll(&cur_poller, exp);
if (sleeping_thread_mask & tid_bit)
HA_ATOMIC_AND(&sleeping_thread_mask, ~tid_bit);
fd_process_cached_events();

View File

@ -23,6 +23,7 @@
#include <proto/proxy.h>
#include <proto/stream.h>
#include <proto/task.h>
#include <proto/fd.h>
struct pool_head *pool_head_task;
struct pool_head *pool_head_tasklet;
@ -70,6 +71,7 @@ void __task_wakeup(struct task *t, struct eb_root *root)
{
void *expected = NULL;
int *rq_size;
unsigned long old_active_mask;
#ifdef USE_THREAD
if (root == &rqueue) {
@ -125,6 +127,7 @@ redo:
__ha_barrier_store();
}
#endif
old_active_mask = active_tasks_mask;
HA_ATOMIC_OR(&active_tasks_mask, t->thread_mask);
t->rq.key = HA_ATOMIC_ADD(&rqueue_ticks, 1);
@ -152,6 +155,13 @@ redo:
rqueue_size[nb]++;
}
/* If all threads that are supposed to handle this task are sleeping,
* wake one.
*/
if ((((t->thread_mask & all_threads_mask) & sleeping_thread_mask) ==
(t->thread_mask & all_threads_mask)) &&
!(t->thread_mask & old_active_mask))
wake_thread(my_ffsl((t->thread_mask & all_threads_mask) &~ tid_bit) - 1);
return;
}