diff --git a/include/proto/fd.h b/include/proto/fd.h index c5a03f775..a4cee3220 100644 --- a/include/proto/fd.h +++ b/include/proto/fd.h @@ -45,6 +45,8 @@ extern unsigned long fd_cache_mask; // Mask of threads with events in the cache extern THREAD_LOCAL int *fd_updt; // FD updates list extern THREAD_LOCAL int fd_nbupdt; // number of updates in the list +extern int poller_wr_pipe[MAX_THREADS]; + __decl_hathreads(extern HA_RWLOCK_T __attribute__((aligned(64))) fdcache_lock); /* global lock to protect fd_cache array */ /* Deletes an FD from the fdsets. @@ -60,6 +62,8 @@ void fd_remove(int fd); /* disable the specified poller */ void disable_poller(const char *poller_name); +void poller_pipe_io_handler(int fd); + /* * Initialize the pollers till the best one is found. * If none works, returns 0, otherwise 1. @@ -516,6 +520,13 @@ static inline unsigned int hap_fd_isset(int fd, unsigned int *evts) return evts[fd / (8*sizeof(*evts))] & (1U << (fd & (8*sizeof(*evts) - 1))); } +static inline void wake_thread(int tid) +{ + char c = 'c'; + + shut_your_big_mouth_gcc(write(poller_wr_pipe[tid], &c, 1)); +} + #endif /* _PROTO_FD_H */ diff --git a/include/types/global.h b/include/types/global.h index a684ea6dd..616e8d3e5 100644 --- a/include/types/global.h +++ b/include/types/global.h @@ -219,6 +219,7 @@ extern char localpeer[MAX_HOSTNAME_LEN]; extern struct list global_listener_queue; /* list of the temporarily limited listeners */ extern struct task *global_listener_queue_task; extern unsigned int warned; /* bitfield of a few warnings to emit just once */ +extern volatile unsigned long sleeping_thread_mask; /* bit values to go with "warned" above */ #define WARN_BLOCK_DEPRECATED 0x00000001 diff --git a/src/cli.c b/src/cli.c index d02842960..739107dab 100644 --- a/src/cli.c +++ b/src/cli.c @@ -890,6 +890,7 @@ static int cli_io_handler_show_fd(struct appctx *appctx) (fdt.iocb == dgram_fd_handler) ? "dgram_fd_handler" : (fdt.iocb == listener_accept) ? "listener_accept" : (fdt.iocb == thread_sync_io_handler) ? "thread_sync_io_handler" : + (fdt.iocb == poller_pipe_io_handler) ? "poller_pipe_io_handler" : "unknown"); if (fdt.iocb == conn_fd_handler) { diff --git a/src/fd.c b/src/fd.c index 3b023a8b0..cbb7b478e 100644 --- a/src/fd.c +++ b/src/fd.c @@ -147,6 +147,7 @@ #include #include #include +#include #include #include @@ -176,6 +177,8 @@ unsigned long fd_cache_mask = 0; // Mask of threads with events in the cache THREAD_LOCAL int *fd_updt = NULL; // FD updates list THREAD_LOCAL int fd_nbupdt = 0; // number of updates in the list +THREAD_LOCAL int poller_rd_pipe = -1; // Pipe to wake the thread +int poller_wr_pipe[MAX_THREADS]; // Pipe to wake the threads #define _GET_NEXT(fd, off) ((struct fdlist_entry *)(void *)((char *)(&fdtab[fd]) + off))->next #define _GET_PREV(fd, off) ((struct fdlist_entry *)(void *)((char *)(&fdtab[fd]) + off))->prev @@ -461,11 +464,31 @@ void disable_poller(const char *poller_name) pollers[p].pref = 0; } +void poller_pipe_io_handler(int fd) +{ + char buf[1024]; + /* Flush the pipe */ + while (read(fd, buf, sizeof(buf)) > 0); + fd_cant_recv(fd); +} + /* Initialize the pollers per thread */ static int init_pollers_per_thread() { + int mypipe[2]; if ((fd_updt = calloc(global.maxsock, sizeof(*fd_updt))) == NULL) return 0; + if (pipe(mypipe) < 0) { + free(fd_updt); + fd_updt = NULL; + return 0; + } + poller_rd_pipe = mypipe[0]; + poller_wr_pipe[tid] = mypipe[1]; + fcntl(poller_rd_pipe, F_SETFL, O_NONBLOCK); + fd_insert(poller_rd_pipe, poller_pipe_io_handler, poller_pipe_io_handler, + tid_bit); + fd_want_recv(poller_rd_pipe); return 1; } diff --git a/src/haproxy.c b/src/haproxy.c index e0e8791f0..a17439163 100644 --- a/src/haproxy.c +++ b/src/haproxy.c @@ -123,6 +123,7 @@ int pid; /* current process id */ int relative_pid = 1; /* process id starting at 1 */ unsigned long pid_bit = 1; /* bit corresponding to the process id */ +volatile unsigned long sleeping_thread_mask; /* Threads that are about to sleep in poll() */ /* global options */ struct global global = { .hard_stop_after = TICK_ETERNITY, @@ -2427,11 +2428,20 @@ static void run_poll_loop() activity[tid].wake_tasks++; else if (signal_queue_len && tid == 0) activity[tid].wake_signal++; - else - exp = next; + else { + HA_ATOMIC_OR(&sleeping_thread_mask, tid_bit); + __ha_barrier_store(); + if (active_tasks_mask & tid_bit) { + activity[tid].wake_tasks++; + HA_ATOMIC_AND(&sleeping_thread_mask, ~tid_bit); + } else + exp = next; + } /* The poller will ensure it returns around */ cur_poller.poll(&cur_poller, exp); + if (sleeping_thread_mask & tid_bit) + HA_ATOMIC_AND(&sleeping_thread_mask, ~tid_bit); fd_process_cached_events(); diff --git a/src/task.c b/src/task.c index 672730b8e..6e7441f1b 100644 --- a/src/task.c +++ b/src/task.c @@ -23,6 +23,7 @@ #include #include #include +#include struct pool_head *pool_head_task; struct pool_head *pool_head_tasklet; @@ -70,6 +71,7 @@ void __task_wakeup(struct task *t, struct eb_root *root) { void *expected = NULL; int *rq_size; + unsigned long old_active_mask; #ifdef USE_THREAD if (root == &rqueue) { @@ -125,6 +127,7 @@ redo: __ha_barrier_store(); } #endif + old_active_mask = active_tasks_mask; HA_ATOMIC_OR(&active_tasks_mask, t->thread_mask); t->rq.key = HA_ATOMIC_ADD(&rqueue_ticks, 1); @@ -152,6 +155,13 @@ redo: rqueue_size[nb]++; } + /* If all threads that are supposed to handle this task are sleeping, + * wake one. + */ + if ((((t->thread_mask & all_threads_mask) & sleeping_thread_mask) == + (t->thread_mask & all_threads_mask)) && + !(t->thread_mask & old_active_mask)) + wake_thread(my_ffsl((t->thread_mask & all_threads_mask) &~ tid_bit) - 1); return; }