diff --git a/include/common/hathreads.h b/include/common/hathreads.h index 1dabf3cc2..a8fdf150a 100644 --- a/include/common/hathreads.h +++ b/include/common/hathreads.h @@ -259,7 +259,6 @@ int thread_need_sync(void); /* WARNING!!! if you update this enum, please also keep lock_label() up to date below */ enum lock_label { THREAD_SYNC_LOCK = 0, - FDCACHE_LOCK, FD_LOCK, TASK_RQ_LOCK, TASK_WQ_LOCK, @@ -376,7 +375,6 @@ static inline const char *lock_label(enum lock_label label) { switch (label) { case THREAD_SYNC_LOCK: return "THREAD_SYNC"; - case FDCACHE_LOCK: return "FDCACHE"; case FD_LOCK: return "FD"; case TASK_RQ_LOCK: return "TASK_RQ"; case TASK_WQ_LOCK: return "TASK_WQ"; diff --git a/include/proto/fd.h b/include/proto/fd.h index a7e70b7fd..595af90f2 100644 --- a/include/proto/fd.h +++ b/include/proto/fd.h @@ -33,8 +33,9 @@ /* public variables */ -extern unsigned int *fd_cache; // FD events cache -extern int fd_cache_num; // number of events in the cache +extern volatile struct fdlist fd_cache; +extern volatile struct fdlist fd_cache_local[MAX_THREADS]; + extern unsigned long fd_cache_mask; // Mask of threads with events in the cache extern THREAD_LOCAL int *fd_updt; // FD updates list @@ -105,44 +106,223 @@ static inline void updt_fd_polling(const int fd) } +#define _GET_NEXT(fd) fdtab[fd].fdcache_entry.next +#define _GET_PREV(fd) fdtab[fd].fdcache_entry.prev + +static inline void fd_add_to_fd_list(volatile struct fdlist *list, int fd) +{ + int next; + int new; + int old; + int last; + +redo_next: + next = _GET_NEXT(fd); + /* + * Check that we're not already in the cache, and if not, lock us. + * <= -3 means not in the cache, -2 means locked, -1 means we're + * in the cache, and the last element, >= 0 gives the FD of the next + * in the cache. + */ + if (next >= -2) + goto done; + if (!HA_ATOMIC_CAS(&_GET_NEXT(fd), &next, -2)) + goto redo_next; + __ha_barrier_store(); +redo_last: + /* First, insert in the linked list */ + last = list->last; + old = -1; + new = fd; + if (unlikely(last == -1)) { + /* list is empty, try to add ourselves alone so that list->last=fd */ + + _GET_PREV(fd) = last; + + /* Make sure the "prev" store is visible before we update the last entry */ + __ha_barrier_store(); + if (unlikely(!HA_ATOMIC_CAS(&list->last, &old, new))) + goto redo_last; + + /* list->first was necessary -1, we're guaranteed to be alone here */ + list->first = fd; + + /* since we're alone at the end of the list and still locked(-2), + * we know noone tried to add past us. Mark the end of list. + */ + _GET_NEXT(fd) = -1; + goto done; /* We're done ! */ + } else { + /* non-empty list, add past the tail */ + do { + new = fd; + old = -1; + _GET_PREV(fd) = last; + + __ha_barrier_store(); + + /* adding ourselves past the last element + * The CAS will only succeed if its next is -1, + * which means it's in the cache, and the last element. + */ + if (likely(HA_ATOMIC_CAS(&_GET_NEXT(last), &old, new))) + break; + goto redo_last; + } while (1); + } + /* Then, update the last entry */ +redo_fd_cache: + last = list->last; + __ha_barrier_load(); + + if (unlikely(!HA_ATOMIC_CAS(&list->last, &last, fd))) + goto redo_fd_cache; + __ha_barrier_store(); + _GET_NEXT(fd) = -1; + __ha_barrier_store(); +done: + return; +} + /* Allocates a cache entry for a file descriptor if it does not yet have one. * This can be done at any time. */ static inline void fd_alloc_cache_entry(const int fd) { - HA_RWLOCK_WRLOCK(FDCACHE_LOCK, &fdcache_lock); - if (fdtab[fd].cache) - goto end; - fd_cache_num++; - fd_cache_mask |= fdtab[fd].thread_mask; - fdtab[fd].cache = fd_cache_num; - fd_cache[fd_cache_num-1] = fd; - end: - HA_RWLOCK_WRUNLOCK(FDCACHE_LOCK, &fdcache_lock); + if (!(fdtab[fd].thread_mask & (fdtab[fd].thread_mask - 1))) + fd_add_to_fd_list(&fd_cache_local[my_ffsl(fdtab[fd].thread_mask) - 1], fd); + else + fd_add_to_fd_list(&fd_cache, fd); + } + +static inline void fd_rm_from_fd_list(volatile struct fdlist *list, int fd) +{ +#if defined(HA_HAVE_CAS_DW) || defined(HA_CAS_IS_8B) + volatile struct fdlist_entry cur_list, next_list; +#endif + int old; + int new = -2; + volatile int prev; + volatile int next; + int last; + +lock_self: +#if (defined(HA_CAS_IS_8B) || defined(HA_HAVE_CAS_DW)) + next_list.next = next_list.prev = -2; + cur_list.prev = _GET_PREV(fd); + cur_list.next = _GET_NEXT(fd); + /* First, attempt to lock our own entries */ + do { + /* The FD is not in the FD cache, give up */ + if (unlikely(cur_list.next <= -3)) + return; + if (unlikely(cur_list.prev == -2 || cur_list.next == -2)) + goto lock_self; + } while ( +#ifdef HA_CAS_IS_8B + unlikely(!HA_ATOMIC_CAS(((void **)(void *)&_GET_NEXT(fd)), ((void **)(void *)&cur_list), (*(void **)(void *)&next_list)))) +#else + unlikely(!__ha_cas_dw((void *)&_GET_NEXT(fd), (void *)&cur_list, (void *)&next_list))) +#endif + ; + next = cur_list.next; + prev = cur_list.prev; + +#else +lock_self_next: + next = _GET_NEXT(fd); + if (next == -2) + goto lock_self_next; + if (next <= -3) + goto done; + if (unlikely(!HA_ATOMIC_CAS(&_GET_NEXT(fd), &next, -2))) + goto lock_self_next; +lock_self_prev: + prev = _GET_PREV(fd); + if (prev == -2) + goto lock_self_prev; + if (unlikely(!HA_ATOMIC_CAS(&_GET_PREV(fd), &prev, -2))) + goto lock_self_prev; +#endif + __ha_barrier_store(); + + /* Now, lock the entries of our neighbours */ + if (likely(prev != -1)) { +redo_prev: + old = fd; + + if (unlikely(!HA_ATOMIC_CAS(&_GET_NEXT(prev), &old, new))) { + if (unlikely(old == -2)) { + /* Neighbour already locked, give up and + * retry again once he's done + */ + _GET_PREV(fd) = prev; + __ha_barrier_store(); + _GET_NEXT(fd) = next; + __ha_barrier_store(); + goto lock_self; + } + goto redo_prev; + } + } + if (likely(next != -1)) { +redo_next: + old = fd; + if (unlikely(!HA_ATOMIC_CAS(&_GET_PREV(next), &old, new))) { + if (unlikely(old == -2)) { + /* Neighbour already locked, give up and + * retry again once he's done + */ + if (prev != -1) { + _GET_NEXT(prev) = fd; + __ha_barrier_store(); + } + _GET_PREV(fd) = prev; + __ha_barrier_store(); + _GET_NEXT(fd) = next; + __ha_barrier_store(); + goto lock_self; + } + goto redo_next; + } + } + if (list->first == fd) + list->first = next; + __ha_barrier_store(); + last = list->last; + while (unlikely(last == fd && (!HA_ATOMIC_CAS(&list->last, &last, prev)))) + __ha_compiler_barrier(); + /* Make sure we let other threads know we're no longer in cache, + * before releasing our neighbours. + */ + __ha_barrier_store(); + if (likely(prev != -1)) + _GET_NEXT(prev) = next; + __ha_barrier_store(); + if (likely(next != -1)) + _GET_PREV(next) = prev; + __ha_barrier_store(); + /* Ok, now we're out of the fd cache */ + _GET_NEXT(fd) = -(next + 4); + __ha_barrier_store(); +done: + return; } +#undef _GET_NEXT +#undef _GET_PREV + + /* Removes entry used by fd from the FD cache and replaces it with the - * last one. The fdtab.cache is adjusted to match the back reference if needed. + * last one. * If the fd has no entry assigned, return immediately. */ static inline void fd_release_cache_entry(int fd) { - unsigned int pos; - - HA_RWLOCK_WRLOCK(FDCACHE_LOCK, &fdcache_lock); - pos = fdtab[fd].cache; - if (!pos) - goto end; - fdtab[fd].cache = 0; - fd_cache_num--; - if (likely(pos <= fd_cache_num)) { - /* was not the last entry */ - fd = fd_cache[fd_cache_num]; - fd_cache[pos - 1] = fd; - fdtab[fd].cache = pos; - } - end: - HA_RWLOCK_WRUNLOCK(FDCACHE_LOCK, &fdcache_lock); + if (!(fdtab[fd].thread_mask & (fdtab[fd].thread_mask - 1))) + fd_rm_from_fd_list(&fd_cache_local[my_ffsl(fdtab[fd].thread_mask) - 1], fd); + else + fd_rm_from_fd_list(&fd_cache, fd); } /* Computes the new polled status based on the active and ready statuses, for @@ -402,7 +582,6 @@ static inline void fd_insert(int fd, void *owner, void (*iocb)(int fd), unsigned fdtab[fd].update_mask &= ~tid_bit; fdtab[fd].linger_risk = 0; fdtab[fd].cloned = 0; - fdtab[fd].cache = 0; fdtab[fd].thread_mask = thread_mask; /* note: do not reset polled_mask here as it indicates which poller * still knows this FD from a possible previous round. diff --git a/include/types/fd.h b/include/types/fd.h index e04ea675f..8edf26bf1 100644 --- a/include/types/fd.h +++ b/include/types/fd.h @@ -90,15 +90,25 @@ enum fd_states { */ #define DEAD_FD_MAGIC 0xFDDEADFD +struct fdlist_entry { + volatile int next; + volatile int prev; +} __attribute__ ((aligned(8))); + +struct fdlist { + volatile int first; + volatile int last; +} __attribute__ ((aligned(8))); + /* info about one given fd */ struct fdtab { __decl_hathreads(HA_SPINLOCK_T lock); unsigned long thread_mask; /* mask of thread IDs authorized to process the task */ unsigned long polled_mask; /* mask of thread IDs currently polling this fd */ unsigned long update_mask; /* mask of thread IDs having an update for fd */ + struct fdlist_entry fdcache_entry; /* Entry in the fdcache */ void (*iocb)(int fd); /* I/O handler */ void *owner; /* the connection or listener associated with this fd, NULL if closed */ - unsigned int cache; /* position+1 in the FD cache. 0=not in cache. */ unsigned char state; /* FD state for read and write directions (2*3 bits) */ unsigned char ev; /* event seen in return of poll() : FD_POLL_* */ unsigned char linger_risk:1; /* 1 if we must kill lingering before closing */ diff --git a/src/cli.c b/src/cli.c index ed8cc5bff..85d35678b 100644 --- a/src/cli.c +++ b/src/cli.c @@ -811,7 +811,7 @@ static int cli_io_handler_show_fd(struct appctx *appctx) (fdt.ev & FD_POLL_IN) ? 'I' : 'i', fdt.linger_risk ? 'L' : 'l', fdt.cloned ? 'C' : 'c', - fdt.cache, + fdt.fdcache_entry.next >= -2 ? 1 : 0, fdt.owner, fdt.iocb, (fdt.iocb == conn_fd_handler) ? "conn_fd_handler" : diff --git a/src/fd.c b/src/fd.c index 0995040d6..2cd79fb9f 100644 --- a/src/fd.c +++ b/src/fd.c @@ -167,15 +167,14 @@ struct poller pollers[MAX_POLLERS]; struct poller cur_poller; int nbpollers = 0; -unsigned int *fd_cache = NULL; // FD events cache -int fd_cache_num = 0; // number of events in the cache +volatile struct fdlist fd_cache ; // FD events cache +volatile struct fdlist fd_cache_local[MAX_THREADS]; // FD events local for each thread + unsigned long fd_cache_mask = 0; // Mask of threads with events in the cache THREAD_LOCAL int *fd_updt = NULL; // FD updates list THREAD_LOCAL int fd_nbupdt = 0; // number of updates in the list -__decl_hathreads(HA_RWLOCK_T fdcache_lock); /* global lock to protect fd_cache array */ - /* Deletes an FD from the fdsets. * The file descriptor is also closed. */ @@ -221,33 +220,30 @@ void fd_remove(int fd) fd_dodelete(fd, 0); } -/* Scan and process the cached events. This should be called right after - * the poller. The loop may cause new entries to be created, for example - * if a listener causes an accept() to initiate a new incoming connection - * wanting to attempt an recv(). - */ -void fd_process_cached_events() +static inline void fdlist_process_cached_events(volatile struct fdlist *fdlist) { - int fd, entry, e; + int fd, old_fd, e; - HA_RWLOCK_RDLOCK(FDCACHE_LOCK, &fdcache_lock); - fd_cache_mask &= ~tid_bit; - for (entry = 0; entry < fd_cache_num; ) { - fd = fd_cache[entry]; + for (old_fd = fd = fdlist->first; fd != -1; fd = fdtab[fd].fdcache_entry.next) { + if (fd == -2) { + fd = old_fd; + continue; + } else if (fd <= -3) + fd = -fd - 4; + if (fd == -1) + break; + old_fd = fd; + if (!(fdtab[fd].thread_mask & tid_bit)) + continue; + if (fdtab[fd].fdcache_entry.next < -3) + continue; - if (!(fdtab[fd].thread_mask & tid_bit)) { - activity[tid].fd_skip++; - goto next; - } - - fd_cache_mask |= tid_bit; + HA_ATOMIC_OR(&fd_cache_mask, tid_bit); if (HA_SPIN_TRYLOCK(FD_LOCK, &fdtab[fd].lock)) { activity[tid].fd_lock++; - goto next; + continue; } - HA_RWLOCK_RDUNLOCK(FDCACHE_LOCK, &fdcache_lock); - e = fdtab[fd].state; fdtab[fd].ev &= FD_POLL_STICKY; @@ -265,19 +261,19 @@ void fd_process_cached_events() fd_release_cache_entry(fd); HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); } - - HA_RWLOCK_RDLOCK(FDCACHE_LOCK, &fdcache_lock); - /* If the fd was removed from the cache, it has been - * replaced by the next one that we don't want to skip ! - */ - if (entry < fd_cache_num && fd_cache[entry] != fd) { - activity[tid].fd_del++; - continue; - } - next: - entry++; } - HA_RWLOCK_RDUNLOCK(FDCACHE_LOCK, &fdcache_lock); +} + +/* Scan and process the cached events. This should be called right after + * the poller. The loop may cause new entries to be created, for example + * if a listener causes an accept() to initiate a new incoming connection + * wanting to attempt an recv(). + */ +void fd_process_cached_events() +{ + HA_ATOMIC_AND(&fd_cache_mask, ~tid_bit); + fdlist_process_cached_events(&fd_cache_local[tid]); + fdlist_process_cached_events(&fd_cache); } /* disable the specified poller */ @@ -320,16 +316,19 @@ int init_pollers() if ((fdinfo = calloc(global.maxsock, sizeof(struct fdinfo))) == NULL) goto fail_info; - if ((fd_cache = calloc(global.maxsock, sizeof(*fd_cache))) == NULL) - goto fail_cache; - + fd_cache.first = fd_cache.last = -1; hap_register_per_thread_init(init_pollers_per_thread); hap_register_per_thread_deinit(deinit_pollers_per_thread); - for (p = 0; p < global.maxsock; p++) + for (p = 0; p < global.maxsock; p++) { HA_SPIN_INIT(&fdtab[p].lock); + /* Mark the fd as out of the fd cache */ + fdtab[p].fdcache_entry.next = -3; + fdtab[p].fdcache_entry.next = -3; + } + for (p = 0; p < global.nbthread; p++) + fd_cache_local[p].first = fd_cache_local[p].last = -1; - HA_RWLOCK_INIT(&fdcache_lock); do { bp = NULL; for (p = 0; p < nbpollers; p++) @@ -372,11 +371,8 @@ void deinit_pollers() { bp->term(bp); } - free(fd_cache); fd_cache = NULL; free(fdinfo); fdinfo = NULL; free(fdtab); fdtab = NULL; - - HA_RWLOCK_DESTROY(&fdcache_lock); } /* diff --git a/src/stream.c b/src/stream.c index 92f9c0a64..e710d0d39 100644 --- a/src/stream.c +++ b/src/stream.c @@ -2905,7 +2905,7 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st conn->flags, conn->handle.fd, conn->handle.fd >= 0 ? fdtab[conn->handle.fd].state : 0, - conn->handle.fd >= 0 ? fdtab[conn->handle.fd].cache : 0, + conn->handle.fd >= 0 ? fdtab[conn->handle.fd].fdcache_entry.next >= -2 : 0, conn->handle.fd >= 0 ? !!(fdtab[conn->handle.fd].update_mask & tid_bit) : 0, conn->handle.fd >= 0 ? fdtab[conn->handle.fd].thread_mask: 0); } @@ -2938,7 +2938,7 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st conn->flags, conn->handle.fd, conn->handle.fd >= 0 ? fdtab[conn->handle.fd].state : 0, - conn->handle.fd >= 0 ? fdtab[conn->handle.fd].cache : 0, + conn->handle.fd >= 0 ? fdtab[conn->handle.fd].fdcache_entry.next >= -2 : 0, conn->handle.fd >= 0 ? !!(fdtab[conn->handle.fd].update_mask & tid_bit) : 0, conn->handle.fd >= 0 ? fdtab[conn->handle.fd].thread_mask: 0); }