mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2025-02-28 16:40:37 +00:00
Revert "BUG/MEDIUM: listener: do not accept connections faster than we can process them"
This reverts commit 62e8aaa1bd
.
While is works extremely well to address SSL handshake floods, it prevents
establishment of new connections during regular traffic above 50-60 Gbps,
because for an unknown reason the queue seems to have ~1.7 active tasks
per connection all the time, which makes no sense as these ought to be
waiting on subscribed events. It might uncover a deeper issue but at least
for now a different solution is needed. cf issue #822.
The test is trivial to run, just start a config with tune.runqueue-depth 10
and inject on 1GB objects with more than 10 connections. Try to connect to
the stats socket, it only works once, then the listeners are not dequeued.
This commit is contained in:
parent
e814321287
commit
02757d02c2
@ -715,7 +715,6 @@ void listener_accept(struct listener *l)
|
|||||||
int next_conn = 0;
|
int next_conn = 0;
|
||||||
int next_feconn = 0;
|
int next_feconn = 0;
|
||||||
int next_actconn = 0;
|
int next_actconn = 0;
|
||||||
int queued = 0; // <0 if RQ threshold exceeded
|
|
||||||
int expire;
|
int expire;
|
||||||
int ret;
|
int ret;
|
||||||
|
|
||||||
@ -783,7 +782,7 @@ void listener_accept(struct listener *l)
|
|||||||
* worst case. If we fail due to system limits or temporary resource
|
* worst case. If we fail due to system limits or temporary resource
|
||||||
* shortage, we try again 100ms later in the worst case.
|
* shortage, we try again 100ms later in the worst case.
|
||||||
*/
|
*/
|
||||||
for (; max_accept; next_conn = next_feconn = next_actconn = 0, max_accept--, queued++) {
|
for (; max_accept; next_conn = next_feconn = next_actconn = 0, max_accept--) {
|
||||||
unsigned int count;
|
unsigned int count;
|
||||||
int status;
|
int status;
|
||||||
__decl_thread(unsigned long mask);
|
__decl_thread(unsigned long mask);
|
||||||
@ -821,12 +820,6 @@ void listener_accept(struct listener *l)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (!(l->options & LI_O_UNLIMITED)) {
|
if (!(l->options & LI_O_UNLIMITED)) {
|
||||||
if (tasks_run_queue + queued >= global.tune.runqueue_depth) {
|
|
||||||
/* load too high already */
|
|
||||||
next_actconn = 0;
|
|
||||||
goto limit_rqueue;
|
|
||||||
}
|
|
||||||
|
|
||||||
do {
|
do {
|
||||||
count = actconn;
|
count = actconn;
|
||||||
if (unlikely(count >= global.maxconn)) {
|
if (unlikely(count >= global.maxconn)) {
|
||||||
@ -1056,16 +1049,14 @@ void listener_accept(struct listener *l)
|
|||||||
_HA_ATOMIC_SUB(&actconn, 1);
|
_HA_ATOMIC_SUB(&actconn, 1);
|
||||||
|
|
||||||
if ((l->state == LI_FULL && (!l->maxconn || l->nbconn < l->maxconn)) ||
|
if ((l->state == LI_FULL && (!l->maxconn || l->nbconn < l->maxconn)) ||
|
||||||
(l->state == LI_LIMITED && queued >= 0 &&
|
(l->state == LI_LIMITED &&
|
||||||
((!p || p->feconn < p->maxconn) && (actconn < global.maxconn) &&
|
((!p || p->feconn < p->maxconn) && (actconn < global.maxconn) &&
|
||||||
(!tick_isset(global_listener_queue_task->expire) ||
|
(!tick_isset(global_listener_queue_task->expire) ||
|
||||||
tick_is_expired(global_listener_queue_task->expire, now_ms))))) {
|
tick_is_expired(global_listener_queue_task->expire, now_ms))))) {
|
||||||
/* at least one thread has to this when quitting */
|
/* at least one thread has to this when quitting */
|
||||||
resume_listener(l);
|
resume_listener(l);
|
||||||
|
|
||||||
/* Dequeues all of the listeners waiting for a resource in case
|
/* Dequeues all of the listeners waiting for a resource */
|
||||||
* we've just aborted an operation that made others stop.
|
|
||||||
*/
|
|
||||||
dequeue_all_listeners();
|
dequeue_all_listeners();
|
||||||
|
|
||||||
if (p && !MT_LIST_ISEMPTY(&p->listener_queue) &&
|
if (p && !MT_LIST_ISEMPTY(&p->listener_queue) &&
|
||||||
@ -1102,19 +1093,6 @@ void listener_accept(struct listener *l)
|
|||||||
if (p->task && tick_isset(expire))
|
if (p->task && tick_isset(expire))
|
||||||
task_schedule(p->task, expire);
|
task_schedule(p->task, expire);
|
||||||
goto end;
|
goto end;
|
||||||
|
|
||||||
limit_rqueue:
|
|
||||||
/* The run queue is too high. We're temporarily limiting this listener,
|
|
||||||
* and queuing the global task to be woken up right now, so that it's
|
|
||||||
* picked from the wait queue after run queues are OK again. If the
|
|
||||||
* load is just a short spike, there will be no delay. If the load is
|
|
||||||
* maintained, the global task will wait a little bit more.
|
|
||||||
*/
|
|
||||||
queued = -1;
|
|
||||||
limit_listener(l, &global_listener_queue);
|
|
||||||
task_schedule(global_listener_queue_task, now_ms);
|
|
||||||
goto end;
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Notify the listener that a connection initiated from it was released. This
|
/* Notify the listener that a connection initiated from it was released. This
|
||||||
@ -1124,10 +1102,9 @@ void listener_accept(struct listener *l)
|
|||||||
void listener_release(struct listener *l)
|
void listener_release(struct listener *l)
|
||||||
{
|
{
|
||||||
struct proxy *fe = l->bind_conf->frontend;
|
struct proxy *fe = l->bind_conf->frontend;
|
||||||
int last = 0;
|
|
||||||
|
|
||||||
if (!(l->options & LI_O_UNLIMITED))
|
if (!(l->options & LI_O_UNLIMITED))
|
||||||
last = _HA_ATOMIC_SUB(&actconn, 1) == 0;
|
_HA_ATOMIC_SUB(&actconn, 1);
|
||||||
if (fe)
|
if (fe)
|
||||||
_HA_ATOMIC_SUB(&fe->feconn, 1);
|
_HA_ATOMIC_SUB(&fe->feconn, 1);
|
||||||
_HA_ATOMIC_SUB(&l->nbconn, 1);
|
_HA_ATOMIC_SUB(&l->nbconn, 1);
|
||||||
@ -1136,23 +1113,12 @@ void listener_release(struct listener *l)
|
|||||||
if (l->state == LI_FULL || l->state == LI_LIMITED)
|
if (l->state == LI_FULL || l->state == LI_LIMITED)
|
||||||
resume_listener(l);
|
resume_listener(l);
|
||||||
|
|
||||||
|
/* Dequeues all of the listeners waiting for a resource */
|
||||||
|
dequeue_all_listeners();
|
||||||
|
|
||||||
if (!MT_LIST_ISEMPTY(&fe->listener_queue) &&
|
if (!MT_LIST_ISEMPTY(&fe->listener_queue) &&
|
||||||
(!fe->fe_sps_lim || freq_ctr_remain(&fe->fe_sess_per_sec, fe->fe_sps_lim, 0) > 0))
|
(!fe->fe_sps_lim || freq_ctr_remain(&fe->fe_sess_per_sec, fe->fe_sps_lim, 0) > 0))
|
||||||
dequeue_proxy_listeners(fe);
|
dequeue_proxy_listeners(fe);
|
||||||
|
|
||||||
/* Dequeue all of the listeners waiting for the a resource, or requeue
|
|
||||||
* the current listener into the global queue if the load remains too
|
|
||||||
* high. In this case we make sure to reconsider it in no more than one
|
|
||||||
* millisecond, which also makes sure that the global task will let all
|
|
||||||
* current tasks flush before executing.
|
|
||||||
*/
|
|
||||||
if (tasks_run_queue < global.tune.runqueue_depth || last) {
|
|
||||||
dequeue_all_listeners();
|
|
||||||
} else {
|
|
||||||
if (l->state != LI_LIMITED)
|
|
||||||
limit_listener(l, &global_listener_queue);
|
|
||||||
task_schedule(global_listener_queue_task, tick_add(now_ms, 1));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Initializes the listener queues. Returns 0 on success, otherwise ERR_* flags */
|
/* Initializes the listener queues. Returns 0 on success, otherwise ERR_* flags */
|
||||||
@ -1193,15 +1159,6 @@ static struct task *manage_global_listener_queue(struct task *t, void *context,
|
|||||||
if (unlikely(actconn >= global.maxconn))
|
if (unlikely(actconn >= global.maxconn))
|
||||||
goto out;
|
goto out;
|
||||||
|
|
||||||
/* If the run queue is still crowded, we woke up too early, let's wait
|
|
||||||
* a bit more (1ms delay) so that we don't wake up more than 1k times
|
|
||||||
* per second.
|
|
||||||
*/
|
|
||||||
if (tasks_run_queue >= global.tune.runqueue_depth) {
|
|
||||||
t->expire = tick_add(now_ms, 1);
|
|
||||||
goto wait_more;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* We should periodically try to enable listeners waiting for a global
|
/* We should periodically try to enable listeners waiting for a global
|
||||||
* resource here, because it is possible, though very unlikely, that
|
* resource here, because it is possible, though very unlikely, that
|
||||||
* they have been blocked by a temporary lack of global resource such
|
* they have been blocked by a temporary lack of global resource such
|
||||||
@ -1212,7 +1169,6 @@ static struct task *manage_global_listener_queue(struct task *t, void *context,
|
|||||||
|
|
||||||
out:
|
out:
|
||||||
t->expire = TICK_ETERNITY;
|
t->expire = TICK_ETERNITY;
|
||||||
wait_more:
|
|
||||||
task_queue(t);
|
task_queue(t);
|
||||||
return t;
|
return t;
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user