mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2025-02-20 04:37:04 +00:00
MAJOR: listener: do not hold the listener lock in listener_accept()
This function used to hold the listener's lock as a way to stay safe against concurrent manipulations, but it turns out this is wrong. First, the lock is held during l->accept(), which itself might indirectly call listener_release(), which, if the listener is marked full, could result in __resume_listener() to be called and the lock being taken twice. In practice it doesn't happen right now because the listener's FULL state cannot change while we're doing this. Second, all the code does is now protected against concurrent accesses. It used not to be the case in the early days of threads : the frequency counters are thread-safe. The rate limiting doesn't require extreme precision. Only the nbconn check is not thread safe. Third, the parts called here will have to be called from different threads without holding this lock, and this becomes a bigger issue if we need to keep this one. This patch does 3 things which need to be addressed at once : 1) it moves the lock to the only 2 functions that were not protected since called form listener_accept() : - limit_listener() - listener_full() 2) it makes sure delete_listener() properly checks its state within the lock. 3) it updates the l->nbconn tracking to make sure that it is always properly reported and accounted for. There is a point of particular care around the situation where the listener's maxconn is reached because the listener has to be marked full before accepting the connection, then resumed if the connection finally gets dropped. It is not possible to perform this change without removing the lock due to the deadlock issue explained above. This patch almost doubles the accept rate in multi-thread on a shared port between 8 threads, and multiplies by 4 the connection rate on a tcp-request connection reject rule.
This commit is contained in:
parent
a36b324777
commit
3f0d02bbc2
113
src/listener.c
113
src/listener.c
@ -233,32 +233,30 @@ int resume_listener(struct listener *l)
|
||||
|
||||
/* Marks a ready listener as full so that the stream code tries to re-enable
|
||||
* it upon next close() using resume_listener().
|
||||
*
|
||||
* Note: this function is only called from listener_accept so <l> is already
|
||||
* locked.
|
||||
*/
|
||||
static void listener_full(struct listener *l)
|
||||
{
|
||||
HA_SPIN_LOCK(LISTENER_LOCK, &l->lock);
|
||||
if (l->state >= LI_READY) {
|
||||
if (l->state == LI_LIMITED) {
|
||||
HA_SPIN_LOCK(LISTENER_QUEUE_LOCK, &lq_lock);
|
||||
LIST_DEL(&l->wait_queue);
|
||||
HA_SPIN_UNLOCK(LISTENER_QUEUE_LOCK, &lq_lock);
|
||||
}
|
||||
|
||||
fd_stop_recv(l->fd);
|
||||
l->state = LI_FULL;
|
||||
if (l->state != LI_FULL) {
|
||||
fd_stop_recv(l->fd);
|
||||
l->state = LI_FULL;
|
||||
}
|
||||
}
|
||||
HA_SPIN_UNLOCK(LISTENER_LOCK, &l->lock);
|
||||
}
|
||||
|
||||
/* Marks a ready listener as limited so that we only try to re-enable it when
|
||||
* resources are free again. It will be queued into the specified queue.
|
||||
*
|
||||
* Note: this function is only called from listener_accept so <l> is already
|
||||
* locked.
|
||||
*/
|
||||
static void limit_listener(struct listener *l, struct list *list)
|
||||
{
|
||||
HA_SPIN_LOCK(LISTENER_LOCK, &l->lock);
|
||||
if (l->state == LI_READY) {
|
||||
HA_SPIN_LOCK(LISTENER_QUEUE_LOCK, &lq_lock);
|
||||
LIST_ADDQ(list, &l->wait_queue);
|
||||
@ -266,6 +264,7 @@ static void limit_listener(struct listener *l, struct list *list)
|
||||
fd_stop_recv(l->fd);
|
||||
l->state = LI_LIMITED;
|
||||
}
|
||||
HA_SPIN_UNLOCK(LISTENER_LOCK, &l->lock);
|
||||
}
|
||||
|
||||
/* This function adds all of the protocol's listener's file descriptors to the
|
||||
@ -429,15 +428,14 @@ int create_listeners(struct bind_conf *bc, const struct sockaddr_storage *ss,
|
||||
*/
|
||||
void delete_listener(struct listener *listener)
|
||||
{
|
||||
if (listener->state != LI_ASSIGNED)
|
||||
return;
|
||||
|
||||
HA_SPIN_LOCK(LISTENER_LOCK, &listener->lock);
|
||||
listener->state = LI_INIT;
|
||||
LIST_DEL(&listener->proto_list);
|
||||
listener->proto->nb_listeners--;
|
||||
HA_ATOMIC_SUB(&jobs, 1);
|
||||
HA_ATOMIC_SUB(&listeners, 1);
|
||||
if (listener->state == LI_ASSIGNED) {
|
||||
listener->state = LI_INIT;
|
||||
LIST_DEL(&listener->proto_list);
|
||||
listener->proto->nb_listeners--;
|
||||
HA_ATOMIC_SUB(&jobs, 1);
|
||||
HA_ATOMIC_SUB(&listeners, 1);
|
||||
}
|
||||
HA_SPIN_UNLOCK(LISTENER_LOCK, &listener->lock);
|
||||
}
|
||||
|
||||
@ -450,6 +448,7 @@ void listener_accept(int fd)
|
||||
struct listener *l = fdtab[fd].owner;
|
||||
struct proxy *p;
|
||||
int max_accept;
|
||||
int next_conn = 0;
|
||||
int expire;
|
||||
int cfd;
|
||||
int ret;
|
||||
@ -461,8 +460,6 @@ void listener_accept(int fd)
|
||||
return;
|
||||
p = l->bind_conf->frontend;
|
||||
max_accept = l->maxaccept ? l->maxaccept : 1;
|
||||
if (HA_SPIN_TRYLOCK(LISTENER_LOCK, &l->lock))
|
||||
return;
|
||||
|
||||
if (!(l->options & LI_O_UNLIMITED) && global.sps_lim) {
|
||||
int max = freq_ctr_remain(&global.sess_per_sec, global.sps_lim, 0);
|
||||
@ -522,11 +519,29 @@ void listener_accept(int fd)
|
||||
* worst case. If we fail due to system limits or temporary resource
|
||||
* shortage, we try again 100ms later in the worst case.
|
||||
*/
|
||||
while (l->nbconn < l->maxconn && max_accept--) {
|
||||
for (; max_accept-- > 0; next_conn = 0) {
|
||||
struct sockaddr_storage addr;
|
||||
socklen_t laddr = sizeof(addr);
|
||||
unsigned int count;
|
||||
|
||||
/* pre-increase the number of connections without going too far */
|
||||
do {
|
||||
count = l->nbconn;
|
||||
if (count >= l->maxconn) {
|
||||
/* the listener was marked full or another
|
||||
* thread is going to do it.
|
||||
*/
|
||||
next_conn = 0;
|
||||
goto end;
|
||||
}
|
||||
next_conn = count + 1;
|
||||
} while (!HA_ATOMIC_CAS(&l->nbconn, &count, next_conn));
|
||||
|
||||
if (next_conn == l->maxconn) {
|
||||
/* we filled it, mark it full */
|
||||
listener_full(l);
|
||||
}
|
||||
|
||||
if (unlikely(actconn >= global.maxconn) && !(l->options & LI_O_UNLIMITED)) {
|
||||
limit_listener(l, &global_listener_queue);
|
||||
task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */
|
||||
@ -578,6 +593,7 @@ void listener_accept(int fd)
|
||||
goto transient_error;
|
||||
case EINTR:
|
||||
case ECONNABORTED:
|
||||
HA_ATOMIC_SUB(&l->nbconn, 1);
|
||||
continue;
|
||||
case ENFILE:
|
||||
if (p)
|
||||
@ -608,26 +624,34 @@ void listener_accept(int fd)
|
||||
if (unlikely(master == 1))
|
||||
fcntl(cfd, F_SETFD, FD_CLOEXEC);
|
||||
|
||||
if (unlikely(cfd >= global.maxsock)) {
|
||||
send_log(p, LOG_EMERG,
|
||||
"Proxy %s reached the configured maximum connection limit. Please check the global 'maxconn' value.\n",
|
||||
p->id);
|
||||
close(cfd);
|
||||
limit_listener(l, &global_listener_queue);
|
||||
task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */
|
||||
goto end;
|
||||
}
|
||||
/* The connection was accepted, it must be counted as such */
|
||||
if (l->counters)
|
||||
HA_ATOMIC_UPDATE_MAX(&l->counters->conn_max, next_conn);
|
||||
|
||||
/* increase the per-process number of cumulated connections */
|
||||
if (!(l->options & LI_O_UNLIMITED)) {
|
||||
count = update_freq_ctr(&global.conn_per_sec, 1);
|
||||
HA_ATOMIC_UPDATE_MAX(&global.cps_max, count);
|
||||
HA_ATOMIC_ADD(&actconn, 1);
|
||||
}
|
||||
|
||||
count = HA_ATOMIC_ADD(&l->nbconn, 1);
|
||||
if (l->counters)
|
||||
HA_ATOMIC_UPDATE_MAX(&l->counters->conn_max, count);
|
||||
if (unlikely(cfd >= global.maxsock)) {
|
||||
send_log(p, LOG_EMERG,
|
||||
"Proxy %s reached the configured maximum connection limit. Please check the global 'maxconn' value.\n",
|
||||
p->id);
|
||||
if (!(l->options & LI_O_UNLIMITED))
|
||||
HA_ATOMIC_SUB(&actconn, 1);
|
||||
close(cfd);
|
||||
limit_listener(l, &global_listener_queue);
|
||||
task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */
|
||||
goto end;
|
||||
}
|
||||
|
||||
/* past this point, l->accept() will automatically decrement
|
||||
* l->nbconn and actconn once done. Setting next_conn=0 allows
|
||||
* the error path not to rollback on nbconn. It's more convenient
|
||||
* than duplicating all exit labels.
|
||||
*/
|
||||
next_conn = 0;
|
||||
|
||||
ret = l->accept(l, cfd, &addr);
|
||||
if (unlikely(ret <= 0)) {
|
||||
@ -642,7 +666,9 @@ void listener_accept(int fd)
|
||||
goto transient_error;
|
||||
}
|
||||
|
||||
/* increase the per-process number of cumulated connections */
|
||||
/* increase the per-process number of cumulated sessions, this
|
||||
* may only be done once l->accept() has accepted the connection.
|
||||
*/
|
||||
if (!(l->options & LI_O_UNLIMITED)) {
|
||||
count = update_freq_ctr(&global.sess_per_sec, 1);
|
||||
HA_ATOMIC_UPDATE_MAX(&global.sps_max, count);
|
||||
@ -654,7 +680,7 @@ void listener_accept(int fd)
|
||||
}
|
||||
#endif
|
||||
|
||||
} /* end of while (max_accept--) */
|
||||
} /* end of for (max_accept--) */
|
||||
|
||||
/* we've exhausted max_accept, so there is no need to poll again */
|
||||
stop:
|
||||
@ -669,10 +695,21 @@ void listener_accept(int fd)
|
||||
limit_listener(l, &global_listener_queue);
|
||||
task_schedule(global_listener_queue_task, tick_first(expire, global_listener_queue_task->expire));
|
||||
end:
|
||||
if (l->nbconn >= l->maxconn)
|
||||
listener_full(l);
|
||||
if (next_conn)
|
||||
HA_ATOMIC_SUB(&l->nbconn, 1);
|
||||
|
||||
HA_SPIN_UNLOCK(LISTENER_LOCK, &l->lock);
|
||||
if (l->nbconn < l->maxconn && l->state == LI_FULL) {
|
||||
/* at least one thread has to this when quitting */
|
||||
resume_listener(l);
|
||||
|
||||
/* Dequeues all of the listeners waiting for a resource */
|
||||
if (!LIST_ISEMPTY(&global_listener_queue))
|
||||
dequeue_all_listeners(&global_listener_queue);
|
||||
|
||||
if (!LIST_ISEMPTY(&p->listener_queue) &&
|
||||
(!p->fe_sps_lim || freq_ctr_remain(&p->fe_sess_per_sec, p->fe_sps_lim, 0) > 0))
|
||||
dequeue_all_listeners(&p->listener_queue);
|
||||
}
|
||||
}
|
||||
|
||||
/* Notify the listener that a connection initiated from it was released. This
|
||||
|
Loading…
Reference in New Issue
Block a user