MEDIUM: fd/poller: turn polled_mask to group-local IDs

This changes the signification of each bit in the polled_mask so that
now each bit represents a local thread ID for the current group instead
of a global thread ID. As such, all tests now apply to ltid_bit instead
of tid_bit.

No particular check was made to verify that the FD's tgid matches the
current one because there should be no case where this is not true. A
check was added in epoll's __fd_clo() to confirm it never differs unless
expected (soft stop under thread isolation, or master in starting mode
going to exec mode), but that doesn't prevent from doing the job: it
only consists in checking in the group's threads those that are still
polling this FD and to remove them.

Some atomic loads were added at the various locations, and most repetitive
references to polled_mask[fd].xx were turned to a local copy instead making
the code much more clear.
This commit is contained in:
Willy Tarreau 2022-07-06 10:37:31 +02:00
parent 0dc1cc93b6
commit 63022128a5
5 changed files with 97 additions and 74 deletions

View File

@ -43,12 +43,25 @@ static int epoll_fd[MAX_THREADS] __read_mostly; // per-thread epoll_fd
static void __fd_clo(int fd)
{
if (unlikely(fdtab[fd].state & FD_CLONED)) {
unsigned long m = polled_mask[fd].poll_recv | polled_mask[fd].poll_send;
unsigned long m = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_recv) | _HA_ATOMIC_LOAD(&polled_mask[fd].poll_send);
int tgrp = fd_tgid(fd);
struct epoll_event ev;
int i;
for (i = global.nbthread - 1; i >= 0; i--)
if (m & (1UL << i))
if (!m)
return;
/* since FDs may only be shared per group and are only closed
* once entirely reset, it should never happen that we have to
* close an FD for another group, unless we're stopping from the
* wrong thread or during startup, which is what we're checking
* for. Regardless, it is not a problem to do so.
*/
if (unlikely(!(global.mode & MODE_STARTING)))
CHECK_IF(tgid != tgrp && !thread_isolated());
for (i = ha_tgroup_info[tgrp-1].base; i < ha_tgroup_info[tgrp-1].base + ha_tgroup_info[tgrp-1].count; i++)
if (m & ha_thread_info[i].ltid_bit)
epoll_ctl(epoll_fd[i], EPOLL_CTL_DEL, fd, &ev);
}
}
@ -57,18 +70,21 @@ static void _update_fd(int fd)
{
int en, opcode;
struct epoll_event ev = { };
ulong pr, ps;
en = fdtab[fd].state;
pr = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_recv);
ps = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_send);
/* Try to force EPOLLET on FDs that support it */
if (fdtab[fd].state & FD_ET_POSSIBLE) {
/* already done ? */
if (polled_mask[fd].poll_recv & polled_mask[fd].poll_send & tid_bit)
if (pr & ps & ti->ltid_bit)
return;
/* enable ET polling in both directions */
_HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit);
_HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit);
_HA_ATOMIC_OR(&polled_mask[fd].poll_recv, ti->ltid_bit);
_HA_ATOMIC_OR(&polled_mask[fd].poll_send, ti->ltid_bit);
opcode = EPOLL_CTL_ADD;
ev.events = EPOLLIN | EPOLLRDHUP | EPOLLOUT | EPOLLET;
goto done;
@ -79,38 +95,35 @@ static void _update_fd(int fd)
* needlessly unsubscribe then re-subscribe it.
*/
if (!(en & FD_EV_READY_R) &&
((en & FD_EV_ACTIVE_W) ||
((polled_mask[fd].poll_send | polled_mask[fd].poll_recv) & tid_bit)))
((en & FD_EV_ACTIVE_W) || ((ps | pr) & ti->ltid_bit)))
en |= FD_EV_ACTIVE_R;
if ((polled_mask[fd].poll_send | polled_mask[fd].poll_recv) & tid_bit) {
if ((ps | pr) & ti->ltid_bit) {
if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_ACTIVE_RW)) {
/* fd removed from poll list */
opcode = EPOLL_CTL_DEL;
if (polled_mask[fd].poll_recv & tid_bit)
_HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
if (polled_mask[fd].poll_send & tid_bit)
_HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
if (pr & ti->ltid_bit)
_HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~ti->ltid_bit);
if (ps & ti->ltid_bit)
_HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~ti->ltid_bit);
}
else {
if (((en & FD_EV_ACTIVE_R) != 0) ==
((polled_mask[fd].poll_recv & tid_bit) != 0) &&
((en & FD_EV_ACTIVE_W) != 0) ==
((polled_mask[fd].poll_send & tid_bit) != 0))
if (((en & FD_EV_ACTIVE_R) != 0) == ((pr & ti->ltid_bit) != 0) &&
((en & FD_EV_ACTIVE_W) != 0) == ((ps & ti->ltid_bit) != 0))
return;
if (en & FD_EV_ACTIVE_R) {
if (!(polled_mask[fd].poll_recv & tid_bit))
_HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit);
if (!(pr & ti->ltid_bit))
_HA_ATOMIC_OR(&polled_mask[fd].poll_recv, ti->ltid_bit);
} else {
if (polled_mask[fd].poll_recv & tid_bit)
_HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
if (pr & ti->ltid_bit)
_HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~ti->ltid_bit);
}
if (en & FD_EV_ACTIVE_W) {
if (!(polled_mask[fd].poll_send & tid_bit))
_HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit);
if (!(ps & ti->ltid_bit))
_HA_ATOMIC_OR(&polled_mask[fd].poll_send, ti->ltid_bit);
} else {
if (polled_mask[fd].poll_send & tid_bit)
_HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
if (ps & ti->ltid_bit)
_HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~ti->ltid_bit);
}
/* fd status changed */
opcode = EPOLL_CTL_MOD;
@ -120,9 +133,9 @@ static void _update_fd(int fd)
/* new fd in the poll list */
opcode = EPOLL_CTL_ADD;
if (en & FD_EV_ACTIVE_R)
_HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit);
_HA_ATOMIC_OR(&polled_mask[fd].poll_recv, ti->ltid_bit);
if (en & FD_EV_ACTIVE_W)
_HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit);
_HA_ATOMIC_OR(&polled_mask[fd].poll_send, ti->ltid_bit);
}
else {
return;

View File

@ -65,38 +65,40 @@ static void _update_fd(int fd)
{
int en;
int events;
ulong pr, ps;
en = fdtab[fd].state;
pr = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_recv);
ps = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_send);
if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_ACTIVE_RW)) {
if (!(polled_mask[fd].poll_recv & tid_bit) &&
!(polled_mask[fd].poll_send & tid_bit)) {
if (!((pr | ps) & ti->ltid_bit)) {
/* fd was not watched, it's still not */
return;
}
/* fd totally removed from poll list */
events = 0;
if (polled_mask[fd].poll_recv & tid_bit)
_HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
if (polled_mask[fd].poll_send & tid_bit)
_HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
if (pr & ti->ltid_bit)
_HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~ti->ltid_bit);
if (ps & ti->ltid_bit)
_HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~ti->ltid_bit);
}
else {
/* OK fd has to be monitored, it was either added or changed */
events = evports_state_to_events(en);
if (en & FD_EV_ACTIVE_R) {
if (!(polled_mask[fd].poll_recv & tid_bit))
_HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit);
if (!(pr & ti->ltid_bit))
_HA_ATOMIC_OR(&polled_mask[fd].poll_recv, ti->ltid_bit);
} else {
if (polled_mask[fd].poll_recv & tid_bit)
_HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
if (pr & ti->ltid_bit)
_HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~ti->ltid_bit);
}
if (en & FD_EV_ACTIVE_W) {
if (!(polled_mask[fd].poll_send & tid_bit))
_HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit);
if (!(ps & ti->ltid_bit))
_HA_ATOMIC_OR(&polled_mask[fd].poll_send, ti->ltid_bit);
} else {
if (polled_mask[fd].poll_send & tid_bit)
_HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
if (ps & ti->ltid_bit)
_HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~ti->ltid_bit);
}
}

