[MEDIUM] listeners: add a global listener management task

This global task is used to periodically check for end of resource shortage
and to try to enable queued listeners again. This is important in case some
temporary system-wide shortage is encountered, so that we don't have to wait
for an existing connection to be released before checking the queue again.

For situations where listeners are queued due to the global maxconn being
reached, the task is woken up at least every second. For situations where
a system resource shortage is detected (memory, sockets, ...) the task is
woken up at least every 100 ms. That way, recovery from severe events can
still be achieved under acceptable conditions.
This commit is contained in:
Willy Tarreau 2011-08-01 20:57:55 +02:00
parent 237250cc0d
commit e9b2602ac5
3 changed files with 58 additions and 0 deletions

View File

@ -124,6 +124,7 @@ extern int stopping; /* non zero means stopping in progress */
extern char hostname[MAX_HOSTNAME_LEN];
extern char localpeer[MAX_HOSTNAME_LEN];
extern struct list global_listener_queue; /* list of the temporarily limited listeners */
extern struct task *global_listener_queue_task;
#endif /* _TYPES_GLOBAL_H */

View File

@ -161,6 +161,8 @@ char localpeer[MAX_HOSTNAME_LEN];
/* list of the temporarily limited listeners because of lack of resource */
struct list global_listener_queue = LIST_HEAD_INIT(global_listener_queue);
struct task *global_listener_queue_task;
static struct task *manage_global_listener_queue(struct task *t);
/*********************************************************************/
/* general purpose functions ***************************************/
@ -546,6 +548,16 @@ void init(int argc, char **argv)
exit(0);
}
global_listener_queue_task = task_new();
if (!global_listener_queue_task) {
Alert("Out of memory when initializing global task\n");
exit(1);
}
/* very simple initialization, users will queue the task if needed */
global_listener_queue_task->context = NULL; /* not even a context! */
global_listener_queue_task->process = manage_global_listener_queue;
global_listener_queue_task->expire = TICK_ETERNITY;
/* now we know the buffer size, we can initialize the buffers */
init_buffer();
@ -957,6 +969,7 @@ void deinit(void)
free(global.desc); global.desc = NULL;
free(fdtab); fdtab = NULL;
free(oldpids); oldpids = NULL;
free(global_listener_queue_task); global_listener_queue_task = NULL;
list_for_each_entry_safe(wl, wlb, &cfg_cfgfiles, list) {
LIST_DEL(&wl->list);
@ -1018,6 +1031,39 @@ void run_poll_loop()
}
}
/* This is the global management task for listeners. It enables listeners waiting
* for global resources when there are enough free resource, or at least once in
* a while. It is designed to be called as a task.
*/
static struct task *manage_global_listener_queue(struct task *t)
{
int next = TICK_ETERNITY;
fprintf(stderr, "coucou!\n");
/* queue is empty, nothing to do */
if (LIST_ISEMPTY(&global_listener_queue))
goto out;
/* If there are still too many concurrent connections, let's wait for
* some of them to go away. We don't need to re-arm the timer because
* each of them will scan the queue anyway.
*/
if (unlikely(actconn >= global.maxconn))
goto out;
/* We should periodically try to enable listeners waiting for a global
* resource here, because it is possible, though very unlikely, that
* they have been blocked by a temporary lack of global resource such
* as a file descriptor or memory and that the temporary condition has
* disappeared.
*/
if (!LIST_ISEMPTY(&global_listener_queue))
dequeue_all_listeners(&global_listener_queue);
out:
t->expire = next;
task_queue(t);
return t;
}
int main(int argc, char **argv)
{

View File

@ -1212,12 +1212,18 @@ int stream_sock_accept(int fd)
max_accept = max;
}
/* Note: if we fail to allocate a connection because of configured
* limits, we'll schedule a new attempt worst 1 second later in the
* worst case. If we fail due to system limits or temporary resource
* shortage, we try again 100ms later in the worst case.
*/
while (max_accept--) {
struct sockaddr_storage addr;
socklen_t laddr = sizeof(addr);
if (unlikely(actconn >= global.maxconn)) {
limit_listener(l, &global_listener_queue);
task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */
return 0;
}
@ -1239,6 +1245,7 @@ int stream_sock_accept(int fd)
"Proxy %s reached system FD limit at %d. Please check system tunables.\n",
p->id, maxfd);
limit_listener(l, &global_listener_queue);
task_schedule(global_listener_queue_task, tick_add(now_ms, 100)); /* try again in 100 ms */
return 0;
case EMFILE:
if (p)
@ -1246,6 +1253,7 @@ int stream_sock_accept(int fd)
"Proxy %s reached process FD limit at %d. Please check 'ulimit-n' and restart.\n",
p->id, maxfd);
limit_listener(l, &global_listener_queue);
task_schedule(global_listener_queue_task, tick_add(now_ms, 100)); /* try again in 100 ms */
return 0;
case ENOBUFS:
case ENOMEM:
@ -1254,6 +1262,7 @@ int stream_sock_accept(int fd)
"Proxy %s reached system memory limit at %d sockets. Please check system tunables.\n",
p->id, maxfd);
limit_listener(l, &global_listener_queue);
task_schedule(global_listener_queue_task, tick_add(now_ms, 100)); /* try again in 100 ms */
return 0;
default:
return 0;
@ -1266,6 +1275,7 @@ int stream_sock_accept(int fd)
p->id);
close(cfd);
limit_listener(l, &global_listener_queue);
task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */
return 0;
}
@ -1293,6 +1303,7 @@ int stream_sock_accept(int fd)
continue;
limit_listener(l, &global_listener_queue);
task_schedule(global_listener_queue_task, tick_add(now_ms, 100)); /* try again in 100 ms */
return 0;
}