BUG/MEDIUM: pollers: Use a global list for fd shared between threads.

With the old model, any fd shared by multiple threads, such as listeners
or dns sockets, would only be updated on one threads, so that could lead
to missed event, or spurious wakeups.
To avoid this, add a global list for fd that are shared, using the same
implementation as the fd cache, and only remove entries from this list
when every thread as updated its poller.

[wt: this will need to be backported to 1.8 but differently so this patch
 must not be backported as-is]
This commit is contained in:
Olivier Houchard 2018-04-25 16:58:25 +02:00 committed by Willy Tarreau
parent 6a2cf8752c
commit 6b96f7289c
9 changed files with 302 additions and 138 deletions

View File

@ -256,6 +256,8 @@ void thread_exit_sync(void);
int thread_no_sync(void);
int thread_need_sync(void);
extern unsigned long all_threads_mask;
#if defined(DEBUG_THREAD) || defined(DEBUG_FULL)
/* WARNING!!! if you update this enum, please also keep lock_label() up to date below */

View File

@ -36,6 +36,8 @@
extern volatile struct fdlist fd_cache;
extern volatile struct fdlist fd_cache_local[MAX_THREADS];
extern volatile struct fdlist update_list;
extern unsigned long fd_cache_mask; // Mask of threads with events in the cache
extern THREAD_LOCAL int *fd_updt; // FD updates list
@ -101,15 +103,57 @@ void fd_rm_from_fd_list(volatile struct fdlist *list, int fd, int off);
*/
static inline void updt_fd_polling(const int fd)
{
unsigned int oldupdt;
if (fdtab[fd].thread_mask == tid_bit) {
unsigned int oldupdt;
/* note: we don't have a test-and-set yet in hathreads */
/* note: we don't have a test-and-set yet in hathreads */
if (HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
return;
if (HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
return;
oldupdt = HA_ATOMIC_ADD(&fd_nbupdt, 1) - 1;
fd_updt[oldupdt] = fd;
oldupdt = HA_ATOMIC_ADD(&fd_nbupdt, 1) - 1;
fd_updt[oldupdt] = fd;
} else {
unsigned long update_mask = fdtab[fd].update_mask;
do {
if (update_mask == fdtab[fd].thread_mask)
return;
} while (!HA_ATOMIC_CAS(&fdtab[fd].update_mask, &update_mask,
fdtab[fd].thread_mask));
fd_add_to_fd_list(&update_list, fd, offsetof(struct fdtab, update));
}
}
/* Called from the poller to acknoledge we read an entry from the global
* update list, to remove our bit from the update_mask, and remove it from
* the list if we were the last one.
*/
static inline void done_update_polling(int fd)
{
unsigned long update_mask;
update_mask = HA_ATOMIC_AND(&fdtab[fd].update_mask, ~tid_bit);
while ((update_mask & all_threads_mask)== 0) {
/* If we were the last one that had to update that entry, remove it from the list */
fd_rm_from_fd_list(&update_list, fd, offsetof(struct fdtab, update));
if (update_list.first == fd)
abort();
update_mask = (volatile unsigned long)fdtab[fd].update_mask;
if ((update_mask & all_threads_mask) != 0) {
/* Maybe it's been re-updated in the meanwhile, and we
* wrongly removed it from the list, if so, re-add it
*/
fd_add_to_fd_list(&update_list, fd, offsetof(struct fdtab, update));
update_mask = (volatile unsigned long)(fdtab[fd].update_mask);
/* And then check again, just in case after all it
* should be removed, even if it's very unlikely, given
* the current thread wouldn't have been able to take
* care of it yet */
} else
break;
}
}
/* Allocates a cache entry for a file descriptor if it does not yet have one.

View File

@ -117,6 +117,7 @@ struct fdtab {
unsigned long polled_mask; /* mask of thread IDs currently polling this fd */
unsigned long update_mask; /* mask of thread IDs having an update for fd */
struct fdlist_entry cache; /* Entry in the fdcache */
struct fdlist_entry update; /* Entry in the global update list */
void (*iocb)(int fd); /* I/O handler */
void *owner; /* the connection or listener associated with this fd, NULL if closed */
unsigned char state; /* FD state for read and write directions (2*3 bits) */

View File

@ -59,16 +59,55 @@ REGPRM1 static void __fd_clo(int fd)
}
}
static void _update_fd(int fd)
{
int en, opcode;
en = fdtab[fd].state;
if (fdtab[fd].polled_mask & tid_bit) {
if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) {
/* fd removed from poll list */
opcode = EPOLL_CTL_DEL;
HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit);
}
else {
/* fd status changed */
opcode = EPOLL_CTL_MOD;
}
}
else if ((fdtab[fd].thread_mask & tid_bit) && (en & FD_EV_POLLED_RW)) {
/* new fd in the poll list */
opcode = EPOLL_CTL_ADD;
HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
}
else {
return;
}
/* construct the epoll events based on new state */
ev.events = 0;
if (en & FD_EV_POLLED_R)
ev.events |= EPOLLIN | EPOLLRDHUP;
if (en & FD_EV_POLLED_W)
ev.events |= EPOLLOUT;
ev.data.fd = fd;
epoll_ctl(epoll_fd[tid], opcode, fd, &ev);
}
/*
* Linux epoll() poller
*/
REGPRM2 static void _do_poll(struct poller *p, int exp)
{
int status, en;
int fd, opcode;
int status;
int fd;
int count;
int updt_idx;
int wait_time;
int old_fd;
/* first, scan the update list to find polling changes */
for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) {
@ -80,40 +119,27 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
continue;
}
en = fdtab[fd].state;
if (fdtab[fd].polled_mask & tid_bit) {
if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) {
/* fd removed from poll list */
opcode = EPOLL_CTL_DEL;
HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit);
}
else {
/* fd status changed */
opcode = EPOLL_CTL_MOD;
}
}
else if ((fdtab[fd].thread_mask & tid_bit) && (en & FD_EV_POLLED_RW)) {
/* new fd in the poll list */
opcode = EPOLL_CTL_ADD;
HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
}
else {
continue;
}
/* construct the epoll events based on new state */
ev.events = 0;
if (en & FD_EV_POLLED_R)
ev.events |= EPOLLIN | EPOLLRDHUP;
if (en & FD_EV_POLLED_W)
ev.events |= EPOLLOUT;
ev.data.fd = fd;
epoll_ctl(epoll_fd[tid], opcode, fd, &ev);
_update_fd(fd);
}
fd_nbupdt = 0;
/* Scan the global update list */
for (old_fd = fd = update_list.first; fd != -1; fd = fdtab[fd].update.next) {
if (fd == -2) {
fd = old_fd;
continue;
}
else if (fd <= -3)
fd = -fd -4;
if (fd == -1)
break;
if (fdtab[fd].update_mask & tid_bit)
done_update_polling(fd);
else
continue;
if (!fdtab[fd].owner)
continue;
_update_fd(fd);
}
/* compute the epoll_wait() timeout */
if (!exp)