View File

@ -36,46 +36,48 @@ static int _update_fd(int fd, int start)
{
int en;
int changes = start;
ulong pr, ps;
en = fdtab[fd].state;
pr = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_recv);
ps = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_send);
if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_ACTIVE_RW)) {
if (!(polled_mask[fd].poll_recv & tid_bit) &&
!(polled_mask[fd].poll_send & tid_bit)) {
if (!((pr | ps) & ti->ltid_bit)) {
/* fd was not watched, it's still not */
return changes;
}
/* fd totally removed from poll list */
EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
if (polled_mask[fd].poll_recv & tid_bit)
_HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
if (polled_mask[fd].poll_send & tid_bit)
_HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
if (pr & ti->ltid_bit)
_HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~ti->ltid_bit);
if (ps & ti->ltid_bit)
_HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~ti->ltid_bit);
}
else {
/* OK fd has to be monitored, it was either added or changed */
if (en & FD_EV_ACTIVE_R) {
if (!(polled_mask[fd].poll_recv & tid_bit)) {
if (!(pr & ti->ltid_bit)) {
EV_SET(&kev[changes++], fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
_HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit);
_HA_ATOMIC_OR(&polled_mask[fd].poll_recv, ti->ltid_bit);
}
}
else if (polled_mask[fd].poll_recv & tid_bit) {
else if (pr & ti->ltid_bit) {
EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~ti->ltid_bit);
}
if (en & FD_EV_ACTIVE_W) {
if (!(polled_mask[fd].poll_send & tid_bit)) {
if (!(ps & ti->ltid_bit)) {
EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
_HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit);
_HA_ATOMIC_OR(&polled_mask[fd].poll_send, ti->ltid_bit);
}
}
else if (polled_mask[fd].poll_send & tid_bit) {
else if (ps & ti->ltid_bit) {
EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
_HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
_HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~ti->ltid_bit);
}
}

View File

@ -47,15 +47,18 @@ static void __fd_clo(int fd)
static void _update_fd(int fd, int *max_add_fd)
{
int en;
ulong pr, ps;
en = fdtab[fd].state;
pr = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_recv);
ps = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_send);
/* we have a single state for all threads, which is why we
* don't check the tid_bit. First thread to see the update
* takes it for every other one.
*/
if (!(en & FD_EV_ACTIVE_RW)) {
if (!(polled_mask[fd].poll_recv | polled_mask[fd].poll_send)) {
if (!(pr | ps)) {
/* fd was not watched, it's still not */
return;
}
@ -69,22 +72,22 @@ static void _update_fd(int fd, int *max_add_fd)
/* OK fd has to be monitored, it was either added or changed */
if (!(en & FD_EV_ACTIVE_R)) {
hap_fd_clr(fd, fd_evts[DIR_RD]);
if (polled_mask[fd].poll_recv & tid_bit)
_HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
if (pr & ti->ltid_bit)
_HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~ti->ltid_bit);
} else {
hap_fd_set(fd, fd_evts[DIR_RD]);
if (!(polled_mask[fd].poll_recv & tid_bit))
_HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit);
if (!(pr & ti->ltid_bit))
_HA_ATOMIC_OR(&polled_mask[fd].poll_recv, ti->ltid_bit);
}
if (!(en & FD_EV_ACTIVE_W)) {
hap_fd_clr(fd, fd_evts[DIR_WR]);
if (polled_mask[fd].poll_send & tid_bit)
_HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
}else {
if (ps & ti->ltid_bit)
_HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~ti->ltid_bit);
} else {
hap_fd_set(fd, fd_evts[DIR_WR]);
if (!(polled_mask[fd].poll_send & tid_bit))
_HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit);
if (!(ps & ti->ltid_bit))
_HA_ATOMIC_OR(&polled_mask[fd].poll_send, ti->ltid_bit);
}
if (fd > *max_add_fd)

