qa/cephfs: allow not aborting execution when mount command fails

This commit adds a new argument check_status to mount methods of
KernelMount, FuseMount, LocalKernelMount and LocalFuseMount. When value
of this argument is False, these methods would catch the
CommandFailedError exception and would return a tuple consisting of the
exception itself, and stdout and stderr of the mount command. This
allows reusing these mount methods while running negative tests for
commands.

The name "check_status" is selected so since teuthology's run() and
vstart_runner's run() use a variable with same name for the very same
purpose.

Signed-off-by: Rishabh Dave <ridave@redhat.com>
This commit is contained in:
Rishabh Dave 2020-03-24 17:04:14 +05:30
parent 07e493ffb5
commit 9a36a0abd0
4 changed files with 120 additions and 70 deletions

View File

@ -14,7 +14,6 @@ from tasks.cephfs.mount import CephFSMount
log = logging.getLogger(__name__)
# Refer mount.py for docstrings.
class FuseMount(CephFSMount):
def __init__(self, ctx, client_config, test_dir, client_id,
@ -32,7 +31,7 @@ class FuseMount(CephFSMount):
self.inst = None
self.addr = None
def mount(self, mntopts=[], createfs=True, **kwargs):
def mount(self, mntopts=[], createfs=True, check_status=True, **kwargs):
self.update_attrs(**kwargs)
self.assert_and_log_minimum_mount_details()
@ -45,7 +44,7 @@ class FuseMount(CephFSMount):
self.setupfs(name=self.cephfs_name)
try:
return self._mount(mntopts)
return self._mount(mntopts, check_status)
except RuntimeError:
# Catch exceptions by the mount() logic (i.e. not remote command
# failures) and ensure the mount is not left half-up.
@ -55,7 +54,7 @@ class FuseMount(CephFSMount):
self.umount_wait(force=True)
raise
def _mount(self, mntopts):
def _mount(self, mntopts, check_status):
log.info("Client client.%s config is %s" % (self.client_id,
self.client_config))
@ -137,14 +136,16 @@ class FuseMount(CephFSMount):
pre_mount_conns = list_connections()
log.info("Pre-mount connections: {0}".format(pre_mount_conns))
proc = self.client_remote.run(
mountcmd_stdout, mountcmd_stderr = StringIO(), StringIO()
self.fuse_daemon = self.client_remote.run(
args=run_cmd,
cwd=cwd,
logger=log.getChild('ceph-fuse.{id}'.format(id=self.client_id)),
stdin=run.PIPE,
wait=False,
stdout=mountcmd_stdout,
stderr=mountcmd_stderr,
wait=False
)
self.fuse_daemon = proc
# Wait for the connection reference to appear in /sys
mount_wait = self.client_config.get('mount_wait', 0)
@ -159,13 +160,21 @@ class FuseMount(CephFSMount):
if self.fuse_daemon.finished:
# Did mount fail? Raise the CommandFailedError instead of
# hitting the "failed to populate /sys/" timeout
self.fuse_daemon.wait()
try:
self.fuse_daemon.wait()
except CommandFailedError as e:
log.info('mount command failed.')
if check_status:
raise
else:
return (e, mountcmd_stdout.getvalue(),
mountcmd_stderr.getvalue())
time.sleep(1)
waited += 1
if waited > timeout:
raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format(
waited
))
raise RuntimeError(
"Fuse mount failed to populate/sys/ after {} "
"seconds".format(waited))
else:
post_mount_conns = list_connections()

View File

@ -25,7 +25,7 @@ class KernelMount(CephFSMount):
client_keyring_path=client_keyring_path, hostfs_mntpt=hostfs_mntpt,
cephfs_name=cephfs_name, cephfs_mntpt=cephfs_mntpt, brxnet=brxnet)
def mount(self, mntopts=[], createfs=True, **kwargs):
def mount(self, mntopts=[], createfs=True, check_status=True, **kwargs):
self.update_attrs(**kwargs)
self.assert_and_log_minimum_mount_details()
@ -47,42 +47,50 @@ class KernelMount(CephFSMount):
if 'file exists' not in stderr.getvalue().lower():
raise
opts = 'name=' + self.client_id
if self.client_keyring_path and self.client_id is not None:
opts += 'secret=' + self.get_key_from_keyfile()
opts += ',norequire_active_mds,conf=' + self.config_path
if self.cephfs_name is not None:
opts += ",mds_namespace={0}".format(self.cephfs_name)
if mntopts:
opts += ',' + ','.join(mntopts)
self.client_remote.run(
args=[
'sudo',
'adjust-ulimits',
'ceph-coverage',
'{tdir}/archive/coverage'.format(tdir=self.test_dir),
'nsenter',
'--net=/var/run/netns/{0}'.format(self.netns_name),
'/bin/mount',
'-t',
'ceph',
':' + self.cephfs_mntpt,
self.hostfs_mntpt,
'-v',
'-o',
opts
],
timeout=(30*60),
)
retval = self._run_mount_cmd(mntopts, check_status)
if retval:
return retval
self.client_remote.run(
args=['sudo', 'chmod', '1777', self.hostfs_mntpt], timeout=(5*60))
self.mounted = True
def _run_mount_cmd(self, mntopts, check_status):
opts = 'norequire_active_mds,'
if self.client_id:
opts += 'name=' + self.client_id
if self.client_keyring_path and self.client_id:
opts += ',secret=' + self.get_key_from_keyfile()
if self.config_path:
opts += ',conf=' + self.config_path
if self.cephfs_name:
opts += ",mds_namespace=" + self.cephfs_name
if mntopts:
opts += ',' + ','.join(mntopts)
mount_dev = ':' + self.cephfs_mntpt
prefix = ['sudo', 'adjust-ulimits', 'ceph-coverage',
self.test_dir + '/archive/coverage',
'nsenter',
'--net=/var/run/netns/{0}'.format(self.netns_name)]
cmdargs = prefix + ['/bin/mount', '-t', 'ceph', mount_dev,
self.hostfs_mntpt, '-v', '-o', opts]
mountcmd_stdout, mountcmd_stderr = StringIO(), StringIO()
try:
self.client_remote.run(args=cmdargs, timeout=(30*60),
stdout=mountcmd_stdout,
stderr=mountcmd_stderr)
except CommandFailedError as e:
log.info('mount command failed')
if check_status:
raise
else:
return (e, mountcmd_stdout.getvalue(),
mountcmd_stderr.getvalue())
log.info('mount command passed')
def umount(self, force=False):
if not self.is_mounted():
self.cleanup()

