diff --git a/include/common/hathreads.h b/include/common/hathreads.h index 3e2a350b6..a2c047e2c 100644 --- a/include/common/hathreads.h +++ b/include/common/hathreads.h @@ -138,6 +138,10 @@ int thread_need_sync(void); enum lock_label { THREAD_SYNC_LOCK = 0, + FDTAB_LOCK, + FDCACHE_LOCK, + FD_LOCK, + POLL_LOCK, POOL_LOCK, LOCK_LABELS }; @@ -221,7 +225,8 @@ struct ha_rwlock { static inline void show_lock_stats() { - const char *labels[LOCK_LABELS] = {"THREAD_SYNC", "POOL"}; + const char *labels[LOCK_LABELS] = {"THREAD_SYNC", "FDTAB", "FDCACHE", "FD", "POLL", + "POOL" }; int lbl; for (lbl = 0; lbl < LOCK_LABELS; lbl++) { diff --git a/include/proto/fd.h b/include/proto/fd.h index 56e20827c..e47d8fd5f 100644 --- a/include/proto/fd.h +++ b/include/proto/fd.h @@ -28,13 +28,22 @@ #include #include + #include /* public variables */ + extern unsigned int *fd_cache; // FD events cache -extern unsigned int *fd_updt; // FD updates list extern int fd_cache_num; // number of events in the cache -extern int fd_nbupdt; // number of updates in the list + +extern THREAD_LOCAL int *fd_updt; // FD updates list +extern THREAD_LOCAL int fd_nbupdt; // number of updates in the list + +#ifdef USE_THREAD +HA_SPINLOCK_T fdtab_lock; /* global lock to protect fdtab array */ +HA_RWLOCK_T fdcache_lock; /* global lock to protect fd_cache array */ +HA_SPINLOCK_T poll_lock; /* global lock to protect poll info */ +#endif /* Deletes an FD from the fdsets, and recomputes the maxfd limit. * The file descriptor is also closed. @@ -104,11 +113,14 @@ static inline void updt_fd_polling(const int fd) */ static inline void fd_alloc_cache_entry(const int fd) { + RWLOCK_WRLOCK(FDCACHE_LOCK, &fdcache_lock); if (fdtab[fd].cache) - return; + goto end; fd_cache_num++; fdtab[fd].cache = fd_cache_num; fd_cache[fd_cache_num-1] = fd; + end: + RWLOCK_WRUNLOCK(FDCACHE_LOCK, &fdcache_lock); } /* Removes entry used by fd from the FD cache and replaces it with the @@ -119,9 +131,10 @@ static inline void fd_release_cache_entry(int fd) { unsigned int pos; + RWLOCK_WRLOCK(FDCACHE_LOCK, &fdcache_lock); pos = fdtab[fd].cache; if (!pos) - return; + goto end; fdtab[fd].cache = 0; fd_cache_num--; if (likely(pos <= fd_cache_num)) { @@ -130,6 +143,8 @@ static inline void fd_release_cache_entry(int fd) fd_cache[pos - 1] = fd; fdtab[fd].cache = pos; } + end: + RWLOCK_WRUNLOCK(FDCACHE_LOCK, &fdcache_lock); } /* Computes the new polled status based on the active and ready statuses, for @@ -252,46 +267,56 @@ static inline int fd_active(const int fd) /* Disable processing recv events on fd */ static inline void fd_stop_recv(int fd) { + SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); if (fd_recv_active(fd)) { fdtab[fd].state &= ~FD_EV_ACTIVE_R; fd_update_cache(fd); /* need an update entry to change the state */ } + SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); } /* Disable processing send events on fd */ static inline void fd_stop_send(int fd) { + SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); if (fd_send_active(fd)) { fdtab[fd].state &= ~FD_EV_ACTIVE_W; fd_update_cache(fd); /* need an update entry to change the state */ } + SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); } /* Disable processing of events on fd for both directions. */ static inline void fd_stop_both(int fd) { + SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); if (fd_active(fd)) { fdtab[fd].state &= ~FD_EV_ACTIVE_RW; fd_update_cache(fd); /* need an update entry to change the state */ } + SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); } /* Report that FD cannot receive anymore without polling (EAGAIN detected). */ static inline void fd_cant_recv(const int fd) { + SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); if (fd_recv_ready(fd)) { fdtab[fd].state &= ~FD_EV_READY_R; fd_update_cache(fd); /* need an update entry to change the state */ } + SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); } /* Report that FD can receive anymore without polling. */ static inline void fd_may_recv(const int fd) { + SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); if (!fd_recv_ready(fd)) { fdtab[fd].state |= FD_EV_READY_R; fd_update_cache(fd); /* need an update entry to change the state */ } + SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); } /* Disable readiness when polled. This is useful to interrupt reading when it @@ -301,54 +326,66 @@ static inline void fd_may_recv(const int fd) */ static inline void fd_done_recv(const int fd) { + SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); if (fd_recv_polled(fd) && fd_recv_ready(fd)) { fdtab[fd].state &= ~FD_EV_READY_R; fd_update_cache(fd); /* need an update entry to change the state */ } + SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); } /* Report that FD cannot send anymore without polling (EAGAIN detected). */ static inline void fd_cant_send(const int fd) { + SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); if (fd_send_ready(fd)) { fdtab[fd].state &= ~FD_EV_READY_W; fd_update_cache(fd); /* need an update entry to change the state */ } + SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); } /* Report that FD can send anymore without polling (EAGAIN detected). */ static inline void fd_may_send(const int fd) { + SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); if (!fd_send_ready(fd)) { fdtab[fd].state |= FD_EV_READY_W; fd_update_cache(fd); /* need an update entry to change the state */ } + SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); } /* Prepare FD to try to receive */ static inline void fd_want_recv(int fd) { + SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); if (!fd_recv_active(fd)) { fdtab[fd].state |= FD_EV_ACTIVE_R; fd_update_cache(fd); /* need an update entry to change the state */ } + SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); } /* Prepare FD to try to send */ static inline void fd_want_send(int fd) { + SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); if (!fd_send_active(fd)) { fdtab[fd].state |= FD_EV_ACTIVE_W; fd_update_cache(fd); /* need an update entry to change the state */ } + SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); } /* Update events seen for FD and its state if needed. This should be called * by the poller to set FD_POLL_* flags. */ static inline void fd_update_events(int fd, int evts) { + SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); fdtab[fd].ev &= FD_POLL_STICKY; fdtab[fd].ev |= evts; + SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR)) fd_may_recv(fd); @@ -360,13 +397,19 @@ static inline void fd_update_events(int fd, int evts) /* Prepares for being polled */ static inline void fd_insert(int fd) { + SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); fdtab[fd].ev = 0; fdtab[fd].new = 1; fdtab[fd].updated = 0; fdtab[fd].linger_risk = 0; fdtab[fd].cloned = 0; + fdtab[fd].cache = 0; + SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); + + SPIN_LOCK(FDTAB_LOCK, &fdtab_lock); if (fd + 1 > maxfd) maxfd = fd + 1; + SPIN_UNLOCK(FDTAB_LOCK, &fdtab_lock); } diff --git a/include/types/fd.h b/include/types/fd.h index 2bd7c07f2..7042dab11 100644 --- a/include/types/fd.h +++ b/include/types/fd.h @@ -23,6 +23,7 @@ #define _TYPES_FD_H #include +#include #include /* Direction for each FD event update */ @@ -93,6 +94,9 @@ enum fd_states { struct fdtab { void (*iocb)(int fd); /* I/O handler */ void *owner; /* the connection or listener associated with this fd, NULL if closed */ +#ifdef USE_THREAD + HA_SPINLOCK_T lock; +#endif unsigned int cache; /* position+1 in the FD cache. 0=not in cache. */ unsigned char state; /* FD state for read and write directions (2*3 bits) */ unsigned char ev; /* event seen in return of poll() : FD_POLL_* */ diff --git a/src/ev_epoll.c b/src/ev_epoll.c index 9e72802dc..0b815b45b 100644 --- a/src/ev_epoll.c +++ b/src/ev_epoll.c @@ -28,13 +28,13 @@ /* private data */ -static struct epoll_event *epoll_events; +static THREAD_LOCAL struct epoll_event *epoll_events = NULL; static int epoll_fd; /* This structure may be used for any purpose. Warning! do not use it in * recursive functions ! */ -static struct epoll_event ev; +static THREAD_LOCAL struct epoll_event ev; #ifndef EPOLLRDHUP /* EPOLLRDHUP was defined late in libc, and it appeared in kernel 2.6.17 */ @@ -49,9 +49,8 @@ static struct epoll_event ev; */ REGPRM1 static void __fd_clo(int fd) { - if (unlikely(fdtab[fd].cloned)) { + if (unlikely(fdtab[fd].cloned)) epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, &ev); - } } /* @@ -68,18 +67,21 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) /* first, scan the update list to find polling changes */ for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) { fd = fd_updt[updt_idx]; - fdtab[fd].updated = 0; - fdtab[fd].new = 0; if (!fdtab[fd].owner) continue; + SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); + fdtab[fd].updated = 0; + fdtab[fd].new = 0; + eo = fdtab[fd].state; en = fd_compute_new_polled_status(eo); + fdtab[fd].state = en; + SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); if ((eo ^ en) & FD_EV_POLLED_RW) { /* poll status changed */ - fdtab[fd].state = en; if ((en & FD_EV_POLLED_RW) == 0) { /* fd removed from poll list */ @@ -103,6 +105,7 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) ev.events |= EPOLLOUT; ev.data.fd = fd; + epoll_ctl(epoll_fd, opcode, fd, &ev); } } @@ -154,7 +157,7 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) /* always remap RDHUP to HUP as they're used similarly */ if (e & EPOLLRDHUP) { - cur_poller.flags |= HAP_POLL_F_RDHUP; + HA_ATOMIC_OR(&cur_poller.flags, HAP_POLL_F_RDHUP); n |= FD_POLL_HUP; } fd_update_events(fd, n); @@ -162,6 +165,19 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) /* the caller will take care of cached events */ } +static int init_epoll_per_thread() +{ + epoll_events = calloc(1, sizeof(struct epoll_event) * global.tune.maxpollevents); + if (epoll_events == NULL) + return 0; + return 1; +} + +static void deinit_epoll_per_thread() +{ + free(epoll_events); +} + /* * Initialization of the epoll() poller. * Returns 0 in case of failure, non-zero in case of success. If it fails, it @@ -175,10 +191,11 @@ REGPRM1 static int _do_init(struct poller *p) if (epoll_fd < 0) goto fail_fd; - epoll_events = (struct epoll_event*) - calloc(1, sizeof(struct epoll_event) * global.tune.maxpollevents); - - if (epoll_events == NULL) + if (global.nbthread > 1) { + hap_register_per_thread_init(init_epoll_per_thread); + hap_register_per_thread_deinit(deinit_epoll_per_thread); + } + else if (!init_epoll_per_thread()) goto fail_ee; return 1; diff --git a/src/ev_kqueue.c b/src/ev_kqueue.c index 02723cca6..326d61607 100644 --- a/src/ev_kqueue.c +++ b/src/ev_kqueue.c @@ -30,7 +30,7 @@ /* private data */ static int kqueue_fd; -static struct kevent *kev = NULL; +static THREAD_LOCAL struct kevent *kev = NULL; /* * kqueue() poller @@ -46,19 +46,21 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) /* first, scan the update list to find changes */ for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) { fd = fd_updt[updt_idx]; - fdtab[fd].updated = 0; - fdtab[fd].new = 0; if (!fdtab[fd].owner) continue; + SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); + fdtab[fd].updated = 0; + fdtab[fd].new = 0; + eo = fdtab[fd].state; en = fd_compute_new_polled_status(eo); + fdtab[fd].state = en; + SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); if ((eo ^ en) & FD_EV_POLLED_RW) { /* poll status changed */ - fdtab[fd].state = en; - if ((eo ^ en) & FD_EV_POLLED_R) { /* read poll status changed */ if (en & FD_EV_POLLED_R) { @@ -139,6 +141,21 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) } } + +static int init_kqueue_per_thread() +{ + /* we can have up to two events per fd (*/ + kev = calloc(1, sizeof(struct kevent) * 2 * global.maxsock); + if (kev == NULL) + return 0; + return 1; +} + +static void deinit_kqueue_per_thread() +{ + free(kev); +} + /* * Initialization of the kqueue() poller. * Returns 0 in case of failure, non-zero in case of success. If it fails, it @@ -152,11 +169,13 @@ REGPRM1 static int _do_init(struct poller *p) if (kqueue_fd < 0) goto fail_fd; - /* we can have up to two events per fd (*/ - kev = calloc(1, sizeof(struct kevent) * 2 * global.maxsock); - if (kev == NULL) + if (global.nbthread > 1) { + hap_register_per_thread_init(init_kqueue_per_thread); + hap_register_per_thread_deinit(deinit_kqueue_per_thread); + } + else if (!init_kqueue_per_thread()) goto fail_kev; - + return 1; fail_kev: diff --git a/src/ev_poll.c b/src/ev_poll.c index 1cb8d2d62..e16968bcf 100644 --- a/src/ev_poll.c +++ b/src/ev_poll.c @@ -35,8 +35,8 @@ static unsigned int *fd_evts[2]; /* private data */ -static struct pollfd *poll_events = NULL; - +static THREAD_LOCAL int nbfd = 0; +static THREAD_LOCAL struct pollfd *poll_events = NULL; static inline void hap_fd_set(int fd, unsigned int *evts) { @@ -50,8 +50,10 @@ static inline void hap_fd_clr(int fd, unsigned int *evts) REGPRM1 static void __fd_clo(int fd) { + SPIN_LOCK(POLL_LOCK, &poll_lock); hap_fd_clr(fd, fd_evts[DIR_RD]); hap_fd_clr(fd, fd_evts[DIR_WR]); + SPIN_UNLOCK(POLL_LOCK, &poll_lock); } /* @@ -60,7 +62,7 @@ REGPRM1 static void __fd_clo(int fd) REGPRM2 static void _do_poll(struct poller *p, int exp) { int status; - int fd, nbfd; + int fd; int wait_time; int updt_idx, en, eo; int fds, count; @@ -70,19 +72,22 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) /* first, scan the update list to find changes */ for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) { fd = fd_updt[updt_idx]; - fdtab[fd].updated = 0; - fdtab[fd].new = 0; if (!fdtab[fd].owner) continue; + SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); + fdtab[fd].updated = 0; + fdtab[fd].new = 0; + eo = fdtab[fd].state; en = fd_compute_new_polled_status(eo); + fdtab[fd].state = en; + SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); if ((eo ^ en) & FD_EV_POLLED_RW) { /* poll status changed, update the lists */ - fdtab[fd].state = en; - + SPIN_LOCK(POLL_LOCK, &poll_lock); if ((eo & ~en) & FD_EV_POLLED_R) hap_fd_clr(fd, fd_evts[DIR_RD]); else if ((en & ~eo) & FD_EV_POLLED_R) @@ -92,6 +97,7 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) hap_fd_clr(fd, fd_evts[DIR_WR]); else if ((en & ~eo) & FD_EV_POLLED_W) hap_fd_set(fd, fd_evts[DIR_WR]); + SPIN_UNLOCK(POLL_LOCK, &poll_lock); } } fd_nbupdt = 0; @@ -100,7 +106,7 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) for (fds = 0; (fds * 8*sizeof(**fd_evts)) < maxfd; fds++) { rn = fd_evts[DIR_RD][fds]; wn = fd_evts[DIR_WR][fds]; - + if (!(rn|wn)) continue; @@ -112,9 +118,9 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) poll_events[nbfd].events = (sr ? (POLLIN | POLLRDHUP) : 0) | (sw ? POLLOUT : 0); nbfd++; } - } + } } - + /* now let's wait for events */ if (!exp) wait_time = MAX_DELAY_MS; @@ -135,7 +141,7 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) unsigned int n; int e = poll_events[count].revents; fd = poll_events[count].fd; - + if (!(e & ( POLLOUT | POLLIN | POLLERR | POLLHUP | POLLRDHUP ))) continue; @@ -161,15 +167,28 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) /* always remap RDHUP to HUP as they're used similarly */ if (e & POLLRDHUP) { - cur_poller.flags |= HAP_POLL_F_RDHUP; + HA_ATOMIC_OR(&cur_poller.flags, HAP_POLL_F_RDHUP); n |= FD_POLL_HUP; } - fd_update_events(fd, n); } } + +static int init_poll_per_thread() +{ + poll_events = calloc(1, sizeof(struct pollfd) * global.maxsock); + if (poll_events == NULL) + return 0; + return 1; +} + +static void deinit_poll_per_thread() +{ + free(poll_events); +} + /* * Initialization of the poll() poller. * Returns 0 in case of failure, non-zero in case of success. If it fails, it @@ -183,14 +202,15 @@ REGPRM1 static int _do_init(struct poller *p) p->private = NULL; fd_evts_bytes = (global.maxsock + sizeof(**fd_evts) - 1) / sizeof(**fd_evts) * sizeof(**fd_evts); - poll_events = calloc(1, sizeof(struct pollfd) * global.maxsock); - - if (poll_events == NULL) + if (global.nbthread > 1) { + hap_register_per_thread_init(init_poll_per_thread); + hap_register_per_thread_deinit(deinit_poll_per_thread); + } + else if (!init_poll_per_thread()) goto fail_pe; - + if ((fd_evts[DIR_RD] = calloc(1, fd_evts_bytes)) == NULL) goto fail_srevt; - if ((fd_evts[DIR_WR] = calloc(1, fd_evts_bytes)) == NULL) goto fail_swevt; diff --git a/src/ev_select.c b/src/ev_select.c index cf80ac856..97d42861f 100644 --- a/src/ev_select.c +++ b/src/ev_select.c @@ -24,14 +24,17 @@ #include +/* private data */ static fd_set *fd_evts[2]; -static fd_set *tmp_evts[2]; +static THREAD_LOCAL fd_set *tmp_evts[2]; /* Immediately remove the entry upon close() */ REGPRM1 static void __fd_clo(int fd) { + SPIN_LOCK(POLL_LOCK, &poll_lock); FD_CLR(fd, fd_evts[DIR_RD]); FD_CLR(fd, fd_evts[DIR_WR]); + SPIN_UNLOCK(POLL_LOCK, &poll_lock); } /* @@ -43,27 +46,30 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) int fd, i; struct timeval delta; int delta_ms; - int readnotnull, writenotnull; int fds; int updt_idx, en, eo; char count; - + int readnotnull, writenotnull; + /* first, scan the update list to find changes */ for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) { fd = fd_updt[updt_idx]; - fdtab[fd].updated = 0; - fdtab[fd].new = 0; if (!fdtab[fd].owner) continue; + SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); + fdtab[fd].updated = 0; + fdtab[fd].new = 0; + eo = fdtab[fd].state; en = fd_compute_new_polled_status(eo); + fdtab[fd].state = en; + SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); if ((eo ^ en) & FD_EV_POLLED_RW) { /* poll status changed, update the lists */ - fdtab[fd].state = en; - + SPIN_LOCK(POLL_LOCK, &poll_lock); if ((eo & ~en) & FD_EV_POLLED_R) FD_CLR(fd, fd_evts[DIR_RD]); else if ((en & ~eo) & FD_EV_POLLED_R) @@ -73,10 +79,28 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) FD_CLR(fd, fd_evts[DIR_WR]); else if ((en & ~eo) & FD_EV_POLLED_W) FD_SET(fd, fd_evts[DIR_WR]); + SPIN_UNLOCK(POLL_LOCK, &poll_lock); } } fd_nbupdt = 0; + /* let's restore fdset state */ + readnotnull = 0; writenotnull = 0; + for (i = 0; i < (maxfd + FD_SETSIZE - 1)/(8*sizeof(int)); i++) { + readnotnull |= (*(((int*)tmp_evts[DIR_RD])+i) = *(((int*)fd_evts[DIR_RD])+i)) != 0; + writenotnull |= (*(((int*)tmp_evts[DIR_WR])+i) = *(((int*)fd_evts[DIR_WR])+i)) != 0; + } + +#if 0 + /* just a verification code, needs to be removed for performance */ + for (i=0; iprivate = NULL; @@ -164,16 +193,15 @@ REGPRM1 static int _do_init(struct poller *p) goto fail_revt; fd_set_bytes = sizeof(fd_set) * (global.maxsock + FD_SETSIZE - 1) / FD_SETSIZE; - - if ((tmp_evts[DIR_RD] = (fd_set *)calloc(1, fd_set_bytes)) == NULL) + if (global.nbthread > 1) { + hap_register_per_thread_init(init_select_per_thread); + hap_register_per_thread_deinit(deinit_select_per_thread); + } + else if (!init_select_per_thread()) goto fail_revt; - - if ((tmp_evts[DIR_WR] = (fd_set *)calloc(1, fd_set_bytes)) == NULL) - goto fail_wevt; if ((fd_evts[DIR_RD] = (fd_set *)calloc(1, fd_set_bytes)) == NULL) goto fail_srevt; - if ((fd_evts[DIR_WR] = (fd_set *)calloc(1, fd_set_bytes)) == NULL) goto fail_swevt; @@ -183,7 +211,6 @@ REGPRM1 static int _do_init(struct poller *p) free(fd_evts[DIR_RD]); fail_srevt: free(tmp_evts[DIR_WR]); - fail_wevt: free(tmp_evts[DIR_RD]); fail_revt: p->pref = 0; diff --git a/src/fd.c b/src/fd.c index 8dab4977a..6c53a3b28 100644 --- a/src/fd.c +++ b/src/fd.c @@ -155,6 +155,7 @@ #include #include +#include #include struct fdtab *fdtab = NULL; /* array of all the file descriptors */ @@ -168,15 +169,23 @@ struct poller cur_poller; int nbpollers = 0; unsigned int *fd_cache = NULL; // FD events cache -unsigned int *fd_updt = NULL; // FD updates list int fd_cache_num = 0; // number of events in the cache -int fd_nbupdt = 0; // number of updates in the list + +THREAD_LOCAL int *fd_updt = NULL; // FD updates list +THREAD_LOCAL int fd_nbupdt = 0; // number of updates in the list + +#ifdef USE_THREAD +HA_SPINLOCK_T fdtab_lock; /* global lock to protect fdtab array */ +HA_RWLOCK_T fdcache_lock; /* global lock to protect fd_cache array */ +HA_SPINLOCK_T poll_lock; /* global lock to protect poll info */ +#endif /* Deletes an FD from the fdsets, and recomputes the maxfd limit. * The file descriptor is also closed. */ static void fd_dodelete(int fd, int do_close) { + SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); if (fdtab[fd].linger_risk) { /* this is generally set when connecting to servers */ setsockopt(fd, SOL_SOCKET, SO_LINGER, @@ -195,9 +204,12 @@ static void fd_dodelete(int fd, int do_close) fdtab[fd].new = 0; if (do_close) close(fd); + SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); + SPIN_LOCK(FDTAB_LOCK, &fdtab_lock); while ((maxfd-1 >= 0) && !fdtab[maxfd-1].owner) maxfd--; + SPIN_UNLOCK(FDTAB_LOCK, &fdtab_lock); } /* Deletes an FD from the fdsets, and recomputes the maxfd limit. @@ -225,10 +237,19 @@ void fd_process_cached_events() { int fd, entry, e; + if (!fd_cache_num) + return; + + RWLOCK_RDLOCK(FDCACHE_LOCK, &fdcache_lock); for (entry = 0; entry < fd_cache_num; ) { fd = fd_cache[entry]; - e = fdtab[fd].state; + if (SPIN_TRYLOCK(FD_LOCK, &fdtab[fd].lock)) + goto next; + + RWLOCK_RDUNLOCK(FDCACHE_LOCK, &fdcache_lock); + + e = fdtab[fd].state; fdtab[fd].ev &= FD_POLL_STICKY; if ((e & (FD_EV_READY_R | FD_EV_ACTIVE_R)) == (FD_EV_READY_R | FD_EV_ACTIVE_R)) @@ -237,18 +258,25 @@ void fd_process_cached_events() if ((e & (FD_EV_READY_W | FD_EV_ACTIVE_W)) == (FD_EV_READY_W | FD_EV_ACTIVE_W)) fdtab[fd].ev |= FD_POLL_OUT; - if (fdtab[fd].iocb && fdtab[fd].owner && fdtab[fd].ev) + if (fdtab[fd].iocb && fdtab[fd].owner && fdtab[fd].ev) { + SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); fdtab[fd].iocb(fd); - else + } + else { fd_release_cache_entry(fd); + SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); + } + RWLOCK_RDLOCK(FDCACHE_LOCK, &fdcache_lock); /* If the fd was removed from the cache, it has been * replaced by the next one that we don't want to skip ! */ if (entry < fd_cache_num && fd_cache[entry] != fd) continue; + next: entry++; } + RWLOCK_RDUNLOCK(FDCACHE_LOCK, &fdcache_lock); } /* disable the specified poller */ @@ -261,6 +289,21 @@ void disable_poller(const char *poller_name) pollers[p].pref = 0; } +/* Initialize the pollers per thread */ +static int init_pollers_per_thread() +{ + if ((fd_updt = calloc(global.maxsock, sizeof(*fd_updt))) == NULL) + return 0; + return 1; +} + +/* Deinitialize the pollers per thread */ +static void deinit_pollers_per_thread() +{ + free(fd_updt); + fd_updt = NULL; +} + /* * Initialize the pollers till the best one is found. * If none works, returns 0, otherwise 1. @@ -279,9 +322,21 @@ int init_pollers() if ((fd_cache = calloc(global.maxsock, sizeof(*fd_cache))) == NULL) goto fail_cache; - if ((fd_updt = calloc(global.maxsock, sizeof(*fd_updt))) == NULL) + if (global.nbthread > 1) { + hap_register_per_thread_init(init_pollers_per_thread); + hap_register_per_thread_deinit(deinit_pollers_per_thread); + } + else if (!init_pollers_per_thread()) goto fail_updt; + for (p = 0; p < global.maxsock; p++) + SPIN_INIT(&fdtab[p].lock); + + //memset(fd_cache, -1, global.maxsock); + + SPIN_INIT(&fdtab_lock); + RWLOCK_INIT(&fdcache_lock); + SPIN_INIT(&poll_lock); do { bp = NULL; for (p = 0; p < nbpollers; p++) @@ -316,16 +371,24 @@ void deinit_pollers() { struct poller *bp; int p; + for (p = 0; p < global.maxsock; p++) + SPIN_DESTROY(&fdtab[p].lock); + for (p = 0; p < nbpollers; p++) { bp = &pollers[p]; if (bp && bp->pref) bp->term(bp); } + free(fd_updt); fd_updt = NULL; free(fd_cache); fd_cache = NULL; free(fdinfo); fdinfo = NULL; free(fdtab); fdtab = NULL; + + SPIN_DESTROY(&fdtab_lock); + RWLOCK_DESTROY(&fdcache_lock); + SPIN_DESTROY(&poll_lock); } /* diff --git a/src/haproxy.c b/src/haproxy.c index 07331898f..7f485148b 100644 --- a/src/haproxy.c +++ b/src/haproxy.c @@ -2202,7 +2202,35 @@ void deinit(void) deinit_pollers(); } /* end deinit() */ +void mworker_pipe_handler(int fd) +{ + char c; + while (read(fd, &c, 1) == -1) { + if (errno == EINTR) + continue; + if (errno == EAGAIN) { + fd_cant_recv(fd); + return; + } + break; + } + + deinit(); + exit(EXIT_FAILURE); + return; +} + +void mworker_pipe_register(int pipefd[2]) +{ + close(mworker_pipe[1]); /* close the write end of the master pipe in the children */ + + fcntl(mworker_pipe[0], F_SETFL, O_NONBLOCK); + fdtab[mworker_pipe[0]].owner = mworker_pipe; + fdtab[mworker_pipe[0]].iocb = mworker_pipe_handler; + fd_insert(mworker_pipe[0]); + fd_want_recv(mworker_pipe[0]); +} static void sync_poll_loop() { @@ -2278,6 +2306,10 @@ static void *run_thread_poll_loop(void *data) } } + if (global.mode & MODE_MWORKER) + mworker_pipe_register(mworker_pipe); + + protocol_enable_all(); THREAD_SYNC_ENABLE(); run_poll_loop(); @@ -2320,37 +2352,6 @@ static struct task *manage_global_listener_queue(struct task *t) return t; } -void mworker_pipe_handler(int fd) -{ - char c; - - while (read(fd, &c, 1) == -1) { - if (errno == EINTR) - continue; - if (errno == EAGAIN) { - fd_cant_recv(fd); - return; - } - break; - } - - deinit(); - exit(EXIT_FAILURE); - return; -} - -void mworker_pipe_register(int pipefd[2]) -{ - close(mworker_pipe[1]); /* close the write end of the master pipe in the children */ - - fcntl(mworker_pipe[0], F_SETFL, O_NONBLOCK); - fdtab[mworker_pipe[0]].owner = mworker_pipe; - fdtab[mworker_pipe[0]].iocb = mworker_pipe_handler; - fd_insert(mworker_pipe[0]); - fd_want_recv(mworker_pipe[0]); - } - - int main(int argc, char **argv) { int err, retry; @@ -2798,11 +2799,6 @@ int main(int argc, char **argv) } global.mode &= ~MODE_STARTING; - - if (global.mode & MODE_MWORKER) - mworker_pipe_register(mworker_pipe); - - protocol_enable_all(); /* * That's it : the central polling loop. Run until we stop. */ @@ -2827,6 +2823,12 @@ int main(int argc, char **argv) } else { tid = 0; + + if (global.mode & MODE_MWORKER) + mworker_pipe_register(mworker_pipe); + + protocol_enable_all(); + run_poll_loop(); }