View File

@ -38,15 +38,18 @@ static void __fd_clo(int fd)
static void _update_fd(int fd, int *max_add_fd)
{
int en;
ulong pr, ps;
en = fdtab[fd].state;
pr = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_recv);
ps = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_send);
/* we have a single state for all threads, which is why we
* don't check the tid_bit. First thread to see the update
* takes it for every other one.
*/
if (!(en & FD_EV_ACTIVE_RW)) {
if (!(polled_mask[fd].poll_recv | polled_mask[fd].poll_send)) {
if (!(pr | ps)) {
/* fd was not watched, it's still not */
return;
}
@ -60,22 +63,22 @@ static void _update_fd(int fd, int *max_add_fd)
/* OK fd has to be monitored, it was either added or changed */
if (!(en & FD_EV_ACTIVE_R)) {
hap_fd_clr(fd, fd_evts[DIR_RD]);
if (polled_mask[fd].poll_recv & tid_bit)
_HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
if (pr & ti->ltid_bit)
_HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~ti->ltid_bit);
} else {
hap_fd_set(fd, fd_evts[DIR_RD]);
if (!(polled_mask[fd].poll_recv & tid_bit))
_HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit);
if (!(pr & ti->ltid_bit))
_HA_ATOMIC_OR(&polled_mask[fd].poll_recv, ti->ltid_bit);
}
if (!(en & FD_EV_ACTIVE_W)) {
hap_fd_clr(fd, fd_evts[DIR_WR]);
if (polled_mask[fd].poll_send & tid_bit)
_HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
if (ps & ti->ltid_bit)
_HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~ti->ltid_bit);
} else {
hap_fd_set(fd, fd_evts[DIR_WR]);
if (!(polled_mask[fd].poll_send & tid_bit))
_HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit);
if (!(ps & ti->ltid_bit))
_HA_ATOMIC_OR(&polled_mask[fd].poll_send, ti->ltid_bit);
}
if (fd > *max_add_fd)