CONTRIB: tcploop: implement fork()

Fork is a very convenient way to deal with independant yet properly
timed connections. It's particularly useful here for accept(), and
ensures that any accepted FD will automatically be released. The
principle is that when we hit a fork command, the parent restarts
evaluating the actions from the beginning and the child continues
to evaluate the next actions. Listen and connect are skipped if the
connection is already established. Fork() is amazingly cheap on
Linux, 21k forked connections per second are handled on a single
core, and 38k on two cores.

For now it's not possible to have two different code paths so in order
to have both a listener and a connector, two distinct commands are
still needed.
This commit is contained in:
Willy Tarreau 2016-11-12 13:25:53 +01:00
parent 84393aa863
commit 95a6b786fc

View File

@ -5,6 +5,7 @@
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/ioctl.h>
#include <sys/wait.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
@ -36,6 +37,8 @@ const struct linger nolinger = { .l_onoff = 1, .l_linger = 0 };
#define TRASH_SIZE 65536
static char trash[TRASH_SIZE];
volatile int nbproc = 0;
/* display the message and exit with the code */
__attribute__((noreturn)) void die(int code, const char *format, ...)
{
@ -65,6 +68,13 @@ struct err_msg *alloc_err_msg(int size)
return err;
}
void sig_handler(int sig)
{
if (sig == SIGCHLD) {
while (waitpid(-1, NULL, WNOHANG) > 0)
__sync_sub_and_fetch(&nbproc, 1);
}
}
/* converts str in the form [[<ipv4>|<ipv6>|<hostname>]:]port to struct sockaddr_storage.
* Returns < 0 with err set in case of error.
@ -492,18 +502,48 @@ int tcp_pause(int sock, const char *arg)
return 0;
}
/* forks another process while respecting the limit imposed in argument (1 by
* default). Will wait for another process to exit before creating a new one.
* Returns the value of the fork() syscall, ie 0 for the child, non-zero for
* the parent, -1 for an error.
*/
int tcp_fork(int sock, const char *arg)
{
int max = 1;
int ret;
if (arg[1]) {
max = atoi(arg + 1);
if (max <= 0) {
fprintf(stderr, "max process must be > 0 or unset (was %d)\n", max);
return -1;
}
}
while (nbproc >= max)
poll(NULL, 0, 1000);
ret = fork();
if (ret > 0)
__sync_add_and_fetch(&nbproc, 1);
return ret;
}
int main(int argc, char **argv)
{
struct sockaddr_storage ss;
struct err_msg err;
const char *arg0;
int arg;
int ret;
int sock;
arg0 = argv[0];
if (argc < 2)
usage(1, arg0);
signal(SIGCHLD, sig_handler);
if (addr_to_ss(argv[1], &ss, &err) < 0)
die(1, "%s\n", err.msg);
@ -614,6 +654,17 @@ int main(int argc, char **argv)
shutdown(sock, SHUT_WR);
break;
case 'N':
ret = tcp_fork(sock, argv[arg]);
if (ret < 0)
die(1, "Fatal: fork() failed.\n");
if (ret > 0) {
/* loop back to first arg */
arg = 1;
continue;
}
/* OK we're in the child, let's continue */
break;
default:
usage(1, arg0);
}