MEDIUM: fd/poller: turn update_mask to group-local IDs

From now on, the FD's update_mask only refers to local thread IDs. However,
there remains a limitation, in updt_fd_polling(), we temporarily have to
check and set shared FDs against .thread_mask, which still contains global
ones. As such, nbtgroups > 1 may break (but this is not yet supported without
special build options).
This commit is contained in:
Willy Tarreau 2022-07-05 19:21:06 +02:00
parent 63022128a5
commit 6d3c501c08
8 changed files with 24 additions and 24 deletions

View File

@ -131,17 +131,17 @@ static inline void done_update_polling(int fd)
{
unsigned long update_mask;
update_mask = _HA_ATOMIC_AND_FETCH(&fdtab[fd].update_mask, ~tid_bit);
while ((update_mask & all_threads_mask)== 0) {
update_mask = _HA_ATOMIC_AND_FETCH(&fdtab[fd].update_mask, ~ti->ltid_bit);
while ((update_mask & tg->threads_enabled) == 0) {
/* If we were the last one that had to update that entry, remove it from the list */
fd_rm_from_fd_list(&update_list[tgid - 1], fd);
update_mask = (volatile unsigned long)fdtab[fd].update_mask;
if ((update_mask & all_threads_mask) != 0) {
update_mask = _HA_ATOMIC_LOAD(&fdtab[fd].update_mask);
if ((update_mask & tg->threads_enabled) != 0) {
/* Maybe it's been re-updated in the meanwhile, and we
* wrongly removed it from the list, if so, re-add it
*/
fd_add_to_fd_list(&update_list[tgid - 1], fd);
update_mask = (volatile unsigned long)(fdtab[fd].update_mask);
update_mask = _HA_ATOMIC_LOAD(&fdtab[fd].update_mask);
/* And then check again, just in case after all it
* should be removed, even if it's very unlikely, given
* the current thread wouldn't have been able to take

View File

@ -169,7 +169,7 @@ static void _do_poll(struct poller *p, int exp, int wake)
for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) {
fd = fd_updt[updt_idx];
_HA_ATOMIC_AND(&fdtab[fd].update_mask, ~tid_bit);
_HA_ATOMIC_AND(&fdtab[fd].update_mask, ~ti->ltid_bit);
if (!fdtab[fd].owner) {
activity[tid].poll_drop_fd++;
continue;
@ -188,7 +188,7 @@ static void _do_poll(struct poller *p, int exp, int wake)
fd = -fd -4;
if (fd == -1)
break;
if (fdtab[fd].update_mask & tid_bit)
if (fdtab[fd].update_mask & ti->ltid_bit)
done_update_polling(fd);
else
continue;

View File

@ -126,7 +126,7 @@ static void _do_poll(struct poller *p, int exp, int wake)
for (i = 0; i < fd_nbupdt; i++) {
fd = fd_updt[i];
_HA_ATOMIC_AND(&fdtab[fd].update_mask, ~tid_bit);
_HA_ATOMIC_AND(&fdtab[fd].update_mask, ~ti->ltid_bit);
if (fdtab[fd].owner == NULL) {
activity[tid].poll_drop_fd++;
continue;
@ -145,7 +145,7 @@ static void _do_poll(struct poller *p, int exp, int wake)
fd = -fd -4;
if (fd == -1)
break;
if (fdtab[fd].update_mask & tid_bit)
if (fdtab[fd].update_mask & ti->ltid_bit)
done_update_polling(fd);
else
continue;

View File

@ -102,7 +102,7 @@ static void _do_poll(struct poller *p, int exp, int wake)
for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) {
fd = fd_updt[updt_idx];
_HA_ATOMIC_AND(&fdtab[fd].update_mask, ~tid_bit);
_HA_ATOMIC_AND(&fdtab[fd].update_mask, ~ti->ltid_bit);
if (!fdtab[fd].owner) {
activity[tid].poll_drop_fd++;
continue;
@ -119,7 +119,7 @@ static void _do_poll(struct poller *p, int exp, int wake)
fd = -fd -4;
if (fd == -1)
break;
if (fdtab[fd].update_mask & tid_bit)
if (fdtab[fd].update_mask & ti->ltid_bit)
done_update_polling(fd);
else
continue;

View File

@ -116,7 +116,7 @@ static void _do_poll(struct poller *p, int exp, int wake)
for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) {
fd = fd_updt[updt_idx];
_HA_ATOMIC_AND(&fdtab[fd].update_mask, ~tid_bit);
_HA_ATOMIC_AND(&fdtab[fd].update_mask, ~ti->ltid_bit);
if (!fdtab[fd].owner) {
activity[tid].poll_drop_fd++;
continue;
@ -134,12 +134,12 @@ static void _do_poll(struct poller *p, int exp, int wake)
fd = -fd -4;
if (fd == -1)
break;
if (fdtab[fd].update_mask & tid_bit) {
if (fdtab[fd].update_mask & ti->ltid_bit) {
/* Cheat a bit, as the state is global to all pollers
* we don't need every thread to take care of the
* update.
*/
_HA_ATOMIC_AND(&fdtab[fd].update_mask, ~all_threads_mask);
_HA_ATOMIC_AND(&fdtab[fd].update_mask, ~tg->threads_enabled);
done_update_polling(fd);
} else
continue;

View File

@ -108,7 +108,7 @@ static void _do_poll(struct poller *p, int exp, int wake)
for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) {
fd = fd_updt[updt_idx];
_HA_ATOMIC_AND(&fdtab[fd].update_mask, ~tid_bit);
_HA_ATOMIC_AND(&fdtab[fd].update_mask, ~ti->ltid_bit);
if (!fdtab[fd].owner) {
activity[tid].poll_drop_fd++;
continue;
@ -125,12 +125,12 @@ static void _do_poll(struct poller *p, int exp, int wake)
fd = -fd -4;
if (fd == -1)
break;
if (fdtab[fd].update_mask & tid_bit) {
if (fdtab[fd].update_mask & ti->ltid_bit) {
/* Cheat a bit, as the state is global to all pollers
* we don't need every thread to take care of the
* update.
*/
_HA_ATOMIC_AND(&fdtab[fd].update_mask, ~all_threads_mask);
_HA_ATOMIC_AND(&fdtab[fd].update_mask, ~tg->threads_enabled);
done_update_polling(fd);
} else
continue;

View File

@ -459,14 +459,14 @@ int fd_takeover(int fd, void *expected_owner)
void updt_fd_polling(const int fd)
{
if (all_threads_mask == 1UL || (fdtab[fd].thread_mask & all_threads_mask) == tid_bit) {
if (HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
if (HA_ATOMIC_BTS(&fdtab[fd].update_mask, ti->ltid))
return;
fd_updt[fd_nbupdt++] = fd;
} else {
unsigned long update_mask = fdtab[fd].update_mask;
do {
if (update_mask == fdtab[fd].thread_mask)
if (update_mask == fdtab[fd].thread_mask) // FIXME: this works only on thread-groups 1
return;
} while (!_HA_ATOMIC_CAS(&fdtab[fd].update_mask, &update_mask, fdtab[fd].thread_mask));
@ -525,7 +525,7 @@ int fd_update_events(int fd, uint evts)
activity[tid].poll_skip_fd++;
/* Let the poller know this FD was lost */
if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, ti->ltid))
fd_updt[fd_nbupdt++] = fd;
fd_drop_tgid(fd);
@ -603,10 +603,10 @@ int fd_update_events(int fd, uint evts)
/* we had to stop this FD and it still must be stopped after the I/O
* cb's changes, so let's program an update for this.
*/
if (must_stop && !(fdtab[fd].update_mask & tid_bit)) {
if (must_stop && !(fdtab[fd].update_mask & ti->ltid_bit)) {
if (((must_stop & FD_POLL_IN) && !fd_recv_active(fd)) ||
((must_stop & FD_POLL_OUT) && !fd_send_active(fd)))
if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, ti->ltid))
fd_updt[fd_nbupdt++] = fd;
}

View File

@ -3330,7 +3330,7 @@ static int stats_dump_full_strm_to_buffer(struct stconn *sc, struct stream *strm
conn->flags,
conn_fd(conn),
conn_fd(conn) >= 0 ? fdtab[conn->handle.fd].state : 0,
conn_fd(conn) >= 0 ? !!(fdtab[conn->handle.fd].update_mask & tid_bit) : 0,
conn_fd(conn) >= 0 ? !!(fdtab[conn->handle.fd].update_mask & ti->ltid_bit) : 0,
conn_fd(conn) >= 0 ? fdtab[conn->handle.fd].thread_mask: 0);
}
@ -3368,7 +3368,7 @@ static int stats_dump_full_strm_to_buffer(struct stconn *sc, struct stream *strm
conn->flags,
conn_fd(conn),
conn_fd(conn) >= 0 ? fdtab[conn->handle.fd].state : 0,
conn_fd(conn) >= 0 ? !!(fdtab[conn->handle.fd].update_mask & tid_bit) : 0,
conn_fd(conn) >= 0 ? !!(fdtab[conn->handle.fd].update_mask & ti->ltid_bit) : 0,
conn_fd(conn) >= 0 ? fdtab[conn->handle.fd].thread_mask: 0);
}