View File

@ -393,7 +393,7 @@ class CephFSMount(object):
args = ['sudo', 'ip', 'link', 'set', 'brx.{0}'.format(self.nsid), 'up']
self.client_remote.run(args=args, timeout=(5*60), omit_sudo=False)
def mount(self, mntopts=[], createfs=True, **kwargs):
def mount(self, mntopts=[], createfs=True, check_status=True, **kwargs):
"""
kwargs expects its members to be same as the arguments accepted by
self.update_attrs().
@ -476,11 +476,13 @@ class CephFSMount(object):
mntopts = kwargs.pop('mntopts', [])
createfs = kwargs.pop('createfs', False)
check_status = kwargs.pop('check_status', True)
wait = kwargs.pop('wait', True)
self.update_attrs(**kwargs)
retval = self.mount(mntopts=mntopts, createfs=createfs)
retval = self.mount(mntopts=mntopts, createfs=createfs,
check_status=check_status)
# avoid this scenario (again): mount command might've failed and
# check_status might have silenced the exception, yet we attempt to
# wait which might lead to an error.

View File

@ -44,16 +44,19 @@ import time
import sys
import errno
from IPy import IP
from unittest import suite, loader
import unittest
import platform
import logging
from unittest import suite, loader
from teuthology.orchestra.run import Raw, quote
from teuthology.orchestra.daemon import DaemonGroup
from teuthology.orchestra.remote import Remote
from teuthology.config import config as teuth_config
from teuthology.contextutil import safe_while
from teuthology.contextutil import MaxWhileTries
import logging
from teuthology.orchestra.run import CommandFailedError
def init_log():
global log
@ -126,7 +129,6 @@ if os.path.exists("./CMakeCache.txt") and os.path.exists("./bin"):
try:
from teuthology.exceptions import CommandFailedError
from tasks.ceph_manager import CephManager
from tasks.cephfs.fuse_mount import FuseMount
from tasks.cephfs.kernel_mount import KernelMount
@ -612,7 +614,7 @@ class LocalKernelMount(KernelMount):
path = "{0}/client.{1}.*.asok".format(d, self.client_id)
return path
def mount(self, mntopts=[], createfs=True, **kwargs):
def mount(self, mntopts=[], createfs=True, check_status=True, **kwargs):
self.update_attrs(**kwargs)
self.assert_and_log_minimum_mount_details()
@ -628,11 +630,14 @@ class LocalKernelMount(KernelMount):
if createfs:
self.setupfs(name=self.cephfs_name)
opts = 'name=' + self.client_id
if self.client_keyring_path:
opts = 'norequire_active_mds'
if self.client_id:
opts += ',name=' + self.client_id
if self.client_keyring_path and self.client_id:
opts += ",secret=" + self.get_key_from_keyfile()
opts += ',norequire_active_mds,conf=' + self.config_path
if self.cephfs_name is not None:
if self.config_path:
opts += ',conf=' + self.config_path
if self.cephfs_name:
opts += ",mds_namespace={0}".format(self.cephfs_name)
if mntopts:
opts += ',' + ','.join(mntopts)
@ -645,17 +650,29 @@ class LocalKernelMount(KernelMount):
if 'file exists' not in stderr.getvalue().lower():
raise
if self.cephfs_mntpt is None:
self.cephfs_mntpt = "/"
cmdargs = ['sudo']
if self.using_namespace:
cmdargs += ['nsenter',
'--net=/var/run/netns/{0}'.format(self.netns_name)]
cmdargs += ['./bin/mount.ceph', ':' + self.cephfs_mntpt,
self.hostfs_mntpt, '-v', '-o', opts]
self.client_remote.run(args=cmdargs, timeout=(30*60), omit_sudo=False)
mountcmd_stdout, mountcmd_stderr = StringIO(), StringIO()
try:
self.client_remote.run(args=cmdargs, timeout=(30*60),
omit_sudo=False, stdout=mountcmd_stdout,
stderr=mountcmd_stderr)
except CommandFailedError as e:
if check_status:
raise
else:
return (e, mountcmd_stdout.getvalue(),
mountcmd_stderr.getvalue())
self.client_remote.run(args=['sudo', 'chmod', '1777',
self.hostfs_mntpt], timeout=(5*60))
self.mounted = True
def cleanup_netns(self):
@ -722,7 +739,7 @@ class LocalFuseMount(FuseMount):
path = "{0}/client.{1}.*.asok".format(d, self.client_id)
return path
def mount(self, mntopts=[], createfs=True, **kwargs):
def mount(self, mntopts=[], createfs=True, check_status=True, **kwargs):
self.update_attrs(**kwargs)
self.assert_and_log_minimum_mount_details()
@ -779,17 +796,18 @@ class LocalFuseMount(FuseMount):
if self.client_keyring_path and self.client_id is not None:
cmdargs.extend(['-k', self.client_keyring_path])
if self.cephfs_name:
cmdargs += ["--client_mds_namespace=" + self.cephfs_name]
cmdargs += ["--client_fs=" + self.cephfs_name]
if self.cephfs_mntpt:
cmdargs += ["--client_fs=" + self.cephfs_mntpt]
cmdargs += ["--client_mountpoint=" + self.cephfs_mntpt]
if os.getuid() != 0:
cmdargs += ["--client_die_on_failed_dentry_invalidate=false"]
if mntopts:
cmdargs += mntopts
mountcmd_stdout, mountcmd_stderr = StringIO(), StringIO()
self.fuse_daemon = self.client_remote.run(args=cmdargs, wait=False,
omit_sudo=False)
self._set_fuse_daemon_pid()
omit_sudo=False, stdout=mountcmd_stdout, stderr=mountcmd_stderr)
self._set_fuse_daemon_pid(check_status)
log.info("Mounting client.{0} with pid "
"{1}".format(self.client_id, self.fuse_daemon.subproc.pid))
@ -800,7 +818,14 @@ class LocalFuseMount(FuseMount):
if self.fuse_daemon.finished:
# Did mount fail? Raise the CommandFailedError instead of
# hitting the "failed to populate /sys/" timeout
self.fuse_daemon.wait()
try:
self.fuse_daemon.wait()
except CommandFailedError as e:
if check_status:
raise
else:
return (e, mountcmd_stdout.getvalue(),
mountcmd_stderr.getvalue())
time.sleep(1)
waited += 1
if waited > 30:
@ -825,20 +850,26 @@ class LocalFuseMount(FuseMount):
self.mounted = True
def _set_fuse_daemon_pid(self):
def _set_fuse_daemon_pid(self, check_status):
# NOTE: When a command <args> is launched with sudo, two processes are
# launched, one with sudo in <args> and other without. Make sure we
# get the PID of latter one.
with safe_while(sleep=1, tries=15) as proceed:
while proceed():
try:
sock = self.find_admin_socket()
except (RuntimeError, CommandFailedError):
continue
try:
with safe_while(sleep=1, tries=15) as proceed:
while proceed():
try:
sock = self.find_admin_socket()
except (RuntimeError, CommandFailedError):
continue
self.fuse_daemon.fuse_pid = int(re.match(".*\.(\d+)\.asok$",
sock).group(1))
break
self.fuse_daemon.fuse_pid = int(re.match(".*\.(\d+)\.asok$",
sock).group(1))
break
except MaxWhileTries:
if check_status:
raise
else:
pass
def cleanup_netns(self):
if self.using_namespace: