MINOR: listener: implement multi-queue accept for threads

There is one point where we can migrate a connection to another thread
without taking risk, it's when we accept it : the new FD is not yet in
the fd cache and no task was created yet. It's still possible to assign
it a different thread than the one which accepted the connection. The
only requirement for this is to have one accept queue per thread and
their respective processing tasks that have to be woken up each time
an entry is added to the queue.

This is a multiple-producer, single-consumer model. Entries are added
at the queue's tail and the processing task is woken up. The consumer
picks entries at the head and processes them in order. The accept queue
contains the fd, the source address, and the listener. Each entry of
the accept queue was rounded up to 64 bytes (one cache line) to avoid
cache aliasing because tests have shown that otherwise performance
suffers a lot (5%). A test has shown that it's important to have at
least 256 entries for the rings, as at 128 it's still possible to fill
them often at high loads on small thread counts.

The processing task does almost nothing except calling the listener's
accept() function and updating the global session and SSL rate counters
just like listener_accept() does on synchronous calls.

At this point the accept queue is implemented but not used.
This commit is contained in:
Willy Tarreau 2019-01-27 15:37:19 +01:00
parent b2b50a7784
commit 1efafce61f
3 changed files with 192 additions and 0 deletions

View File

@ -169,6 +169,9 @@ static inline const char *listener_state_str(const struct listener *l)
}
extern struct xfer_sock_list *xfer_sock_list;
extern struct accept_queue_ring accept_queue_rings[MAX_THREADS] __attribute__((aligned(64)));
#endif /* _PROTO_LISTENER_H */
/*

View File

@ -272,6 +272,32 @@ struct xfer_sock_list {
struct sockaddr_storage addr;
};
/* This is used to create the accept queue, optimized to be 64 bytes long. */
struct accept_queue_entry {
struct listener *listener; // 8 bytes
int fd __attribute__((aligned(8))); // 4 bytes
int addr_len; // 4 bytes
union {
sa_family_t family; // 2 bytes
struct sockaddr_in in; // 16 bytes
struct sockaddr_in6 in6; // 28 bytes
} addr; // this is normally 28 bytes
/* 20-bytes hole here */
char pad0[0] __attribute((aligned(64)));
};
/* The per-thread accept queue ring, must be a power of two minus 1 */
#define ACCEPT_QUEUE_SIZE ((1<<8) - 1)
struct accept_queue_ring {
unsigned int head;
unsigned int tail;
struct task *task; /* task of the thread owning this ring */
struct accept_queue_entry entry[ACCEPT_QUEUE_SIZE] __attribute((aligned(64)));
};
#endif /* _TYPES_LISTENER_H */
/*

View File

@ -52,6 +52,169 @@ static struct bind_kw_list bind_keywords = {
struct xfer_sock_list *xfer_sock_list = NULL;
#if defined(USE_THREAD)
struct accept_queue_ring accept_queue_rings[MAX_THREADS] __attribute__((aligned(64))) = { };
/* dequeue and process a pending connection from the local accept queue (single
* consumer). Returns the accepted fd or -1 if none was found. The listener is
* placed into *li. The address is copied into *addr for no more than *addr_len
* bytes, and the address length is returned into *addr_len.
*/
int accept_queue_pop_sc(struct accept_queue_ring *ring, struct listener **li, void *addr, int *addr_len)
{
struct accept_queue_entry *e;
unsigned int pos, next;
struct listener *ptr;
int len;
int fd;
pos = ring->head;
if (pos == ring->tail)
return -1;
next = pos + 1;
if (next >= ACCEPT_QUEUE_SIZE)
next = 0;
e = &ring->entry[pos];
/* wait for the producer to update the listener's pointer */
while (1) {
ptr = e->listener;
__ha_barrier_load();
if (ptr)
break;
pl_cpu_relax();
}
fd = e->fd;
len = e->addr_len;
if (len > *addr_len)
len = *addr_len;
if (likely(len > 0))
memcpy(addr, &e->addr, len);
/* release the entry */
e->listener = NULL;
__ha_barrier_store();
ring->head = next;
*addr_len = len;
*li = ptr;
return fd;
}
/* tries to push a new accepted connection <fd> into ring <ring> for listener
* <li>, from address <addr> whose length is <addr_len>. Returns non-zero if it
* succeeds, or zero if the ring is full. Supports multiple producers.
*/
int accept_queue_push_mp(struct accept_queue_ring *ring, int fd,
struct listener *li, const void *addr, int addr_len)
{
struct accept_queue_entry *e;
unsigned int pos, next;
pos = ring->tail;
do {
next = pos + 1;
if (next >= ACCEPT_QUEUE_SIZE)
next = 0;
if (next == ring->head)
return 0; // ring full
} while (unlikely(!HA_ATOMIC_CAS(&ring->tail, &pos, next)));
e = &ring->entry[pos];
if (addr_len > sizeof(e->addr))
addr_len = sizeof(e->addr);
if (addr_len)
memcpy(&e->addr, addr, addr_len);
e->addr_len = addr_len;
e->fd = fd;
__ha_barrier_store();
/* now commit the change */
e->listener = li;
return 1;
}
/* proceed with accepting new connections */
static struct task *accept_queue_process(struct task *t, void *context, unsigned short state)
{
struct accept_queue_ring *ring = context;
struct listener *li;
struct sockaddr_storage addr;
int max_accept = global.tune.maxaccept ? global.tune.maxaccept : 64;
int addr_len;
int ret;
int fd;
while (max_accept--) {
addr_len = sizeof(addr);
fd = accept_queue_pop_sc(ring, &li, &addr, &addr_len);
if (fd < 0)
break;
HA_ATOMIC_ADD(&li->thr_conn[tid], 1);
ret = li->accept(li, fd, &addr);
if (ret <= 0) {
/* connection was terminated by the application */
continue;
}
/* increase the per-process number of cumulated sessions, this
* may only be done once l->accept() has accepted the connection.
*/
if (!(li->options & LI_O_UNLIMITED)) {
HA_ATOMIC_UPDATE_MAX(&global.sps_max,
update_freq_ctr(&global.sess_per_sec, 1));
if (li->bind_conf && li->bind_conf->is_ssl) {
HA_ATOMIC_UPDATE_MAX(&global.ssl_max,
update_freq_ctr(&global.ssl_per_sec, 1));
}
}
}
/* ran out of budget ? Let's come here ASAP */
if (max_accept < 0)
task_wakeup(t, TASK_WOKEN_IO);
return t;
}
/* Initializes the accept-queues. Returns 0 on success, otherwise ERR_* flags */
static int accept_queue_init()
{
struct task *t;
int i;
for (i = 0; i < global.nbthread; i++) {
t = task_new(1UL << i);
if (!t) {
ha_alert("Out of memory while initializing accept queue for thread %d\n", i);
return ERR_FATAL|ERR_ABORT;
}
t->process = accept_queue_process;
t->context = &accept_queue_rings[i];
accept_queue_rings[i].task = t;
}
return 0;
}
REGISTER_CONFIG_POSTPARSER("multi-threaded accept queue", accept_queue_init);
#endif // USE_THREAD
/* This function adds the specified listener's file descriptor to the polling
* lists if it is in the LI_LISTEN state. The listener enters LI_READY or
* LI_FULL state depending on its number of connections. In deamon mode, we