ceph/qa/tasks/ceph_fuse.py

178 lines
5.6 KiB
Python

"""
Ceph FUSE client task
"""
import contextlib
import logging
from teuthology import misc
from tasks.cephfs.fuse_mount import FuseMount
log = logging.getLogger(__name__)
@contextlib.contextmanager
def task(ctx, config):
"""
Mount/unmount a ``ceph-fuse`` client.
The config is optional and defaults to mounting on all clients. If
a config is given, it is expected to be a list of clients to do
this operation on. This lets you e.g. set up one client with
``ceph-fuse`` and another with ``kclient``.
``brxnet`` should be a Private IPv4 Address range, default range is
[192.168.0.0/16]
Example that mounts all clients::
tasks:
- ceph:
- ceph-fuse:
- interactive:
- brxnet: [192.168.0.0/16]
Example that uses both ``kclient` and ``ceph-fuse``::
tasks:
- ceph:
- ceph-fuse: [client.0]
- kclient: [client.1]
- interactive:
Example that enables valgrind:
tasks:
- ceph:
- ceph-fuse:
client.0:
valgrind: [--tool=memcheck, --leak-check=full, --show-reachable=yes]
- interactive:
Example that stops an already-mounted client:
::
tasks:
- ceph:
- ceph-fuse: [client.0]
- ... do something that requires the FS mounted ...
- ceph-fuse:
client.0:
mounted: false
- ... do something that requires the FS unmounted ...
Example that adds more generous wait time for mount (for virtual machines):
tasks:
- ceph:
- ceph-fuse:
client.0:
mount_wait: 60 # default is 0, do not wait before checking /sys/
mount_timeout: 120 # default is 30, give up if /sys/ is not populated
- interactive:
:param ctx: Context
:param config: Configuration
"""
log.info('Running ceph_fuse task...')
if config is None:
ids = misc.all_roles_of_type(ctx.cluster, 'client')
client_roles = [f'client.{id_}' for id_ in ids]
config = dict([r, dict()] for r in client_roles)
elif isinstance(config, list):
client_roles = config
config = dict([r, dict()] for r in client_roles)
elif isinstance(config, dict):
client_roles = filter(lambda x: 'client.' in x, config.keys())
else:
raise ValueError(f"Invalid config object: {config} ({config.__class__})")
log.info(f"config is {config}")
clients = list(misc.get_clients(ctx=ctx, roles=client_roles))
testdir = misc.get_testdir(ctx)
all_mounts = getattr(ctx, 'mounts', {})
mounted_by_me = {}
skipped = {}
remotes = set()
brxnet = config.get("brxnet", None)
# Construct any new FuseMount instances
overrides = ctx.config.get('overrides', {}).get('ceph-fuse', {})
top_overrides = dict(filter(lambda x: 'client.' not in x[0], overrides.items()))
for id_, remote in clients:
entity = f"client.{id_}"
client_config = config.get(entity)
if client_config is None:
client_config = {}
# top level overrides
for k, v in top_overrides.items():
if v is not None:
client_config[k] = v
# mount specific overrides
client_config_overrides = overrides.get(entity)
misc.deep_merge(client_config, client_config_overrides)
log.info(f"{entity} config is {client_config}")
remotes.add(remote)
auth_id = client_config.get("auth_id", id_)
cephfs_name = client_config.get("cephfs_name")
skip = client_config.get("skip", False)
if skip:
skipped[id_] = skip
continue
if id_ not in all_mounts:
fuse_mount = FuseMount(ctx=ctx, client_config=client_config,
test_dir=testdir, client_id=auth_id,
client_remote=remote, brxnet=brxnet,
cephfs_name=cephfs_name)
all_mounts[id_] = fuse_mount
else:
# Catch bad configs where someone has e.g. tried to use ceph-fuse and kcephfs for the same client
assert isinstance(all_mounts[id_], FuseMount)
if not config.get("disabled", False) and client_config.get('mounted', True):
mounted_by_me[id_] = {"config": client_config, "mount": all_mounts[id_]}
ctx.mounts = all_mounts
# Umount any pre-existing clients that we have not been asked to mount
for client_id in set(all_mounts.keys()) - set(mounted_by_me.keys()) - set(skipped.keys()):
mount = all_mounts[client_id]
if mount.is_mounted():
mount.umount_wait()
for remote in remotes:
FuseMount.cleanup_stale_netnses_and_bridge(remote)
# Mount any clients we have been asked to (default to mount all)
log.info('Mounting ceph-fuse clients...')
for info in mounted_by_me.values():
config = info["config"]
mount_x = info['mount']
if config.get("mount_path"):
mount_x.cephfs_mntpt = config.get("mount_path")
if config.get("mountpoint"):
mount_x.hostfs_mntpt = config.get("mountpoint")
mount_x.mount()
for info in mounted_by_me.values():
info["mount"].wait_until_mounted()
try:
yield all_mounts
finally:
log.info('Unmounting ceph-fuse clients...')
for info in mounted_by_me.values():
# Conditional because an inner context might have umounted it
mount = info["mount"]
if mount.is_mounted():
mount.umount_wait()
for remote in remotes:
FuseMount.cleanup_stale_netnses_and_bridge(remote)