diff --git a/include/proto/listener.h b/include/proto/listener.h index 3a6762ccb..8ec41af11 100644 --- a/include/proto/listener.h +++ b/include/proto/listener.h @@ -169,6 +169,9 @@ static inline const char *listener_state_str(const struct listener *l) } extern struct xfer_sock_list *xfer_sock_list; + +extern struct accept_queue_ring accept_queue_rings[MAX_THREADS] __attribute__((aligned(64))); + #endif /* _PROTO_LISTENER_H */ /* diff --git a/include/types/listener.h b/include/types/listener.h index 8e9db71dc..876d3c372 100644 --- a/include/types/listener.h +++ b/include/types/listener.h @@ -272,6 +272,32 @@ struct xfer_sock_list { struct sockaddr_storage addr; }; +/* This is used to create the accept queue, optimized to be 64 bytes long. */ +struct accept_queue_entry { + struct listener *listener; // 8 bytes + int fd __attribute__((aligned(8))); // 4 bytes + int addr_len; // 4 bytes + + union { + sa_family_t family; // 2 bytes + struct sockaddr_in in; // 16 bytes + struct sockaddr_in6 in6; // 28 bytes + } addr; // this is normally 28 bytes + /* 20-bytes hole here */ + char pad0[0] __attribute((aligned(64))); +}; + +/* The per-thread accept queue ring, must be a power of two minus 1 */ +#define ACCEPT_QUEUE_SIZE ((1<<8) - 1) + +struct accept_queue_ring { + unsigned int head; + unsigned int tail; + struct task *task; /* task of the thread owning this ring */ + struct accept_queue_entry entry[ACCEPT_QUEUE_SIZE] __attribute((aligned(64))); +}; + + #endif /* _TYPES_LISTENER_H */ /* diff --git a/src/listener.c b/src/listener.c index fb527ab16..9e8c87818 100644 --- a/src/listener.c +++ b/src/listener.c @@ -52,6 +52,169 @@ static struct bind_kw_list bind_keywords = { struct xfer_sock_list *xfer_sock_list = NULL; +#if defined(USE_THREAD) + +struct accept_queue_ring accept_queue_rings[MAX_THREADS] __attribute__((aligned(64))) = { }; + +/* dequeue and process a pending connection from the local accept queue (single + * consumer). Returns the accepted fd or -1 if none was found. The listener is + * placed into *li. The address is copied into *addr for no more than *addr_len + * bytes, and the address length is returned into *addr_len. + */ +int accept_queue_pop_sc(struct accept_queue_ring *ring, struct listener **li, void *addr, int *addr_len) +{ + struct accept_queue_entry *e; + unsigned int pos, next; + struct listener *ptr; + int len; + int fd; + + pos = ring->head; + + if (pos == ring->tail) + return -1; + + next = pos + 1; + if (next >= ACCEPT_QUEUE_SIZE) + next = 0; + + e = &ring->entry[pos]; + + /* wait for the producer to update the listener's pointer */ + while (1) { + ptr = e->listener; + __ha_barrier_load(); + if (ptr) + break; + pl_cpu_relax(); + } + + fd = e->fd; + len = e->addr_len; + if (len > *addr_len) + len = *addr_len; + + if (likely(len > 0)) + memcpy(addr, &e->addr, len); + + /* release the entry */ + e->listener = NULL; + + __ha_barrier_store(); + ring->head = next; + + *addr_len = len; + *li = ptr; + + return fd; +} + + +/* tries to push a new accepted connection into ring for listener + *
  • , from address whose length is . Returns non-zero if it + * succeeds, or zero if the ring is full. Supports multiple producers. + */ +int accept_queue_push_mp(struct accept_queue_ring *ring, int fd, + struct listener *li, const void *addr, int addr_len) +{ + struct accept_queue_entry *e; + unsigned int pos, next; + + pos = ring->tail; + do { + next = pos + 1; + if (next >= ACCEPT_QUEUE_SIZE) + next = 0; + if (next == ring->head) + return 0; // ring full + } while (unlikely(!HA_ATOMIC_CAS(&ring->tail, &pos, next))); + + + e = &ring->entry[pos]; + + if (addr_len > sizeof(e->addr)) + addr_len = sizeof(e->addr); + + if (addr_len) + memcpy(&e->addr, addr, addr_len); + + e->addr_len = addr_len; + e->fd = fd; + + __ha_barrier_store(); + /* now commit the change */ + + e->listener = li; + return 1; +} + +/* proceed with accepting new connections */ +static struct task *accept_queue_process(struct task *t, void *context, unsigned short state) +{ + struct accept_queue_ring *ring = context; + struct listener *li; + struct sockaddr_storage addr; + int max_accept = global.tune.maxaccept ? global.tune.maxaccept : 64; + int addr_len; + int ret; + int fd; + + while (max_accept--) { + addr_len = sizeof(addr); + fd = accept_queue_pop_sc(ring, &li, &addr, &addr_len); + if (fd < 0) + break; + + HA_ATOMIC_ADD(&li->thr_conn[tid], 1); + ret = li->accept(li, fd, &addr); + if (ret <= 0) { + /* connection was terminated by the application */ + continue; + } + + /* increase the per-process number of cumulated sessions, this + * may only be done once l->accept() has accepted the connection. + */ + if (!(li->options & LI_O_UNLIMITED)) { + HA_ATOMIC_UPDATE_MAX(&global.sps_max, + update_freq_ctr(&global.sess_per_sec, 1)); + if (li->bind_conf && li->bind_conf->is_ssl) { + HA_ATOMIC_UPDATE_MAX(&global.ssl_max, + update_freq_ctr(&global.ssl_per_sec, 1)); + } + } + } + + /* ran out of budget ? Let's come here ASAP */ + if (max_accept < 0) + task_wakeup(t, TASK_WOKEN_IO); + + return t; +} + +/* Initializes the accept-queues. Returns 0 on success, otherwise ERR_* flags */ +static int accept_queue_init() +{ + struct task *t; + int i; + + for (i = 0; i < global.nbthread; i++) { + t = task_new(1UL << i); + if (!t) { + ha_alert("Out of memory while initializing accept queue for thread %d\n", i); + return ERR_FATAL|ERR_ABORT; + } + t->process = accept_queue_process; + t->context = &accept_queue_rings[i]; + accept_queue_rings[i].task = t; + } + return 0; +} + +REGISTER_CONFIG_POSTPARSER("multi-threaded accept queue", accept_queue_init); + +#endif // USE_THREAD + /* This function adds the specified listener's file descriptor to the polling * lists if it is in the LI_LISTEN state. The listener enters LI_READY or * LI_FULL state depending on its number of connections. In deamon mode, we