MAJOR: fd/threads: Make the fdcache mostly lockless.

Create a local, per-thread, fdcache, for file descriptors that only belongs
to one thread, and make the global fd cache mostly lockless, as we can get
a lot of contention on the fd cache lock.
This commit is contained in:
Olivier Houchard 2018-01-24 18:17:56 +01:00 committed by Willy Tarreau
parent cf975d46bc
commit 4815c8cbfe
6 changed files with 262 additions and 79 deletions

View File

@ -259,7 +259,6 @@ int thread_need_sync(void);
/* WARNING!!! if you update this enum, please also keep lock_label() up to date below */
enum lock_label {
THREAD_SYNC_LOCK = 0,
FDCACHE_LOCK,
FD_LOCK,
TASK_RQ_LOCK,
TASK_WQ_LOCK,
@ -376,7 +375,6 @@ static inline const char *lock_label(enum lock_label label)
{
switch (label) {
case THREAD_SYNC_LOCK: return "THREAD_SYNC";
case FDCACHE_LOCK: return "FDCACHE";
case FD_LOCK: return "FD";
case TASK_RQ_LOCK: return "TASK_RQ";
case TASK_WQ_LOCK: return "TASK_WQ";

View File

@ -33,8 +33,9 @@
/* public variables */
extern unsigned int *fd_cache; // FD events cache
extern int fd_cache_num; // number of events in the cache
extern volatile struct fdlist fd_cache;
extern volatile struct fdlist fd_cache_local[MAX_THREADS];
extern unsigned long fd_cache_mask; // Mask of threads with events in the cache
extern THREAD_LOCAL int *fd_updt; // FD updates list
@ -105,44 +106,223 @@ static inline void updt_fd_polling(const int fd)
}
#define _GET_NEXT(fd) fdtab[fd].fdcache_entry.next
#define _GET_PREV(fd) fdtab[fd].fdcache_entry.prev
static inline void fd_add_to_fd_list(volatile struct fdlist *list, int fd)
{
int next;
int new;
int old;
int last;
redo_next:
next = _GET_NEXT(fd);
/*
* Check that we're not already in the cache, and if not, lock us.
* <= -3 means not in the cache, -2 means locked, -1 means we're
* in the cache, and the last element, >= 0 gives the FD of the next
* in the cache.
*/
if (next >= -2)
goto done;
if (!HA_ATOMIC_CAS(&_GET_NEXT(fd), &next, -2))
goto redo_next;
__ha_barrier_store();
redo_last:
/* First, insert in the linked list */
last = list->last;
old = -1;
new = fd;
if (unlikely(last == -1)) {
/* list is empty, try to add ourselves alone so that list->last=fd */
_GET_PREV(fd) = last;
/* Make sure the "prev" store is visible before we update the last entry */
__ha_barrier_store();
if (unlikely(!HA_ATOMIC_CAS(&list->last, &old, new)))
goto redo_last;
/* list->first was necessary -1, we're guaranteed to be alone here */
list->first = fd;
/* since we're alone at the end of the list and still locked(-2),
* we know noone tried to add past us. Mark the end of list.
*/
_GET_NEXT(fd) = -1;
goto done; /* We're done ! */
} else {
/* non-empty list, add past the tail */
do {
new = fd;
old = -1;
_GET_PREV(fd) = last;
__ha_barrier_store();
/* adding ourselves past the last element
* The CAS will only succeed if its next is -1,
* which means it's in the cache, and the last element.
*/
if (likely(HA_ATOMIC_CAS(&_GET_NEXT(last), &old, new)))
break;
goto redo_last;
} while (1);
}
/* Then, update the last entry */
redo_fd_cache:
last = list->last;
__ha_barrier_load();
if (unlikely(!HA_ATOMIC_CAS(&list->last, &last, fd)))
goto redo_fd_cache;
__ha_barrier_store();
_GET_NEXT(fd) = -1;
__ha_barrier_store();
done:
return;
}
/* Allocates a cache entry for a file descriptor if it does not yet have one.
* This can be done at any time.
*/
static inline void fd_alloc_cache_entry(const int fd)
{
HA_RWLOCK_WRLOCK(FDCACHE_LOCK, &fdcache_lock);
if (fdtab[fd].cache)
goto end;
fd_cache_num++;
fd_cache_mask |= fdtab[fd].thread_mask;
fdtab[fd].cache = fd_cache_num;
fd_cache[fd_cache_num-1] = fd;
end:
HA_RWLOCK_WRUNLOCK(FDCACHE_LOCK, &fdcache_lock);
if (!(fdtab[fd].thread_mask & (fdtab[fd].thread_mask - 1)))
fd_add_to_fd_list(&fd_cache_local[my_ffsl(fdtab[fd].thread_mask) - 1], fd);
else
fd_add_to_fd_list(&fd_cache, fd);
}
static inline void fd_rm_from_fd_list(volatile struct fdlist *list, int fd)
{
#if defined(HA_HAVE_CAS_DW) || defined(HA_CAS_IS_8B)
volatile struct fdlist_entry cur_list, next_list;
#endif
int old;
int new = -2;
volatile int prev;
volatile int next;
int last;
lock_self:
#if (defined(HA_CAS_IS_8B) || defined(HA_HAVE_CAS_DW))
next_list.next = next_list.prev = -2;
cur_list.prev = _GET_PREV(fd);
cur_list.next = _GET_NEXT(fd);
/* First, attempt to lock our own entries */
do {
/* The FD is not in the FD cache, give up */
if (unlikely(cur_list.next <= -3))
return;
if (unlikely(cur_list.prev == -2 || cur_list.next == -2))
goto lock_self;
} while (
#ifdef HA_CAS_IS_8B
unlikely(!HA_ATOMIC_CAS(((void **)(void *)&_GET_NEXT(fd)), ((void **)(void *)&cur_list), (*(void **)(void *)&next_list))))
#else
unlikely(!__ha_cas_dw((void *)&_GET_NEXT(fd), (void *)&cur_list, (void *)&next_list)))
#endif
;
next = cur_list.next;
prev = cur_list.prev;
#else
lock_self_next:
next = _GET_NEXT(fd);
if (next == -2)
goto lock_self_next;
if (next <= -3)
goto done;
if (unlikely(!HA_ATOMIC_CAS(&_GET_NEXT(fd), &next, -2)))
goto lock_self_next;
lock_self_prev:
prev = _GET_PREV(fd);
if (prev == -2)
goto lock_self_prev;
if (unlikely(!HA_ATOMIC_CAS(&_GET_PREV(fd), &prev, -2)))
goto lock_self_prev;
#endif
__ha_barrier_store();
/* Now, lock the entries of our neighbours */
if (likely(prev != -1)) {
redo_prev:
old = fd;
if (unlikely(!HA_ATOMIC_CAS(&_GET_NEXT(prev), &old, new))) {
if (unlikely(old == -2)) {
/* Neighbour already locked, give up and
* retry again once he's done
*/
_GET_PREV(fd) = prev;
__ha_barrier_store();
_GET_NEXT(fd) = next;
__ha_barrier_store();
goto lock_self;
}
goto redo_prev;
}
}
if (likely(next != -1)) {
redo_next:
old = fd;
if (unlikely(!HA_ATOMIC_CAS(&_GET_PREV(next), &old, new))) {
if (unlikely(old == -2)) {
/* Neighbour already locked, give up and
* retry again once he's done
*/
if (prev != -1) {
_GET_NEXT(prev) = fd;
__ha_barrier_store();
}
_GET_PREV(fd) = prev;
__ha_barrier_store();
_GET_NEXT(fd) = next;
__ha_barrier_store();
goto lock_self;
}
goto redo_next;
}
}
if (list->first == fd)
list->first = next;
__ha_barrier_store();
last = list->last;
while (unlikely(last == fd && (!HA_ATOMIC_CAS(&list->last, &last, prev))))
__ha_compiler_barrier();
/* Make sure we let other threads know we're no longer in cache,
* before releasing our neighbours.
*/
__ha_barrier_store();
if (likely(prev != -1))
_GET_NEXT(prev) = next;
__ha_barrier_store();
if (likely(next != -1))
_GET_PREV(next) = prev;
__ha_barrier_store();
/* Ok, now we're out of the fd cache */
_GET_NEXT(fd) = -(next + 4);
__ha_barrier_store();
done:
return;
}
#undef _GET_NEXT
#undef _GET_PREV
/* Removes entry used by fd <fd> from the FD cache and replaces it with the
* last one. The fdtab.cache is adjusted to match the back reference if needed.
* last one.
* If the fd has no entry assigned, return immediately.
*/
static inline void fd_release_cache_entry(int fd)
{
unsigned int pos;
HA_RWLOCK_WRLOCK(FDCACHE_LOCK, &fdcache_lock);
pos = fdtab[fd].cache;
if (!pos)
goto end;
fdtab[fd].cache = 0;
fd_cache_num--;
if (likely(pos <= fd_cache_num)) {
/* was not the last entry */
fd = fd_cache[fd_cache_num];
fd_cache[pos - 1] = fd;
fdtab[fd].cache = pos;
}
end:
HA_RWLOCK_WRUNLOCK(FDCACHE_LOCK, &fdcache_lock);
if (!(fdtab[fd].thread_mask & (fdtab[fd].thread_mask - 1)))
fd_rm_from_fd_list(&fd_cache_local[my_ffsl(fdtab[fd].thread_mask) - 1], fd);
else
fd_rm_from_fd_list(&fd_cache, fd);
}
/* Computes the new polled status based on the active and ready statuses, for
@ -402,7 +582,6 @@ static inline void fd_insert(int fd, void *owner, void (*iocb)(int fd), unsigned
fdtab[fd].update_mask &= ~tid_bit;
fdtab[fd].linger_risk = 0;
fdtab[fd].cloned = 0;
fdtab[fd].cache = 0;
fdtab[fd].thread_mask = thread_mask;
/* note: do not reset polled_mask here as it indicates which poller
* still knows this FD from a possible previous round.

View File

@ -90,15 +90,25 @@ enum fd_states {
*/
#define DEAD_FD_MAGIC 0xFDDEADFD
struct fdlist_entry {
volatile int next;
volatile int prev;
} __attribute__ ((aligned(8)));
struct fdlist {
volatile int first;
volatile int last;
} __attribute__ ((aligned(8)));
/* info about one given fd */
struct fdtab {
__decl_hathreads(HA_SPINLOCK_T lock);
unsigned long thread_mask; /* mask of thread IDs authorized to process the task */
unsigned long polled_mask; /* mask of thread IDs currently polling this fd */
unsigned long update_mask; /* mask of thread IDs having an update for fd */
struct fdlist_entry fdcache_entry; /* Entry in the fdcache */
void (*iocb)(int fd); /* I/O handler */
void *owner; /* the connection or listener associated with this fd, NULL if closed */
unsigned int cache; /* position+1 in the FD cache. 0=not in cache. */
unsigned char state; /* FD state for read and write directions (2*3 bits) */
unsigned char ev; /* event seen in return of poll() : FD_POLL_* */
unsigned char linger_risk:1; /* 1 if we must kill lingering before closing */

View File

@ -811,7 +811,7 @@ static int cli_io_handler_show_fd(struct appctx *appctx)
(fdt.ev & FD_POLL_IN) ? 'I' : 'i',
fdt.linger_risk ? 'L' : 'l',
fdt.cloned ? 'C' : 'c',
fdt.cache,
fdt.fdcache_entry.next >= -2 ? 1 : 0,
fdt.owner,
fdt.iocb,
(fdt.iocb == conn_fd_handler) ? "conn_fd_handler" :

View File

@ -167,15 +167,14 @@ struct poller pollers[MAX_POLLERS];
struct poller cur_poller;
int nbpollers = 0;
unsigned int *fd_cache = NULL; // FD events cache
int fd_cache_num = 0; // number of events in the cache
volatile struct fdlist fd_cache ; // FD events cache
volatile struct fdlist fd_cache_local[MAX_THREADS]; // FD events local for each thread
unsigned long fd_cache_mask = 0; // Mask of threads with events in the cache
THREAD_LOCAL int *fd_updt = NULL; // FD updates list
THREAD_LOCAL int fd_nbupdt = 0; // number of updates in the list
__decl_hathreads(HA_RWLOCK_T fdcache_lock); /* global lock to protect fd_cache array */
/* Deletes an FD from the fdsets.
* The file descriptor is also closed.
*/
@ -221,33 +220,30 @@ void fd_remove(int fd)
fd_dodelete(fd, 0);
}
/* Scan and process the cached events. This should be called right after
* the poller. The loop may cause new entries to be created, for example
* if a listener causes an accept() to initiate a new incoming connection
* wanting to attempt an recv().
*/
void fd_process_cached_events()
static inline void fdlist_process_cached_events(volatile struct fdlist *fdlist)
{
int fd, entry, e;
int fd, old_fd, e;
HA_RWLOCK_RDLOCK(FDCACHE_LOCK, &fdcache_lock);
fd_cache_mask &= ~tid_bit;
for (entry = 0; entry < fd_cache_num; ) {
fd = fd_cache[entry];
for (old_fd = fd = fdlist->first; fd != -1; fd = fdtab[fd].fdcache_entry.next) {
if (fd == -2) {
fd = old_fd;
continue;
} else if (fd <= -3)
fd = -fd - 4;
if (fd == -1)
break;
old_fd = fd;
if (!(fdtab[fd].thread_mask & tid_bit))
continue;
if (fdtab[fd].fdcache_entry.next < -3)
continue;
if (!(fdtab[fd].thread_mask & tid_bit)) {
activity[tid].fd_skip++;
goto next;
}
fd_cache_mask |= tid_bit;
HA_ATOMIC_OR(&fd_cache_mask, tid_bit);
if (HA_SPIN_TRYLOCK(FD_LOCK, &fdtab[fd].lock)) {
activity[tid].fd_lock++;
goto next;
continue;
}
HA_RWLOCK_RDUNLOCK(FDCACHE_LOCK, &fdcache_lock);
e = fdtab[fd].state;
fdtab[fd].ev &= FD_POLL_STICKY;
@ -265,19 +261,19 @@ void fd_process_cached_events()
fd_release_cache_entry(fd);
HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
HA_RWLOCK_RDLOCK(FDCACHE_LOCK, &fdcache_lock);
/* If the fd was removed from the cache, it has been
* replaced by the next one that we don't want to skip !
*/
if (entry < fd_cache_num && fd_cache[entry] != fd) {
activity[tid].fd_del++;
continue;
}
next:
entry++;
}
HA_RWLOCK_RDUNLOCK(FDCACHE_LOCK, &fdcache_lock);
}
/* Scan and process the cached events. This should be called right after
* the poller. The loop may cause new entries to be created, for example
* if a listener causes an accept() to initiate a new incoming connection
* wanting to attempt an recv().
*/
void fd_process_cached_events()
{
HA_ATOMIC_AND(&fd_cache_mask, ~tid_bit);
fdlist_process_cached_events(&fd_cache_local[tid]);
fdlist_process_cached_events(&fd_cache);
}
/* disable the specified poller */
@ -320,16 +316,19 @@ int init_pollers()
if ((fdinfo = calloc(global.maxsock, sizeof(struct fdinfo))) == NULL)
goto fail_info;
if ((fd_cache = calloc(global.maxsock, sizeof(*fd_cache))) == NULL)
goto fail_cache;
fd_cache.first = fd_cache.last = -1;
hap_register_per_thread_init(init_pollers_per_thread);
hap_register_per_thread_deinit(deinit_pollers_per_thread);
for (p = 0; p < global.maxsock; p++)
for (p = 0; p < global.maxsock; p++) {
HA_SPIN_INIT(&fdtab[p].lock);
/* Mark the fd as out of the fd cache */
fdtab[p].fdcache_entry.next = -3;
fdtab[p].fdcache_entry.next = -3;
}
for (p = 0; p < global.nbthread; p++)
fd_cache_local[p].first = fd_cache_local[p].last = -1;
HA_RWLOCK_INIT(&fdcache_lock);
do {
bp = NULL;
for (p = 0; p < nbpollers; p++)
@ -372,11 +371,8 @@ void deinit_pollers() {
bp->term(bp);
}
free(fd_cache); fd_cache = NULL;
free(fdinfo); fdinfo = NULL;
free(fdtab); fdtab = NULL;
HA_RWLOCK_DESTROY(&fdcache_lock);
}
/*

View File

@ -2905,7 +2905,7 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st
conn->flags,
conn->handle.fd,
conn->handle.fd >= 0 ? fdtab[conn->handle.fd].state : 0,
conn->handle.fd >= 0 ? fdtab[conn->handle.fd].cache : 0,
conn->handle.fd >= 0 ? fdtab[conn->handle.fd].fdcache_entry.next >= -2 : 0,
conn->handle.fd >= 0 ? !!(fdtab[conn->handle.fd].update_mask & tid_bit) : 0,
conn->handle.fd >= 0 ? fdtab[conn->handle.fd].thread_mask: 0);
}
@ -2938,7 +2938,7 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st
conn->flags,
conn->handle.fd,
conn->handle.fd >= 0 ? fdtab[conn->handle.fd].state : 0,
conn->handle.fd >= 0 ? fdtab[conn->handle.fd].cache : 0,
conn->handle.fd >= 0 ? fdtab[conn->handle.fd].fdcache_entry.next >= -2 : 0,
conn->handle.fd >= 0 ? !!(fdtab[conn->handle.fd].update_mask & tid_bit) : 0,
conn->handle.fd >= 0 ? fdtab[conn->handle.fd].thread_mask: 0);
}