MAJOR: listener: use the multi-queue for multi-thread listeners

The idea is to redistribute an incoming connection to one of the
threads a bind_conf is bound to when there is more than one. We do this
using a random improved by the p2c algorithm : a random() call returns
two different thread numbers. We then compare their respective connection
count and the length of their accept queues, and pick the least loaded
one. We even use this deferred accept mechanism if the target thread
ends up being the local thread, because this maintains fairness between
all connections and tests show that it's about 1% faster this way,
likely due to cache locality. If the target thread's accept queue is
full, the connection is accepted synchronously by the current thread.
This commit is contained in:
Willy Tarreau 2019-01-27 15:37:19 +01:00
parent 1efafce61f
commit e0e9c48ab2

View File

@ -816,6 +816,52 @@ void listener_accept(int fd)
*/
next_conn = 0;
#if defined(USE_THREAD)
count = l->bind_conf->thr_count;
if (count > 1) {
struct accept_queue_ring *ring;
int r, t1, t2, q1, q2;
/* pick two small distinct random values and drop lower bits */
r = (random() >> 8) % ((count - 1) * count);
t2 = r / count; // 0..thr_count-2
t1 = r % count; // 0..thr_count-1
t2 += t1 + 1; // necessarily different from t1
if (t2 >= count)
t2 -= count;
t1 = bind_map_thread_id(l->bind_conf, t1);
t2 = bind_map_thread_id(l->bind_conf, t2);
q1 = accept_queue_rings[t1].tail - accept_queue_rings[t1].head + ACCEPT_QUEUE_SIZE;
if (q1 >= ACCEPT_QUEUE_SIZE)
q1 -= ACCEPT_QUEUE_SIZE;
q2 = accept_queue_rings[t2].tail - accept_queue_rings[t2].head + ACCEPT_QUEUE_SIZE;
if (q2 >= ACCEPT_QUEUE_SIZE)
q2 -= ACCEPT_QUEUE_SIZE;
/* make t1 the lowest loaded thread */
if (q1 >= ACCEPT_QUEUE_SIZE || l->thr_conn[t1] + q1 > l->thr_conn[t2] + q2)
t1 = t2;
/* We use deferred accepts even if it's the local thread because
* tests show that it's the best performing model, likely due to
* better cache locality when processing this loop.
*/
ring = &accept_queue_rings[t1];
if (accept_queue_push_mp(ring, cfd, l, &addr, laddr)) {
task_wakeup(ring->task, TASK_WOKEN_IO);
continue;
}
/* If the ring is full we do a synchronous accept on
* the local thread here.
* FIXME: we should update some stats here.
*/
}
#endif // USE_THREAD
HA_ATOMIC_ADD(&l->thr_conn[tid], 1);
ret = l->accept(l, cfd, &addr);
if (unlikely(ret <= 0)) {