mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2025-04-21 06:25:43 +00:00
BUG/MEDIUM: epoll/threads: use one epoll_fd per thread
There currently is a problem regarding epoll(). While select() and poll() compute their polling state on the fly upon each call, epoll() keeps a shared state between all threads via the epoll_fd. The problem is that once an fd is registered on *any* thread, all other threads receive events for that FD as well. It is clearly visible when binding a listener to a single thread like in the configuration below where all 4 threads will work, 3 of them simply spinning to skip the event : global nbthread 4 frontend foo bind :1234 process 1/1 The worst case happens when some slow operations are in progress on a busy thread, preventing it from processing its task and causing the other ones to wake up not being able to do anything with this event. Typically computing a large TLS key will delay processing of next events on the same thread while others will still wake up. All this simply shows that the poller must remain thread-specific, with its own events and its own ability to sleep when it doesn't have anyhing to do. This patch does exactly this. For this, it proceeds like this : - have one epoll_fd per thread instead of one per process - initialize these epoll_fd when threads are created. - mark all known FDs as updated so that the next invocation of _do_poll() recomputes their polling status (including a possible removal of undesired polling from the original FD) ; - use each fd's polled_mask to maintain an accurate status of the current polling activity for this FD. - when scanning updates, only focus on events whose new polling status differs from the existing one - during updates, always verify the thread_mask to resist migration - on __fd_clo(), for cloned FDs (typically listeners inherited from the parent during a graceful shutdown), run epoll_ctl(DEL) on all epoll_fd. This is the reason why epoll_fd is stored in a shared array and not in a thread_local storage. Note: maybe this can be moved to an update instead. Interestingly, this shows that we don't need the FD's old state anymore and that we only use it to convert it to the new state based on stable information. It appears clearly that the FD code can be further improved by computing the final state directly when manipulating it. With this change, the config above goes from 22000 cps at 380% CPU to 43000 cps at 100% CPU : not only the 3 unused threads are not activated, but they do not disturb the activity anymore. The output of "show activity" before and after the patch on a 4-thread config where a first listener on thread 2 forwards over SSL to threads 3 & 4 shows this a much smaller amount of undesired events (thread 1 doesn't wake up anymore, poll_skip remains zero, fd_skip stays low) : // before: 400% CPU, 7700 cps, 13 seconds loops: 11380717 65879 5733468 5728129 wake_cache: 0 63986 317547 314174 wake_tasks: 0 0 0 0 wake_applets: 0 0 0 0 wake_signal: 0 0 0 0 poll_exp: 0 63986 317547 314174 poll_drop: 1 0 49981 48893 poll_dead: 65514 0 31334 31934 poll_skip: 46293690 34071 22867786 22858208 fd_skip: 66068135 174157 33732685 33825727 fd_lock: 0 2 2809 2905 fd_del: 0 494361 80890 79464 conn_dead: 0 0 0 0 stream: 0 407747 50526 49474 empty_rq: 11380718 1914 5683023 5678715 long_rq: 0 0 0 0 // after: 200% cpu, 9450 cps, 11 seconds loops: 17 66147 1001631 450968 wake_cache: 0 66119 865139 321227 wake_tasks: 0 0 0 0 wake_applets: 0 0 0 0 wake_signal: 0 0 0 0 poll_exp: 0 66119 865139 321227 poll_drop: 6 5 38279 60768 poll_dead: 0 0 0 0 poll_skip: 0 0 0 0 fd_skip: 54 172661 4411407 2008198 fd_lock: 0 0 10890 5394 fd_del: 0 492829 58965 105091 conn_dead: 0 0 0 0 stream: 0 406223 38663 61338 empty_rq: 18 40 962999 390549 long_rq: 0 0 0 0 This patch presents a few risks but fixes a real problem with threads, and as such it needs be backported to 1.8. It depends on previous patch ("MINOR: fd: add a bitmask to indicate that an FD is known by the poller"). Special thanks go to Samuel Reed for providing a large amount of useful debugging information and for testing fixes.
This commit is contained in:
parent
c9c8378c2b
commit
d9e7e36c6e
108
src/ev_epoll.c
108
src/ev_epoll.c
@ -29,7 +29,7 @@
|
||||
|
||||
/* private data */
|
||||
static THREAD_LOCAL struct epoll_event *epoll_events = NULL;
|
||||
static int epoll_fd;
|
||||
static int epoll_fd[MAX_THREADS]; // per-thread epoll_fd
|
||||
|
||||
/* This structure may be used for any purpose. Warning! do not use it in
|
||||
* recursive functions !
|
||||
@ -49,8 +49,14 @@ static THREAD_LOCAL struct epoll_event ev;
|
||||
*/
|
||||
REGPRM1 static void __fd_clo(int fd)
|
||||
{
|
||||
if (unlikely(fdtab[fd].cloned))
|
||||
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, &ev);
|
||||
if (unlikely(fdtab[fd].cloned)) {
|
||||
unsigned long m = fdtab[fd].thread_mask;
|
||||
int i;
|
||||
|
||||
for (i = global.nbthread - 1; i >= 0; i--)
|
||||
if (m & (1UL << i))
|
||||
epoll_ctl(epoll_fd[i], EPOLL_CTL_DEL, fd, &ev);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
@ -82,34 +88,36 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
|
||||
fdtab[fd].state = en;
|
||||
HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
|
||||
|
||||
if ((eo ^ en) & FD_EV_POLLED_RW) {
|
||||
/* poll status changed */
|
||||
|
||||
if ((en & FD_EV_POLLED_RW) == 0) {
|
||||
if (fdtab[fd].polled_mask & tid_bit) {
|
||||
if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) {
|
||||
/* fd removed from poll list */
|
||||
opcode = EPOLL_CTL_DEL;
|
||||
}
|
||||
else if ((eo & FD_EV_POLLED_RW) == 0) {
|
||||
/* new fd in the poll list */
|
||||
opcode = EPOLL_CTL_ADD;
|
||||
HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit);
|
||||
}
|
||||
else {
|
||||
/* fd status changed */
|
||||
opcode = EPOLL_CTL_MOD;
|
||||
}
|
||||
|
||||
/* construct the epoll events based on new state */
|
||||
ev.events = 0;
|
||||
if (en & FD_EV_POLLED_R)
|
||||
ev.events |= EPOLLIN | EPOLLRDHUP;
|
||||
|
||||
if (en & FD_EV_POLLED_W)
|
||||
ev.events |= EPOLLOUT;
|
||||
|
||||
ev.data.fd = fd;
|
||||
|
||||
epoll_ctl(epoll_fd, opcode, fd, &ev);
|
||||
}
|
||||
else if ((fdtab[fd].thread_mask & tid_bit) && (en & FD_EV_POLLED_RW)) {
|
||||
/* new fd in the poll list */
|
||||
opcode = EPOLL_CTL_ADD;
|
||||
HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
|
||||
}
|
||||
else {
|
||||
continue;
|
||||
}
|
||||
|
||||
/* construct the epoll events based on new state */
|
||||
ev.events = 0;
|
||||
if (en & FD_EV_POLLED_R)
|
||||
ev.events |= EPOLLIN | EPOLLRDHUP;
|
||||
|
||||
if (en & FD_EV_POLLED_W)
|
||||
ev.events |= EPOLLOUT;
|
||||
|
||||
ev.data.fd = fd;
|
||||
epoll_ctl(epoll_fd[tid], opcode, fd, &ev);
|
||||
}
|
||||
fd_nbupdt = 0;
|
||||
|
||||
@ -129,7 +137,7 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
|
||||
/* now let's wait for polled events */
|
||||
|
||||
gettimeofday(&before_poll, NULL);
|
||||
status = epoll_wait(epoll_fd, epoll_events, global.tune.maxpollevents, wait_time);
|
||||
status = epoll_wait(epoll_fd[tid], epoll_events, global.tune.maxpollevents, wait_time);
|
||||
tv_update_date(wait_time, status);
|
||||
measure_idle();
|
||||
|
||||
@ -146,7 +154,10 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
|
||||
}
|
||||
|
||||
if (!(fdtab[fd].thread_mask & tid_bit)) {
|
||||
/* FD has been migrated */
|
||||
activity[tid].poll_skip++;
|
||||
epoll_ctl(epoll_fd[tid], EPOLL_CTL_DEL, fd, &ev);
|
||||
HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit);
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -178,14 +189,38 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
|
||||
|
||||
static int init_epoll_per_thread()
|
||||
{
|
||||
int fd;
|
||||
|
||||
epoll_events = calloc(1, sizeof(struct epoll_event) * global.tune.maxpollevents);
|
||||
if (epoll_events == NULL)
|
||||
return 0;
|
||||
goto fail_alloc;
|
||||
|
||||
if (tid) {
|
||||
epoll_fd[tid] = epoll_create(global.maxsock + 1);
|
||||
if (epoll_fd[tid] < 0)
|
||||
goto fail_fd;
|
||||
}
|
||||
|
||||
/* we may have to unregister some events initially registered on the
|
||||
* original fd when it was alone, and/or to register events on the new
|
||||
* fd for this thread. Let's just mark them as updated, the poller will
|
||||
* do the rest.
|
||||
*/
|
||||
for (fd = 0; fd < maxfd; fd++)
|
||||
updt_fd_polling(fd);
|
||||
|
||||
return 1;
|
||||
fail_fd:
|
||||
free(epoll_events);
|
||||
fail_alloc:
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void deinit_epoll_per_thread()
|
||||
{
|
||||
if (tid)
|
||||
close(epoll_fd[tid]);
|
||||
|
||||
free(epoll_events);
|
||||
epoll_events = NULL;
|
||||
}
|
||||
@ -199,8 +234,8 @@ REGPRM1 static int _do_init(struct poller *p)
|
||||
{
|
||||
p->private = NULL;
|
||||
|
||||
epoll_fd = epoll_create(global.maxsock + 1);
|
||||
if (epoll_fd < 0)
|
||||
epoll_fd[tid] = epoll_create(global.maxsock + 1);
|
||||
if (epoll_fd[tid] < 0)
|
||||
goto fail_fd;
|
||||
|
||||
hap_register_per_thread_init(init_epoll_per_thread);
|
||||
@ -219,9 +254,9 @@ REGPRM1 static int _do_init(struct poller *p)
|
||||
*/
|
||||
REGPRM1 static void _do_term(struct poller *p)
|
||||
{
|
||||
if (epoll_fd >= 0) {
|
||||
close(epoll_fd);
|
||||
epoll_fd = -1;
|
||||
if (epoll_fd[tid] >= 0) {
|
||||
close(epoll_fd[tid]);
|
||||
epoll_fd[tid] = -1;
|
||||
}
|
||||
|
||||
p->private = NULL;
|
||||
@ -251,10 +286,10 @@ REGPRM1 static int _do_test(struct poller *p)
|
||||
*/
|
||||
REGPRM1 static int _do_fork(struct poller *p)
|
||||
{
|
||||
if (epoll_fd >= 0)
|
||||
close(epoll_fd);
|
||||
epoll_fd = epoll_create(global.maxsock + 1);
|
||||
if (epoll_fd < 0)
|
||||
if (epoll_fd[tid] >= 0)
|
||||
close(epoll_fd[tid]);
|
||||
epoll_fd[tid] = epoll_create(global.maxsock + 1);
|
||||
if (epoll_fd[tid] < 0)
|
||||
return 0;
|
||||
return 1;
|
||||
}
|
||||
@ -268,11 +303,14 @@ __attribute__((constructor))
|
||||
static void _do_register(void)
|
||||
{
|
||||
struct poller *p;
|
||||
int i;
|
||||
|
||||
if (nbpollers >= MAX_POLLERS)
|
||||
return;
|
||||
|
||||
epoll_fd = -1;
|
||||
for (i = 0; i < MAX_THREADS; i++)
|
||||
epoll_fd[i] = -1;
|
||||
|
||||
p = &pollers[nbpollers++];
|
||||
|
||||
p->name = "epoll";
|
||||
|
Loading…
Reference in New Issue
Block a user