MAJOR: threads/fd: Make fd stuffs thread-safe

Many changes have been made to do so. First, the fd_updt array, where all
pending FDs for polling are stored, is now a thread-local array. Then 3 locks
have been added to protect, respectively, the fdtab array, the fd_cache array
and poll information. In addition, a lock for each entry in the fdtab array has
been added to protect all accesses to a specific FD or its information.

For pollers, according to the poller, the way to manage the concurrency is
different. There is a poller loop on each thread. So the set of monitored FDs
may need to be protected. epoll and kqueue are thread-safe per-se, so there few
things to do to protect these pollers. This is not possible with select and
poll, so there is no sharing between the threads. The poller on each thread is
independant from others.

Finally, per-thread init/deinit functions are used for each pollers and for FD
part for manage thread-local ressources.

Now, you must be carefull when a FD is created during the HAProxy startup. All
update on the FD state must be made in the threads context and never before
their creation. This is mandatory because fd_updt array is thread-local and
initialized only for threads. Because there is no pollers for the main one, this
array remains uninitialized in this context. For this reason, listeners are now
enabled in run_thread_poll_loop function, just like the worker pipe.
This commit is contained in:
Christopher Faulet 2017-05-29 10:40:41 +02:00 committed by Willy Tarreau
parent b349e48ede
commit d4604adeaa
9 changed files with 319 additions and 119 deletions

View File

