mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2024-12-14 15:34:35 +00:00
[MEDIUM] listeners: add a global listener management task
This global task is used to periodically check for end of resource shortage and to try to enable queued listeners again. This is important in case some temporary system-wide shortage is encountered, so that we don't have to wait for an existing connection to be released before checking the queue again. For situations where listeners are queued due to the global maxconn being reached, the task is woken up at least every second. For situations where a system resource shortage is detected (memory, sockets, ...) the task is woken up at least every 100 ms. That way, recovery from severe events can still be achieved under acceptable conditions.
This commit is contained in:
parent
237250cc0d
commit
e9b2602ac5
@ -124,6 +124,7 @@ extern int stopping; /* non zero means stopping in progress */
|
||||
extern char hostname[MAX_HOSTNAME_LEN];
|
||||
extern char localpeer[MAX_HOSTNAME_LEN];
|
||||
extern struct list global_listener_queue; /* list of the temporarily limited listeners */
|
||||
extern struct task *global_listener_queue_task;
|
||||
|
||||
#endif /* _TYPES_GLOBAL_H */
|
||||
|
||||
|
@ -161,6 +161,8 @@ char localpeer[MAX_HOSTNAME_LEN];
|
||||
|
||||
/* list of the temporarily limited listeners because of lack of resource */
|
||||
struct list global_listener_queue = LIST_HEAD_INIT(global_listener_queue);
|
||||
struct task *global_listener_queue_task;
|
||||
static struct task *manage_global_listener_queue(struct task *t);
|
||||
|
||||
/*********************************************************************/
|
||||
/* general purpose functions ***************************************/
|
||||
@ -546,6 +548,16 @@ void init(int argc, char **argv)
|
||||
exit(0);
|
||||
}
|
||||
|
||||
global_listener_queue_task = task_new();
|
||||
if (!global_listener_queue_task) {
|
||||
Alert("Out of memory when initializing global task\n");
|
||||
exit(1);
|
||||
}
|
||||
/* very simple initialization, users will queue the task if needed */
|
||||
global_listener_queue_task->context = NULL; /* not even a context! */
|
||||
global_listener_queue_task->process = manage_global_listener_queue;
|
||||
global_listener_queue_task->expire = TICK_ETERNITY;
|
||||
|
||||
/* now we know the buffer size, we can initialize the buffers */
|
||||
init_buffer();
|
||||
|
||||
@ -957,6 +969,7 @@ void deinit(void)
|
||||
free(global.desc); global.desc = NULL;
|
||||
free(fdtab); fdtab = NULL;
|
||||
free(oldpids); oldpids = NULL;
|
||||
free(global_listener_queue_task); global_listener_queue_task = NULL;
|
||||
|
||||
list_for_each_entry_safe(wl, wlb, &cfg_cfgfiles, list) {
|
||||
LIST_DEL(&wl->list);
|
||||
@ -1018,6 +1031,39 @@ void run_poll_loop()
|
||||
}
|
||||
}
|
||||
|
||||
/* This is the global management task for listeners. It enables listeners waiting
|
||||
* for global resources when there are enough free resource, or at least once in
|
||||
* a while. It is designed to be called as a task.
|
||||
*/
|
||||
static struct task *manage_global_listener_queue(struct task *t)
|
||||
{
|
||||
int next = TICK_ETERNITY;
|
||||
fprintf(stderr, "coucou!\n");
|
||||
/* queue is empty, nothing to do */
|
||||
if (LIST_ISEMPTY(&global_listener_queue))
|
||||
goto out;
|
||||
|
||||
/* If there are still too many concurrent connections, let's wait for
|
||||
* some of them to go away. We don't need to re-arm the timer because
|
||||
* each of them will scan the queue anyway.
|
||||
*/
|
||||
if (unlikely(actconn >= global.maxconn))
|
||||
goto out;
|
||||
|
||||
/* We should periodically try to enable listeners waiting for a global
|
||||
* resource here, because it is possible, though very unlikely, that
|
||||
* they have been blocked by a temporary lack of global resource such
|
||||
* as a file descriptor or memory and that the temporary condition has
|
||||
* disappeared.
|
||||
*/
|
||||
if (!LIST_ISEMPTY(&global_listener_queue))
|
||||
dequeue_all_listeners(&global_listener_queue);
|
||||
|
||||
out:
|
||||
t->expire = next;
|
||||
task_queue(t);
|
||||
return t;
|
||||
}
|
||||
|
||||
int main(int argc, char **argv)
|
||||
{
|
||||
|
@ -1212,12 +1212,18 @@ int stream_sock_accept(int fd)
|
||||
max_accept = max;
|
||||
}
|
||||
|
||||
/* Note: if we fail to allocate a connection because of configured
|
||||
* limits, we'll schedule a new attempt worst 1 second later in the
|
||||
* worst case. If we fail due to system limits or temporary resource
|
||||
* shortage, we try again 100ms later in the worst case.
|
||||
*/
|
||||
while (max_accept--) {
|
||||
struct sockaddr_storage addr;
|
||||
socklen_t laddr = sizeof(addr);
|
||||
|
||||
if (unlikely(actconn >= global.maxconn)) {
|
||||
limit_listener(l, &global_listener_queue);
|
||||
task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -1239,6 +1245,7 @@ int stream_sock_accept(int fd)
|
||||
"Proxy %s reached system FD limit at %d. Please check system tunables.\n",
|
||||
p->id, maxfd);
|
||||
limit_listener(l, &global_listener_queue);
|
||||
task_schedule(global_listener_queue_task, tick_add(now_ms, 100)); /* try again in 100 ms */
|
||||
return 0;
|
||||
case EMFILE:
|
||||
if (p)
|
||||
@ -1246,6 +1253,7 @@ int stream_sock_accept(int fd)
|
||||
"Proxy %s reached process FD limit at %d. Please check 'ulimit-n' and restart.\n",
|
||||
p->id, maxfd);
|
||||
limit_listener(l, &global_listener_queue);
|
||||
task_schedule(global_listener_queue_task, tick_add(now_ms, 100)); /* try again in 100 ms */
|
||||
return 0;
|
||||
case ENOBUFS:
|
||||
case ENOMEM:
|
||||
@ -1254,6 +1262,7 @@ int stream_sock_accept(int fd)
|
||||
"Proxy %s reached system memory limit at %d sockets. Please check system tunables.\n",
|
||||
p->id, maxfd);
|
||||
limit_listener(l, &global_listener_queue);
|
||||
task_schedule(global_listener_queue_task, tick_add(now_ms, 100)); /* try again in 100 ms */
|
||||
return 0;
|
||||
default:
|
||||
return 0;
|
||||
@ -1266,6 +1275,7 @@ int stream_sock_accept(int fd)
|
||||
p->id);
|
||||
close(cfd);
|
||||
limit_listener(l, &global_listener_queue);
|
||||
task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -1293,6 +1303,7 @@ int stream_sock_accept(int fd)
|
||||
continue;
|
||||
|
||||
limit_listener(l, &global_listener_queue);
|
||||
task_schedule(global_listener_queue_task, tick_add(now_ms, 100)); /* try again in 100 ms */
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user