diff --git a/doc/internals/fd-migration.txt b/doc/internals/fd-migration.txt index 5635ec8da..010431d28 100644 --- a/doc/internals/fd-migration.txt +++ b/doc/internals/fd-migration.txt @@ -103,3 +103,36 @@ ones, with their respective sequences: - mux->wake() is called - if the connection previously was in the list, it's reinserted under the idle_conns_lock. + + +With the DWCAS removal between running_mask & thread_mask: + +fd_takeover: + 1 if (!CAS(&running_mask, 0, tid_bit)) + 2 return fail; + 3 atomic_store(&thread_mask, tid_bit); + 4 atomic_and(&running_mask, ~tid_bit); + +poller: + 1 do { + 2 /* read consistent running_mask & thread_mask */ + 3 do { + 4 run = atomic_load(&running_mask); + 5 thr = atomic_load(&thread_mask); + 6 } while (run & ~thr); + 7 + 8 if (!(thr & tid_bit)) { + 9 /* takeover has started */ + 10 goto disable_fd; + 11 } + 12 } while (!CAS(&running_mask, run, run | tid_bit)); + +fd_delete: + 1 atomic_or(&running_mask, tid_bit); + 2 atomic_store(&thread_mask, 0); + 3 atomic_and(&running_mask, ~tid_bit); + +The loop in poller:3-6 is used to make sure the thread_mask we read matches +the last updated running_mask. If nobody can give up on fd_takeover(), it +might even be possible to spin on thread_mask only. Late pollers will not +set running anymore with this. diff --git a/include/haproxy/fd-t.h b/include/haproxy/fd-t.h index 1e318254e..433a3f3ae 100644 --- a/include/haproxy/fd-t.h +++ b/include/haproxy/fd-t.h @@ -151,10 +151,6 @@ struct fdlist { /* info about one given fd. Note: only align on cache lines when using threads; * 32-bit small archs can put everything in 32-bytes when threads are disabled. - * - * NOTE: DO NOT REORDER THIS STRUCT AT ALL! Some code parts rely on exact field - * ordering, for example fd_takeover() and fd_set_running() want running_mask - * immediately followed by thread_mask to perform a double-word-CAS on them. */ struct fdtab { unsigned long running_mask; /* mask of thread IDs currently using the fd */ diff --git a/src/fd.c b/src/fd.c index 3ac7e558f..138a8fd16 100644 --- a/src/fd.c +++ b/src/fd.c @@ -372,49 +372,36 @@ void fd_delete(int fd) */ int fd_takeover(int fd, void *expected_owner) { - int ret = -1; - -#ifndef HA_HAVE_CAS_DW - if (_HA_ATOMIC_OR_FETCH(&fdtab[fd].running_mask, tid_bit) == tid_bit) { - HA_RWLOCK_WRLOCK(OTHER_LOCK, &fd_mig_lock); - if (fdtab[fd].owner == expected_owner) { - fdtab[fd].thread_mask = tid_bit; - ret = 0; - } - HA_RWLOCK_WRUNLOCK(OTHER_LOCK, &fd_mig_lock); - } -#else - unsigned long old_masks[2]; - unsigned long new_masks[2]; - - new_masks[0] = new_masks[1] = tid_bit; - - old_masks[0] = _HA_ATOMIC_OR_FETCH(&fdtab[fd].running_mask, tid_bit); - old_masks[1] = fdtab[fd].thread_mask; + unsigned long old; /* protect ourself against a delete then an insert for the same fd, * if it happens, then the owner will no longer be the expected * connection. */ - if (fdtab[fd].owner == expected_owner) { - while (old_masks[0] == tid_bit && old_masks[1]) { - if (_HA_ATOMIC_DWCAS(&fdtab[fd].running_mask, &old_masks, &new_masks)) { - ret = 0; - break; - } - } - } -#endif /* HW_HAVE_CAS_DW */ + if (fdtab[fd].owner != expected_owner) + return -1; - _HA_ATOMIC_AND(&fdtab[fd].running_mask, ~tid_bit); + /* we must be alone to work on this idle FD. If not, it means that its + * poller is currently waking up and is about to use it, likely to + * close it on shut/error, but maybe also to process any unexpectedly + * pending data. + */ + old = 0; + if (!HA_ATOMIC_CAS(&fdtab[fd].running_mask, &old, tid_bit)) + return -1; + + /* success, from now on it's ours */ + HA_ATOMIC_STORE(&fdtab[fd].thread_mask, tid_bit); /* Make sure the FD doesn't have the active bit. It is possible that * the fd is polled by the thread that used to own it, the new thread * is supposed to call subscribe() later, to activate polling. */ - if (likely(ret == 0)) - fd_stop_recv(fd); - return ret; + fd_stop_recv(fd); + + /* we're done with it */ + HA_ATOMIC_AND(&fdtab[fd].running_mask, ~tid_bit); + return 0; } void updt_fd_polling(const int fd) @@ -456,16 +443,29 @@ int fd_update_events(int fd, uint evts) unsigned long locked; uint old, new; uint new_flags, must_stop; + ulong rmask, tmask; ti->flags &= ~TI_FL_STUCK; // this thread is still running /* do nothing if the FD was taken over under us */ - if (fd_set_running(fd) == -1) { - activity[tid].poll_skip_fd++; - return FD_UPDT_MIGRATED; - } + do { + /* make sure we read a synchronous copy of rmask and tmask + * (tmask is only up to date if it reflects all of rmask's + * bits). + */ + do { + rmask = _HA_ATOMIC_LOAD(&fdtab[fd].running_mask); + tmask = _HA_ATOMIC_LOAD(&fdtab[fd].thread_mask); + } while (rmask & ~tmask); - locked = (fdtab[fd].thread_mask != tid_bit); + if (!(tmask & tid_bit)) { + /* a takeover has started */ + activity[tid].poll_skip_fd++; + return FD_UPDT_MIGRATED; + } + } while (!HA_ATOMIC_CAS(&fdtab[fd].running_mask, &rmask, rmask | tid_bit)); + + locked = (tmask != tid_bit); /* OK now we are guaranteed that our thread_mask was present and * that we're allowed to update the FD.