REORG/MEDIUM: move the default accept function from sockstream to protocols.c
The previous sockstream_accept() function uses nothing from sockstream, and is totally irrelevant to stream interfaces. Move this to the protocols.c file which handles listeners and protocols, and call it listener_accept(). It now makes much more sense that the code dealing with listen() also handles accept() and passes it to upper layers.
This commit is contained in:
parent
26d8c59f0b
commit
bbebbbff83
|
@ -99,6 +99,12 @@ int unbind_all_listeners(struct protocol *proto);
|
|||
*/
|
||||
void delete_listener(struct listener *listener);
|
||||
|
||||
/* This function is called on a read event from a listening socket, corresponding
|
||||
* to an accept. It tries to accept as many connections as possible, and for each
|
||||
* calls the listener's accept handler (generally the frontend's accept handler).
|
||||
*/
|
||||
int listener_accept(int fd);
|
||||
|
||||
/* Registers the protocol <proto> */
|
||||
void protocol_register(struct protocol *proto);
|
||||
|
||||
|
|
|
@ -31,7 +31,6 @@
|
|||
|
||||
|
||||
/* main event functions used to move data between sockets and buffers */
|
||||
int stream_sock_accept(int fd);
|
||||
int stream_sock_read(int fd);
|
||||
int stream_sock_write(int fd);
|
||||
void stream_sock_data_finish(struct stream_interface *si);
|
||||
|
|
|
@ -69,7 +69,7 @@ static struct protocol proto_tcpv4 = {
|
|||
.sock_family = AF_INET,
|
||||
.sock_addrlen = sizeof(struct sockaddr_in),
|
||||
.l3_addrlen = 32/8,
|
||||
.accept = &stream_sock_accept,
|
||||
.accept = &listener_accept,
|
||||
.connect = tcp_connect_server,
|
||||
.bind = tcp_bind_listener,
|
||||
.bind_all = tcp_bind_listeners,
|
||||
|
@ -88,7 +88,7 @@ static struct protocol proto_tcpv6 = {
|
|||
.sock_family = AF_INET6,
|
||||
.sock_addrlen = sizeof(struct sockaddr_in6),
|
||||
.l3_addrlen = 128/8,
|
||||
.accept = &stream_sock_accept,
|
||||
.accept = &listener_accept,
|
||||
.connect = tcp_connect_server,
|
||||
.bind = tcp_bind_listener,
|
||||
.bind_all = tcp_bind_listeners,
|
||||
|
|
|
@ -55,7 +55,7 @@ static struct protocol proto_unix = {
|
|||
.sock_family = AF_UNIX,
|
||||
.sock_addrlen = sizeof(struct sockaddr_un),
|
||||
.l3_addrlen = sizeof(((struct sockaddr_un*)0)->sun_path),/* path len */
|
||||
.accept = &stream_sock_accept,
|
||||
.accept = &listener_accept,
|
||||
.bind = uxst_bind_listener,
|
||||
.bind_all = uxst_bind_listeners,
|
||||
.unbind_all = uxst_unbind_listeners,
|
||||
|
|
165
src/protocols.c
165
src/protocols.c
|
@ -10,6 +10,7 @@
|
|||
*
|
||||
*/
|
||||
|
||||
#include <errno.h>
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
|
||||
|
@ -17,9 +18,15 @@
|
|||
#include <common/errors.h>
|
||||
#include <common/mini-clist.h>
|
||||
#include <common/standard.h>
|
||||
#include <common/time.h>
|
||||
|
||||
#include <types/global.h>
|
||||
|
||||
#include <proto/acl.h>
|
||||
#include <proto/fd.h>
|
||||
#include <proto/freq_ctr.h>
|
||||
#include <proto/log.h>
|
||||
#include <proto/task.h>
|
||||
|
||||
/* List head of all registered protocols */
|
||||
static struct list protocols = LIST_HEAD_INIT(protocols);
|
||||
|
@ -230,6 +237,164 @@ void delete_listener(struct listener *listener)
|
|||
listener->proto->nb_listeners--;
|
||||
}
|
||||
|
||||
/* This function is called on a read event from a listening socket, corresponding
|
||||
* to an accept. It tries to accept as many connections as possible, and for each
|
||||
* calls the listener's accept handler (generally the frontend's accept handler).
|
||||
*/
|
||||
int listener_accept(int fd)
|
||||
{
|
||||
struct listener *l = fdtab[fd].owner;
|
||||
struct proxy *p = l->frontend;
|
||||
int max_accept = global.tune.maxaccept;
|
||||
int cfd;
|
||||
int ret;
|
||||
|
||||
if (unlikely(l->nbconn >= l->maxconn)) {
|
||||
listener_full(l);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (global.cps_lim && !(l->options & LI_O_UNLIMITED)) {
|
||||
int max = freq_ctr_remain(&global.conn_per_sec, global.cps_lim, 0);
|
||||
|
||||
if (unlikely(!max)) {
|
||||
/* frontend accept rate limit was reached */
|
||||
limit_listener(l, &global_listener_queue);
|
||||
task_schedule(global_listener_queue_task, tick_add(now_ms, next_event_delay(&global.conn_per_sec, global.cps_lim, 0)));
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (max_accept > max)
|
||||
max_accept = max;
|
||||
}
|
||||
|
||||
if (p && p->fe_sps_lim) {
|
||||
int max = freq_ctr_remain(&p->fe_sess_per_sec, p->fe_sps_lim, 0);
|
||||
|
||||
if (unlikely(!max)) {
|
||||
/* frontend accept rate limit was reached */
|
||||
limit_listener(l, &p->listener_queue);
|
||||
task_schedule(p->task, tick_add(now_ms, next_event_delay(&p->fe_sess_per_sec, p->fe_sps_lim, 0)));
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (max_accept > max)
|
||||
max_accept = max;
|
||||
}
|
||||
|
||||
/* Note: if we fail to allocate a connection because of configured
|
||||
* limits, we'll schedule a new attempt worst 1 second later in the
|
||||
* worst case. If we fail due to system limits or temporary resource
|
||||
* shortage, we try again 100ms later in the worst case.
|
||||
*/
|
||||
while (max_accept--) {
|
||||
struct sockaddr_storage addr;
|
||||
socklen_t laddr = sizeof(addr);
|
||||
|
||||
if (unlikely(actconn >= global.maxconn) && !(l->options & LI_O_UNLIMITED)) {
|
||||
limit_listener(l, &global_listener_queue);
|
||||
task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (unlikely(p && p->feconn >= p->maxconn)) {
|
||||
limit_listener(l, &p->listener_queue);
|
||||
return 0;
|
||||
}
|
||||
|
||||
cfd = accept(fd, (struct sockaddr *)&addr, &laddr);
|
||||
if (unlikely(cfd == -1)) {
|
||||
switch (errno) {
|
||||
case EAGAIN:
|
||||
case EINTR:
|
||||
case ECONNABORTED:
|
||||
return 0; /* nothing more to accept */
|
||||
case ENFILE:
|
||||
if (p)
|
||||
send_log(p, LOG_EMERG,
|
||||
"Proxy %s reached system FD limit at %d. Please check system tunables.\n",
|
||||
p->id, maxfd);
|
||||
limit_listener(l, &global_listener_queue);
|
||||
task_schedule(global_listener_queue_task, tick_add(now_ms, 100)); /* try again in 100 ms */
|
||||
return 0;
|
||||
case EMFILE:
|
||||
if (p)
|
||||
send_log(p, LOG_EMERG,
|
||||
"Proxy %s reached process FD limit at %d. Please check 'ulimit-n' and restart.\n",
|
||||
p->id, maxfd);
|
||||
limit_listener(l, &global_listener_queue);
|
||||
task_schedule(global_listener_queue_task, tick_add(now_ms, 100)); /* try again in 100 ms */
|
||||
return 0;
|
||||
case ENOBUFS:
|
||||
case ENOMEM:
|
||||
if (p)
|
||||
send_log(p, LOG_EMERG,
|
||||
"Proxy %s reached system memory limit at %d sockets. Please check system tunables.\n",
|
||||
p->id, maxfd);
|
||||
limit_listener(l, &global_listener_queue);
|
||||
task_schedule(global_listener_queue_task, tick_add(now_ms, 100)); /* try again in 100 ms */
|
||||
return 0;
|
||||
default:
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
if (unlikely(cfd >= global.maxsock)) {
|
||||
send_log(p, LOG_EMERG,
|
||||
"Proxy %s reached the configured maximum connection limit. Please check the global 'maxconn' value.\n",
|
||||
p->id);
|
||||
close(cfd);
|
||||
limit_listener(l, &global_listener_queue);
|
||||
task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* increase the per-process number of cumulated connections */
|
||||
if (!(l->options & LI_O_UNLIMITED)) {
|
||||
update_freq_ctr(&global.conn_per_sec, 1);
|
||||
if (global.conn_per_sec.curr_ctr > global.cps_max)
|
||||
global.cps_max = global.conn_per_sec.curr_ctr;
|
||||
actconn++;
|
||||
}
|
||||
|
||||
jobs++;
|
||||
totalconn++;
|
||||
l->nbconn++;
|
||||
|
||||
if (l->counters) {
|
||||
if (l->nbconn > l->counters->conn_max)
|
||||
l->counters->conn_max = l->nbconn;
|
||||
}
|
||||
|
||||
ret = l->accept(l, cfd, &addr);
|
||||
if (unlikely(ret <= 0)) {
|
||||
/* The connection was closed by session_accept(). Either
|
||||
* we just have to ignore it (ret == 0) or it's a critical
|
||||
* error due to a resource shortage, and we must stop the
|
||||
* listener (ret < 0).
|
||||
*/
|
||||
if (!(l->options & LI_O_UNLIMITED))
|
||||
actconn--;
|
||||
jobs--;
|
||||
l->nbconn--;
|
||||
if (ret == 0) /* successful termination */
|
||||
continue;
|
||||
|
||||
limit_listener(l, &global_listener_queue);
|
||||
task_schedule(global_listener_queue_task, tick_add(now_ms, 100)); /* try again in 100 ms */
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (l->nbconn >= l->maxconn) {
|
||||
listener_full(l);
|
||||
return 0;
|
||||
}
|
||||
|
||||
} /* end of while (p->feconn < p->maxconn) */
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* Registers the protocol <proto> */
|
||||
void protocol_register(struct protocol *proto)
|
||||
{
|
||||
|
|
|
@ -1138,164 +1138,6 @@ void stream_sock_chk_snd(struct stream_interface *si)
|
|||
}
|
||||
}
|
||||
|
||||
/* This function is called on a read event from a listening socket, corresponding
|
||||
* to an accept. It tries to accept as many connections as possible, and for each
|
||||
* calls the listener's accept handler (generally the frontend's accept handler).
|
||||
*/
|
||||
int stream_sock_accept(int fd)
|
||||
{
|
||||
struct listener *l = fdtab[fd].owner;
|
||||
struct proxy *p = l->frontend;
|
||||
int max_accept = global.tune.maxaccept;
|
||||
int cfd;
|
||||
int ret;
|
||||
|
||||
if (unlikely(l->nbconn >= l->maxconn)) {
|
||||
listener_full(l);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (global.cps_lim && !(l->options & LI_O_UNLIMITED)) {
|
||||
int max = freq_ctr_remain(&global.conn_per_sec, global.cps_lim, 0);
|
||||
|
||||
if (unlikely(!max)) {
|
||||
/* frontend accept rate limit was reached */
|
||||
limit_listener(l, &global_listener_queue);
|
||||
task_schedule(global_listener_queue_task, tick_add(now_ms, next_event_delay(&global.conn_per_sec, global.cps_lim, 0)));
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (max_accept > max)
|
||||
max_accept = max;
|
||||
}
|
||||
|
||||
if (p && p->fe_sps_lim) {
|
||||
int max = freq_ctr_remain(&p->fe_sess_per_sec, p->fe_sps_lim, 0);
|
||||
|
||||
if (unlikely(!max)) {
|
||||
/* frontend accept rate limit was reached */
|
||||
limit_listener(l, &p->listener_queue);
|
||||
task_schedule(p->task, tick_add(now_ms, next_event_delay(&p->fe_sess_per_sec, p->fe_sps_lim, 0)));
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (max_accept > max)
|
||||
max_accept = max;
|
||||
}
|
||||
|
||||
/* Note: if we fail to allocate a connection because of configured
|
||||
* limits, we'll schedule a new attempt worst 1 second later in the
|
||||
* worst case. If we fail due to system limits or temporary resource
|
||||
* shortage, we try again 100ms later in the worst case.
|
||||
*/
|
||||
while (max_accept--) {
|
||||
struct sockaddr_storage addr;
|
||||
socklen_t laddr = sizeof(addr);
|
||||
|
||||
if (unlikely(actconn >= global.maxconn) && !(l->options & LI_O_UNLIMITED)) {
|
||||
limit_listener(l, &global_listener_queue);
|
||||
task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (unlikely(p && p->feconn >= p->maxconn)) {
|
||||
limit_listener(l, &p->listener_queue);
|
||||
return 0;
|
||||
}
|
||||
|
||||
cfd = accept(fd, (struct sockaddr *)&addr, &laddr);
|
||||
if (unlikely(cfd == -1)) {
|
||||
switch (errno) {
|
||||
case EAGAIN:
|
||||
case EINTR:
|
||||
case ECONNABORTED:
|
||||
return 0; /* nothing more to accept */
|
||||
case ENFILE:
|
||||
if (p)
|
||||
send_log(p, LOG_EMERG,
|
||||
"Proxy %s reached system FD limit at %d. Please check system tunables.\n",
|
||||
p->id, maxfd);
|
||||
limit_listener(l, &global_listener_queue);
|
||||
task_schedule(global_listener_queue_task, tick_add(now_ms, 100)); /* try again in 100 ms */
|
||||
return 0;
|
||||
case EMFILE:
|
||||
if (p)
|
||||
send_log(p, LOG_EMERG,
|
||||
"Proxy %s reached process FD limit at %d. Please check 'ulimit-n' and restart.\n",
|
||||
p->id, maxfd);
|
||||
limit_listener(l, &global_listener_queue);
|
||||
task_schedule(global_listener_queue_task, tick_add(now_ms, 100)); /* try again in 100 ms */
|
||||
return 0;
|
||||
case ENOBUFS:
|
||||
case ENOMEM:
|
||||
if (p)
|
||||
send_log(p, LOG_EMERG,
|
||||
"Proxy %s reached system memory limit at %d sockets. Please check system tunables.\n",
|
||||
p->id, maxfd);
|
||||
limit_listener(l, &global_listener_queue);
|
||||
task_schedule(global_listener_queue_task, tick_add(now_ms, 100)); /* try again in 100 ms */
|
||||
return 0;
|
||||
default:
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
if (unlikely(cfd >= global.maxsock)) {
|
||||
send_log(p, LOG_EMERG,
|
||||
"Proxy %s reached the configured maximum connection limit. Please check the global 'maxconn' value.\n",
|
||||
p->id);
|
||||
close(cfd);
|
||||
limit_listener(l, &global_listener_queue);
|
||||
task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* increase the per-process number of cumulated connections */
|
||||
if (!(l->options & LI_O_UNLIMITED)) {
|
||||
update_freq_ctr(&global.conn_per_sec, 1);
|
||||
if (global.conn_per_sec.curr_ctr > global.cps_max)
|
||||
global.cps_max = global.conn_per_sec.curr_ctr;
|
||||
actconn++;
|
||||
}
|
||||
|
||||
jobs++;
|
||||
totalconn++;
|
||||
l->nbconn++;
|
||||
|
||||
if (l->counters) {
|
||||
if (l->nbconn > l->counters->conn_max)
|
||||
l->counters->conn_max = l->nbconn;
|
||||
}
|
||||
|
||||
ret = l->accept(l, cfd, &addr);
|
||||
if (unlikely(ret <= 0)) {
|
||||
/* The connection was closed by session_accept(). Either
|
||||
* we just have to ignore it (ret == 0) or it's a critical
|
||||
* error due to a resource shortage, and we must stop the
|
||||
* listener (ret < 0).
|
||||
*/
|
||||
if (!(l->options & LI_O_UNLIMITED))
|
||||
actconn--;
|
||||
jobs--;
|
||||
l->nbconn--;
|
||||
if (ret == 0) /* successful termination */
|
||||
continue;
|
||||
|
||||
limit_listener(l, &global_listener_queue);
|
||||
task_schedule(global_listener_queue_task, tick_add(now_ms, 100)); /* try again in 100 ms */
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (l->nbconn >= l->maxconn) {
|
||||
listener_full(l);
|
||||
return 0;
|
||||
}
|
||||
|
||||
} /* end of while (p->feconn < p->maxconn) */
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* stream sock operations */
|
||||
struct sock_ops stream_sock = {
|
||||
.update = stream_sock_data_finish,
|
||||
|
|
Loading…
Reference in New Issue