MEDIUM: fd: Introduce a running mask, and use it instead of the spinlock.

In the struct fdtab, introduce a new mask, running_mask. Each thread should
add its bit before using the fd.
Use the running_mask instead of a lock, in fd_insert/fd_delete, we'll just
spin as long as the mask is non-zero, to be sure we access the data
exclusively.
fd_set_running_excl() spins until the mask is 0, fd_set_running() just
adds the thread bit, and fd_clr_running() removes it.
This commit is contained in:
Olivier Houchard 2020-02-27 17:26:13 +01:00 committed by Olivier Houchard
parent 2ea1b49832
commit a7bf573520
4 changed files with 30 additions and 15 deletions

View File

@ -529,7 +529,6 @@ static inline unsigned long thread_isolated()
/* WARNING!!! if you update this enum, please also keep lock_label() up to date below */
enum lock_label {
FD_LOCK,
TASK_RQ_LOCK,
TASK_WQ_LOCK,
POOL_LOCK,
@ -647,7 +646,6 @@ struct ha_rwlock {
static inline const char *lock_label(enum lock_label label)
{
switch (label) {
case FD_LOCK: return "FD";
case TASK_RQ_LOCK: return "TASK_RQ";
case TASK_WQ_LOCK: return "TASK_WQ";
case POOL_LOCK: return "POOL";

View File

@ -299,6 +299,23 @@ static inline void fd_want_send(int fd)
updt_fd_polling(fd);
}
static inline void fd_set_running(int fd)
{
_HA_ATOMIC_OR(&fdtab[fd].running_mask, tid_bit);
}
static inline void fd_set_running_excl(int fd)
{
unsigned long old_mask = 0;
while (!_HA_ATOMIC_CAS(&fdtab[fd].running_mask, &old_mask, tid_bit));
}
static inline void fd_clr_running(int fd)
{
_HA_ATOMIC_AND(&fdtab[fd].running_mask, ~tid_bit);
}
/* Update events seen for FD <fd> and its state if needed. This should be
* called by the poller, passing FD_EV_*_{R,W,RW} in <evts>. FD_EV_ERR_*
* doesn't need to also pass FD_EV_SHUT_*, it's implied. ERR and SHUT are
@ -353,8 +370,11 @@ static inline void fd_update_events(int fd, unsigned char evts)
if (fdtab[fd].ev & (FD_POLL_OUT | FD_POLL_ERR))
fd_may_send(fd);
if (fdtab[fd].iocb && fd_active(fd))
if (fdtab[fd].iocb && fd_active(fd)) {
fd_set_running(fd);
fdtab[fd].iocb(fd);
fd_clr_running(fd);
}
/* we had to stop this FD and it still must be stopped after the I/O
* cb's changes, so let's program an update for this.
@ -372,10 +392,10 @@ static inline void fd_update_events(int fd, unsigned char evts)
/* Prepares <fd> for being polled */
static inline void fd_insert(int fd, void *owner, void (*iocb)(int fd), unsigned long thread_mask)
{
unsigned long locked = atleast2(thread_mask);
int locked = fdtab[fd].running_mask != tid_bit;
if (locked)
HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
fd_set_running_excl(fd);
fdtab[fd].owner = owner;
fdtab[fd].iocb = iocb;
fdtab[fd].ev = 0;
@ -386,7 +406,7 @@ static inline void fd_insert(int fd, void *owner, void (*iocb)(int fd), unsigned
* still knows this FD from a possible previous round.
*/
if (locked)
HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
fd_clr_running(fd);
/* the two directions are ready until proven otherwise */
fd_may_both(fd);
_HA_ATOMIC_ADD(&ha_used_fds, 1);

View File

@ -118,8 +118,8 @@ struct fdlist {
/* info about one given fd */
struct fdtab {
__decl_hathreads(HA_SPINLOCK_T lock);
unsigned long thread_mask; /* mask of thread IDs authorized to process the task */
unsigned long running_mask; /* mask of thread IDs currntly using the fd */
unsigned long thread_mask; /* mask of thread IDs authorized to process the fd */
unsigned long update_mask; /* mask of thread IDs having an update for fd */
struct fdlist_entry update; /* Entry in the global update list */
void (*iocb)(int fd); /* I/O handler */

View File

@ -300,10 +300,11 @@ done:
*/
static void fd_dodelete(int fd, int do_close)
{
unsigned long locked = atleast2(fdtab[fd].thread_mask);
int locked = fdtab[fd].running_mask != tid_bit;
if (locked)
HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
fd_set_running_excl(fd);
if (fdtab[fd].linger_risk) {
/* this is generally set when connecting to servers */
setsockopt(fd, SOL_SOCKET, SO_LINGER,
@ -324,7 +325,7 @@ static void fd_dodelete(int fd, int do_close)
_HA_ATOMIC_SUB(&ha_used_fds, 1);
}
if (locked)
HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
fd_clr_running(fd);
}
/* Deletes an FD from the fdsets.
@ -602,7 +603,6 @@ int init_pollers()
update_list.first = update_list.last = -1;
for (p = 0; p < global.maxsock; p++) {
HA_SPIN_INIT(&fdtab[p].lock);
/* Mark the fd as out of the fd cache */
fdtab[p].update.next = -3;
}
@ -639,9 +639,6 @@ void deinit_pollers() {
struct poller *bp;
int p;
for (p = 0; p < global.maxsock; p++)
HA_SPIN_DESTROY(&fdtab[p].lock);
for (p = 0; p < nbpollers; p++) {
bp = &pollers[p];