diff --git a/src/listener.c b/src/listener.c index 397b3bed54..d83a2079f7 100644 --- a/src/listener.c +++ b/src/listener.c @@ -715,6 +715,7 @@ void listener_accept(struct listener *l) int next_conn = 0; int next_feconn = 0; int next_actconn = 0; + int queued = 0; // <0 if RQ threshold exceeded int expire; int ret; @@ -782,7 +783,7 @@ void listener_accept(struct listener *l) * worst case. If we fail due to system limits or temporary resource * shortage, we try again 100ms later in the worst case. */ - for (; max_accept; next_conn = next_feconn = next_actconn = 0, max_accept--) { + for (; max_accept; next_conn = next_feconn = next_actconn = 0, max_accept--, queued++) { unsigned int count; int status; __decl_thread(unsigned long mask); @@ -820,6 +821,12 @@ void listener_accept(struct listener *l) } if (!(l->options & LI_O_UNLIMITED)) { + if (tasks_run_queue + queued >= global.tune.runqueue_depth) { + /* load too high already */ + next_actconn = 0; + goto limit_rqueue; + } + do { count = actconn; if (unlikely(count >= global.maxconn)) { @@ -1049,14 +1056,16 @@ void listener_accept(struct listener *l) _HA_ATOMIC_SUB(&actconn, 1); if ((l->state == LI_FULL && (!l->maxconn || l->nbconn < l->maxconn)) || - (l->state == LI_LIMITED && + (l->state == LI_LIMITED && queued >= 0 && ((!p || p->feconn < p->maxconn) && (actconn < global.maxconn) && (!tick_isset(global_listener_queue_task->expire) || tick_is_expired(global_listener_queue_task->expire, now_ms))))) { /* at least one thread has to this when quitting */ resume_listener(l); - /* Dequeues all of the listeners waiting for a resource */ + /* Dequeues all of the listeners waiting for a resource in case + * we've just aborted an operation that made others stop. + */ dequeue_all_listeners(); if (p && !MT_LIST_ISEMPTY(&p->listener_queue) && @@ -1093,6 +1102,19 @@ void listener_accept(struct listener *l) if (p->task && tick_isset(expire)) task_schedule(p->task, expire); goto end; + + limit_rqueue: + /* The run queue is too high. We're temporarily limiting this listener, + * and queuing the global task to be woken up right now, so that it's + * picked from the wait queue after run queues are OK again. If the + * load is just a short spike, there will be no delay. If the load is + * maintained, the global task will wait a little bit more. + */ + queued = -1; + limit_listener(l, &global_listener_queue); + task_schedule(global_listener_queue_task, now_ms); + goto end; + } /* Notify the listener that a connection initiated from it was released. This @@ -1102,9 +1124,10 @@ void listener_accept(struct listener *l) void listener_release(struct listener *l) { struct proxy *fe = l->bind_conf->frontend; + int last = 0; if (!(l->options & LI_O_UNLIMITED)) - _HA_ATOMIC_SUB(&actconn, 1); + last = _HA_ATOMIC_SUB(&actconn, 1) == 0; if (fe) _HA_ATOMIC_SUB(&fe->feconn, 1); _HA_ATOMIC_SUB(&l->nbconn, 1); @@ -1113,12 +1136,23 @@ void listener_release(struct listener *l) if (l->state == LI_FULL || l->state == LI_LIMITED) resume_listener(l); - /* Dequeues all of the listeners waiting for a resource */ - dequeue_all_listeners(); - if (!MT_LIST_ISEMPTY(&fe->listener_queue) && (!fe->fe_sps_lim || freq_ctr_remain(&fe->fe_sess_per_sec, fe->fe_sps_lim, 0) > 0)) dequeue_proxy_listeners(fe); + + /* Dequeue all of the listeners waiting for the a resource, or requeue + * the current listener into the global queue if the load remains too + * high. In this case we make sure to reconsider it in no more than one + * millisecond, which also makes sure that the global task will let all + * current tasks flush before executing. + */ + if (tasks_run_queue < global.tune.runqueue_depth || last) { + dequeue_all_listeners(); + } else { + if (l->state != LI_LIMITED) + limit_listener(l, &global_listener_queue); + task_schedule(global_listener_queue_task, tick_add(now_ms, 1)); + } } /* Initializes the listener queues. Returns 0 on success, otherwise ERR_* flags */ @@ -1159,6 +1193,15 @@ static struct task *manage_global_listener_queue(struct task *t, void *context, if (unlikely(actconn >= global.maxconn)) goto out; + /* If the run queue is still crowded, we woke up too early, let's wait + * a bit more (1ms delay) so that we don't wake up more than 1k times + * per second. + */ + if (tasks_run_queue >= global.tune.runqueue_depth) { + t->expire = tick_add(now_ms, 1); + goto wait_more; + } + /* We should periodically try to enable listeners waiting for a global * resource here, because it is possible, though very unlikely, that * they have been blocked by a temporary lack of global resource such @@ -1169,6 +1212,7 @@ static struct task *manage_global_listener_queue(struct task *t, void *context, out: t->expire = TICK_ETERNITY; + wait_more: task_queue(t); return t; }