View File

@ -33,6 +33,41 @@ static int kqueue_fd[MAX_THREADS]; // per-thread kqueue_fd
static THREAD_LOCAL struct kevent *kev = NULL;
static struct kevent *kev_out = NULL; // Trash buffer for kevent() to write the eventlist in
static int _update_fd(int fd)
{
int en;
int changes = 0;
en = fdtab[fd].state;
if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) {
if (!(fdtab[fd].polled_mask & tid_bit)) {
/* fd was not watched, it's still not */
return 0;
}
/* fd totally removed from poll list */
EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit);
}
else {
/* OK fd has to be monitored, it was either added or changed */
if (en & FD_EV_POLLED_R)
EV_SET(&kev[changes++], fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
else if (fdtab[fd].polled_mask & tid_bit)
EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
if (en & FD_EV_POLLED_W)
EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
else if (fdtab[fd].polled_mask & tid_bit)
EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
}
return changes;
}
/*
* kqueue() poller
*/
@ -41,8 +76,9 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
int status;
int count, fd, delta_ms;
struct timespec timeout;
int updt_idx, en;
int updt_idx;
int changes = 0;
int old_fd;
timeout.tv_sec = 0;
timeout.tv_nsec = 0;
@ -55,35 +91,27 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
activity[tid].poll_drop++;
continue;
}
en = fdtab[fd].state;
if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) {
if (!(fdtab[fd].polled_mask & tid_bit)) {
/* fd was not watched, it's still not */
continue;
}
/* fd totally removed from poll list */
EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit);
}
else {
/* OK fd has to be monitored, it was either added or changed */
if (en & FD_EV_POLLED_R)
EV_SET(&kev[changes++], fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
else if (fdtab[fd].polled_mask & tid_bit)
EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
if (en & FD_EV_POLLED_W)
EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
else if (fdtab[fd].polled_mask & tid_bit)
EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
}
changes += _update_fd(fd);
}
/* Scan the global update list */
for (old_fd = fd = update_list.first; fd != -1; fd = fdtab[fd].update.next) {
if (fd == -2) {
fd = old_fd;
continue;
}
else if (fd <= -3)
fd = -fd -4;
if (fd == -1)
break;
if (fdtab[fd].update_mask & tid_bit)
done_update_polling(fd);
else
continue;
if (!fdtab[fd].owner)
continue;
changes += _update_fd(fd);
}
if (changes) {
#ifdef EV_RECEIPT
kev[0].flags |= EV_RECEIPT;

View File

@ -45,6 +45,44 @@ REGPRM1 static void __fd_clo(int fd)
hap_fd_clr(fd, fd_evts[DIR_WR]);
}
static void _update_fd(int fd, int *max_add_fd)
{
int en;
en = fdtab[fd].state;
/* we have a single state for all threads, which is why we
* don't check the tid_bit. First thread to see the update
* takes it for every other one.
*/
if (!(en & FD_EV_POLLED_RW)) {
if (!fdtab[fd].polled_mask) {
/* fd was not watched, it's still not */
return;
}
/* fd totally removed from poll list */
hap_fd_clr(fd, fd_evts[DIR_RD]);
hap_fd_clr(fd, fd_evts[DIR_WR]);
HA_ATOMIC_AND(&fdtab[fd].polled_mask, 0);
}
else {
/* OK fd has to be monitored, it was either added or changed */
if (!(en & FD_EV_POLLED_R))
hap_fd_clr(fd, fd_evts[DIR_RD]);
else
hap_fd_set(fd, fd_evts[DIR_RD]);
if (!(en & FD_EV_POLLED_W))
hap_fd_clr(fd, fd_evts[DIR_WR]);
else
hap_fd_set(fd, fd_evts[DIR_WR]);
HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
if (fd > *max_add_fd)
*max_add_fd = fd;
}
}
/*
* Poll() poller
*/
@ -53,11 +91,12 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
int status;
int fd;
int wait_time;
int updt_idx, en;
int updt_idx;
int fds, count;
int sr, sw;
int old_maxfd, new_maxfd, max_add_fd;
unsigned rn, wn; /* read new, write new */
int old_fd;
max_add_fd = -1;
@ -70,39 +109,31 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
activity[tid].poll_drop++;
continue;
}
_update_fd(fd, &max_add_fd);
}
en = fdtab[fd].state;
/* we have a single state for all threads, which is why we
* don't check the tid_bit. First thread to see the update
* takes it for every other one.
*/
if (!(en & FD_EV_POLLED_RW)) {
if (!fdtab[fd].polled_mask) {
/* fd was not watched, it's still not */
continue;
}
/* fd totally removed from poll list */
hap_fd_clr(fd, fd_evts[DIR_RD]);
hap_fd_clr(fd, fd_evts[DIR_WR]);
HA_ATOMIC_AND(&fdtab[fd].polled_mask, 0);
}
else {
/* OK fd has to be monitored, it was either added or changed */
if (!(en & FD_EV_POLLED_R))
hap_fd_clr(fd, fd_evts[DIR_RD]);
else
hap_fd_set(fd, fd_evts[DIR_RD]);
if (!(en & FD_EV_POLLED_W))
hap_fd_clr(fd, fd_evts[DIR_WR]);
else
hap_fd_set(fd, fd_evts[DIR_WR]);
HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
if (fd > max_add_fd)
max_add_fd = fd;
/* Now scan the global update list */
for (old_fd = fd = update_list.first; fd != -1; fd = fdtab[fd].update.next) {
if (fd == -2) {
fd = old_fd;
continue;
}
else if (fd <= -3)
fd = -fd -4;
if (fd == -1)
break;
if (fdtab[fd].update_mask & tid_bit) {
/* Cheat a bit, as the state is global to all pollers
* we don't need every thread ot take care of the
* update.
*/
HA_ATOMIC_AND(&fdtab[fd].update_mask, ~all_threads_mask);
done_update_polling(fd);
} else
continue;
if (!fdtab[fd].owner)
continue;
_update_fd(fd, &max_add_fd);
}
/* maybe we added at least one fd larger than maxfd */

View File

@ -36,6 +36,44 @@ REGPRM1 static void __fd_clo(int fd)
hap_fd_clr(fd, fd_evts[DIR_WR]);
}
static void _update_fd(int fd, int *max_add_fd)
{
int en;
en = fdtab[fd].state;
/* we have a single state for all threads, which is why we
* don't check the tid_bit. First thread to see the update
* takes it for every other one.
*/
if (!(en & FD_EV_POLLED_RW)) {
if (!fdtab[fd].polled_mask) {
/* fd was not watched, it's still not */
return;
}
/* fd totally removed from poll list */
hap_fd_clr(fd, fd_evts[DIR_RD]);
hap_fd_clr(fd, fd_evts[DIR_WR]);
HA_ATOMIC_AND(&fdtab[fd].polled_mask, 0);
}
else {
/* OK fd has to be monitored, it was either added or changed */
if (!(en & FD_EV_POLLED_R))
hap_fd_clr(fd, fd_evts[DIR_RD]);
else
hap_fd_set(fd, fd_evts[DIR_RD]);
if (!(en & FD_EV_POLLED_W))
hap_fd_clr(fd, fd_evts[DIR_WR]);
else
hap_fd_set(fd, fd_evts[DIR_WR]);
HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
if (fd > *max_add_fd)
*max_add_fd = fd;
}
}
/*
* Select() poller
*/
@ -46,10 +84,11 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
struct timeval delta;
int delta_ms;
int fds;
int updt_idx, en;
int updt_idx;
char count;
int readnotnull, writenotnull;
int old_maxfd, new_maxfd, max_add_fd;
int old_fd;
max_add_fd = -1;
@ -62,40 +101,32 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
activity[tid].poll_drop++;
continue;
}
en = fdtab[fd].state;
/* we have a single state for all threads, which is why we
* don't check the tid_bit. First thread to see the update
* takes it for every other one.
*/
if (!(en & FD_EV_POLLED_RW)) {
if (!fdtab[fd].polled_mask) {
/* fd was not watched, it's still not */
continue;
}
/* fd totally removed from poll list */
hap_fd_clr(fd, fd_evts[DIR_RD]);
hap_fd_clr(fd, fd_evts[DIR_WR]);
HA_ATOMIC_AND(&fdtab[fd].polled_mask, 0);
}
else {
/* OK fd has to be monitored, it was either added or changed */
if (!(en & FD_EV_POLLED_R))
hap_fd_clr(fd, fd_evts[DIR_RD]);
else
hap_fd_set(fd, fd_evts[DIR_RD]);
if (!(en & FD_EV_POLLED_W))
hap_fd_clr(fd, fd_evts[DIR_WR]);
else
hap_fd_set(fd, fd_evts[DIR_WR]);
HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
if (fd > max_add_fd)
max_add_fd = fd;
}
_update_fd(fd, &max_add_fd);
}
/* Now scan the global update list */
for (old_fd = fd = update_list.first; fd != -1; fd = fdtab[fd].update.next) {
if (fd == -2) {
fd = old_fd;
continue;
}
else if (fd <= -3)
fd = -fd -4;
if (fd == -1)
break;
if (fdtab[fd].update_mask & tid_bit) {
/* Cheat a bit, as the state is global to all pollers
* we don't need every thread ot take care of the
* update.
*/
HA_ATOMIC_AND(&fdtab[fd].update_mask, ~all_threads_mask);
done_update_polling(fd);
} else
continue;
if (!fdtab[fd].owner)
continue;
_update_fd(fd, &max_add_fd);
}
/* maybe we added at least one fd larger than maxfd */
for (old_maxfd = maxfd; old_maxfd <= max_add_fd; ) {

View File

@ -169,6 +169,7 @@ int nbpollers = 0;
volatile struct fdlist fd_cache ; // FD events cache
volatile struct fdlist fd_cache_local[MAX_THREADS]; // FD events local for each thread
volatile struct fdlist update_list; // Global update list
unsigned long fd_cache_mask = 0; // Mask of threads with events in the cache
@ -244,7 +245,6 @@ void fd_rm_from_fd_list(volatile struct fdlist *list, int fd, int off)
int prev;
int next;
int last;
lock_self:
#if (defined(HA_CAS_IS_8B) || defined(HA_HAVE_CAS_DW))
next_list.next = next_list.prev = -2;
@ -492,6 +492,7 @@ int init_pollers()
goto fail_info;
fd_cache.first = fd_cache.last = -1;
update_list.first = update_list.last = -1;
hap_register_per_thread_init(init_pollers_per_thread);
hap_register_per_thread_deinit(deinit_pollers_per_thread);
@ -499,7 +500,7 @@ int init_pollers()
HA_SPIN_INIT(&fdtab[p].lock);
/* Mark the fd as out of the fd cache */
fdtab[p].cache.next = -3;
fdtab[p].cache.next = -3;
fdtab[p].update.next = -3;
}
for (p = 0; p < global.nbthread; p++)
fd_cache_local[p].first = fd_cache_local[p].last = -1;

View File

@ -31,7 +31,7 @@ void thread_sync_io_handler(int fd)
static HA_SPINLOCK_T sync_lock;
static int threads_sync_pipe[2];
static unsigned long threads_want_sync = 0;
static unsigned long all_threads_mask = 0;
unsigned long all_threads_mask = 0;
#if defined(DEBUG_THREAD) || defined(DEBUG_FULL)
struct lock_stat lock_stats[LOCK_LABELS];