MEDIUM: mworker: replace the master pipe by socketpairs

In order to communicate with the workers, the master pipe has been
replaced by a socketpair() per worker.

The goal is to use these sockets as stats sockets and be able to access
them from the master.

When reloading, the master serialize the information of the workers and
put them in a environment variable. Once the master has been reexecuted
it unserialize that information and it is capable of closing the FDs of
the leaving children.
This commit is contained in:
William Lallemand 2018-09-11 10:06:26 +02:00 committed by Willy Tarreau
parent f9cc07c25b
commit bc19305e53

View File

@ -206,7 +206,20 @@ int *children = NULL; /* store PIDs of children in master workers mode */
static volatile sig_atomic_t caught_signal = 0;
static char **next_argv = NULL;
int mworker_pipe[2];
struct list proc_list = LIST_HEAD_INIT(proc_list);
int master = 0; /* 1 if in master, 0 if in child */
struct mworker_proc {
int pid;
int ipc_fd[2]; /* 0 is master side, 1 is worker side */
int relative_pid;
struct list list;
};
struct mworker_proc *proc_self;
/* list of the temporarily limited listeners because of lack of resource */
struct list global_listener_queue = LIST_HEAD_INIT(global_listener_queue);
@ -515,6 +528,65 @@ static void mworker_kill(int sig)
}
}
/*
* serialize the proc list and put it in the environment
*/
static void mworker_proc_list_to_env()
{
char *msg = NULL;
struct mworker_proc *child;
list_for_each_entry(child, &proc_list, list) {
if (msg)
memprintf(&msg, "%s|type=worker;fd=%d;pid=%d;rpid=%d", msg, child->ipc_fd[0], child->pid, child->relative_pid);
else
memprintf(&msg, "type=worker;fd=%d;pid=%d;rpid=%d", child->ipc_fd[0], child->pid, child->relative_pid);
}
if (msg)
setenv("HAPROXY_CHILDREN", msg, 1);
}
/*
* unserialize the proc list from the environment
*/
static void mworker_env_to_proc_list()
{
char *msg, *token = NULL, *s1;
msg = getenv("HAPROXY_CHILDREN");
if (!msg)
return;
while ((token = strtok_r(msg, "|", &s1))) {
struct mworker_proc *child;
char *subtoken = NULL;
char *s2;
msg = NULL;
child = calloc(1, sizeof(*child));
while ((subtoken = strtok_r(token, ";", &s2))) {
token = NULL;
if (strncmp(subtoken, "fd=", 3) == 0) {
child->ipc_fd[0] = atoi(subtoken+3);
} else if (strncmp(subtoken, "pid=", 4) == 0) {
child->pid = atoi(subtoken+4);
} else if (strncmp(subtoken, "rpid=", 5) == 0) {
child->relative_pid = atoi(subtoken+5);
}
}
if (child->pid)
LIST_ADDQ(&proc_list, &child->list);
else
free(child);
}
unsetenv("HAPROXY_CHILDREN");
}
/*
* Upon a reload, the master worker needs to close all listeners FDs but the mworker_pipe
* fd, and the FD provided by fd@
@ -628,6 +700,8 @@ static void mworker_reload()
#endif
setenv("HAPROXY_MWORKER_REEXEC", "1", 1);
mworker_proc_list_to_env(); /* put the children description in the env */
/* compute length */
while (next_argv[next_argc])
next_argc++;
@ -710,6 +784,7 @@ static void mworker_catch_sigchld(struct sig_handler *sh)
{
int exitpid = -1;
int status = 0;
struct mworker_proc *child, *it;
restart_wait:
@ -724,6 +799,16 @@ restart_wait:
else
status = 255;
list_for_each_entry_safe(child, it, &proc_list, list) {
if (child->pid != exitpid)
continue;
LIST_DEL(&child->list);
close(child->ipc_fd[0]);
free(child);
break;
}
if (!children) {
ha_warning("Worker %d exited with code %d\n", exitpid, status);
} else {
@ -760,6 +845,10 @@ static void mworker_loop()
sd_notifyf(0, "READY=1\nMAINPID=%lu", (unsigned long)getpid());
#endif
master = 1;
mworker_env_to_proc_list(); /* get the info of the children in the env */
signal_register_fct(SIGTERM, mworker_catch_sigterm, SIGTERM);
signal_register_fct(SIGUSR1, mworker_catch_sigterm, SIGUSR1);
signal_register_fct(SIGINT, mworker_catch_sigterm, SIGINT);
@ -770,6 +859,9 @@ static void mworker_loop()
mworker_unblock_signals();
mworker_cleanlisteners();
mworker_catch_sigchld(NULL); /* ensure we clean the children in case
some SIGCHLD were lost */
tid = 0;
global.nbthread = 1;
@ -2391,13 +2483,13 @@ void mworker_pipe_handler(int fd)
/* should only be called once per process */
void mworker_pipe_register()
{
if (fdtab[mworker_pipe[0]].owner)
if (fdtab[proc_self->ipc_fd[1]].owner)
/* already initialized */
return;
fcntl(mworker_pipe[0], F_SETFL, O_NONBLOCK);
fd_insert(mworker_pipe[0], mworker_pipe, mworker_pipe_handler, MAX_THREADS_MASK);
fd_want_recv(mworker_pipe[0]);
fcntl(proc_self->ipc_fd[1], F_SETFL, O_NONBLOCK);
fd_insert(proc_self->ipc_fd[1], proc_self->ipc_fd, mworker_pipe_handler, MAX_THREADS_MASK);
fd_want_recv(proc_self->ipc_fd[1]);
}
/* Runs the polling loop */
@ -2466,7 +2558,7 @@ static void *run_thread_poll_loop(void *data)
}
}
if (global.mode & MODE_MWORKER) {
if ((global.mode & MODE_MWORKER) && master == 0) {
HA_SPIN_LOCK(START_LOCK, &start_lock);
mworker_pipe_register();
HA_SPIN_UNLOCK(START_LOCK, &start_lock);
@ -2786,33 +2878,6 @@ int main(int argc, char **argv)
setsid();
}
if (global.mode & MODE_MWORKER) {
if ((getenv("HAPROXY_MWORKER_REEXEC") == NULL)) {
char *msg = NULL;
/* master pipe to ensure the master is still alive */
ret = pipe(mworker_pipe);
if (ret < 0) {
ha_alert("[%s.main()] Cannot create master pipe.\n", argv[0]);
exit(EXIT_FAILURE);
} else {
memprintf(&msg, "%d", mworker_pipe[0]);
setenv("HAPROXY_MWORKER_PIPE_RD", msg, 1);
memprintf(&msg, "%d", mworker_pipe[1]);
setenv("HAPROXY_MWORKER_PIPE_WR", msg, 1);
free(msg);
}
} else {
char* rd = getenv("HAPROXY_MWORKER_PIPE_RD");
char* wr = getenv("HAPROXY_MWORKER_PIPE_WR");
if (!rd || !wr) {
ha_alert("[%s.main()] Cannot get master pipe FDs.\n", argv[0]);
atexit_flag = 0;// dont reexecute master process
exit(EXIT_FAILURE);
}
mworker_pipe[0] = atoi(rd);
mworker_pipe[1] = atoi(wr);
}
}
/* if in master-worker mode, write the PID of the father */
if (global.mode & MODE_MWORKER) {
@ -2824,6 +2889,24 @@ int main(int argc, char **argv)
/* the father launches the required number of processes */
for (proc = 0; proc < global.nbproc; proc++) {
if (global.mode & MODE_MWORKER) {
proc_self = malloc(sizeof(*proc_self));
if (!proc_self) {
ha_alert("[%s.main()] Cannot allocate process structures.\n", argv[0]);
exit(1);
}
/* master pipe to ensure the master is still alive */
ret = socketpair(AF_UNIX, SOCK_STREAM, 0, proc_self->ipc_fd);
if (ret < 0) {
ha_alert("[%s.main()] Cannot create master pipe.\n", argv[0]);
exit(EXIT_FAILURE);
} else {
proc_self->relative_pid = relative_pid;
LIST_ADDQ(&proc_list, &proc_self->list);
}
}
ret = fork();
if (ret < 0) {
ha_alert("[%s.main()] Cannot fork.\n", argv[0]);
@ -2838,6 +2921,12 @@ int main(int argc, char **argv)
snprintf(pidstr, sizeof(pidstr), "%d\n", ret);
shut_your_big_mouth_gcc(write(pidfd, pidstr, strlen(pidstr)));
}
if (global.mode & MODE_MWORKER) {
proc_self->pid = ret;
close(proc_self->ipc_fd[1]); /* close client side */
proc_self->ipc_fd[1] = -1;
}
relative_pid++; /* each child will get a different one */
pid_bit <<= 1;
}
@ -2898,9 +2987,24 @@ int main(int argc, char **argv)
/* child must never use the atexit function */
atexit_flag = 0;
/* close the write end of the master pipe in the children */
if (global.mode & MODE_MWORKER)
close(mworker_pipe[1]);
/* close useless master sockets */
if (global.mode & MODE_MWORKER) {
struct mworker_proc *child, *it;
master = 0;
/* free proc struct of other processes */
list_for_each_entry_safe(child, it, &proc_list, list) {
if (child->ipc_fd[0] > -1) {
close(child->ipc_fd[0]);
child->ipc_fd[0] = -1;
}
if (child == proc_self)
continue;
close(child->ipc_fd[1]);
LIST_DEL(&child->list);
free(child);
}
}
if (!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE)) {
devnullfd = open("/dev/null", O_RDWR, 0);