diff --git a/qa/workunits/cephadm/test_cephadm.sh b/qa/workunits/cephadm/test_cephadm.sh index 7fde32796d6..2233610b862 100755 --- a/qa/workunits/cephadm/test_cephadm.sh +++ b/qa/workunits/cephadm/test_cephadm.sh @@ -240,6 +240,8 @@ $CEPHADM unit --fsid $FSID --name mon.a -- is-enabled ## shell $CEPHADM shell --fsid $FSID -- true $CEPHADM shell --fsid $FSID -- test -d /var/log/ceph +expect_false $CEPHADM --timeout 1 shell --fsid $FSID -- sleep 10 +$CEPHADM --timeout 10 shell --fsid $FSID -- sleep 1 ## enter expect_false $CEPHADM enter @@ -248,6 +250,14 @@ $CEPHADM enter --fsid $FSID --name mgr.x -- test -d /var/lib/ceph/mgr/ceph-x $CEPHADM enter --fsid $FSID --name mon.a -- pidof ceph-mon expect_false $CEPHADM enter --fsid $FSID --name mgr.x -- pidof ceph-mon $CEPHADM enter --fsid $FSID --name mgr.x -- pidof ceph-mgr +expect_false $CEPHADM --timeout 1 enter --fsid $FSID --name mon.a -- sleep 10 +$CEPHADM --timeout 10 enter --fsid $FSID --name mon.a -- sleep 1 + +## logs +expect_false $CEPHADM logs +expect_false $CEPHADM logs --fsid $FSID --name mon.z +$CEPHADM logs --fsid $FSID --name mon.a +expect_false $CEPHADM --timeout 1 logs --fsid $FSID --name mon.a -f ## ceph-volume $CEPHADM ceph-volume --fsid $FSID -- inventory --format=json \ diff --git a/src/cephadm/cephadm b/src/cephadm/cephadm index 985e70a3863..74bb946b1da 100755 --- a/src/cephadm/cephadm +++ b/src/cephadm/cephadm @@ -10,6 +10,7 @@ LOG_DIR_MODE=0o770 DATA_DIR_MODE=0o700 CONTAINER_PREFERENCE = ['podman', 'docker'] # prefer podman to docker CUSTOM_PS1=r'[ceph: \u@\h \W]\$ ' +DEFAULT_TIMEOUT=None # in seconds """ You can invoke cephadm in two ways: @@ -57,6 +58,7 @@ import uuid from distutils.spawn import find_executable from functools import wraps from glob import glob +from threading import Thread if sys.version_info >= (3, 0): from io import StringIO @@ -73,6 +75,9 @@ container_path = None class Error(Exception): pass +class TimeoutExpired(Error): + pass + ################################## @@ -367,7 +372,12 @@ class FileLock(object): ################################## # Popen wrappers, lifted from ceph-volume -def call(command, desc=None, verbose=False, **kwargs): +def call(command, + desc=None, + verbose=False, + verbose_on_failure=True, + timeout=DEFAULT_TIMEOUT, + **kwargs): """ Wrap subprocess.Popen to @@ -379,10 +389,11 @@ def call(command, desc=None, verbose=False, **kwargs): :param verbose_on_failure: On a non-zero exit status, it will forcefully set logging ON for the terminal + :param timeout: timeout in seconds """ if not desc: desc = command[0] - verbose_on_failure = kwargs.pop('verbose_on_failure', True) + timeout = timeout or args.timeout logger.debug("Running command: %s" % ' '.join(command)) process = subprocess.Popen( @@ -404,7 +415,15 @@ def call(command, desc=None, verbose=False, **kwargs): stop = False out_buffer = '' # partial line (no newline yet) err_buffer = '' # partial line (no newline yet) + start_time = time.time() + end_time = None + if timeout: + end_time = start_time + timeout while not stop: + if end_time and (time.time() >= end_time): + logger.info(desc + ':timeout after %s seconds' % timeout) + stop = True + process.kill() if reads and process.poll() is not None: # we want to stop, but first read off anything remaining # on stdout/stderr @@ -412,7 +431,7 @@ def call(command, desc=None, verbose=False, **kwargs): else: reads, _, _ = select.select( [process.stdout.fileno(), process.stderr.fileno()], - [], [] + [], [], timeout ) for fd in reads: try: @@ -467,14 +486,81 @@ def call(command, desc=None, verbose=False, **kwargs): return out, err, returncode + def call_throws(command, **kwargs): out, err, ret = call(command, **kwargs) if ret: raise RuntimeError('Failed command: %s' % ' '.join(command)) return out, err, ret + +def call_timeout(command, timeout): + #type (List[str], int) -> int + + logger.debug('Running command (timeout=%s): %s' + % (timeout, ' '.join(command))) + + def raise_timeout(command, timeout): + msg = 'Command \'%s\' timed out after %s seconds' % (command, timeout) + logger.debug(msg) + raise TimeoutExpired(msg) + + def call_timeout_py2(command, timeout): + #type (List[str], int) -> int + proc = subprocess.Popen(command) + thread = Thread(target=proc.wait) + thread.start() + thread.join(timeout) + if thread.is_alive(): + proc.kill() + thread.join() + raise_timeout(command, timeout) + return proc.returncode + + def call_timeout_py3(command, timeout): + #type (List[str], int) -> int + try: + return subprocess.call(command, timeout=timeout) + except subprocess.TimeoutExpired as e: + raise_timeout(command, timeout) + + ret = 1 + if sys.version_info >= (3, 3): + ret = call_timeout_py3(command, timeout) + else: + # py2 subprocess has no timeout arg + ret = call_timeout_py2(command, timeout) + return ret + ################################## +def is_available(what, func, retry_max=5): + # type (str, func, Optional[int]) -> func + """ + Wait for a service to become available + + :param what: the name of the service + :param func: the callable object that determines availability + :param retry_max: max number of retry invocations of func + """ + @wraps(func) + def func_wrapper(*args, **kwargs): + logger.info('Waiting for %s...' % (what)) + retry_num = 1 + while True: + if func(*args, **kwargs): + break + elif retry_num > retry_max: + raise Error('%s not available after %s tries' + % (what, retry_max)) + + logger.info('%s not available, waiting (%s/%s)...' + % (what, retry_num, retry_max)) + + retry_num += 1 + time.sleep(1) + return func_wrapper + def read_config(fn): # type: (Optional[str]) -> ConfigParser # bend over backwards here because py2's ConfigParser doesn't like @@ -1418,13 +1504,13 @@ class CephContainer: self.cname, ] + cmd - def run(self): - # type: () -> str + def run(self, timeout=DEFAULT_TIMEOUT): + # type: (Optional[int]) -> str logger.debug(self.run_cmd()) - out, _, _ = call_throws(self.run_cmd(), desc=self.entrypoint) + out, _, _ = call_throws( + self.run_cmd(), desc=self.entrypoint, timeout=timeout) return out - ################################## def command_version(): @@ -1628,8 +1714,8 @@ def command_bootstrap(): tmp_config = write_tmp(config, uid, gid) # a CLI helper to reduce our typing - def cli(cmd, extra_mounts={}): - # type: (List[str], Dict[str, str]) -> str + def cli(cmd, extra_mounts={}, timeout=DEFAULT_TIMEOUT): + # type: (List[str], Dict[str, str], Optional[int]) -> str mounts = { log_dir: '/var/log/ceph:z', tmp_admin_keyring.name: '/etc/ceph/ceph.client.admin.keyring:z', @@ -1637,31 +1723,30 @@ def command_bootstrap(): } for k, v in extra_mounts.items(): mounts[k] = v + timeout = timeout or args.timeout return CephContainer( image=args.image, entrypoint='/usr/bin/ceph', args=cmd, volume_mounts=mounts, - ).run() + ).run(timeout=timeout) logger.info('Waiting for mon to start...') - while True: - c = CephContainer( - image=args.image, - entrypoint='/usr/bin/ceph', - args=[ - 'status'], - volume_mounts={ - mon_dir: '/var/lib/ceph/mon/ceph-%s:z' % (mon_id), - tmp_admin_keyring.name: '/etc/ceph/ceph.client.admin.keyring:z', - tmp_config.name: '/etc/ceph/ceph.conf:z', - }, - ) - out, err, ret = call(c.run_cmd(), c.entrypoint) - if ret == 0: - break - logger.info('mon is still not available yet, waiting...') - time.sleep(1) + c = CephContainer( + image=args.image, + entrypoint='/usr/bin/ceph', + args=[ + 'status'], + volume_mounts={ + mon_dir: '/var/lib/ceph/mon/ceph-%s:z' % (mon_id), + tmp_admin_keyring.name: '/etc/ceph/ceph.client.admin.keyring:z', + tmp_config.name: '/etc/ceph/ceph.conf:z', + }, + ) + def is_mon_available(): + out, err, ret = call(c.run_cmd(), desc=c.entrypoint, timeout=30) + return ret == 0 + is_available('mon', is_mon_available)() # assimilate and minimize config if not args.no_minimize_config: @@ -1712,13 +1797,11 @@ def command_bootstrap(): logger.info('Wrote config to %s' % args.output_config) logger.info('Waiting for mgr to start...') - while True: - out = cli(['status', '-f', 'json-pretty']) + def is_mgr_available(): + out = cli(['status', '-f', 'json-pretty'], timeout=30) j = json.loads(out) - if j.get('mgrmap', {}).get('available', False): - break - logger.info('mgr is still not available yet, waiting...') - time.sleep(1) + return j.get('mgrmap', {}).get('available', False) + is_available('mgr', is_mgr_available)() # ssh if not args.skip_ssh: @@ -1760,16 +1843,16 @@ def command_bootstrap(): if not args.skip_dashboard: logger.info('Enabling the dashboard module...') cli(['mgr', 'module', 'enable', 'dashboard']) - logger.info('Waiting for the module to be available...') - # FIXME: potential for an endless loop? - while True: - c_out = cli(['-h']) - if 'dashboard' in c_out: - break - logger.info('Dashboard not yet available, waiting...') - time.sleep(1) + + logger.info('Waiting for the dashboard to start...') + def is_dashboard_available(): + out = cli(['-h'], timeout=30) + return 'dashboard' in out + is_available('Dashboard', is_dashboard_available)() + logger.info('Generating a dashboard self-signed certificate...') cli(['dashboard', 'create-self-signed-cert']) + logger.info('Creating initial admin user...') password = args.initial_dashboard_password or generate_password() cli(['dashboard', 'ac-user-create', @@ -1779,6 +1862,7 @@ def command_bootstrap(): logger.info('Fetching dashboard port number...') out = cli(['config', 'get', 'mgr', 'mgr/dashboard/ssl_server_port']) port = int(out) + logger.info('Ceph Dashboard is now available at:\n\n' '\t URL: https://%s:%s/\n' '\t User: %s\n' @@ -1873,8 +1957,7 @@ def command_run(): (daemon_type, daemon_id) = args.name.split('.', 1) c = get_container(args.fsid, daemon_type, daemon_id) command = c.run_cmd() - logger.debug("Running command: %s" % ' '.join(command)) - return subprocess.call(command) + return call_timeout(command, args.timeout) ################################## @@ -1908,6 +1991,7 @@ def command_shell(): '-e', 'LANG=C', '-e', "PS1=%s" % CUSTOM_PS1, ] + c = CephContainer( image=args.image, entrypoint='doesnotmatter', @@ -1915,8 +1999,8 @@ def command_shell(): container_args=container_args, volume_mounts=mounts) command = c.shell_cmd(command) - logger.debug("Running command: %s" % ' '.join(command)) - return subprocess.call(command) + + return call_timeout(command, args.timeout) ################################## @@ -1939,8 +2023,7 @@ def command_enter(): c = get_container(args.fsid, daemon_type, daemon_id, container_args=container_args) command = c.exec_cmd(command) - logger.debug("Running command: %s" % ' '.join(command)) - return subprocess.call(command) + return call_timeout(command, args.timeout) ################################## @@ -1999,7 +2082,7 @@ def command_unit(): @infer_fsid def command_logs(): - # type: () -> None + # type: () -> int if not args.fsid: raise Error('must pass --fsid to specify cluster') cmd = [str(container_path), 'logs'] # type: List[str] @@ -2011,8 +2094,7 @@ def command_logs(): # call this directly, without our wrapper, so that we get an unmolested # stdout with logger prefixing. - logger.debug("Running command: %s" % ' '.join(cmd)) - subprocess.call(cmd) # type: ignore + return call_timeout(cmd, args.timeout) ################################## @@ -2378,6 +2460,12 @@ def _get_parser(): '--verbose', '-v', action='store_true', help='Show debug-level log messages') + parser.add_argument( + '--timeout', + type=int, + default=DEFAULT_TIMEOUT, + help='timeout in seconds') + subparsers = parser.add_subparsers(help='sub-command') parser_version = subparsers.add_parser(