mirror of https://github.com/schoebel/mars
marsadm: new option --parallel
This commit is contained in:
parent
f1d7caedfe
commit
c3f9970029
|
@ -31,6 +31,7 @@ umask 0077;
|
||||||
|
|
||||||
# global defaults
|
# global defaults
|
||||||
|
|
||||||
|
my $parallel = -1;
|
||||||
my $threshold = 10 * 1024 * 1024;
|
my $threshold = 10 * 1024 * 1024;
|
||||||
my $window = 60;
|
my $window = 60;
|
||||||
my $verbose = 0;
|
my $verbose = 0;
|
||||||
|
@ -58,6 +59,7 @@ my $marsadm_var_dir = defined($ENV{MARSADM_VRA_DIR}) ?
|
||||||
|
|
||||||
my $error_count = 0;
|
my $error_count = 0;
|
||||||
my $notify = "";
|
my $notify = "";
|
||||||
|
my $child_prefix = "";
|
||||||
my $logger = "/usr/bin/logger";
|
my $logger = "/usr/bin/logger";
|
||||||
|
|
||||||
sub llog {
|
sub llog {
|
||||||
|
@ -71,14 +73,14 @@ sub llog {
|
||||||
sub lprint {
|
sub lprint {
|
||||||
my ($text) = @_;
|
my ($text) = @_;
|
||||||
$OUTPUT_AUTOFLUSH = 1;
|
$OUTPUT_AUTOFLUSH = 1;
|
||||||
print $text;
|
print $child_prefix . $text;
|
||||||
llog($text);
|
llog($text);
|
||||||
}
|
}
|
||||||
|
|
||||||
sub lprint_stderr {
|
sub lprint_stderr {
|
||||||
my ($text) = @_;
|
my ($text) = @_;
|
||||||
$OUTPUT_AUTOFLUSH = 1;
|
$OUTPUT_AUTOFLUSH = 1;
|
||||||
print STDERR $text;
|
print STDERR $child_prefix . $text;
|
||||||
llog($text);
|
llog($text);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3523,6 +3525,7 @@ sub _create_delete {
|
||||||
|
|
||||||
sub _wait_delete {
|
sub _wait_delete {
|
||||||
return if $dry_run;
|
return if $dry_run;
|
||||||
|
lwarn "Do not run this in --parallel mode\n" if $child_prefix;
|
||||||
for (;;) {
|
for (;;) {
|
||||||
my $deleted = get_link("$mars/todo-global/deleted-$real_host");
|
my $deleted = get_link("$mars/todo-global/deleted-$real_host");
|
||||||
$deleted =~ s/^0+//;
|
$deleted =~ s/^0+//;
|
||||||
|
@ -6628,6 +6631,7 @@ my %cmd_table =
|
||||||
"For a complete local shutdown of the resource, use 'marsadm down'.",
|
"For a complete local shutdown of the resource, use 'marsadm down'.",
|
||||||
\&attach_res_phase0,
|
\&attach_res_phase0,
|
||||||
"check preconditions",
|
"check preconditions",
|
||||||
|
"FORK",
|
||||||
\&attach_res_phase0b,
|
\&attach_res_phase0b,
|
||||||
"wait for systemd device release",
|
"wait for systemd device release",
|
||||||
\&attach_res_phase1,
|
\&attach_res_phase1,
|
||||||
|
@ -6721,6 +6725,7 @@ my %cmd_table =
|
||||||
"Shortcut for detach + pause-sync + pause-fetch + pause-replay.",
|
"Shortcut for detach + pause-sync + pause-fetch + pause-replay.",
|
||||||
\&up_res_phase0,
|
\&up_res_phase0,
|
||||||
"check preconditions",
|
"check preconditions",
|
||||||
|
"FORK",
|
||||||
\&attach_res_phase0b,
|
\&attach_res_phase0b,
|
||||||
"wait for systemd device release",
|
"wait for systemd device release",
|
||||||
\&up_res_phase1,
|
\&up_res_phase1,
|
||||||
|
@ -6756,6 +6761,7 @@ my %cmd_table =
|
||||||
],
|
],
|
||||||
\&primary_phase0,
|
\&primary_phase0,
|
||||||
"check preconditions",
|
"check preconditions",
|
||||||
|
"FORK",
|
||||||
\&primary_phase0b,
|
\&primary_phase0b,
|
||||||
"wait for systemd",
|
"wait for systemd",
|
||||||
\&primary_phase1,
|
\&primary_phase1,
|
||||||
|
@ -6787,6 +6793,7 @@ my %cmd_table =
|
||||||
"Therefore, don't unnecessarily give 'secondary'!",
|
"Therefore, don't unnecessarily give 'secondary'!",
|
||||||
\&primary_phase0,
|
\&primary_phase0,
|
||||||
"check preconditions",
|
"check preconditions",
|
||||||
|
"FORK",
|
||||||
\&primary_phase1,
|
\&primary_phase1,
|
||||||
"leave primary state",
|
"leave primary state",
|
||||||
\&primary_phase1b,
|
\&primary_phase1b,
|
||||||
|
@ -6980,6 +6987,17 @@ marsadm [<global_options>] view[-<macroname>] [<resource_names> | all ]
|
||||||
'delete-resource'.
|
'delete-resource'.
|
||||||
--verbose
|
--verbose
|
||||||
Increase speakyness of some commands.
|
Increase speakyness of some commands.
|
||||||
|
--parallel
|
||||||
|
Only resonable when combined with \"all\".
|
||||||
|
For each resource, fork() a sub-process running independently
|
||||||
|
from other resources. May seepd up handover a lot.
|
||||||
|
However, several cluster managers are known to have problems
|
||||||
|
with a high parallelism degree (up to deadlocks).
|
||||||
|
Only use this after thorough testing in combination with your
|
||||||
|
whole operation stack!
|
||||||
|
--parallel=<number>
|
||||||
|
Like --parallel, but limit the parallelism degree to the given
|
||||||
|
number of parallel processes.
|
||||||
--logger=/path/to/usr/bin/logger
|
--logger=/path/to/usr/bin/logger
|
||||||
Use an alternative syslog messenger.
|
Use an alternative syslog messenger.
|
||||||
When empty, disable syslogging.
|
When empty, disable syslogging.
|
||||||
|
@ -7109,6 +7127,9 @@ foreach my $arg (@ARGV) {
|
||||||
} elsif ($arg =~ m/--thresh-logsize\s*=\s*([0-9]+)/) {
|
} elsif ($arg =~ m/--thresh-logsize\s*=\s*([0-9]+)/) {
|
||||||
$thresh_logsize = $1;
|
$thresh_logsize = $1;
|
||||||
next;
|
next;
|
||||||
|
} elsif ($arg =~ m/--parallel(\s*=\s*(-?[0-9]+)?)?/) {
|
||||||
|
$parallel = defined($2) ? $2 : 0;
|
||||||
|
next;
|
||||||
} elsif ($arg =~ s/--verbose\s*=\s*(-?[0-9]+)/$1/) {
|
} elsif ($arg =~ s/--verbose\s*=\s*(-?[0-9]+)/$1/) {
|
||||||
$verbose = $arg;
|
$verbose = $arg;
|
||||||
next;
|
next;
|
||||||
|
@ -7276,16 +7297,22 @@ sub do_one_res {
|
||||||
|
|
||||||
my %skip_res;
|
my %skip_res;
|
||||||
|
|
||||||
|
sub expand_res_list {
|
||||||
|
my ($cmd, $res) = @_;
|
||||||
|
my @res_list=();
|
||||||
|
if ($res eq "all" && $cmd !~ m/show|cat|cluster|set-link|delete-file/) {
|
||||||
|
@res_list = get_member_resources();
|
||||||
|
} elsif ($res =~ m/,/) {
|
||||||
|
@res_list = split(",", $res);
|
||||||
|
}
|
||||||
|
return sort alphanum_cmp @res_list;
|
||||||
|
}
|
||||||
|
|
||||||
sub do_all_res {
|
sub do_all_res {
|
||||||
my $func = shift;
|
my $func = shift;
|
||||||
my $cmd = shift;
|
my $cmd = shift;
|
||||||
my $res = shift;
|
my $res = shift;
|
||||||
my @res_list=();
|
my @res_list = expand_res_list($cmd, $res);
|
||||||
if ($res eq "all" && $cmd !~ m/show|cat|cluster|set-link|delete-file/) {
|
|
||||||
@res_list = glob("$mars/resource-*");
|
|
||||||
} elsif ($res =~ m/,/) {
|
|
||||||
@res_list = split(",", $res);
|
|
||||||
}
|
|
||||||
if (@res_list) {
|
if (@res_list) {
|
||||||
ldie "Cannot combine command '$cmd' with 'all' existing resources - you must explicitly name a single new resource\n" if $cmd =~ m/create|join/;
|
ldie "Cannot combine command '$cmd' with 'all' existing resources - you must explicitly name a single new resource\n" if $cmd =~ m/create|join/;
|
||||||
my $any_success = 0;
|
my $any_success = 0;
|
||||||
|
@ -7293,8 +7320,8 @@ sub do_all_res {
|
||||||
my @total_list = glob("$mars/ips/ip-*");
|
my @total_list = glob("$mars/ips/ip-*");
|
||||||
my $total_count = scalar(@total_list);
|
my $total_count = scalar(@total_list);
|
||||||
call_hook(!$force, "all-pre", $cmd, "all", @_);
|
call_hook(!$force, "all-pre", $cmd, "all", @_);
|
||||||
foreach $res (sort alphanum_cmp @res_list) {
|
foreach $res (@res_list) {
|
||||||
next unless -e "$res/data-$host";
|
next unless -e "$mars/resource-$res/data-$host";
|
||||||
$any_member++;
|
$any_member++;
|
||||||
$res =~ s/^.*\/resource-(.*)$/$1/;
|
$res =~ s/^.*\/resource-(.*)$/$1/;
|
||||||
next if defined($skip_res{$res});
|
next if defined($skip_res{$res});
|
||||||
|
@ -7342,10 +7369,88 @@ if (!$ip) {
|
||||||
my $func = $cmd_table{$cmd};
|
my $func = $cmd_table{$cmd};
|
||||||
ldie "unknown command '$cmd'\n" unless $func;
|
ldie "unknown command '$cmd'\n" unless $func;
|
||||||
|
|
||||||
|
my %kid_res;
|
||||||
|
|
||||||
|
sub wait_pid_list {
|
||||||
|
my @pid_list = @_;
|
||||||
|
foreach my $pid (@pid_list) {
|
||||||
|
my $check_pid = waitpid($pid, 0);
|
||||||
|
my $status = $?;
|
||||||
|
my $sub_res = $kid_res{$pid};
|
||||||
|
if ($status > 0) {
|
||||||
|
lwarn "RESOURCE $sub_res CHILD $pid terminated with status=$status\n";
|
||||||
|
$error_count += $status;
|
||||||
|
} elsif ($check_pid == $pid) {
|
||||||
|
lprint "RESOURCE $sub_res CHILD $pid terminated successfully\n";
|
||||||
|
} else {
|
||||||
|
lwarn "RESOURCE $sub_res CHILD $pid terminated with unknown state\n";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sub wait_any_pid {
|
||||||
|
my ($flags) = @_;
|
||||||
|
lprint "WAITING for termination of a child...\n";
|
||||||
|
my $pid = waitpid(-1, $flags);
|
||||||
|
my $status = $?;
|
||||||
|
if ($pid > 0) {
|
||||||
|
my $sub_res = $kid_res{$pid};
|
||||||
|
if ($status != 0) {
|
||||||
|
lwarn "RESOURCE $sub_res CHILD $pid terminated with status=$status\n";
|
||||||
|
$error_count += abs($status);
|
||||||
|
} else {
|
||||||
|
lprint "RESOURCE $sub_res CHILD $pid terminated successfully\n";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return $pid;
|
||||||
|
}
|
||||||
|
|
||||||
if (ref($func) eq "ARRAY") {
|
if (ref($func) eq "ARRAY") {
|
||||||
my @list = @$func;
|
my @list = @$func;
|
||||||
while (@list) {
|
while (@list) {
|
||||||
my $memb_func = shift @list;
|
my $memb_func = shift @list;
|
||||||
|
# check whether fork() is possible
|
||||||
|
if ($memb_func && $memb_func eq "FORK") {
|
||||||
|
$memb_func = shift @list;
|
||||||
|
# check whether fork() is requested
|
||||||
|
if ($parallel >= 0) {
|
||||||
|
my @res_list = expand_res_list($cmd, $res);
|
||||||
|
my $child_count = 0;
|
||||||
|
# only fork() when beneficial
|
||||||
|
if (@res_list && scalar(@res_list) > 1) {
|
||||||
|
foreach my $child_res (@res_list) {
|
||||||
|
# when necessary, limit the parallelism degree
|
||||||
|
if ($parallel > 0 && $child_count >= $parallel) {
|
||||||
|
my $done_pid = wait_any_pid(0);
|
||||||
|
if ($done_pid > 0) {
|
||||||
|
$child_count--;
|
||||||
|
delete $kid_res{$done_pid};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
my $pid = fork();
|
||||||
|
ldie "Cannot fork()\n" unless defined($pid);
|
||||||
|
if ($pid) {
|
||||||
|
lprint "RESOURCE $child_res starting CHILD $pid\n";
|
||||||
|
$child_count++;
|
||||||
|
$kid_res{$pid} = $child_res;
|
||||||
|
} else {
|
||||||
|
# child: simply continue with new $res
|
||||||
|
$res = $child_res;
|
||||||
|
$child_count = 0;
|
||||||
|
$child_prefix = "CHILD $child_res: ";
|
||||||
|
lprint "STARTING\n";
|
||||||
|
last;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if ($child_count) {
|
||||||
|
my @wait_list = sort alphanum_cmp keys(%kid_res);
|
||||||
|
wait_pid_list(@wait_list);
|
||||||
|
lprint "EXIT $error_count\n";
|
||||||
|
exit($error_count);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
# nested arrays may be used for _global_ workers
|
# nested arrays may be used for _global_ workers
|
||||||
if (ref($memb_func) eq "ARRAY") {
|
if (ref($memb_func) eq "ARRAY") {
|
||||||
my @sub_list = @$memb_func;
|
my @sub_list = @$memb_func;
|
||||||
|
@ -7369,4 +7474,6 @@ if (ref($func) eq "ARRAY") {
|
||||||
|
|
||||||
finish_links();
|
finish_links();
|
||||||
|
|
||||||
|
lprint "EXIT $error_count\n" if $child_prefix;
|
||||||
|
|
||||||
exit($error_count);
|
exit($error_count);
|
||||||
|
|
Loading…
Reference in New Issue