BUG/MEDIUM: listener: make sure the listener never accepts too many conns

We were not checking p->feconn nor the global actconn soon enough. In
older versions this could result in a frontend accepting more connections
than allowed by its maxconn or the global maxconn, exactly N-1 extra
connections where N is the number of threads, provided each of these
threads were running a different listener. But with the lock removal,
it became worse, the excess could be the listener's maxconn multiplied
by the number of threads. Among the nasty side effect was that LI_FULL
could be removed while the limit was still over and in some cases the
polling on the socket was no re-enabled.

This commit takes care of updating and checking p->feconn and the global
actconn *before* processing the connection, so that the listener can be
turned off before accepting the socket if needed. This requires to move
some of the bookkeeping operations form session to listen, which totally
makes sense in this context.

Now the limits are properly respected, even if a listener's maxconn is
over a frontend's. This only applies on top of the listener lock removal
series and doesn't have to be backported.
This commit is contained in:
Willy Tarreau 2019-02-27 19:32:32 +01:00
parent 01abd02508
commit 82c9789ac4
2 changed files with 67 additions and 22 deletions

View File

@ -578,6 +578,8 @@ void listener_accept(int fd)
struct proxy *p;
int max_accept;
int next_conn = 0;
int next_feconn = 0;
int next_actconn = 0;
int expire;
int cfd;
int ret;
@ -648,12 +650,15 @@ void listener_accept(int fd)
* worst case. If we fail due to system limits or temporary resource
* shortage, we try again 100ms later in the worst case.
*/
for (; max_accept-- > 0; next_conn = 0) {
for (; max_accept-- > 0; next_conn = next_feconn = next_actconn = 0) {
struct sockaddr_storage addr;
socklen_t laddr = sizeof(addr);
unsigned int count;
/* pre-increase the number of connections without going too far */
/* pre-increase the number of connections without going too far.
* We process the listener, then the proxy, then the process.
* We know which ones to unroll based on the next_xxx value.
*/
do {
count = l->nbconn;
if (count >= l->maxconn) {
@ -671,15 +676,42 @@ void listener_accept(int fd)
listener_full(l);
}
if (unlikely(actconn >= global.maxconn) && !(l->options & LI_O_UNLIMITED)) {
limit_listener(l, &global_listener_queue);
task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */
goto end;
if (p) {
do {
count = p->feconn;
if (count >= p->maxconn) {
/* the frontend was marked full or another
* thread is going to do it.
*/
next_feconn = 0;
goto end;
}
next_feconn = count + 1;
} while (!HA_ATOMIC_CAS(&p->feconn, &count, next_feconn));
if (unlikely(next_feconn == p->maxconn)) {
/* we just filled it */
limit_listener(l, &p->listener_queue);
}
}
if (unlikely(p && p->feconn >= p->maxconn)) {
limit_listener(l, &p->listener_queue);
goto end;
if (!(l->options & LI_O_UNLIMITED)) {
do {
count = actconn;
if (count >= global.maxconn) {
/* the process was marked full or another
* thread is going to do it.
*/
next_actconn = 0;
goto end;
}
next_actconn = count + 1;
} while (!HA_ATOMIC_CAS(&actconn, &count, next_actconn));
if (unlikely(next_actconn == global.maxconn)) {
limit_listener(l, &global_listener_queue);
task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */
}
}
/* with sockpair@ we don't want to do an accept */
@ -723,6 +755,10 @@ void listener_accept(int fd)
case EINTR:
case ECONNABORTED:
HA_ATOMIC_SUB(&l->nbconn, 1);
if (p)
HA_ATOMIC_SUB(&p->feconn, 1);
if (!(l->options & LI_O_UNLIMITED))
HA_ATOMIC_SUB(&actconn, 1);
continue;
case ENFILE:
if (p)
@ -757,18 +793,20 @@ void listener_accept(int fd)
if (l->counters)
HA_ATOMIC_UPDATE_MAX(&l->counters->conn_max, next_conn);
if (p)
HA_ATOMIC_UPDATE_MAX(&p->fe_counters.conn_max, next_feconn);
proxy_inc_fe_conn_ctr(l, p);
if (!(l->options & LI_O_UNLIMITED)) {
count = update_freq_ctr(&global.conn_per_sec, 1);
HA_ATOMIC_UPDATE_MAX(&global.cps_max, count);
HA_ATOMIC_ADD(&actconn, 1);
}
if (unlikely(cfd >= global.maxsock)) {
send_log(p, LOG_EMERG,
"Proxy %s reached the configured maximum connection limit. Please check the global 'maxconn' value.\n",
p->id);
if (!(l->options & LI_O_UNLIMITED))
HA_ATOMIC_SUB(&actconn, 1);
close(cfd);
limit_listener(l, &global_listener_queue);
task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */
@ -776,11 +814,13 @@ void listener_accept(int fd)
}
/* past this point, l->accept() will automatically decrement
* l->nbconn and actconn once done. Setting next_conn=0 allows
* the error path not to rollback on nbconn. It's more convenient
* than duplicating all exit labels.
* l->nbconn, feconn and actconn once done. Setting next_*conn=0
* allows the error path not to rollback on nbconn. It's more
* convenient than duplicating all exit labels.
*/
next_conn = 0;
next_feconn = 0;
next_actconn = 0;
#if defined(USE_THREAD)
count = l->bind_conf->thr_count;
@ -875,7 +915,14 @@ void listener_accept(int fd)
if (next_conn)
HA_ATOMIC_SUB(&l->nbconn, 1);
if (l->nbconn < l->maxconn && l->state == LI_FULL) {
if (p && next_feconn)
HA_ATOMIC_SUB(&p->feconn, 1);
if (next_actconn)
HA_ATOMIC_SUB(&actconn, 1);
if ((l->state == LI_FULL && l->nbconn < l->maxconn) ||
(l->state == LI_LIMITED && ((!p || p->feconn < p->maxconn) && (actconn < global.maxconn)))) {
/* at least one thread has to this when quitting */
resume_listener(l);
@ -899,9 +946,12 @@ void listener_release(struct listener *l)
if (!(l->options & LI_O_UNLIMITED))
HA_ATOMIC_SUB(&actconn, 1);
if (fe)
HA_ATOMIC_SUB(&fe->feconn, 1);
HA_ATOMIC_SUB(&l->nbconn, 1);
HA_ATOMIC_SUB(&l->thr_conn[tid], 1);
if (l->state == LI_FULL)
if (l->state == LI_FULL || l->state == LI_LIMITED)
resume_listener(l);
/* Dequeues all of the listeners waiting for a resource */

View File

@ -55,10 +55,6 @@ struct session *session_new(struct proxy *fe, struct listener *li, enum obj_type
vars_init(&sess->vars, SCOPE_SESS);
sess->task = NULL;
sess->t_handshake = -1; /* handshake not done yet */
HA_ATOMIC_UPDATE_MAX(&fe->fe_counters.conn_max,
HA_ATOMIC_ADD(&fe->feconn, 1));
if (li)
proxy_inc_fe_conn_ctr(li, fe);
HA_ATOMIC_ADD(&totalconn, 1);
HA_ATOMIC_ADD(&jobs, 1);
LIST_INIT(&sess->srv_list);
@ -72,7 +68,6 @@ void session_free(struct session *sess)
struct connection *conn, *conn_back;
struct sess_srv_list *srv_list, *srv_list_back;
HA_ATOMIC_SUB(&sess->fe->feconn, 1);
if (sess->listener)
listener_release(sess->listener);
session_store_counters(sess);