mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2025-01-23 22:12:46 +00:00
MAJOR: fd: get rid of the DWCAS when setting the running_mask
Right now we're using a DWCAS to atomically set the running_mask while being constrained by the thread_mask. This DWCAS is annoying because we may seriously need it later when adding support for thread groups, for checking that the running_mask applies to the correct group. It turns out that the DWCAS is not strictly necessary because we never need it to set the thread_mask based on the running_mask, only the other way around. And in fact, the running_mask is always cleared alone, and the thread_mask is changed alone as well. The running_mask is only relevant to indicate a takeover when the thread_mask matches it. Any bit set in running and not present in thread_mask indicates a transition in progress. As such, it is possible to re-arrange this by using a regular CAS around a consistency check between running_mask and thread_mask in fd_update_events and by making a CAS on running_mask then an atomic store on the thread_mask in fd_takeover(). The only other case is fd_delete() but that one already sets the running_mask before clearing the thread_mask, which is compatible with the consistency check above. This change has happily survived 10 billion takeovers on a 16-thread machine at 800k requests/s. The fd-migration doc was updated to reflect this change.
This commit is contained in:
parent
b1f29bc625
commit
f69fea64e0
@ -103,3 +103,36 @@ ones, with their respective sequences:
|
||||
- mux->wake() is called
|
||||
- if the connection previously was in the list, it's reinserted under the
|
||||
idle_conns_lock.
|
||||
|
||||
|
||||
With the DWCAS removal between running_mask & thread_mask:
|
||||
|
||||
fd_takeover:
|
||||
1 if (!CAS(&running_mask, 0, tid_bit))
|
||||
2 return fail;
|
||||
3 atomic_store(&thread_mask, tid_bit);
|
||||
4 atomic_and(&running_mask, ~tid_bit);
|
||||
|
||||
poller:
|
||||
1 do {
|
||||
2 /* read consistent running_mask & thread_mask */
|
||||
3 do {
|
||||
4 run = atomic_load(&running_mask);
|
||||
5 thr = atomic_load(&thread_mask);
|
||||
6 } while (run & ~thr);
|
||||
7
|
||||
8 if (!(thr & tid_bit)) {
|
||||
9 /* takeover has started */
|
||||
10 goto disable_fd;
|
||||
11 }
|
||||
12 } while (!CAS(&running_mask, run, run | tid_bit));
|
||||
|
||||
fd_delete:
|
||||
1 atomic_or(&running_mask, tid_bit);
|
||||
2 atomic_store(&thread_mask, 0);
|
||||
3 atomic_and(&running_mask, ~tid_bit);
|
||||
|
||||
The loop in poller:3-6 is used to make sure the thread_mask we read matches
|
||||
the last updated running_mask. If nobody can give up on fd_takeover(), it
|
||||
might even be possible to spin on thread_mask only. Late pollers will not
|
||||
set running anymore with this.
|
||||
|
@ -151,10 +151,6 @@ struct fdlist {
|
||||
|
||||
/* info about one given fd. Note: only align on cache lines when using threads;
|
||||
* 32-bit small archs can put everything in 32-bytes when threads are disabled.
|
||||
*
|
||||
* NOTE: DO NOT REORDER THIS STRUCT AT ALL! Some code parts rely on exact field
|
||||
* ordering, for example fd_takeover() and fd_set_running() want running_mask
|
||||
* immediately followed by thread_mask to perform a double-word-CAS on them.
|
||||
*/
|
||||
struct fdtab {
|
||||
unsigned long running_mask; /* mask of thread IDs currently using the fd */
|
||||
|
74
src/fd.c
74
src/fd.c
@ -372,49 +372,36 @@ void fd_delete(int fd)
|
||||
*/
|
||||
int fd_takeover(int fd, void *expected_owner)
|
||||
{
|
||||
int ret = -1;
|
||||
|
||||
#ifndef HA_HAVE_CAS_DW
|
||||
if (_HA_ATOMIC_OR_FETCH(&fdtab[fd].running_mask, tid_bit) == tid_bit) {
|
||||
HA_RWLOCK_WRLOCK(OTHER_LOCK, &fd_mig_lock);
|
||||
if (fdtab[fd].owner == expected_owner) {
|
||||
fdtab[fd].thread_mask = tid_bit;
|
||||
ret = 0;
|
||||
}
|
||||
HA_RWLOCK_WRUNLOCK(OTHER_LOCK, &fd_mig_lock);
|
||||
}
|
||||
#else
|
||||
unsigned long old_masks[2];
|
||||
unsigned long new_masks[2];
|
||||
|
||||
new_masks[0] = new_masks[1] = tid_bit;
|
||||
|
||||
old_masks[0] = _HA_ATOMIC_OR_FETCH(&fdtab[fd].running_mask, tid_bit);
|
||||
old_masks[1] = fdtab[fd].thread_mask;
|
||||
unsigned long old;
|
||||
|
||||
/* protect ourself against a delete then an insert for the same fd,
|
||||
* if it happens, then the owner will no longer be the expected
|
||||
* connection.
|
||||
*/
|
||||
if (fdtab[fd].owner == expected_owner) {
|
||||
while (old_masks[0] == tid_bit && old_masks[1]) {
|
||||
if (_HA_ATOMIC_DWCAS(&fdtab[fd].running_mask, &old_masks, &new_masks)) {
|
||||
ret = 0;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif /* HW_HAVE_CAS_DW */
|
||||
if (fdtab[fd].owner != expected_owner)
|
||||
return -1;
|
||||
|
||||
_HA_ATOMIC_AND(&fdtab[fd].running_mask, ~tid_bit);
|
||||
/* we must be alone to work on this idle FD. If not, it means that its
|
||||
* poller is currently waking up and is about to use it, likely to
|
||||
* close it on shut/error, but maybe also to process any unexpectedly
|
||||
* pending data.
|
||||
*/
|
||||
old = 0;
|
||||
if (!HA_ATOMIC_CAS(&fdtab[fd].running_mask, &old, tid_bit))
|
||||
return -1;
|
||||
|
||||
/* success, from now on it's ours */
|
||||
HA_ATOMIC_STORE(&fdtab[fd].thread_mask, tid_bit);
|
||||
|
||||
/* Make sure the FD doesn't have the active bit. It is possible that
|
||||
* the fd is polled by the thread that used to own it, the new thread
|
||||
* is supposed to call subscribe() later, to activate polling.
|
||||
*/
|
||||
if (likely(ret == 0))
|
||||
fd_stop_recv(fd);
|
||||
return ret;
|
||||
fd_stop_recv(fd);
|
||||
|
||||
/* we're done with it */
|
||||
HA_ATOMIC_AND(&fdtab[fd].running_mask, ~tid_bit);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void updt_fd_polling(const int fd)
|
||||
@ -456,16 +443,29 @@ int fd_update_events(int fd, uint evts)
|
||||
unsigned long locked;
|
||||
uint old, new;
|
||||
uint new_flags, must_stop;
|
||||
ulong rmask, tmask;
|
||||
|
||||
ti->flags &= ~TI_FL_STUCK; // this thread is still running
|
||||
|
||||
/* do nothing if the FD was taken over under us */
|
||||
if (fd_set_running(fd) == -1) {
|
||||
activity[tid].poll_skip_fd++;
|
||||
return FD_UPDT_MIGRATED;
|
||||
}
|
||||
do {
|
||||
/* make sure we read a synchronous copy of rmask and tmask
|
||||
* (tmask is only up to date if it reflects all of rmask's
|
||||
* bits).
|
||||
*/
|
||||
do {
|
||||
rmask = _HA_ATOMIC_LOAD(&fdtab[fd].running_mask);
|
||||
tmask = _HA_ATOMIC_LOAD(&fdtab[fd].thread_mask);
|
||||
} while (rmask & ~tmask);
|
||||
|
||||
locked = (fdtab[fd].thread_mask != tid_bit);
|
||||
if (!(tmask & tid_bit)) {
|
||||
/* a takeover has started */
|
||||
activity[tid].poll_skip_fd++;
|
||||
return FD_UPDT_MIGRATED;
|
||||
}
|
||||
} while (!HA_ATOMIC_CAS(&fdtab[fd].running_mask, &rmask, rmask | tid_bit));
|
||||
|
||||
locked = (tmask != tid_bit);
|
||||
|
||||
/* OK now we are guaranteed that our thread_mask was present and
|
||||
* that we're allowed to update the FD.
|
||||
|
Loading…
Reference in New Issue
Block a user