mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2025-04-20 22:15:40 +00:00
BUG/MEDIUM: kqueue/threads: use one kqueue_fd per thread
This is the same principle as the previous patch (BUG/MEDIUM: epoll/threads: use one epoll_fd per thread) except that this time it's for kqueue. We don't want all threads to wake up because of activity on a single other thread that the other ones are not interested in. Just like with previous patch, this one shows that the polling state doesn't need to be changed here and that some simplifications are now possible. This patch only implements the minimum required for a stable backport. This should be backported to 1.8.
This commit is contained in:
parent
d9e7e36c6e
commit
7a2364d474
@ -29,7 +29,7 @@
|
||||
|
||||
|
||||
/* private data */
|
||||
static int kqueue_fd;
|
||||
static int kqueue_fd[MAX_THREADS]; // per-thread kqueue_fd
|
||||
static THREAD_LOCAL struct kevent *kev = NULL;
|
||||
|
||||
/*
|
||||
@ -61,35 +61,34 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
|
||||
fdtab[fd].state = en;
|
||||
HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
|
||||
|
||||
if ((eo ^ en) & FD_EV_POLLED_RW) {
|
||||
/* poll status changed */
|
||||
if ((eo ^ en) & FD_EV_POLLED_R) {
|
||||
/* read poll status changed */
|
||||
if (en & FD_EV_POLLED_R) {
|
||||
EV_SET(&kev[changes], fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
|
||||
changes++;
|
||||
}
|
||||
else {
|
||||
EV_SET(&kev[changes], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
|
||||
changes++;
|
||||
}
|
||||
if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) {
|
||||
if (!(fdtab[fd].polled_mask & tid_bit)) {
|
||||
/* fd was not watched, it's still not */
|
||||
continue;
|
||||
}
|
||||
/* fd totally removed from poll list */
|
||||
EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
|
||||
EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
|
||||
HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit);
|
||||
}
|
||||
else {
|
||||
/* OK fd has to be monitored, it was either added or changed */
|
||||
|
||||
if ((eo ^ en) & FD_EV_POLLED_W) {
|
||||
/* write poll status changed */
|
||||
if (en & FD_EV_POLLED_W) {
|
||||
EV_SET(&kev[changes], fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
|
||||
changes++;
|
||||
}
|
||||
else {
|
||||
EV_SET(&kev[changes], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
|
||||
changes++;
|
||||
}
|
||||
}
|
||||
if (en & FD_EV_POLLED_R)
|
||||
EV_SET(&kev[changes++], fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
|
||||
else if (fdtab[fd].polled_mask & tid_bit)
|
||||
EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
|
||||
|
||||
if (en & FD_EV_POLLED_W)
|
||||
EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
|
||||
else if (fdtab[fd].polled_mask & tid_bit)
|
||||
EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
|
||||
|
||||
HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
|
||||
}
|
||||
}
|
||||
if (changes)
|
||||
kevent(kqueue_fd, kev, changes, NULL, 0, NULL);
|
||||
kevent(kqueue_fd[tid], kev, changes, NULL, 0, NULL);
|
||||
fd_nbupdt = 0;
|
||||
|
||||
delta_ms = 0;
|
||||
@ -113,7 +112,7 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
|
||||
|
||||
fd = MIN(maxfd, global.tune.maxpollevents);
|
||||
gettimeofday(&before_poll, NULL);
|
||||
status = kevent(kqueue_fd, // int kq
|
||||
status = kevent(kqueue_fd[tid], // int kq
|
||||
NULL, // const struct kevent *changelist
|
||||
0, // int nchanges
|
||||
kev, // struct kevent *eventlist
|
||||
@ -155,11 +154,32 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
|
||||
|
||||
static int init_kqueue_per_thread()
|
||||
{
|
||||
int fd;
|
||||
|
||||
/* we can have up to two events per fd (*/
|
||||
kev = calloc(1, sizeof(struct kevent) * 2 * global.maxsock);
|
||||
if (kev == NULL)
|
||||
return 0;
|
||||
goto fail_alloc;
|
||||
|
||||
if (tid) {
|
||||
kqueue_fd[tid] = kqueue();
|
||||
if (kqueue_fd[tid] < 0)
|
||||
goto fail_fd;
|
||||
}
|
||||
|
||||
/* we may have to unregister some events initially registered on the
|
||||
* original fd when it was alone, and/or to register events on the new
|
||||
* fd for this thread. Let's just mark them as updated, the poller will
|
||||
* do the rest.
|
||||
*/
|
||||
for (fd = 0; fd < maxfd; fd++)
|
||||
updt_fd_polling(fd);
|
||||
|
||||
return 1;
|
||||
fail_fd:
|
||||
free(kev);
|
||||
fail_alloc:
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void deinit_kqueue_per_thread()
|
||||
@ -177,8 +197,8 @@ REGPRM1 static int _do_init(struct poller *p)
|
||||
{
|
||||
p->private = NULL;
|
||||
|
||||
kqueue_fd = kqueue();
|
||||
if (kqueue_fd < 0)
|
||||
kqueue_fd[tid] = kqueue();
|
||||
if (kqueue_fd[tid] < 0)
|
||||
goto fail_fd;
|
||||
|
||||
hap_register_per_thread_init(init_kqueue_per_thread);
|
||||
@ -196,9 +216,9 @@ REGPRM1 static int _do_init(struct poller *p)
|
||||
*/
|
||||
REGPRM1 static void _do_term(struct poller *p)
|
||||
{
|
||||
if (kqueue_fd >= 0) {
|
||||
close(kqueue_fd);
|
||||
kqueue_fd = -1;
|
||||
if (kqueue_fd[tid] >= 0) {
|
||||
close(kqueue_fd[tid]);
|
||||
kqueue_fd[tid] = -1;
|
||||
}
|
||||
|
||||
p->private = NULL;
|
||||
@ -227,8 +247,8 @@ REGPRM1 static int _do_test(struct poller *p)
|
||||
*/
|
||||
REGPRM1 static int _do_fork(struct poller *p)
|
||||
{
|
||||
kqueue_fd = kqueue();
|
||||
if (kqueue_fd < 0)
|
||||
kqueue_fd[tid] = kqueue();
|
||||
if (kqueue_fd[tid] < 0)
|
||||
return 0;
|
||||
return 1;
|
||||
}
|
||||
@ -242,11 +262,14 @@ __attribute__((constructor))
|
||||
static void _do_register(void)
|
||||
{
|
||||
struct poller *p;
|
||||
int i;
|
||||
|
||||
if (nbpollers >= MAX_POLLERS)
|
||||
return;
|
||||
|
||||
kqueue_fd = -1;
|
||||
for (i = 0; i < MAX_THREADS; i++)
|
||||
kqueue_fd[i] = -1;
|
||||
|
||||
p = &pollers[nbpollers++];
|
||||
|
||||
p->name = "kqueue";
|
||||
|
Loading…
Reference in New Issue
Block a user