ceph/qa/tasks/cephfs/fuse_mount.py
Patrick Donnelly 483b16062d
qa: add new mntargs option for fuse
test_client_recovery was also using mntopts to specify additional
options to ceph-fuse. Because the two prior commits unify the behavior
of ceph-fuse and the kernel mount so that the "-o" option is available
for both, that changes breaks this test. Add a special set of args
available only for fuse (there is no equivalent on the kernel).

Signed-off-by: Patrick Donnelly <pdonnell@redhat.com>
2022-09-26 22:19:29 -04:00

534 lines
19 KiB
Python

import json
import time
import logging
from io import StringIO
from textwrap import dedent
from teuthology.contextutil import MaxWhileTries
from teuthology.contextutil import safe_while
from teuthology.orchestra import run
from teuthology.exceptions import CommandFailedError
from tasks.ceph_manager import get_valgrind_args
from tasks.cephfs.mount import CephFSMount, UMOUNT_TIMEOUT
log = logging.getLogger(__name__)
# Refer mount.py for docstrings.
class FuseMount(CephFSMount):
def __init__(self, ctx, test_dir, client_id, client_remote,
client_keyring_path=None, cephfs_name=None,
cephfs_mntpt=None, hostfs_mntpt=None, brxnet=None,
client_config={}):
super(FuseMount, self).__init__(ctx=ctx, test_dir=test_dir,
client_id=client_id, client_remote=client_remote,
client_keyring_path=client_keyring_path, hostfs_mntpt=hostfs_mntpt,
cephfs_name=cephfs_name, cephfs_mntpt=cephfs_mntpt, brxnet=brxnet,
client_config=client_config)
self.fuse_daemon = None
self._fuse_conn = None
self.id = None
self.inst = None
self.addr = None
self.mount_timeout = int(self.client_config.get('mount_timeout', 30))
self._mount_bin = [
'ceph-fuse', "-f",
"--admin-socket", "/var/run/ceph/$cluster-$name.$pid.asok"]
self._mount_cmd_cwd = self.test_dir
if self.client_config.get('valgrind') is not None:
self.cwd = None # get_valgrind_args chdir for us
self._mount_cmd_logger = log.getChild('ceph-fuse.{id}'.format(id=self.client_id))
self._mount_cmd_stdin = run.PIPE
def mount(self, mntopts=None, check_status=True, mntargs=None, **kwargs):
self.update_attrs(**kwargs)
self.assert_and_log_minimum_mount_details()
self.setup_netns()
try:
return self._mount(mntopts, mntargs, check_status)
except RuntimeError:
# Catch exceptions by the mount() logic (i.e. not remote command
# failures) and ensure the mount is not left half-up.
# Otherwise we might leave a zombie mount point that causes
# anyone traversing cephtest/ to get hung up on.
log.warning("Trying to clean up after failed mount")
self.umount_wait(force=True)
raise
def _mount(self, mntopts, mntargs, check_status):
log.info("Client client.%s config is %s" % (self.client_id,
self.client_config))
self._create_mntpt()
retval = self._run_mount_cmd(mntopts, mntargs, check_status)
if retval:
return retval
self.gather_mount_info()
def _run_mount_cmd(self, mntopts, mntargs, check_status):
mount_cmd = self._get_mount_cmd(mntopts, mntargs)
mountcmd_stdout, mountcmd_stderr = StringIO(), StringIO()
# Before starting ceph-fuse process, note the contents of
# /sys/fs/fuse/connections
pre_mount_conns = self._list_fuse_conns()
log.info("Pre-mount connections: {0}".format(pre_mount_conns))
self.fuse_daemon = self.client_remote.run(
args=mount_cmd,
cwd=self._mount_cmd_cwd,
logger=self._mount_cmd_logger,
stdin=self._mount_cmd_stdin,
stdout=mountcmd_stdout,
stderr=mountcmd_stderr,
wait=False
)
return self._wait_and_record_our_fuse_conn(
check_status, pre_mount_conns, mountcmd_stdout, mountcmd_stderr)
def _get_mount_cmd(self, mntopts, mntargs):
daemon_signal = 'kill'
if self.client_config.get('coverage') or \
self.client_config.get('valgrind') is not None:
daemon_signal = 'term'
mount_cmd = ['sudo', 'adjust-ulimits', 'ceph-coverage',
'{tdir}/archive/coverage'.format(tdir=self.test_dir),
'daemon-helper', daemon_signal]
mount_cmd = self._add_valgrind_args(mount_cmd)
mount_cmd = ['sudo'] + self._nsenter_args + mount_cmd
mount_cmd += self._mount_bin + [self.hostfs_mntpt]
if self.client_id:
mount_cmd += ['--id', self.client_id]
if self.client_keyring_path and self.client_id:
mount_cmd += ['-k', self.client_keyring_path]
self.validate_subvol_options()
assert(self.cephfs_mntpt)
mount_cmd += ["--client_mountpoint=" + self.cephfs_mntpt]
if self.cephfs_name:
mount_cmd += ["--client_fs=" + self.cephfs_name]
if mntopts:
mount_cmd.extend(('-o', ','.join(mntopts)))
if mntargs:
mount_cmd.extend(mntargs)
return mount_cmd
def _add_valgrind_args(self, mount_cmd):
if self.client_config.get('valgrind') is not None:
mount_cmd = get_valgrind_args(
self.test_dir,
'client.{id}'.format(id=self.client_id),
mount_cmd,
self.client_config.get('valgrind'),
cd=False
)
return mount_cmd
def _list_fuse_conns(self):
conn_dir = "/sys/fs/fuse/connections"
self.client_remote.run(args=['sudo', 'modprobe', 'fuse'],
check_status=False)
self.client_remote.run(
args=["sudo", "mount", "-t", "fusectl", conn_dir, conn_dir],
check_status=False, timeout=(30))
try:
ls_str = self.client_remote.sh("ls " + conn_dir,
stdout=StringIO(),
timeout=300).strip()
except CommandFailedError:
return []
if ls_str:
return [int(n) for n in ls_str.split("\n")]
else:
return []
def _wait_and_record_our_fuse_conn(self, check_status, pre_mount_conns,
mountcmd_stdout, mountcmd_stderr):
"""
Wait for the connection reference to appear in /sys
"""
waited = 0
post_mount_conns = self._list_fuse_conns()
while len(post_mount_conns) <= len(pre_mount_conns):
if self.fuse_daemon.finished:
# Did mount fail? Raise the CommandFailedError instead of
# hitting the "failed to populate /sys/" timeout
try:
self.fuse_daemon.wait()
except CommandFailedError as e:
log.info('mount command failed.')
if check_status:
raise
else:
return (e, mountcmd_stdout.getvalue(),
mountcmd_stderr.getvalue())
time.sleep(1)
waited += 1
if waited > self._fuse_conn_check_timeout:
raise RuntimeError(
"Fuse mount failed to populate/sys/ after {} "
"seconds".format(waited))
else:
post_mount_conns = self._list_fuse_conns()
log.info("Post-mount connections: {0}".format(post_mount_conns))
self._record_our_fuse_conn(pre_mount_conns, post_mount_conns)
@property
def _fuse_conn_check_timeout(self):
mount_wait = self.client_config.get('mount_wait', 0)
if mount_wait > 0:
log.info("Fuse mount waits {0} seconds before checking /sys/".format(mount_wait))
time.sleep(mount_wait)
timeout = int(self.client_config.get('mount_timeout', 30))
return timeout
def _record_our_fuse_conn(self, pre_mount_conns, post_mount_conns):
"""
Record our fuse connection number so that we can use it when forcing
an unmount.
"""
new_conns = list(set(post_mount_conns) - set(pre_mount_conns))
if len(new_conns) == 0:
raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns))
elif len(new_conns) > 1:
raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns))
else:
self._fuse_conn = new_conns[0]
def gather_mount_info(self):
status = self.admin_socket(['status'])
self.id = status['id']
self.client_pid = status['metadata']['pid']
try:
self.inst = status['inst_str']
self.addr = status['addr_str']
except KeyError:
sessions = self.fs.rank_asok(['session', 'ls'])
for s in sessions:
if s['id'] == self.id:
self.inst = s['inst']
self.addr = self.inst.split()[1]
if self.inst is None:
raise RuntimeError("cannot find client session")
def check_mounted_state(self):
proc = self.client_remote.run(
args=[
'stat',
'--file-system',
'--printf=%T\n',
'--',
self.hostfs_mntpt,
],
stdout=StringIO(),
stderr=StringIO(),
wait=False,
timeout=300
)
try:
proc.wait()
except CommandFailedError:
error = proc.stderr.getvalue()
if ("endpoint is not connected" in error
or "Software caused connection abort" in error):
# This happens is fuse is killed without unmount
log.warning("Found stale mount point at {0}".format(self.hostfs_mntpt))
return True
else:
# This happens if the mount directory doesn't exist
log.info('mount point does not exist: %s', self.hostfs_mntpt)
return False
fstype = proc.stdout.getvalue().rstrip('\n')
if fstype == 'fuseblk':
log.info('ceph-fuse is mounted on %s', self.hostfs_mntpt)
return True
else:
log.debug('ceph-fuse not mounted, got fs type {fstype!r}'.format(
fstype=fstype))
return False
def wait_until_mounted(self):
"""
Check to make sure that fuse is mounted on mountpoint. If not,
sleep for 5 seconds and check again.
"""
while not self.check_mounted_state():
# Even if it's not mounted, it should at least
# be running: catch simple failures where it has terminated.
assert not self.fuse_daemon.poll()
time.sleep(5)
# Now that we're mounted, set permissions so that the rest of the test
# will have unrestricted access to the filesystem mount.
for retry in range(10):
try:
stderr = StringIO()
self.client_remote.run(args=['sudo', 'chmod', '1777',
self.hostfs_mntpt],
timeout=300,
stderr=stderr, omit_sudo=False)
break
except run.CommandFailedError:
stderr = stderr.getvalue().lower()
if "read-only file system" in stderr:
break
elif "permission denied" in stderr:
time.sleep(5)
else:
raise
def _mountpoint_exists(self):
return self.client_remote.run(args=["ls", "-d", self.hostfs_mntpt],
check_status=False,
timeout=300).exitstatus == 0
def umount(self, cleanup=True):
"""
umount() must not run cleanup() when it's called by umount_wait()
since "run.wait([self.fuse_daemon], timeout)" would hang otherwise.
"""
if not self.is_mounted():
if cleanup:
self.cleanup()
return
if self.is_blocked():
self._run_umount_lf()
if cleanup:
self.cleanup()
return
try:
log.info('Running fusermount -u on {name}...'.format(name=self.client_remote.name))
stderr = StringIO()
self.client_remote.run(
args=['sudo', 'fusermount', '-u', self.hostfs_mntpt],
stderr=stderr, timeout=UMOUNT_TIMEOUT, omit_sudo=False)
except run.CommandFailedError:
if "mountpoint not found" in stderr.getvalue():
# This happens if the mount directory doesn't exist
log.info('mount point does not exist: %s', self.mountpoint)
elif "not mounted" in stderr.getvalue():
# This happens if the mount directory already unmouted
log.info('mount point not mounted: %s', self.mountpoint)
else:
log.info('Failed to unmount ceph-fuse on {name}, aborting...'.format(name=self.client_remote.name))
self.client_remote.run(
args=['sudo', run.Raw('PATH=/usr/sbin:$PATH'), 'lsof',
run.Raw(';'), 'ps', 'auxf'],
timeout=UMOUNT_TIMEOUT, omit_sudo=False)
# abort the fuse mount, killing all hung processes
if self._fuse_conn:
self.run_python(dedent("""
import os
path = "/sys/fs/fuse/connections/{0}/abort"
if os.path.exists(path):
open(path, "w").write("1")
""").format(self._fuse_conn))
self._fuse_conn = None
# make sure its unmounted
self._run_umount_lf()
self._fuse_conn = None
self.id = None
self.inst = None
self.addr = None
if cleanup:
self.cleanup()
def umount_wait(self, force=False, require_clean=False,
timeout=UMOUNT_TIMEOUT):
"""
:param force: Complete cleanly even if the MDS is offline
"""
if not (self.is_mounted() and self.fuse_daemon):
log.debug('ceph-fuse client.{id} is not mounted at {remote} '
'{mnt}'.format(id=self.client_id,
remote=self.client_remote,
mnt=self.hostfs_mntpt))
self.cleanup()
return
if force:
assert not require_clean # mutually exclusive
# When we expect to be forcing, kill the ceph-fuse process directly.
# This should avoid hitting the more aggressive fallback killing
# in umount() which can affect other mounts too.
self.fuse_daemon.stdin.close()
# However, we will still hit the aggressive wait if there is an ongoing
# mount -o remount (especially if the remount is stuck because MDSs
# are unavailable)
if self.is_blocked():
self._run_umount_lf()
self.cleanup()
return
# cleanup is set to to fail since clieanup must happen after umount is
# complete; otherwise following call to run.wait hangs.
self.umount(cleanup=False)
try:
# Permit a timeout, so that we do not block forever
run.wait([self.fuse_daemon], timeout)
except MaxWhileTries:
log.error("process failed to terminate after unmount. This probably"
" indicates a bug within ceph-fuse.")
raise
except CommandFailedError:
if require_clean:
raise
self.cleanup()
def teardown(self):
"""
Whatever the state of the mount, get it gone.
"""
super(FuseMount, self).teardown()
self.umount()
if self.fuse_daemon and not self.fuse_daemon.finished:
self.fuse_daemon.stdin.close()
try:
self.fuse_daemon.wait()
except CommandFailedError:
pass
def _asok_path(self):
return "/var/run/ceph/ceph-client.{0}.*.asok".format(self.client_id)
@property
def _prefix(self):
return ""
def find_admin_socket(self):
pyscript = """
import glob
import re
import os
import subprocess
def _find_admin_socket(client_name):
asok_path = "{asok_path}"
files = glob.glob(asok_path)
mountpoint = "{mountpoint}"
# Given a non-glob path, it better be there
if "*" not in asok_path:
assert(len(files) == 1)
return files[0]
for f in files:
pid = re.match(".*\.(\d+)\.asok$", f).group(1)
if os.path.exists("/proc/{{0}}".format(pid)):
with open("/proc/{{0}}/cmdline".format(pid), 'r') as proc_f:
contents = proc_f.read()
if mountpoint in contents:
return f
raise RuntimeError("Client socket {{0}} not found".format(client_name))
print(_find_admin_socket("{client_name}"))
""".format(
asok_path=self._asok_path(),
client_name="client.{0}".format(self.client_id),
mountpoint=self.mountpoint)
asok_path = self.run_python(pyscript, sudo=True)
log.info("Found client admin socket at {0}".format(asok_path))
return asok_path
def admin_socket(self, args):
asok_path = self.find_admin_socket()
# Query client ID from admin socket, wait 2 seconds
# and retry 10 times if it is not ready
with safe_while(sleep=2, tries=10) as proceed:
while proceed():
try:
p = self.client_remote.run(args=
['sudo', self._prefix + 'ceph', '--admin-daemon', asok_path] + args,
stdout=StringIO(), stderr=StringIO(), wait=False,
timeout=300)
p.wait()
break
except CommandFailedError:
if "connection refused" in p.stderr.getvalue().lower():
pass
return json.loads(p.stdout.getvalue().strip())
def get_global_id(self):
"""
Look up the CephFS client ID for this mount
"""
return self.admin_socket(['mds_sessions'])['id']
def get_global_inst(self):
"""
Look up the CephFS client instance for this mount
"""
return self.inst
def get_global_addr(self):
"""
Look up the CephFS client addr for this mount
"""
return self.addr
def get_client_pid(self):
"""
return pid of ceph-fuse process
"""
status = self.admin_socket(['status'])
return status['metadata']['pid']
def get_osd_epoch(self):
"""
Return 2-tuple of osd_epoch, osd_epoch_barrier
"""
status = self.admin_socket(['status'])
return status['osd_epoch'], status['osd_epoch_barrier']
def get_dentry_count(self):
"""
Return 2-tuple of dentry_count, dentry_pinned_count
"""
status = self.admin_socket(['status'])
return status['dentry_count'], status['dentry_pinned_count']
def set_cache_size(self, size):
return self.admin_socket(['config', 'set', 'client_cache_size', str(size)])
def get_op_read_count(self):
return self.admin_socket(['perf', 'dump', 'objecter'])['objecter']['osdop_read']