mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2024-12-25 14:12:13 +00:00
MINOR: fd: Implement fd_takeover().
Implement a new function, fd_takeover(), that lets you become the thread responsible for the fd. On architectures that do not have a double-width CAS, use a global rwlock. fd_set_running() was also changed to be able to compete with fd_takeover(), either using a dooble-width CAS on both running_mask and thread_mask, or by claiming a reader on the global rwlock. This extra operation should not have any measurable impact on modern architectures where threading is relevant.
This commit is contained in:
parent
dc2f2753e9
commit
8851664293
@ -60,6 +60,16 @@ void fd_delete(int fd);
|
|||||||
*/
|
*/
|
||||||
void fd_remove(int fd);
|
void fd_remove(int fd);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Take over a FD belonging to another thread.
|
||||||
|
* Returns 0 on success, and -1 on failure.
|
||||||
|
*/
|
||||||
|
int fd_takeover(int fd, void *expected_owner);
|
||||||
|
|
||||||
|
#ifndef HA_HAVE_CAS_DW
|
||||||
|
__decl_hathreads(HA_RWLOCK_T fd_mig_lock);
|
||||||
|
#endif
|
||||||
|
|
||||||
ssize_t fd_write_frag_line(int fd, size_t maxlen, const struct ist pfx[], size_t npfx, const struct ist msg[], size_t nmsg, int nl);
|
ssize_t fd_write_frag_line(int fd, size_t maxlen, const struct ist pfx[], size_t npfx, const struct ist msg[], size_t nmsg, int nl);
|
||||||
|
|
||||||
/* close all FDs starting from <start> */
|
/* close all FDs starting from <start> */
|
||||||
@ -299,9 +309,35 @@ static inline void fd_want_send(int fd)
|
|||||||
updt_fd_polling(fd);
|
updt_fd_polling(fd);
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline void fd_set_running(int fd)
|
/* Set the fd as currently running on the current thread.
|
||||||
|
* Retuns 0 if all goes well, or -1 if we no longer own the fd, and should
|
||||||
|
* do nothing with it.
|
||||||
|
*/
|
||||||
|
static inline int fd_set_running(int fd)
|
||||||
{
|
{
|
||||||
|
#ifndef HA_HAVE_CAS_DW
|
||||||
|
HA_RWLOCK_RDLOCK(OTHER_LOCK, &fd_mig_lock);
|
||||||
|
if (!(fdtab[fd].thread_mask & tid_bit)) {
|
||||||
|
HA_RWLOCK_RDUNLOCK(OTHER_LOCK, &fd_mig_lock);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
_HA_ATOMIC_OR(&fdtab[fd].running_mask, tid_bit);
|
_HA_ATOMIC_OR(&fdtab[fd].running_mask, tid_bit);
|
||||||
|
HA_RWLOCK_RDUNLOCK(OTHER_LOCK, &fd_mig_lock);
|
||||||
|
return 0;
|
||||||
|
#else
|
||||||
|
unsigned long old_masks[2];
|
||||||
|
unsigned long new_masks[2];
|
||||||
|
old_masks[0] = fdtab[fd].running_mask;
|
||||||
|
old_masks[1] = fdtab[fd].thread_mask;
|
||||||
|
do {
|
||||||
|
if (!(old_masks[1] & tid_bit))
|
||||||
|
return -1;
|
||||||
|
new_masks[0] = fdtab[fd].running_mask | tid_bit;
|
||||||
|
new_masks[1] = old_masks[1];
|
||||||
|
|
||||||
|
} while (!(HA_ATOMIC_DWCAS(&fdtab[fd].running_mask, &old_masks, &new_masks)));
|
||||||
|
return 0;
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline void fd_set_running_excl(int fd)
|
static inline void fd_set_running_excl(int fd)
|
||||||
@ -371,7 +407,8 @@ static inline void fd_update_events(int fd, unsigned char evts)
|
|||||||
fd_may_send(fd);
|
fd_may_send(fd);
|
||||||
|
|
||||||
if (fdtab[fd].iocb && fd_active(fd)) {
|
if (fdtab[fd].iocb && fd_active(fd)) {
|
||||||
fd_set_running(fd);
|
if (fd_set_running(fd) == -1)
|
||||||
|
return;
|
||||||
fdtab[fd].iocb(fd);
|
fdtab[fd].iocb(fd);
|
||||||
fd_clr_running(fd);
|
fd_clr_running(fd);
|
||||||
}
|
}
|
||||||
|
67
src/fd.c
67
src/fd.c
@ -302,6 +302,13 @@ static void fd_dodelete(int fd, int do_close)
|
|||||||
{
|
{
|
||||||
int locked = fdtab[fd].running_mask != tid_bit;
|
int locked = fdtab[fd].running_mask != tid_bit;
|
||||||
|
|
||||||
|
/* We're just trying to protect against a concurrent fd_insert()
|
||||||
|
* here, not against fd_takeother(), because either we're called
|
||||||
|
* directly from the iocb(), and we're already locked, or we're
|
||||||
|
* called from the mux tasklet, but then the mux is responsible for
|
||||||
|
* making sure the tasklet does nothing, and the connection is never
|
||||||
|
* destroyed.
|
||||||
|
*/
|
||||||
if (locked)
|
if (locked)
|
||||||
fd_set_running_excl(fd);
|
fd_set_running_excl(fd);
|
||||||
|
|
||||||
@ -328,6 +335,66 @@ static void fd_dodelete(int fd, int do_close)
|
|||||||
fd_clr_running(fd);
|
fd_clr_running(fd);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifndef HA_HAVE_CAS_DW
|
||||||
|
__decl_hathreads(__delc_rwlock(fd_mig_lock));
|
||||||
|
#endif
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Take over a FD belonging to another thread.
|
||||||
|
* unexpected_conn is the expected owner of the fd.
|
||||||
|
* Returns 0 on success, and -1 on failure.
|
||||||
|
*/
|
||||||
|
int fd_takeover(int fd, void *expected_owner)
|
||||||
|
{
|
||||||
|
#ifndef HA_HAVE_CAS_DW
|
||||||
|
int ret;
|
||||||
|
|
||||||
|
HA_RWLOCK_WRLOCK(OTHER_LOCK, &fd_mig_lock);
|
||||||
|
_HA_ATOMIC_OR(&fdtab[fd].running_mask, tid_bit);
|
||||||
|
if (fdtab[fd].running_mask != tid_bit || fdtab[fd].owner != expected_owner) {
|
||||||
|
ret = -1;
|
||||||
|
_HA_ATOMIC_AND(&fdtab[fd].running_mask, ~tid_bit);
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
fdtab[fd].thread_mask = tid_bit;
|
||||||
|
_HA_ATOMIC_AND(&fdtab[fd].running_mask, ~tid_bit);
|
||||||
|
ret = 0;
|
||||||
|
end:
|
||||||
|
HA_RWLOCK_WRUNLOCK(OTHER_LOCK, &fd_mig_lock);
|
||||||
|
/* Make sure the FD doesn't have the active bit. It is possible that
|
||||||
|
* the fd is polled by the thread that used to own it, the new thread
|
||||||
|
* is supposed to call subscribe() later, to activate polling.
|
||||||
|
*/
|
||||||
|
fd_stop_recv(fd);
|
||||||
|
return ret;
|
||||||
|
#else
|
||||||
|
unsigned long old_masks[2];
|
||||||
|
unsigned long new_masks[2];
|
||||||
|
|
||||||
|
old_masks[0] = tid_bit;
|
||||||
|
old_masks[1] = fdtab[fd].thread_mask;
|
||||||
|
new_masks[0] = new_masks[1] = tid_bit;
|
||||||
|
/* protect ourself against a delete then an insert for the same fd,
|
||||||
|
* if it happens, then the owner will no longer be the expected
|
||||||
|
* connection.
|
||||||
|
*/
|
||||||
|
_HA_ATOMIC_OR(&fdtab[fd].running_mask, tid_bit);
|
||||||
|
if (fdtab[fd].owner != expected_owner) {
|
||||||
|
_HA_ATOMIC_AND(&fdtab[fd].running_mask, ~tid_bit);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
do {
|
||||||
|
if (old_masks[0] != tid_bit || !old_masks[1]) {
|
||||||
|
_HA_ATOMIC_AND(&fdtab[fd].running_mask, ~tid_bit);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
} while (!(_HA_ATOMIC_DWCAS(&fdtab[fd].running_mask, &old_masks,
|
||||||
|
&new_masks)));
|
||||||
|
_HA_ATOMIC_AND(&fdtab[fd].running_mask, ~tid_bit);
|
||||||
|
return 0;
|
||||||
|
#endif /* HW_HAVE_CAS_DW */
|
||||||
|
}
|
||||||
|
|
||||||
/* Deletes an FD from the fdsets.
|
/* Deletes an FD from the fdsets.
|
||||||
* The file descriptor is also closed.
|
* The file descriptor is also closed.
|
||||||
*/
|
*/
|
||||||
|
Loading…
Reference in New Issue
Block a user