@ -138,6 +138,10 @@ int thread_need_sync(void);
enum lock_label {
THREAD_SYNC_LOCK = 0,
FDTAB_LOCK,
FDCACHE_LOCK,
FD_LOCK,
POLL_LOCK,
POOL_LOCK,
LOCK_LABELS
};
@ -221,7 +225,8 @@ struct ha_rwlock {
static inline void show_lock_stats()
{
const char *labels[LOCK_LABELS] = {"THREAD_SYNC", "POOL"};
const char *labels[LOCK_LABELS] = {"THREAD_SYNC", "FDTAB", "FDCACHE", "FD", "POLL",
"POOL" };
int lbl;
for (lbl = 0; lbl < LOCK_LABELS; lbl++) {

View File

@ -28,13 +28,22 @@
#include <unistd.h>
#include <common/config.h>
#include <types/fd.h>
/* public variables */
extern unsigned int *fd_cache; // FD events cache
extern unsigned int *fd_updt; // FD updates list
extern int fd_cache_num; // number of events in the cache
extern int fd_nbupdt; // number of updates in the list
extern THREAD_LOCAL int *fd_updt; // FD updates list
extern THREAD_LOCAL int fd_nbupdt; // number of updates in the list
#ifdef USE_THREAD
HA_SPINLOCK_T fdtab_lock; /* global lock to protect fdtab array */
HA_RWLOCK_T fdcache_lock; /* global lock to protect fd_cache array */
HA_SPINLOCK_T poll_lock; /* global lock to protect poll info */
#endif
/* Deletes an FD from the fdsets, and recomputes the maxfd limit.
* The file descriptor is also closed.
@ -104,11 +113,14 @@ static inline void updt_fd_polling(const int fd)
*/
static inline void fd_alloc_cache_entry(const int fd)
{
RWLOCK_WRLOCK(FDCACHE_LOCK, &fdcache_lock);
if (fdtab[fd].cache)
return;
goto end;
fd_cache_num++;
fdtab[fd].cache = fd_cache_num;
fd_cache[fd_cache_num-1] = fd;
end:
RWLOCK_WRUNLOCK(FDCACHE_LOCK, &fdcache_lock);
}
/* Removes entry used by fd <fd> from the FD cache and replaces it with the
@ -119,9 +131,10 @@ static inline void fd_release_cache_entry(int fd)
{
unsigned int pos;
RWLOCK_WRLOCK(FDCACHE_LOCK, &fdcache_lock);
pos = fdtab[fd].cache;
if (!pos)
return;
goto end;
fdtab[fd].cache = 0;
fd_cache_num--;
if (likely(pos <= fd_cache_num)) {
@ -130,6 +143,8 @@ static inline void fd_release_cache_entry(int fd)
fd_cache[pos - 1] = fd;
fdtab[fd].cache = pos;
}
end:
RWLOCK_WRUNLOCK(FDCACHE_LOCK, &fdcache_lock);
}
/* Computes the new polled status based on the active and ready statuses, for
@ -252,46 +267,56 @@ static inline int fd_active(const int fd)
/* Disable processing recv events on fd <fd> */
static inline void fd_stop_recv(int fd)
{
SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
if (fd_recv_active(fd)) {
fdtab[fd].state &= ~FD_EV_ACTIVE_R;
fd_update_cache(fd); /* need an update entry to change the state */
}
SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Disable processing send events on fd <fd> */
static inline void fd_stop_send(int fd)
{
SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
if (fd_send_active(fd)) {
fdtab[fd].state &= ~FD_EV_ACTIVE_W;
fd_update_cache(fd); /* need an update entry to change the state */
}
SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Disable processing of events on fd <fd> for both directions. */
static inline void fd_stop_both(int fd)
{
SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
if (fd_active(fd)) {
fdtab[fd].state &= ~FD_EV_ACTIVE_RW;
fd_update_cache(fd); /* need an update entry to change the state */
}
SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Report that FD <fd> cannot receive anymore without polling (EAGAIN detected). */
static inline void fd_cant_recv(const int fd)
{
SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
if (fd_recv_ready(fd)) {
fdtab[fd].state &= ~FD_EV_READY_R;
fd_update_cache(fd); /* need an update entry to change the state */
}
SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Report that FD <fd> can receive anymore without polling. */
static inline void fd_may_recv(const int fd)
{
SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
if (!fd_recv_ready(fd)) {
fdtab[fd].state |= FD_EV_READY_R;
fd_update_cache(fd); /* need an update entry to change the state */
}
SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Disable readiness when polled. This is useful to interrupt reading when it
@ -301,54 +326,66 @@ static inline void fd_may_recv(const int fd)
*/
static inline void fd_done_recv(const int fd)
{
SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
if (fd_recv_polled(fd) && fd_recv_ready(fd)) {
fdtab[fd].state &= ~FD_EV_READY_R;
fd_update_cache(fd); /* need an update entry to change the state */
}
SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Report that FD <fd> cannot send anymore without polling (EAGAIN detected). */
static inline void fd_cant_send(const int fd)
{
SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
if (fd_send_ready(fd)) {
fdtab[fd].state &= ~FD_EV_READY_W;
fd_update_cache(fd); /* need an update entry to change the state */
}
SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Report that FD <fd> can send anymore without polling (EAGAIN detected). */
static inline void fd_may_send(const int fd)
{
SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
if (!fd_send_ready(fd)) {
fdtab[fd].state |= FD_EV_READY_W;
fd_update_cache(fd); /* need an update entry to change the state */
}
SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Prepare FD <fd> to try to receive */
static inline void fd_want_recv(int fd)
{
SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
if (!fd_recv_active(fd)) {
fdtab[fd].state |= FD_EV_ACTIVE_R;
fd_update_cache(fd); /* need an update entry to change the state */
}
SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Prepare FD <fd> to try to send */
static inline void fd_want_send(int fd)
{
SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
if (!fd_send_active(fd)) {
fdtab[fd].state |= FD_EV_ACTIVE_W;
fd_update_cache(fd); /* need an update entry to change the state */
}
SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Update events seen for FD <fd> and its state if needed. This should be called
* by the poller to set FD_POLL_* flags. */
static inline void fd_update_events(int fd, int evts)
{
SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
fdtab[fd].ev &= FD_POLL_STICKY;
fdtab[fd].ev |= evts;
SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR))
fd_may_recv(fd);
@ -360,13 +397,19 @@ static inline void fd_update_events(int fd, int evts)
/* Prepares <fd> for being polled */
static inline void fd_insert(int fd)
{
SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
fdtab[fd].ev = 0;
fdtab[fd].new = 1;
fdtab[fd].updated = 0;
fdtab[fd].linger_risk = 0;
fdtab[fd].cloned = 0;
fdtab[fd].cache = 0;
SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
SPIN_LOCK(FDTAB_LOCK, &fdtab_lock);
if (fd + 1 > maxfd)
maxfd = fd + 1;
SPIN_UNLOCK(FDTAB_LOCK, &fdtab_lock);
}

View File

@ -23,6 +23,7 @@
#define _TYPES_FD_H
#include <common/config.h>
#include <common/hathreads.h>
#include <types/port_range.h>
/* Direction for each FD event update */
@ -93,6 +94,9 @@ enum fd_states {
struct fdtab {
void (*iocb)(int fd); /* I/O handler */
void *owner; /* the connection or listener associated with this fd, NULL if closed */
#ifdef USE_THREAD
HA_SPINLOCK_T lock;
#endif
unsigned int cache; /* position+1 in the FD cache. 0=not in cache. */
unsigned char state; /* FD state for read and write directions (2*3 bits) */
unsigned char ev; /* event seen in return of poll() : FD_POLL_* */

View File

@ -28,13 +28,13 @@
/* private data */
static struct epoll_event *epoll_events;
static THREAD_LOCAL struct epoll_event *epoll_events = NULL;
static int epoll_fd;
/* This structure may be used for any purpose. Warning! do not use it in
* recursive functions !
*/
static struct epoll_event ev;
static THREAD_LOCAL struct epoll_event ev;
#ifndef EPOLLRDHUP
/* EPOLLRDHUP was defined late in libc, and it appeared in kernel 2.6.17 */
@ -49,9 +49,8 @@ static struct epoll_event ev;
*/
REGPRM1 static void __fd_clo(int fd)
{
if (unlikely(fdtab[fd].cloned)) {
if (unlikely(fdtab[fd].cloned))
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, &ev);
}
}
/*
@ -68,18 +67,21 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
/* first, scan the update list to find polling changes */
for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) {
fd = fd_updt[updt_idx];
fdtab[fd].updated = 0;
fdtab[fd].new = 0;
if (!fdtab[fd].owner)
continue;
SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
fdtab[fd].updated = 0;
fdtab[fd].new = 0;
eo = fdtab[fd].state;
en = fd_compute_new_polled_status(eo);
fdtab[fd].state = en;
SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
if ((eo ^ en) & FD_EV_POLLED_RW) {
/* poll status changed */
fdtab[fd].state = en;
if ((en & FD_EV_POLLED_RW) == 0) {
/* fd removed from poll list */
@ -103,6 +105,7 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
ev.events |= EPOLLOUT;
ev.data.fd = fd;
epoll_ctl(epoll_fd, opcode, fd, &ev);
}
}
@ -154,7 +157,7 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
/* always remap RDHUP to HUP as they're used similarly */
if (e & EPOLLRDHUP) {
cur_poller.flags |= HAP_POLL_F_RDHUP;
HA_ATOMIC_OR(&cur_poller.flags, HAP_POLL_F_RDHUP);
n |= FD_POLL_HUP;
}
fd_update_events(fd, n);
@ -162,6 +165,19 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
/* the caller will take care of cached events */
}
static int init_epoll_per_thread()
{
epoll_events = calloc(1, sizeof(struct epoll_event) * global.tune.maxpollevents);
if (epoll_events == NULL)
return 0;
return 1;
}
static void deinit_epoll_per_thread()
{
free(epoll_events);
}
/*
* Initialization of the epoll() poller.
* Returns 0 in case of failure, non-zero in case of success. If it fails, it
@ -175,10 +191,11 @@ REGPRM1 static int _do_init(struct poller *p)
if (epoll_fd < 0)
goto fail_fd;
epoll_events = (struct epoll_event*)
calloc(1, sizeof(struct epoll_event) * global.tune.maxpollevents);
if (epoll_events == NULL)
if (global.nbthread > 1) {
hap_register_per_thread_init(init_epoll_per_thread);
hap_register_per_thread_deinit(deinit_epoll_per_thread);
}
else if (!init_epoll_per_thread())
goto fail_ee;
return 1;

View File

@ -30,7 +30,7 @@
/* private data */
static int kqueue_fd;
static struct kevent *kev = NULL;
static THREAD_LOCAL struct kevent *kev = NULL;
/*
* kqueue() poller
@ -46,19 +46,21 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
/* first, scan the update list to find changes */
for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) {
fd = fd_updt[updt_idx];
fdtab[fd].updated = 0;
fdtab[fd].new = 0;
if (!fdtab[fd].owner)
continue;
SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
fdtab[fd].updated = 0;
fdtab[fd].new = 0;
eo = fdtab[fd].state;
en = fd_compute_new_polled_status(eo);
fdtab[fd].state = en;
SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
if ((eo ^ en) & FD_EV_POLLED_RW) {
/* poll status changed */
fdtab[fd].state = en;
if ((eo ^ en) & FD_EV_POLLED_R) {
/* read poll status changed */
if (en & FD_EV_POLLED_R) {
@ -139,6 +141,21 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
}
}
static int init_kqueue_per_thread()
{
/* we can have up to two events per fd (*/
kev = calloc(1, sizeof(struct kevent) * 2 * global.maxsock);
if (kev == NULL)
return 0;
return 1;
}
static void deinit_kqueue_per_thread()
{
free(kev);
}
/*
* Initialization of the kqueue() poller.
* Returns 0 in case of failure, non-zero in case of success. If it fails, it
@ -152,11 +169,13 @@ REGPRM1 static int _do_init(struct poller *p)
if (kqueue_fd < 0)
goto fail_fd;
/* we can have up to two events per fd (*/
kev = calloc(1, sizeof(struct kevent) * 2 * global.maxsock);
if (kev == NULL)
if (global.nbthread > 1) {
hap_register_per_thread_init(init_kqueue_per_thread);
hap_register_per_thread_deinit(deinit_kqueue_per_thread);
}
else if (!init_kqueue_per_thread())
goto fail_kev;
return 1;
fail_kev:

View File

@ -35,8 +35,8 @@
static unsigned int *fd_evts[2];
/* private data */
static struct pollfd *poll_events = NULL;
static THREAD_LOCAL int nbfd = 0;
static THREAD_LOCAL struct pollfd *poll_events = NULL;
static inline void hap_fd_set(int fd, unsigned int *evts)
{
@ -50,8 +50,10 @@ static inline void hap_fd_clr(int fd, unsigned int *evts)
REGPRM1 static void __fd_clo(int fd)
{
SPIN_LOCK(POLL_LOCK, &poll_lock);
hap_fd_clr(fd, fd_evts[DIR_RD]);
hap_fd_clr(fd, fd_evts[DIR_WR]);
SPIN_UNLOCK(POLL_LOCK, &poll_lock);
}
/*
@ -60,7 +62,7 @@ REGPRM1 static void __fd_clo(int fd)
REGPRM2 static void _do_poll(struct poller *p, int exp)
{
int status;
int fd, nbfd;
int fd;
int wait_time;
int updt_idx, en, eo;
int fds, count;
@ -70,19 +72,22 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
/* first, scan the update list to find changes */
for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) {
fd = fd_updt[updt_idx];
fdtab[fd].updated = 0;
fdtab[fd].new = 0;
if (!fdtab[fd].owner)
continue;
SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
fdtab[fd].updated = 0;
fdtab[fd].new = 0;
eo = fdtab[fd].state;
en = fd_compute_new_polled_status(eo);
fdtab[fd].state = en;
SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
if ((eo ^ en) & FD_EV_POLLED_RW) {
/* poll status changed, update the lists */
fdtab[fd].state = en;
SPIN_LOCK(POLL_LOCK, &poll_lock);
if ((eo & ~en) & FD_EV_POLLED_R)
hap_fd_clr(fd, fd_evts[DIR_RD]);
else if ((en & ~eo) & FD_EV_POLLED_R)
@ -92,6 +97,7 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
hap_fd_clr(fd, fd_evts[DIR_WR]);
else if ((en & ~eo) & FD_EV_POLLED_W)
hap_fd_set(fd, fd_evts[DIR_WR]);
SPIN_UNLOCK(POLL_LOCK, &poll_lock);
}
}
fd_nbupdt = 0;
@ -100,7 +106,7 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
for (fds = 0; (fds * 8*sizeof(**fd_evts)) < maxfd; fds++) {
rn = fd_evts[DIR_RD][fds];
wn = fd_evts[DIR_WR][fds];
if (!(rn|wn))
continue;
@ -112,9 +118,9 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
poll_events[nbfd].events = (sr ? (POLLIN | POLLRDHUP) : 0) | (sw ? POLLOUT : 0);
nbfd++;
}
}
}
}
/* now let's wait for events */
if (!exp)
wait_time = MAX_DELAY_MS;
@ -135,7 +141,7 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
unsigned int n;
int e = poll_events[count].revents;
fd = poll_events[count].fd;
if (!(e & ( POLLOUT | POLLIN | POLLERR | POLLHUP | POLLRDHUP )))
continue;
@ -161,15 +167,28 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
/* always remap RDHUP to HUP as they're used similarly */
if (e & POLLRDHUP) {
cur_poller.flags |= HAP_POLL_F_RDHUP;
HA_ATOMIC_OR(&cur_poller.flags, HAP_POLL_F_RDHUP);
n |= FD_POLL_HUP;
}
fd_update_events(fd, n);
}
}
static int init_poll_per_thread()
{
poll_events = calloc(1, sizeof(struct pollfd) * global.maxsock);
if (poll_events == NULL)
return 0;
return 1;
}
static void deinit_poll_per_thread()
{
free(poll_events);
}
/*
* Initialization of the poll() poller.
* Returns 0 in case of failure, non-zero in case of success. If it fails, it
@ -183,14 +202,15 @@ REGPRM1 static int _do_init(struct poller *p)
p->private = NULL;
fd_evts_bytes = (global.maxsock + sizeof(**fd_evts) - 1) / sizeof(**fd_evts) * sizeof(**fd_evts);
poll_events = calloc(1, sizeof(struct pollfd) * global.maxsock);
if (poll_events == NULL)
if (global.nbthread > 1) {
hap_register_per_thread_init(init_poll_per_thread);
hap_register_per_thread_deinit(deinit_poll_per_thread);
}
else if (!init_poll_per_thread())
goto fail_pe;
if ((fd_evts[DIR_RD] = calloc(1, fd_evts_bytes)) == NULL)
goto fail_srevt;
if ((fd_evts[DIR_WR] = calloc(1, fd_evts_bytes)) == NULL)
goto fail_swevt;

View File

@ -24,14 +24,17 @@
#include <proto/fd.h>
/* private data */
static fd_set *fd_evts[2];
static fd_set *tmp_evts[2];
static THREAD_LOCAL fd_set *tmp_evts[2];
/* Immediately remove the entry upon close() */
REGPRM1 static void __fd_clo(int fd)
{
SPIN_LOCK(POLL_LOCK, &poll_lock);
FD_CLR(fd, fd_evts[DIR_RD]);
FD_CLR(fd, fd_evts[DIR_WR]);
SPIN_UNLOCK(POLL_LOCK, &poll_lock);
}
/*
@ -43,27 +46,30 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
int fd, i;
struct timeval delta;
int delta_ms;
int readnotnull, writenotnull;
int fds;
int updt_idx, en, eo;
char count;
int readnotnull, writenotnull;
/* first, scan the update list to find changes */
for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) {
fd = fd_updt[updt_idx];
fdtab[fd].updated = 0;
fdtab[fd].new = 0;
if (!fdtab[fd].owner)
continue;
SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
fdtab[fd].updated = 0;
fdtab[fd].new = 0;
eo = fdtab[fd].state;
en = fd_compute_new_polled_status(eo);
fdtab[fd].state = en;
SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
if ((eo ^ en) & FD_EV_POLLED_RW) {
/* poll status changed, update the lists */
fdtab[fd].state = en;
SPIN_LOCK(POLL_LOCK, &poll_lock);
if ((eo & ~en) & FD_EV_POLLED_R)
FD_CLR(fd, fd_evts[DIR_RD]);
else if ((en & ~eo) & FD_EV_POLLED_R)
@ -73,10 +79,28 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
FD_CLR(fd, fd_evts[DIR_WR]);
else if ((en & ~eo) & FD_EV_POLLED_W)
FD_SET(fd, fd_evts[DIR_WR]);
SPIN_UNLOCK(POLL_LOCK, &poll_lock);
}
}
fd_nbupdt = 0;
/* let's restore fdset state */
readnotnull = 0; writenotnull = 0;
for (i = 0; i < (maxfd + FD_SETSIZE - 1)/(8*sizeof(int)); i++) {
readnotnull |= (*(((int*)tmp_evts[DIR_RD])+i) = *(((int*)fd_evts[DIR_RD])+i)) != 0;
writenotnull |= (*(((int*)tmp_evts[DIR_WR])+i) = *(((int*)fd_evts[DIR_WR])+i)) != 0;
}
#if 0
/* just a verification code, needs to be removed for performance */
for (i=0; i<maxfd; i++) {
if (FD_ISSET(i, tmp_evts[DIR_RD]) != FD_ISSET(i, fd_evts[DIR_RD]))
abort();
if (FD_ISSET(i, tmp_evts[DIR_WR]) != FD_ISSET(i, fd_evts[DIR_WR]))
abort();
}
#endif
delta_ms = 0;
delta.tv_sec = 0;
delta.tv_usec = 0;
@ -94,30 +118,13 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
delta.tv_usec = (delta_ms % 1000) * 1000;
}
/* let's restore fdset state */
readnotnull = 0; writenotnull = 0;
for (i = 0; i < (maxfd + FD_SETSIZE - 1)/(8*sizeof(int)); i++) {
readnotnull |= (*(((int*)tmp_evts[DIR_RD])+i) = *(((int*)fd_evts[DIR_RD])+i)) != 0;
writenotnull |= (*(((int*)tmp_evts[DIR_WR])+i) = *(((int*)fd_evts[DIR_WR])+i)) != 0;
}
// /* just a verification code, needs to be removed for performance */
// for (i=0; i<maxfd; i++) {
// if (FD_ISSET(i, tmp_evts[DIR_RD]) != FD_ISSET(i, fd_evts[DIR_RD]))
// abort();
// if (FD_ISSET(i, tmp_evts[DIR_WR]) != FD_ISSET(i, fd_evts[DIR_WR]))
// abort();
//
// }
gettimeofday(&before_poll, NULL);
status = select(maxfd,
readnotnull ? tmp_evts[DIR_RD] : NULL,
writenotnull ? tmp_evts[DIR_WR] : NULL,
NULL,
&delta);
tv_update_date(delta_ms, status);
measure_idle();
@ -148,6 +155,28 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
}
}
static int init_select_per_thread()
{
int fd_set_bytes;
fd_set_bytes = sizeof(fd_set) * (global.maxsock + FD_SETSIZE - 1) / FD_SETSIZE;
if ((tmp_evts[DIR_RD] = (fd_set *)calloc(1, fd_set_bytes)) == NULL)
goto fail;
if ((tmp_evts[DIR_WR] = (fd_set *)calloc(1, fd_set_bytes)) == NULL)
goto fail;
return 1;
fail:
free(tmp_evts[DIR_RD]);
free(tmp_evts[DIR_WR]);
return 0;
}
static void deinit_select_per_thread()
{
free(tmp_evts[DIR_WR]);
free(tmp_evts[DIR_RD]);
}
/*
* Initialization of the select() poller.
* Returns 0 in case of failure, non-zero in case of success. If it fails, it
@ -155,7 +184,7 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
*/
REGPRM1 static int _do_init(struct poller *p)
{
__label__ fail_swevt, fail_srevt, fail_wevt, fail_revt;
__label__ fail_swevt, fail_srevt, fail_revt;
int fd_set_bytes;
p->private = NULL;
@ -164,16 +193,15 @@ REGPRM1 static int _do_init(struct poller *p)
goto fail_revt;
fd_set_bytes = sizeof(fd_set) * (global.maxsock + FD_SETSIZE - 1) / FD_SETSIZE;
if ((tmp_evts[DIR_RD] = (fd_set *)calloc(1, fd_set_bytes)) == NULL)
if (global.nbthread > 1) {
hap_register_per_thread_init(init_select_per_thread);
hap_register_per_thread_deinit(deinit_select_per_thread);
}
else if (!init_select_per_thread())
goto fail_revt;
if ((tmp_evts[DIR_WR] = (fd_set *)calloc(1, fd_set_bytes)) == NULL)
goto fail_wevt;
if ((fd_evts[DIR_RD] = (fd_set *)calloc(1, fd_set_bytes)) == NULL)
goto fail_srevt;
if ((fd_evts[DIR_WR] = (fd_set *)calloc(1, fd_set_bytes)) == NULL)
goto fail_swevt;
@ -183,7 +211,6 @@ REGPRM1 static int _do_init(struct poller *p)
free(fd_evts[DIR_RD]);
fail_srevt:
free(tmp_evts[DIR_WR]);
fail_wevt:
free(tmp_evts[DIR_RD]);
fail_revt:
p->pref = 0;

View File

@ -155,6 +155,7 @@
#include <types/global.h>
#include <proto/fd.h>
#include <proto/log.h>
#include <proto/port_range.h>
struct fdtab *fdtab = NULL; /* array of all the file descriptors */
@ -168,15 +169,23 @@ struct poller cur_poller;
int nbpollers = 0;
unsigned int *fd_cache = NULL; // FD events cache
unsigned int *fd_updt = NULL; // FD updates list
int fd_cache_num = 0; // number of events in the cache
int fd_nbupdt = 0; // number of updates in the list
THREAD_LOCAL int *fd_updt = NULL; // FD updates list
THREAD_LOCAL int fd_nbupdt = 0; // number of updates in the list
#ifdef USE_THREAD
HA_SPINLOCK_T fdtab_lock; /* global lock to protect fdtab array */
HA_RWLOCK_T fdcache_lock; /* global lock to protect fd_cache array */
HA_SPINLOCK_T poll_lock; /* global lock to protect poll info */
#endif
/* Deletes an FD from the fdsets, and recomputes the maxfd limit.
* The file descriptor is also closed.
*/
static void fd_dodelete(int fd, int do_close)
{
SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
if (fdtab[fd].linger_risk) {
/* this is generally set when connecting to servers */
setsockopt(fd, SOL_SOCKET, SO_LINGER,
@ -195,9 +204,12 @@ static void fd_dodelete(int fd, int do_close)
fdtab[fd].new = 0;
if (do_close)
close(fd);
SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
SPIN_LOCK(FDTAB_LOCK, &fdtab_lock);
while ((maxfd-1 >= 0) && !fdtab[maxfd-1].owner)
maxfd--;
SPIN_UNLOCK(FDTAB_LOCK, &fdtab_lock);
}
/* Deletes an FD from the fdsets, and recomputes the maxfd limit.
@ -225,10 +237,19 @@ void fd_process_cached_events()
{
int fd, entry, e;
if (!fd_cache_num)
return;
RWLOCK_RDLOCK(FDCACHE_LOCK, &fdcache_lock);
for (entry = 0; entry < fd_cache_num; ) {
fd = fd_cache[entry];
e = fdtab[fd].state;
if (SPIN_TRYLOCK(FD_LOCK, &fdtab[fd].lock))
goto next;
RWLOCK_RDUNLOCK(FDCACHE_LOCK, &fdcache_lock);
e = fdtab[fd].state;
fdtab[fd].ev &= FD_POLL_STICKY;
if ((e & (FD_EV_READY_R | FD_EV_ACTIVE_R)) == (FD_EV_READY_R | FD_EV_ACTIVE_R))
@ -237,18 +258,25 @@ void fd_process_cached_events()
if ((e & (FD_EV_READY_W | FD_EV_ACTIVE_W)) == (FD_EV_READY_W | FD_EV_ACTIVE_W))
fdtab[fd].ev |= FD_POLL_OUT;
if (fdtab[fd].iocb && fdtab[fd].owner && fdtab[fd].ev)
if (fdtab[fd].iocb && fdtab[fd].owner && fdtab[fd].ev) {
SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
fdtab[fd].iocb(fd);
else
}
else {
fd_release_cache_entry(fd);
SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
RWLOCK_RDLOCK(FDCACHE_LOCK, &fdcache_lock);
/* If the fd was removed from the cache, it has been
* replaced by the next one that we don't want to skip !
*/
if (entry < fd_cache_num && fd_cache[entry] != fd)
continue;
next:
entry++;
}
RWLOCK_RDUNLOCK(FDCACHE_LOCK, &fdcache_lock);
}
/* disable the specified poller */
@ -261,6 +289,21 @@ void disable_poller(const char *poller_name)
pollers[p].pref = 0;
}
/* Initialize the pollers per thread */
static int init_pollers_per_thread()
{
if ((fd_updt = calloc(global.maxsock, sizeof(*fd_updt))) == NULL)
return 0;
return 1;
}
/* Deinitialize the pollers per thread */
static void deinit_pollers_per_thread()
{
free(fd_updt);
fd_updt = NULL;
}
/*
* Initialize the pollers till the best one is found.
* If none works, returns 0, otherwise 1.
@ -279,9 +322,21 @@ int init_pollers()
if ((fd_cache = calloc(global.maxsock, sizeof(*fd_cache))) == NULL)
goto fail_cache;
if ((fd_updt = calloc(global.maxsock, sizeof(*fd_updt))) == NULL)
if (global.nbthread > 1) {
hap_register_per_thread_init(init_pollers_per_thread);
hap_register_per_thread_deinit(deinit_pollers_per_thread);
}
else if (!init_pollers_per_thread())
goto fail_updt;
for (p = 0; p < global.maxsock; p++)
SPIN_INIT(&fdtab[p].lock);
//memset(fd_cache, -1, global.maxsock);
SPIN_INIT(&fdtab_lock);
RWLOCK_INIT(&fdcache_lock);
SPIN_INIT(&poll_lock);
do {
bp = NULL;
for (p = 0; p < nbpollers; p++)
@ -316,16 +371,24 @@ void deinit_pollers() {
struct poller *bp;
int p;
for (p = 0; p < global.maxsock; p++)
SPIN_DESTROY(&fdtab[p].lock);
for (p = 0; p < nbpollers; p++) {
bp = &pollers[p];
if (bp && bp->pref)
bp->term(bp);
}
free(fd_updt); fd_updt = NULL;
free(fd_cache); fd_cache = NULL;
free(fdinfo); fdinfo = NULL;
free(fdtab); fdtab = NULL;
SPIN_DESTROY(&fdtab_lock);
RWLOCK_DESTROY(&fdcache_lock);
SPIN_DESTROY(&poll_lock);
}
/*

View File

@ -2202,7 +2202,35 @@ void deinit(void)
deinit_pollers();
} /* end deinit() */
void mworker_pipe_handler(int fd)
{
char c;
while (read(fd, &c, 1) == -1) {
if (errno == EINTR)
continue;
if (errno == EAGAIN) {
fd_cant_recv(fd);
return;
}
break;
}
deinit();
exit(EXIT_FAILURE);
return;
}
void mworker_pipe_register(int pipefd[2])
{
close(mworker_pipe[1]); /* close the write end of the master pipe in the children */
fcntl(mworker_pipe[0], F_SETFL, O_NONBLOCK);
fdtab[mworker_pipe[0]].owner = mworker_pipe;
fdtab[mworker_pipe[0]].iocb = mworker_pipe_handler;
fd_insert(mworker_pipe[0]);
fd_want_recv(mworker_pipe[0]);
}
static void sync_poll_loop()
{
@ -2278,6 +2306,10 @@ static void *run_thread_poll_loop(void *data)
}
}
if (global.mode & MODE_MWORKER)
mworker_pipe_register(mworker_pipe);
protocol_enable_all();
THREAD_SYNC_ENABLE();
run_poll_loop();
@ -2320,37 +2352,6 @@ static struct task *manage_global_listener_queue(struct task *t)
return t;
}
void mworker_pipe_handler(int fd)
{
char c;
while (read(fd, &c, 1) == -1) {
if (errno == EINTR)
continue;
if (errno == EAGAIN) {
fd_cant_recv(fd);
return;
}
break;
}
deinit();
exit(EXIT_FAILURE);
return;
}
void mworker_pipe_register(int pipefd[2])
{
close(mworker_pipe[1]); /* close the write end of the master pipe in the children */
fcntl(mworker_pipe[0], F_SETFL, O_NONBLOCK);
fdtab[mworker_pipe[0]].owner = mworker_pipe;
fdtab[mworker_pipe[0]].iocb = mworker_pipe_handler;
fd_insert(mworker_pipe[0]);
fd_want_recv(mworker_pipe[0]);
}
int main(int argc, char **argv)
{
int err, retry;
@ -2798,11 +2799,6 @@ int main(int argc, char **argv)
}
global.mode &= ~MODE_STARTING;
if (global.mode & MODE_MWORKER)
mworker_pipe_register(mworker_pipe);
protocol_enable_all();
/*
* That's it : the central polling loop. Run until we stop.
*/
@ -2827,6 +2823,12 @@ int main(int argc, char **argv)
}
else {
tid = 0;
if (global.mode & MODE_MWORKER)
mworker_pipe_register(mworker_pipe);
protocol_enable_all();
run_poll_loop();
}