MEDIUM: listener: use protocol->accept_conn() to accept a connection

Now listener_accept() doesn't have to deal with the incoming FD anymore
(except for a little bit of side band stuff). It directly retrieves a
valid connection from the protocol layer, or receives a well-defined
error code that helps it decide how to proceed. This removes a lot of
hardly maintainable low-level code and opens the function to receive
new protocol stacks.
This commit is contained in:
Willy Tarreau 2020-10-15 10:09:31 +02:00
parent 344b8fcf87
commit 9378bbe0be

View File

@ -10,7 +10,6 @@
* *
*/ */
#define _GNU_SOURCE
#include <ctype.h> #include <ctype.h>
#include <errno.h> #include <errno.h>
#include <stdio.h> #include <stdio.h>
@ -29,7 +28,6 @@
#include <haproxy/list.h> #include <haproxy/list.h>
#include <haproxy/listener.h> #include <haproxy/listener.h>
#include <haproxy/log.h> #include <haproxy/log.h>
#include <haproxy/proto_sockpair.h>
#include <haproxy/protocol-t.h> #include <haproxy/protocol-t.h>
#include <haproxy/protocol.h> #include <haproxy/protocol.h>
#include <haproxy/sample.h> #include <haproxy/sample.h>
@ -695,11 +693,7 @@ void listener_accept(int fd)
int next_feconn = 0; int next_feconn = 0;
int next_actconn = 0; int next_actconn = 0;
int expire; int expire;
int cfd;
int ret; int ret;
#ifdef USE_ACCEPT4
static int accept4_broken;
#endif
if (!l) if (!l)
return; return;
@ -768,10 +762,9 @@ void listener_accept(int fd)
* shortage, we try again 100ms later in the worst case. * shortage, we try again 100ms later in the worst case.
*/ */
for (; max_accept; next_conn = next_feconn = next_actconn = 0, max_accept--) { for (; max_accept; next_conn = next_feconn = next_actconn = 0, max_accept--) {
struct sockaddr_storage addr;
socklen_t laddr = sizeof(addr);
unsigned int count; unsigned int count;
__decl_thread(unsigned long mask); __decl_thread(unsigned long mask);
int status;
/* pre-increase the number of connections without going too far. /* pre-increase the number of connections without going too far.
* We process the listener, then the proxy, then the process. * We process the listener, then the proxy, then the process.
@ -820,100 +813,29 @@ void listener_accept(int fd)
} while (!_HA_ATOMIC_CAS(&actconn, (int *)(&count), next_actconn)); } while (!_HA_ATOMIC_CAS(&actconn, (int *)(&count), next_actconn));
} }
/* with sockpair@ we don't want to do an accept */ cli_conn = l->rx.proto->accept_conn(l, &status);
if (unlikely(l->rx.addr.ss_family == AF_CUST_SOCKPAIR)) { if (!cli_conn) {
if ((cfd = recv_fd_uxst(fd)) != -1) switch (status) {
fcntl(cfd, F_SETFL, O_NONBLOCK); case CO_AC_DONE:
/* just like with UNIX sockets, only the family is filled */ goto end;
addr.ss_family = AF_UNIX;
laddr = sizeof(addr.ss_family);
} else
#ifdef USE_ACCEPT4 case CO_AC_RETRY: /* likely a signal */
/* only call accept4() if it's known to be safe, otherwise
* fallback to the legacy accept() + fcntl().
*/
if (unlikely(accept4_broken ||
((cfd = accept4(fd, (struct sockaddr *)&addr, &laddr, SOCK_NONBLOCK)) == -1 &&
(errno == ENOSYS || errno == EINVAL || errno == EBADF) &&
(accept4_broken = 1))))
#endif
if ((cfd = accept(fd, (struct sockaddr *)&addr, &laddr)) != -1)
fcntl(cfd, F_SETFL, O_NONBLOCK);
if (unlikely(cfd == -1)) {
switch (errno) {
case EAGAIN:
if (fdtab[fd].ev & (FD_POLL_HUP|FD_POLL_ERR)) {
/* the listening socket might have been disabled in a shared
* process and we're a collateral victim. We'll just pause for
* a while in case it comes back. In the mean time, we need to
* clear this sticky flag.
*/
_HA_ATOMIC_AND(&fdtab[fd].ev, ~(FD_POLL_HUP|FD_POLL_ERR));
goto transient_error;
}
goto end; /* nothing more to accept */
case EINVAL:
/* might be trying to accept on a shut fd (eg: soft stop) */
goto transient_error;
case EINTR:
case ECONNABORTED:
_HA_ATOMIC_SUB(&l->nbconn, 1); _HA_ATOMIC_SUB(&l->nbconn, 1);
if (p) if (p)
_HA_ATOMIC_SUB(&p->feconn, 1); _HA_ATOMIC_SUB(&p->feconn, 1);
if (!(l->options & LI_O_UNLIMITED)) if (!(l->options & LI_O_UNLIMITED))
_HA_ATOMIC_SUB(&actconn, 1); _HA_ATOMIC_SUB(&actconn, 1);
continue; continue;
case ENFILE:
if (p) case CO_AC_YIELD:
send_log(p, LOG_EMERG,
"Proxy %s reached system FD limit (maxsock=%d). Please check system tunables.\n",
p->id, global.maxsock);
goto transient_error;
case EMFILE:
if (p)
send_log(p, LOG_EMERG,
"Proxy %s reached process FD limit (maxsock=%d). Please check 'ulimit-n' and restart.\n",
p->id, global.maxsock);
goto transient_error;
case ENOBUFS:
case ENOMEM:
if (p)
send_log(p, LOG_EMERG,
"Proxy %s reached system memory limit (maxsock=%d). Please check system tunables.\n",
p->id, global.maxsock);
goto transient_error;
default:
/* unexpected result, let's give up and let other tasks run */
max_accept = 0; max_accept = 0;
goto end; goto end;
default:
goto transient_error;
} }
} }
/* we don't want to leak the FD upon reload if it's in the master */
if (unlikely(master == 1))
fcntl(cfd, F_SETFD, FD_CLOEXEC);
/* we'll have to at least allocate a connection, assign the listener
* to conn->target, set the source address, and set the fd.
*/
cli_conn = conn_new(&l->obj_type);
if (cli_conn) {
cli_conn->handle.fd = cfd;
cli_conn->flags |= CO_FL_ADDR_FROM_SET;
if (!sockaddr_alloc(&cli_conn->src, &addr, laddr)) {
conn_free(cli_conn);
cli_conn = NULL;
}
}
if (!cli_conn) {
/* no more memory, give up! */
close(cfd);
continue;
}
/* The connection was accepted, it must be counted as such */ /* The connection was accepted, it must be counted as such */
if (l->counters) if (l->counters)
HA_ATOMIC_UPDATE_MAX(&l->counters->conn_max, next_conn); HA_ATOMIC_UPDATE_MAX(&l->counters->conn_max, next_conn);
@ -930,12 +852,12 @@ void listener_accept(int fd)
_HA_ATOMIC_ADD(&activity[tid].accepted, 1); _HA_ATOMIC_ADD(&activity[tid].accepted, 1);
if (unlikely(cfd >= global.maxsock)) { if (unlikely(cli_conn->handle.fd >= global.maxsock)) {
send_log(p, LOG_EMERG, send_log(p, LOG_EMERG,
"Proxy %s reached the configured maximum connection limit. Please check the global 'maxconn' value.\n", "Proxy %s reached the configured maximum connection limit. Please check the global 'maxconn' value.\n",
p->id); p->id);
conn_free(cli_conn); conn_free(cli_conn);
close(cfd); close(cli_conn->handle.fd);
expire = tick_add(now_ms, 1000); /* try again in 1 second */ expire = tick_add(now_ms, 1000); /* try again in 1 second */
goto limit_global; goto limit_global;
} }