1
0
mirror of http://git.haproxy.org/git/haproxy.git/ synced 2025-04-20 22:15:40 +00:00

BUG/MEDIUM: kqueue/threads: use one kqueue_fd per thread

This is the same principle as the previous patch (BUG/MEDIUM:
epoll/threads: use one epoll_fd per thread) except that this time it's
for kqueue. We don't want all threads to wake up because of activity on
a single other thread that the other ones are not interested in.

Just like with previous patch, this one shows that the polling state
doesn't need to be changed here and that some simplifications are now
possible. This patch only implements the minimum required for a stable
backport.

This should be backported to 1.8.
This commit is contained in:
Willy Tarreau 2018-01-19 08:56:14 +01:00
parent d9e7e36c6e
commit 7a2364d474

View File

@ -29,7 +29,7 @@
/* private data */
static int kqueue_fd;
static int kqueue_fd[MAX_THREADS]; // per-thread kqueue_fd
static THREAD_LOCAL struct kevent *kev = NULL;
/*
@ -61,35 +61,34 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
fdtab[fd].state = en;
HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
if ((eo ^ en) & FD_EV_POLLED_RW) {
/* poll status changed */
if ((eo ^ en) & FD_EV_POLLED_R) {
/* read poll status changed */
if (en & FD_EV_POLLED_R) {
EV_SET(&kev[changes], fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
changes++;
}
else {
EV_SET(&kev[changes], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
changes++;
}
if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) {
if (!(fdtab[fd].polled_mask & tid_bit)) {
/* fd was not watched, it's still not */
continue;
}
/* fd totally removed from poll list */
EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit);
}
else {
/* OK fd has to be monitored, it was either added or changed */
if ((eo ^ en) & FD_EV_POLLED_W) {
/* write poll status changed */
if (en & FD_EV_POLLED_W) {
EV_SET(&kev[changes], fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
changes++;
}
else {
EV_SET(&kev[changes], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
changes++;
}
}
if (en & FD_EV_POLLED_R)
EV_SET(&kev[changes++], fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
else if (fdtab[fd].polled_mask & tid_bit)
EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
if (en & FD_EV_POLLED_W)
EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
else if (fdtab[fd].polled_mask & tid_bit)
EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
}
}
if (changes)
kevent(kqueue_fd, kev, changes, NULL, 0, NULL);
kevent(kqueue_fd[tid], kev, changes, NULL, 0, NULL);
fd_nbupdt = 0;
delta_ms = 0;
@ -113,7 +112,7 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
fd = MIN(maxfd, global.tune.maxpollevents);
gettimeofday(&before_poll, NULL);
status = kevent(kqueue_fd, // int kq
status = kevent(kqueue_fd[tid], // int kq
NULL, // const struct kevent *changelist
0, // int nchanges
kev, // struct kevent *eventlist
@ -155,11 +154,32 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
static int init_kqueue_per_thread()
{
int fd;
/* we can have up to two events per fd (*/
kev = calloc(1, sizeof(struct kevent) * 2 * global.maxsock);
if (kev == NULL)
return 0;
goto fail_alloc;
if (tid) {
kqueue_fd[tid] = kqueue();
if (kqueue_fd[tid] < 0)
goto fail_fd;
}
/* we may have to unregister some events initially registered on the
* original fd when it was alone, and/or to register events on the new
* fd for this thread. Let's just mark them as updated, the poller will
* do the rest.
*/
for (fd = 0; fd < maxfd; fd++)
updt_fd_polling(fd);
return 1;
fail_fd:
free(kev);
fail_alloc:
return 0;
}
static void deinit_kqueue_per_thread()
@ -177,8 +197,8 @@ REGPRM1 static int _do_init(struct poller *p)
{
p->private = NULL;
kqueue_fd = kqueue();
if (kqueue_fd < 0)
kqueue_fd[tid] = kqueue();
if (kqueue_fd[tid] < 0)
goto fail_fd;
hap_register_per_thread_init(init_kqueue_per_thread);
@ -196,9 +216,9 @@ REGPRM1 static int _do_init(struct poller *p)
*/
REGPRM1 static void _do_term(struct poller *p)
{
if (kqueue_fd >= 0) {
close(kqueue_fd);
kqueue_fd = -1;
if (kqueue_fd[tid] >= 0) {
close(kqueue_fd[tid]);
kqueue_fd[tid] = -1;
}
p->private = NULL;
@ -227,8 +247,8 @@ REGPRM1 static int _do_test(struct poller *p)
*/
REGPRM1 static int _do_fork(struct poller *p)
{
kqueue_fd = kqueue();
if (kqueue_fd < 0)
kqueue_fd[tid] = kqueue();
if (kqueue_fd[tid] < 0)
return 0;
return 1;
}
@ -242,11 +262,14 @@ __attribute__((constructor))
static void _do_register(void)
{
struct poller *p;
int i;
if (nbpollers >= MAX_POLLERS)
return;
kqueue_fd = -1;
for (i = 0; i < MAX_THREADS; i++)
kqueue_fd[i] = -1;
p = &pollers[nbpollers++];
p->name = "kqueue";