MAJOR: mworker: move master-worker fork in init()

This refactoring allows to simplify 'master-worker' logic. The master process
with this change will fork a worker very early at the initialization stage,
which allows to perform a configuration parsing only for the worker. In reality
only the worker process needs to parse and to apply the whole configuration.

Master process just polls master CLI sockets, watches worker status, catches
its termination state and handles the signals. With this refactoring there is
no longer need for master to perform re-execution after reading the whole
configuration file to free additional memory. And there is no longer need for
worker to register atexit callbacks, in order to free the memory, when it
fails to apply the new configuration. In contrast, we now need to set
proc_self pointer to the new worker entry in processes list just after
the fork in the worker process context. proc_self is dereferenced in
mworker_sockpair_register_per_thread(), which is called when worker enters in
its polling loop.

Following patches will try to gather more 'worker' and 'master' specific' code
in the dedicated cases of this new fork() switch, or in a separate functions.
This commit is contained in:
Valentine Krasnobaeva 2024-06-28 16:21:30 +02:00 committed by Willy Tarreau
parent 4cbfcc60f4
commit 8dd4efe42f

View File

@ -2045,18 +2045,6 @@ static void init(int argc, char **argv)
if (global.mode & MODE_CHECK_CONDITION) if (global.mode & MODE_CHECK_CONDITION)
do_check_condition(progname); do_check_condition(progname);
/* set the atexit functions when not doing configuration check */
if (!(global.mode & MODE_CHECK) && (getenv("HAPROXY_MWORKER_REEXEC") != NULL)) {
if (global.mode & MODE_MWORKER) {
atexit_flag = 1;
atexit(reexec_on_failure);
} else if (global.mode & MODE_MWORKER_WAIT) {
atexit_flag = 1;
atexit(exit_on_waitmode_failure);
}
}
if (change_dir && chdir(change_dir) < 0) { if (change_dir && chdir(change_dir) < 0) {
ha_alert("Could not change to directory %s : %s\n", change_dir, strerror(errno)); ha_alert("Could not change to directory %s : %s\n", change_dir, strerror(errno));
exit(1); exit(1);
@ -2065,7 +2053,7 @@ static void init(int argc, char **argv)
usermsgs_clr("config"); usermsgs_clr("config");
/* in wait mode, we don't try to read the configuration files */ /* in wait mode, we don't try to read the configuration files */
if (!(global.mode & MODE_MWORKER_WAIT)) { if (!(global.mode & MODE_MWORKER)) {
ret = read_cfg(progname); ret = read_cfg(progname);
/* free memory to store config file content */ /* free memory to store config file content */
list_for_each_entry_safe(cfg, cfg_tmp, &cfg_cfgfiles, list) list_for_each_entry_safe(cfg, cfg_tmp, &cfg_cfgfiles, list)
@ -2157,7 +2145,67 @@ static void init(int argc, char **argv)
DISGUISE(write(pidfd, pidstr, strlen(pidstr))); DISGUISE(write(pidfd, pidstr, strlen(pidstr)));
} }
if (global.mode & (MODE_MWORKER|MODE_MWORKER_WAIT)) /* master-worker mode fork() */
if (global.mode & MODE_MWORKER) {
int worker_pid;
struct mworker_proc *child;
struct ring *tmp_startup_logs = NULL;
/* at this point the worker must have his own startup_logs buffer */
tmp_startup_logs = startup_logs_dup(startup_logs);
worker_pid = fork();
switch (worker_pid) {
case -1:
ha_alert("[%s.main()] Cannot fork.\n", argv[0]);
exit(EXIT_FAILURE);
case 0:
/* in child */
startup_logs_free(startup_logs);
startup_logs = tmp_startup_logs;
/* This one must not be exported, it's internal! */
unsetenv("HAPROXY_MWORKER_REEXEC");
ha_random_jump96(1);
/* proc_self needs to point to the new forked worker in
* worker's context, as it's dereferenced in
* mworker_sockpair_register_per_thread(), called for
* master and for worker.
*/
list_for_each_entry(child, &proc_list, list) {
if (child->options & PROC_O_TYPE_WORKER &&
child->reloads == 0 &&
child->pid == -1) {
proc_self = child;
break;
}
}
break;
default:
/* in parent */
global.mode |= MODE_MWORKER_WAIT;
master = 1;
atexit_flag = 1;
atexit(exit_on_waitmode_failure);
ha_notice("Initializing new worker (%d)\n", worker_pid);
/* find the right mworker_proc */
list_for_each_entry(child, &proc_list, list) {
if (child->reloads == 0 && child->options & PROC_O_TYPE_WORKER && child->pid == -1) {
child->timestamp = date.tv_sec;
child->pid = worker_pid;
child->version = strdup(haproxy_version);
/* at this step the fd is bound for the worker, set it to -1 so
* it could be close in case of errors in mworker_cleanup_proc() */
child->ipc_fd[1] = -1;
break;
}
}
}
}
if (master)
mworker_create_master_cli(); mworker_create_master_cli();
if (!LIST_ISEMPTY(&mworker_cli_conf) && !(arg_mode & MODE_MWORKER)) { if (!LIST_ISEMPTY(&mworker_cli_conf) && !(arg_mode & MODE_MWORKER)) {
@ -2328,7 +2376,7 @@ static void init(int argc, char **argv)
} }
/* set the default maxconn in the master, but let it be rewritable with -n */ /* set the default maxconn in the master, but let it be rewritable with -n */
if (global.mode & MODE_MWORKER_WAIT) if (master)
global.maxconn = MASTER_MAXCONN; global.maxconn = MASTER_MAXCONN;
if (cfg_maxconn > 0) if (cfg_maxconn > 0)
@ -3218,6 +3266,7 @@ static void set_identity(const char *program_name)
int main(int argc, char **argv) int main(int argc, char **argv)
{ {
int err, retry; int err, retry;
int devnullfd = -1;
struct rlimit limit; struct rlimit limit;
int intovf = (unsigned char)argc + 1; /* let the compiler know it's strictly positive */ int intovf = (unsigned char)argc + 1; /* let the compiler know it's strictly positive */
@ -3543,136 +3592,34 @@ int main(int argc, char **argv)
clock_adjust_now_offset(); clock_adjust_now_offset();
ready_date = date; ready_date = date;
/* catch last warnings, which could be produced while adjusting limits
* or preallocating fds
*/
if (warned & WARN_ANY && global.mode & MODE_ZERO_WARNING) {
ha_alert("Some warnings were found and 'zero-warning' is set. Aborting.\n");
exit(1);
}
if (global.mode & (MODE_DAEMON | MODE_MWORKER | MODE_MWORKER_WAIT)) {
int ret = 0;
int in_parent = 0;
int devnullfd = -1;
/* the father launches the required number of processes */
if (!(global.mode & MODE_MWORKER_WAIT)) {
struct ring *tmp_startup_logs = NULL;
if (global.mode & MODE_MWORKER)
mworker_ext_launch_all();
/* at this point the worker must have his own startup_logs buffer */
tmp_startup_logs = startup_logs_dup(startup_logs);
ret = fork();
if (ret < 0) {
ha_alert("[%s.main()] Cannot fork.\n", argv[0]);
protocol_unbind_all();
exit(1); /* there has been an error */
}
else if (ret == 0) { /* child breaks here */
startup_logs_free(startup_logs);
startup_logs = tmp_startup_logs;
/* This one must not be exported, it's internal! */
unsetenv("HAPROXY_MWORKER_REEXEC");
ha_random_jump96(1);
}
else { /* parent here */
in_parent = 1;
if (global.mode & MODE_MWORKER) {
struct mworker_proc *child;
ha_notice("New worker (%d) forked\n", ret);
/* find the right mworker_proc */
list_for_each_entry(child, &proc_list, list) {
if (child->reloads == 0 &&
child->options & PROC_O_TYPE_WORKER &&
child->pid == -1) {
child->timestamp = date.tv_sec;
child->pid = ret;
child->version = strdup(haproxy_version);
/* at this step the fd is bound for the worker, set it to -1 so
* it could be close in case of errors in mworker_cleanup_proc() */
child->ipc_fd[1] = -1;
break;
}
}
}
}
} else {
/* wait mode */
in_parent = 1;
}
/* close the pidfile both in children and father */ /* close the pidfile both in children and father */
if (pidfd >= 0) { if (pidfd >= 0) {
//lseek(pidfd, 0, SEEK_SET); /* debug: emulate eglibc bug */ //lseek(pidfd, 0, SEEK_SET); /* debug: emulate eglibc bug */
close(pidfd); close(pidfd);
} }
/* We won't ever use this anymore */ /* We won't ever use this anymore */
ha_free(&global.pidfile); ha_free(&global.pidfile);
if (in_parent) { if (global.mode & MODE_MWORKER_WAIT) {
if (global.mode & (MODE_MWORKER|MODE_MWORKER_WAIT)) { struct mworker_proc *child, *it;
master = 1;
if ((!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE)) && if ((!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE)) &&
(global.mode & MODE_DAEMON)) { (global.mode & MODE_DAEMON)) {
/* detach from the tty, this is required to properly daemonize. */ /* detach from the tty, this is required to properly daemonize. */
if ((getenv("HAPROXY_MWORKER_REEXEC") == NULL)) if ((getenv("HAPROXY_MWORKER_REEXEC") == NULL))
stdio_quiet(-1); stdio_quiet(-1);
global.mode &= ~MODE_VERBOSE; global.mode &= ~MODE_VERBOSE;
global.mode |= MODE_QUIET; /* ensure that we won't say anything from now */ global.mode |= MODE_QUIET; /* ensure that we won't say anything from now */
} }
if (global.mode & MODE_MWORKER_WAIT) {
/* only the wait mode handles the master CLI */
mworker_loop();
} else {
#if defined(USE_SYSTEMD)
if (global.tune.options & GTUNE_USE_SYSTEMD)
sd_notifyf(0, "READY=1\nMAINPID=%lu\nSTATUS=Ready.\n", (unsigned long)getpid());
#endif
/* if not in wait mode, restore initial environment (before parsing the config) and
* reload in wait mode to free the memory. The initial process environment should
* be restored here, preceded by clean_env(), which do the same job as clearenv().
* Otherwise, we will lost HAPROXY_LOAD_SUCCESS, HAPROXY_MWORKER_WAIT_ONLY and
* HAPROXY_MWORKER_REEXEC variables, which will be set further.
*/
if (clean_env() != 0)
exit(EXIT_FAILURE);
if (restore_env() != 0)
exit(EXIT_FAILURE);
setenv("HAPROXY_LOAD_SUCCESS", "1", 1); setenv("HAPROXY_LOAD_SUCCESS", "1", 1);
ha_notice("Loading success.\n"); ha_notice("Loading success.\n");
proc_self->failedreloads = 0; /* reset the number of failure */ proc_self->failedreloads = 0; /* reset the number of failure */
mworker_reexec_waitmode(); mworker_loop();
}
/* should never get there */
exit(EXIT_FAILURE);
}
#if defined(USE_OPENSSL) && !defined(OPENSSL_NO_DH) #if defined(USE_OPENSSL) && !defined(OPENSSL_NO_DH)
ssl_free_dh(); ssl_free_dh();
#endif #endif
exit(0); /* parent must leave */
}
/* child must never use the atexit function */
atexit_flag = 0;
/* close useless master sockets */
if (global.mode & MODE_MWORKER) {
struct mworker_proc *child, *it;
master = 0; master = 0;
/* close useless master sockets */
mworker_cli_proxy_stop(); mworker_cli_proxy_stop();
/* free proc struct of other processes */ /* free proc struct of other processes */
@ -3685,17 +3632,12 @@ int main(int argc, char **argv)
close(child->ipc_fd[0]); close(child->ipc_fd[0]);
child->ipc_fd[0] = -1; child->ipc_fd[0] = -1;
} }
if (child->options & PROC_O_TYPE_WORKER &&
child->reloads == 0 &&
child->pid == -1) {
/* keep this struct if this is our pid */
proc_self = child;
continue;
}
LIST_DELETE(&child->list); LIST_DELETE(&child->list);
mworker_free_child(child); mworker_free_child(child);
child = NULL; child = NULL;
} }
/* master must leave */
exit(0);
} }
if (!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE)) { if (!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE)) {
@ -3736,11 +3678,9 @@ int main(int argc, char **argv)
global.mode &= ~MODE_VERBOSE; global.mode &= ~MODE_VERBOSE;
global.mode |= MODE_QUIET; /* ensure that we won't say anything from now */ global.mode |= MODE_QUIET; /* ensure that we won't say anything from now */
} }
pid = getpid(); /* update child's pid */ pid = getpid(); /* update pid */
if (!(global.mode & MODE_MWORKER)) /* in mworker mode we don't want a new pgid for the children */
setsid(); setsid();
fork_poller(); fork_poller();
}
/* pass through every cli socket, and check if it's bound to /* pass through every cli socket, and check if it's bound to
* the current process and if it exposes listeners sockets. * the current process and if it exposes listeners sockets.
@ -3829,6 +3769,10 @@ int main(int argc, char **argv)
/* when multithreading we need to let only the thread 0 handle the signals */ /* when multithreading we need to let only the thread 0 handle the signals */
haproxy_unblock_signals(); haproxy_unblock_signals();
#if defined(USE_SYSTEMD)
if (global.tune.options & GTUNE_USE_SYSTEMD)
sd_notifyf(0, "READY=1\nMAINPID=%lu\nSTATUS=Ready.\n", (unsigned long)getpid());
#endif
/* Finally, start the poll loop for the first thread */ /* Finally, start the poll loop for the first thread */
run_thread_poll_loop(&ha_thread_info[0]); run_thread_poll_loop(&ha_thread_info[0]);