mirror of
https://github.com/ceph/ceph
synced 2025-01-04 02:02:36 +00:00
Merge pull request #32049 from mgfritch/wip-cd-timeout
cephadm: add ability to specify a timeout Reviewed-by: Sebastian Wagner <sebastian.wagner@suse.com>
This commit is contained in:
commit
bbd18f1986
@ -240,6 +240,8 @@ $CEPHADM unit --fsid $FSID --name mon.a -- is-enabled
|
||||
## shell
|
||||
$CEPHADM shell --fsid $FSID -- true
|
||||
$CEPHADM shell --fsid $FSID -- test -d /var/log/ceph
|
||||
expect_false $CEPHADM --timeout 1 shell --fsid $FSID -- sleep 10
|
||||
$CEPHADM --timeout 10 shell --fsid $FSID -- sleep 1
|
||||
|
||||
## enter
|
||||
expect_false $CEPHADM enter
|
||||
@ -248,6 +250,14 @@ $CEPHADM enter --fsid $FSID --name mgr.x -- test -d /var/lib/ceph/mgr/ceph-x
|
||||
$CEPHADM enter --fsid $FSID --name mon.a -- pidof ceph-mon
|
||||
expect_false $CEPHADM enter --fsid $FSID --name mgr.x -- pidof ceph-mon
|
||||
$CEPHADM enter --fsid $FSID --name mgr.x -- pidof ceph-mgr
|
||||
expect_false $CEPHADM --timeout 1 enter --fsid $FSID --name mon.a -- sleep 10
|
||||
$CEPHADM --timeout 10 enter --fsid $FSID --name mon.a -- sleep 1
|
||||
|
||||
## logs
|
||||
expect_false $CEPHADM logs
|
||||
expect_false $CEPHADM logs --fsid $FSID --name mon.z
|
||||
$CEPHADM logs --fsid $FSID --name mon.a
|
||||
expect_false $CEPHADM --timeout 1 logs --fsid $FSID --name mon.a -f
|
||||
|
||||
## ceph-volume
|
||||
$CEPHADM ceph-volume --fsid $FSID -- inventory --format=json \
|
||||
|
@ -10,6 +10,7 @@ LOG_DIR_MODE=0o770
|
||||
DATA_DIR_MODE=0o700
|
||||
CONTAINER_PREFERENCE = ['podman', 'docker'] # prefer podman to docker
|
||||
CUSTOM_PS1=r'[ceph: \u@\h \W]\$ '
|
||||
DEFAULT_TIMEOUT=None # in seconds
|
||||
|
||||
"""
|
||||
You can invoke cephadm in two ways:
|
||||
@ -57,6 +58,7 @@ import uuid
|
||||
from distutils.spawn import find_executable
|
||||
from functools import wraps
|
||||
from glob import glob
|
||||
from threading import Thread
|
||||
|
||||
if sys.version_info >= (3, 0):
|
||||
from io import StringIO
|
||||
@ -73,6 +75,9 @@ container_path = None
|
||||
class Error(Exception):
|
||||
pass
|
||||
|
||||
class TimeoutExpired(Error):
|
||||
pass
|
||||
|
||||
##################################
|
||||
|
||||
|
||||
@ -367,7 +372,12 @@ class FileLock(object):
|
||||
##################################
|
||||
# Popen wrappers, lifted from ceph-volume
|
||||
|
||||
def call(command, desc=None, verbose=False, **kwargs):
|
||||
def call(command,
|
||||
desc=None,
|
||||
verbose=False,
|
||||
verbose_on_failure=True,
|
||||
timeout=DEFAULT_TIMEOUT,
|
||||
**kwargs):
|
||||
"""
|
||||
Wrap subprocess.Popen to
|
||||
|
||||
@ -379,10 +389,11 @@ def call(command, desc=None, verbose=False, **kwargs):
|
||||
|
||||
:param verbose_on_failure: On a non-zero exit status, it will forcefully set
|
||||
logging ON for the terminal
|
||||
:param timeout: timeout in seconds
|
||||
"""
|
||||
if not desc:
|
||||
desc = command[0]
|
||||
verbose_on_failure = kwargs.pop('verbose_on_failure', True)
|
||||
timeout = timeout or args.timeout
|
||||
|
||||
logger.debug("Running command: %s" % ' '.join(command))
|
||||
process = subprocess.Popen(
|
||||
@ -404,7 +415,15 @@ def call(command, desc=None, verbose=False, **kwargs):
|
||||
stop = False
|
||||
out_buffer = '' # partial line (no newline yet)
|
||||
err_buffer = '' # partial line (no newline yet)
|
||||
start_time = time.time()
|
||||
end_time = None
|
||||
if timeout:
|
||||
end_time = start_time + timeout
|
||||
while not stop:
|
||||
if end_time and (time.time() >= end_time):
|
||||
logger.info(desc + ':timeout after %s seconds' % timeout)
|
||||
stop = True
|
||||
process.kill()
|
||||
if reads and process.poll() is not None:
|
||||
# we want to stop, but first read off anything remaining
|
||||
# on stdout/stderr
|
||||
@ -412,7 +431,7 @@ def call(command, desc=None, verbose=False, **kwargs):
|
||||
else:
|
||||
reads, _, _ = select.select(
|
||||
[process.stdout.fileno(), process.stderr.fileno()],
|
||||
[], []
|
||||
[], [], timeout
|
||||
)
|
||||
for fd in reads:
|
||||
try:
|
||||
@ -467,14 +486,81 @@ def call(command, desc=None, verbose=False, **kwargs):
|
||||
|
||||
return out, err, returncode
|
||||
|
||||
|
||||
def call_throws(command, **kwargs):
|
||||
out, err, ret = call(command, **kwargs)
|
||||
if ret:
|
||||
raise RuntimeError('Failed command: %s' % ' '.join(command))
|
||||
return out, err, ret
|
||||
|
||||
|
||||
def call_timeout(command, timeout):
|
||||
#type (List[str], int) -> int
|
||||
|
||||
logger.debug('Running command (timeout=%s): %s'
|
||||
% (timeout, ' '.join(command)))
|
||||
|
||||
def raise_timeout(command, timeout):
|
||||
msg = 'Command \'%s\' timed out after %s seconds' % (command, timeout)
|
||||
logger.debug(msg)
|
||||
raise TimeoutExpired(msg)
|
||||
|
||||
def call_timeout_py2(command, timeout):
|
||||
#type (List[str], int) -> int
|
||||
proc = subprocess.Popen(command)
|
||||
thread = Thread(target=proc.wait)
|
||||
thread.start()
|
||||
thread.join(timeout)
|
||||
if thread.is_alive():
|
||||
proc.kill()
|
||||
thread.join()
|
||||
raise_timeout(command, timeout)
|
||||
return proc.returncode
|
||||
|
||||
def call_timeout_py3(command, timeout):
|
||||
#type (List[str], int) -> int
|
||||
try:
|
||||
return subprocess.call(command, timeout=timeout)
|
||||
except subprocess.TimeoutExpired as e:
|
||||
raise_timeout(command, timeout)
|
||||
|
||||
ret = 1
|
||||
if sys.version_info >= (3, 3):
|
||||
ret = call_timeout_py3(command, timeout)
|
||||
else:
|
||||
# py2 subprocess has no timeout arg
|
||||
ret = call_timeout_py2(command, timeout)
|
||||
return ret
|
||||
|
||||
##################################
|
||||
|
||||
def is_available(what, func, retry_max=5):
|
||||
# type (str, func, Optional[int]) -> func
|
||||
"""
|
||||
Wait for a service to become available
|
||||
|
||||
:param what: the name of the service
|
||||
:param func: the callable object that determines availability
|
||||
:param retry_max: max number of retry invocations of func
|
||||
"""
|
||||
@wraps(func)
|
||||
def func_wrapper(*args, **kwargs):
|
||||
logger.info('Waiting for %s...' % (what))
|
||||
retry_num = 1
|
||||
while True:
|
||||
if func(*args, **kwargs):
|
||||
break
|
||||
elif retry_num > retry_max:
|
||||
raise Error('%s not available after %s tries'
|
||||
% (what, retry_max))
|
||||
|
||||
logger.info('%s not available, waiting (%s/%s)...'
|
||||
% (what, retry_num, retry_max))
|
||||
|
||||
retry_num += 1
|
||||
time.sleep(1)
|
||||
return func_wrapper
|
||||
|
||||
def read_config(fn):
|
||||
# type: (Optional[str]) -> ConfigParser
|
||||
# bend over backwards here because py2's ConfigParser doesn't like
|
||||
@ -1418,13 +1504,13 @@ class CephContainer:
|
||||
self.cname,
|
||||
] + cmd
|
||||
|
||||
def run(self):
|
||||
# type: () -> str
|
||||
def run(self, timeout=DEFAULT_TIMEOUT):
|
||||
# type: (Optional[int]) -> str
|
||||
logger.debug(self.run_cmd())
|
||||
out, _, _ = call_throws(self.run_cmd(), desc=self.entrypoint)
|
||||
out, _, _ = call_throws(
|
||||
self.run_cmd(), desc=self.entrypoint, timeout=timeout)
|
||||
return out
|
||||
|
||||
|
||||
##################################
|
||||
|
||||
def command_version():
|
||||
@ -1628,8 +1714,8 @@ def command_bootstrap():
|
||||
tmp_config = write_tmp(config, uid, gid)
|
||||
|
||||
# a CLI helper to reduce our typing
|
||||
def cli(cmd, extra_mounts={}):
|
||||
# type: (List[str], Dict[str, str]) -> str
|
||||
def cli(cmd, extra_mounts={}, timeout=DEFAULT_TIMEOUT):
|
||||
# type: (List[str], Dict[str, str], Optional[int]) -> str
|
||||
mounts = {
|
||||
log_dir: '/var/log/ceph:z',
|
||||
tmp_admin_keyring.name: '/etc/ceph/ceph.client.admin.keyring:z',
|
||||
@ -1637,31 +1723,30 @@ def command_bootstrap():
|
||||
}
|
||||
for k, v in extra_mounts.items():
|
||||
mounts[k] = v
|
||||
timeout = timeout or args.timeout
|
||||
return CephContainer(
|
||||
image=args.image,
|
||||
entrypoint='/usr/bin/ceph',
|
||||
args=cmd,
|
||||
volume_mounts=mounts,
|
||||
).run()
|
||||
).run(timeout=timeout)
|
||||
|
||||
logger.info('Waiting for mon to start...')
|
||||
while True:
|
||||
c = CephContainer(
|
||||
image=args.image,
|
||||
entrypoint='/usr/bin/ceph',
|
||||
args=[
|
||||
'status'],
|
||||
volume_mounts={
|
||||
mon_dir: '/var/lib/ceph/mon/ceph-%s:z' % (mon_id),
|
||||
tmp_admin_keyring.name: '/etc/ceph/ceph.client.admin.keyring:z',
|
||||
tmp_config.name: '/etc/ceph/ceph.conf:z',
|
||||
},
|
||||
)
|
||||
out, err, ret = call(c.run_cmd(), c.entrypoint)
|
||||
if ret == 0:
|
||||
break
|
||||
logger.info('mon is still not available yet, waiting...')
|
||||
time.sleep(1)
|
||||
c = CephContainer(
|
||||
image=args.image,
|
||||
entrypoint='/usr/bin/ceph',
|
||||
args=[
|
||||
'status'],
|
||||
volume_mounts={
|
||||
mon_dir: '/var/lib/ceph/mon/ceph-%s:z' % (mon_id),
|
||||
tmp_admin_keyring.name: '/etc/ceph/ceph.client.admin.keyring:z',
|
||||
tmp_config.name: '/etc/ceph/ceph.conf:z',
|
||||
},
|
||||
)
|
||||
def is_mon_available():
|
||||
out, err, ret = call(c.run_cmd(), desc=c.entrypoint, timeout=30)
|
||||
return ret == 0
|
||||
is_available('mon', is_mon_available)()
|
||||
|
||||
# assimilate and minimize config
|
||||
if not args.no_minimize_config:
|
||||
@ -1712,13 +1797,11 @@ def command_bootstrap():
|
||||
logger.info('Wrote config to %s' % args.output_config)
|
||||
|
||||
logger.info('Waiting for mgr to start...')
|
||||
while True:
|
||||
out = cli(['status', '-f', 'json-pretty'])
|
||||
def is_mgr_available():
|
||||
out = cli(['status', '-f', 'json-pretty'], timeout=30)
|
||||
j = json.loads(out)
|
||||
if j.get('mgrmap', {}).get('available', False):
|
||||
break
|
||||
logger.info('mgr is still not available yet, waiting...')
|
||||
time.sleep(1)
|
||||
return j.get('mgrmap', {}).get('available', False)
|
||||
is_available('mgr', is_mgr_available)()
|
||||
|
||||
# ssh
|
||||
if not args.skip_ssh:
|
||||
@ -1760,16 +1843,16 @@ def command_bootstrap():
|
||||
if not args.skip_dashboard:
|
||||
logger.info('Enabling the dashboard module...')
|
||||
cli(['mgr', 'module', 'enable', 'dashboard'])
|
||||
logger.info('Waiting for the module to be available...')
|
||||
# FIXME: potential for an endless loop?
|
||||
while True:
|
||||
c_out = cli(['-h'])
|
||||
if 'dashboard' in c_out:
|
||||
break
|
||||
logger.info('Dashboard not yet available, waiting...')
|
||||
time.sleep(1)
|
||||
|
||||
logger.info('Waiting for the dashboard to start...')
|
||||
def is_dashboard_available():
|
||||
out = cli(['-h'], timeout=30)
|
||||
return 'dashboard' in out
|
||||
is_available('Dashboard', is_dashboard_available)()
|
||||
|
||||
logger.info('Generating a dashboard self-signed certificate...')
|
||||
cli(['dashboard', 'create-self-signed-cert'])
|
||||
|
||||
logger.info('Creating initial admin user...')
|
||||
password = args.initial_dashboard_password or generate_password()
|
||||
cli(['dashboard', 'ac-user-create',
|
||||
@ -1779,6 +1862,7 @@ def command_bootstrap():
|
||||
logger.info('Fetching dashboard port number...')
|
||||
out = cli(['config', 'get', 'mgr', 'mgr/dashboard/ssl_server_port'])
|
||||
port = int(out)
|
||||
|
||||
logger.info('Ceph Dashboard is now available at:\n\n'
|
||||
'\t URL: https://%s:%s/\n'
|
||||
'\t User: %s\n'
|
||||
@ -1873,8 +1957,7 @@ def command_run():
|
||||
(daemon_type, daemon_id) = args.name.split('.', 1)
|
||||
c = get_container(args.fsid, daemon_type, daemon_id)
|
||||
command = c.run_cmd()
|
||||
logger.debug("Running command: %s" % ' '.join(command))
|
||||
return subprocess.call(command)
|
||||
return call_timeout(command, args.timeout)
|
||||
|
||||
##################################
|
||||
|
||||
@ -1908,6 +1991,7 @@ def command_shell():
|
||||
'-e', 'LANG=C',
|
||||
'-e', "PS1=%s" % CUSTOM_PS1,
|
||||
]
|
||||
|
||||
c = CephContainer(
|
||||
image=args.image,
|
||||
entrypoint='doesnotmatter',
|
||||
@ -1915,8 +1999,8 @@ def command_shell():
|
||||
container_args=container_args,
|
||||
volume_mounts=mounts)
|
||||
command = c.shell_cmd(command)
|
||||
logger.debug("Running command: %s" % ' '.join(command))
|
||||
return subprocess.call(command)
|
||||
|
||||
return call_timeout(command, args.timeout)
|
||||
|
||||
##################################
|
||||
|
||||
@ -1939,8 +2023,7 @@ def command_enter():
|
||||
c = get_container(args.fsid, daemon_type, daemon_id,
|
||||
container_args=container_args)
|
||||
command = c.exec_cmd(command)
|
||||
logger.debug("Running command: %s" % ' '.join(command))
|
||||
return subprocess.call(command)
|
||||
return call_timeout(command, args.timeout)
|
||||
|
||||
##################################
|
||||
|
||||
@ -1999,7 +2082,7 @@ def command_unit():
|
||||
|
||||
@infer_fsid
|
||||
def command_logs():
|
||||
# type: () -> None
|
||||
# type: () -> int
|
||||
if not args.fsid:
|
||||
raise Error('must pass --fsid to specify cluster')
|
||||
cmd = [str(container_path), 'logs'] # type: List[str]
|
||||
@ -2011,8 +2094,7 @@ def command_logs():
|
||||
|
||||
# call this directly, without our wrapper, so that we get an unmolested
|
||||
# stdout with logger prefixing.
|
||||
logger.debug("Running command: %s" % ' '.join(cmd))
|
||||
subprocess.call(cmd) # type: ignore
|
||||
return call_timeout(cmd, args.timeout)
|
||||
|
||||
##################################
|
||||
|
||||
@ -2378,6 +2460,12 @@ def _get_parser():
|
||||
'--verbose', '-v',
|
||||
action='store_true',
|
||||
help='Show debug-level log messages')
|
||||
parser.add_argument(
|
||||
'--timeout',
|
||||
type=int,
|
||||
default=DEFAULT_TIMEOUT,
|
||||
help='timeout in seconds')
|
||||
|
||||
subparsers = parser.add_subparsers(help='sub-command')
|
||||
|
||||
parser_version = subparsers.add_parser(
|
||||
|
Loading…
Reference in New Issue
Block a user