mirror of
https://github.com/ceph/ceph
synced 2025-01-02 00:52:22 +00:00
ae00634a5e
Only fuse would support the "mountpoint" yaml config. Move it to mount.py so it works consistently with the kernel mount. Ditto for the cephfs mount point. Signed-off-by: Patrick Donnelly <pdonnell@redhat.com>
1504 lines
52 KiB
Python
1504 lines
52 KiB
Python
import hashlib
|
|
import json
|
|
import logging
|
|
import datetime
|
|
import os
|
|
import re
|
|
import time
|
|
|
|
from io import StringIO
|
|
from contextlib import contextmanager
|
|
from textwrap import dedent
|
|
from IPy import IP
|
|
|
|
from teuthology.contextutil import safe_while
|
|
from teuthology.misc import get_file, write_file
|
|
from teuthology.orchestra import run
|
|
from teuthology.orchestra.run import Raw
|
|
from teuthology.exceptions import CommandFailedError, ConnectionLostError
|
|
|
|
from tasks.cephfs.filesystem import Filesystem
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
UMOUNT_TIMEOUT = 300
|
|
|
|
|
|
class CephFSMount(object):
|
|
def __init__(self, ctx, test_dir, client_id, client_remote,
|
|
client_keyring_path=None, hostfs_mntpt=None,
|
|
cephfs_name=None, cephfs_mntpt=None, brxnet=None,
|
|
client_config=None):
|
|
"""
|
|
:param test_dir: Global teuthology test dir
|
|
:param client_id: Client ID, the 'foo' in client.foo
|
|
:param client_keyring_path: path to keyring for given client_id
|
|
:param client_remote: Remote instance for the host where client will
|
|
run
|
|
:param hostfs_mntpt: Path to directory on the FS on which Ceph FS will
|
|
be mounted
|
|
:param cephfs_name: Name of Ceph FS to be mounted
|
|
:param cephfs_mntpt: Path to directory inside Ceph FS that will be
|
|
mounted as root
|
|
"""
|
|
self.ctx = ctx
|
|
self.test_dir = test_dir
|
|
|
|
self._verify_attrs(client_id=client_id,
|
|
client_keyring_path=client_keyring_path,
|
|
hostfs_mntpt=hostfs_mntpt, cephfs_name=cephfs_name,
|
|
cephfs_mntpt=cephfs_mntpt)
|
|
|
|
if client_config is None:
|
|
client_config = {}
|
|
self.client_config = client_config
|
|
|
|
self.cephfs_name = cephfs_name
|
|
self.client_id = client_id
|
|
self.client_keyring_path = client_keyring_path
|
|
self.client_remote = client_remote
|
|
self.cluster_name = 'ceph' # TODO: use config['cluster']
|
|
self.fs = None
|
|
|
|
if cephfs_mntpt is None and client_config.get("mount_path"):
|
|
self.cephfs_mntpt = client_config.get("mount_path")
|
|
log.info(f"using client_config[\"cephfs_mntpt\"] = {self.cephfs_mntpt}")
|
|
else:
|
|
self.cephfs_mntpt = cephfs_mntpt
|
|
log.info(f"cephfs_mntpt = {self.cephfs_mntpt}")
|
|
|
|
if hostfs_mntpt is None and client_config.get("mountpoint"):
|
|
self.hostfs_mntpt = client_config.get("mountpoint")
|
|
log.info(f"using client_config[\"hostfs_mntpt\"] = {self.hostfs_mntpt}")
|
|
elif hostfs_mntpt is not None:
|
|
self.hostfs_mntpt = hostfs_mntpt
|
|
else:
|
|
self.hostfs_mntpt = os.path.join(self.test_dir, f'mnt.{self.client_id}')
|
|
self.hostfs_mntpt_dirname = os.path.basename(self.hostfs_mntpt)
|
|
log.info(f"hostfs_mntpt = {self.hostfs_mntpt}")
|
|
|
|
self._netns_name = None
|
|
self.nsid = -1
|
|
if brxnet is None:
|
|
self.ceph_brx_net = '192.168.0.0/16'
|
|
else:
|
|
self.ceph_brx_net = brxnet
|
|
|
|
self.test_files = ['a', 'b', 'c']
|
|
|
|
self.background_procs = []
|
|
|
|
# This will cleanup the stale netnses, which are from the
|
|
# last failed test cases.
|
|
@staticmethod
|
|
def cleanup_stale_netnses_and_bridge(remote):
|
|
p = remote.run(args=['ip', 'netns', 'list'],
|
|
stdout=StringIO(), timeout=(5*60))
|
|
p = p.stdout.getvalue().strip()
|
|
|
|
# Get the netns name list
|
|
netns_list = re.findall(r'ceph-ns-[^()\s][-.\w]+[^():\s]', p)
|
|
|
|
# Remove the stale netnses
|
|
for ns in netns_list:
|
|
ns_name = ns.split()[0]
|
|
args = ['sudo', 'ip', 'netns', 'delete', '{0}'.format(ns_name)]
|
|
try:
|
|
remote.run(args=args, timeout=(5*60), omit_sudo=False)
|
|
except Exception:
|
|
pass
|
|
|
|
# Remove the stale 'ceph-brx'
|
|
try:
|
|
args = ['sudo', 'ip', 'link', 'delete', 'ceph-brx']
|
|
remote.run(args=args, timeout=(5*60), omit_sudo=False)
|
|
except Exception:
|
|
pass
|
|
|
|
def _parse_netns_name(self):
|
|
self._netns_name = '-'.join(["ceph-ns",
|
|
re.sub(r'/+', "-", self.mountpoint)])
|
|
|
|
@property
|
|
def mountpoint(self):
|
|
if self.hostfs_mntpt is None:
|
|
self.hostfs_mntpt = os.path.join(self.test_dir,
|
|
self.hostfs_mntpt_dirname)
|
|
return self.hostfs_mntpt
|
|
|
|
@mountpoint.setter
|
|
def mountpoint(self, path):
|
|
if not isinstance(path, str):
|
|
raise RuntimeError('path should be of str type.')
|
|
self._mountpoint = self.hostfs_mntpt = path
|
|
|
|
@property
|
|
def netns_name(self):
|
|
if self._netns_name == None:
|
|
self._parse_netns_name()
|
|
return self._netns_name
|
|
|
|
@netns_name.setter
|
|
def netns_name(self, name):
|
|
self._netns_name = name
|
|
|
|
def assert_that_ceph_fs_exists(self):
|
|
output = self.ctx.managers[self.cluster_name].raw_cluster_cmd("fs", "ls")
|
|
if self.cephfs_name:
|
|
assert self.cephfs_name in output, \
|
|
'expected ceph fs is not present on the cluster'
|
|
log.info(f'Mounting Ceph FS {self.cephfs_name}; just confirmed its presence on cluster')
|
|
else:
|
|
assert 'No filesystems enabled' not in output, \
|
|
'ceph cluster has no ceph fs, not even the default ceph fs'
|
|
log.info('Mounting default Ceph FS; just confirmed its presence on cluster')
|
|
|
|
def assert_and_log_minimum_mount_details(self):
|
|
"""
|
|
Make sure we have minimum details required for mounting. Ideally, this
|
|
method should be called at the beginning of the mount method.
|
|
"""
|
|
if not self.client_id or not self.client_remote or \
|
|
not self.hostfs_mntpt:
|
|
log.error(f"self.client_id = {self.client_id}")
|
|
log.error(f"self.client_remote = {self.client_remote}")
|
|
log.error(f"self.hostfs_mntpt = {self.hostfs_mntpt}")
|
|
errmsg = ('Mounting CephFS requires that at least following '
|
|
'details to be provided -\n'
|
|
'1. the client ID,\n2. the mountpoint and\n'
|
|
'3. the remote machine where CephFS will be mounted.\n')
|
|
raise RuntimeError(errmsg)
|
|
|
|
self.assert_that_ceph_fs_exists()
|
|
|
|
log.info('Mounting Ceph FS. Following are details of mount; remember '
|
|
'"None" represents Python type None -')
|
|
log.info(f'self.client_remote.hostname = {self.client_remote.hostname}')
|
|
log.info(f'self.client.name = client.{self.client_id}')
|
|
log.info(f'self.hostfs_mntpt = {self.hostfs_mntpt}')
|
|
log.info(f'self.cephfs_name = {self.cephfs_name}')
|
|
log.info(f'self.cephfs_mntpt = {self.cephfs_mntpt}')
|
|
log.info(f'self.client_keyring_path = {self.client_keyring_path}')
|
|
if self.client_keyring_path:
|
|
log.info('keyring content -\n' +
|
|
get_file(self.client_remote, self.client_keyring_path,
|
|
sudo=True).decode())
|
|
|
|
def is_blocked(self):
|
|
self.fs = Filesystem(self.ctx, name=self.cephfs_name)
|
|
|
|
output = self.fs.mon_manager.raw_cluster_cmd(args='osd blocklist ls')
|
|
return self.addr in output
|
|
|
|
def is_stuck(self):
|
|
"""
|
|
Check if mount is stuck/in a hanged state.
|
|
"""
|
|
if not self.is_mounted():
|
|
return False
|
|
|
|
retval = self.client_remote.run(args=f'sudo stat {self.hostfs_mntpt}',
|
|
omit_sudo=False, wait=False).returncode
|
|
if retval == 0:
|
|
return False
|
|
|
|
time.sleep(10)
|
|
proc = self.client_remote.run(args='ps -ef', stdout=StringIO())
|
|
# if proc was running even after 10 seconds, it has to be stuck.
|
|
if f'stat {self.hostfs_mntpt}' in proc.stdout.getvalue():
|
|
log.critical('client mounted at self.hostfs_mntpt is stuck!')
|
|
return True
|
|
return False
|
|
|
|
def is_mounted(self):
|
|
file = self.client_remote.read_file('/proc/self/mounts',stdout=StringIO())
|
|
if self.hostfs_mntpt in file:
|
|
return True
|
|
else:
|
|
log.debug(f"not mounted; /proc/self/mounts is:\n{file}")
|
|
return False
|
|
|
|
def setupfs(self, name=None):
|
|
if name is None and self.fs is not None:
|
|
# Previous mount existed, reuse the old name
|
|
name = self.fs.name
|
|
self.fs = Filesystem(self.ctx, name=name)
|
|
log.info('Wait for MDS to reach steady state...')
|
|
self.fs.wait_for_daemons()
|
|
log.info('Ready to start {}...'.format(type(self).__name__))
|
|
|
|
def _create_mntpt(self):
|
|
self.client_remote.run(args=f'mkdir -p -v {self.hostfs_mntpt}',
|
|
timeout=60)
|
|
# Use 0000 mode to prevent undesired modifications to the mountpoint on
|
|
# the local file system.
|
|
self.client_remote.run(args=f'chmod 0000 {self.hostfs_mntpt}',
|
|
timeout=60)
|
|
|
|
@property
|
|
def _nsenter_args(self):
|
|
return ['nsenter', f'--net=/var/run/netns/{self.netns_name}']
|
|
|
|
def _set_filemode_on_mntpt(self):
|
|
stderr = StringIO()
|
|
try:
|
|
self.client_remote.run(
|
|
args=['sudo', 'chmod', '1777', self.hostfs_mntpt],
|
|
stderr=stderr, timeout=(5*60))
|
|
except CommandFailedError:
|
|
# the client does not have write permissions in the caps it holds
|
|
# for the Ceph FS that was just mounted.
|
|
if 'permission denied' in stderr.getvalue().lower():
|
|
pass
|
|
|
|
def _setup_brx_and_nat(self):
|
|
# The ip for ceph-brx should be
|
|
ip = IP(self.ceph_brx_net)[-2]
|
|
mask = self.ceph_brx_net.split('/')[1]
|
|
brd = IP(self.ceph_brx_net).broadcast()
|
|
|
|
brx = self.client_remote.run(args=['ip', 'addr'], stderr=StringIO(),
|
|
stdout=StringIO(), timeout=(5*60))
|
|
brx = re.findall(r'inet .* ceph-brx', brx.stdout.getvalue())
|
|
if brx:
|
|
# If the 'ceph-brx' already exists, then check whether
|
|
# the new net is conflicting with it
|
|
_ip, _mask = brx[0].split()[1].split('/', 1)
|
|
if _ip != "{}".format(ip) or _mask != mask:
|
|
raise RuntimeError("Conflict with existing ceph-brx {0}, new {1}/{2}".format(brx[0].split()[1], ip, mask))
|
|
|
|
# Setup the ceph-brx and always use the last valid IP
|
|
if not brx:
|
|
log.info("Setuping the 'ceph-brx' with {0}/{1}".format(ip, mask))
|
|
|
|
self.run_shell_payload(f"""
|
|
set -e
|
|
sudo ip link add name ceph-brx type bridge
|
|
sudo ip addr flush dev ceph-brx
|
|
sudo ip link set ceph-brx up
|
|
sudo ip addr add {ip}/{mask} brd {brd} dev ceph-brx
|
|
""", timeout=(5*60), omit_sudo=False, cwd='/')
|
|
|
|
args = "echo 1 | sudo tee /proc/sys/net/ipv4/ip_forward"
|
|
self.client_remote.run(args=args, timeout=(5*60), omit_sudo=False)
|
|
|
|
# Setup the NAT
|
|
p = self.client_remote.run(args=['route'], stderr=StringIO(),
|
|
stdout=StringIO(), timeout=(5*60))
|
|
p = re.findall(r'default .*', p.stdout.getvalue())
|
|
if p == False:
|
|
raise RuntimeError("No default gw found")
|
|
gw = p[0].split()[7]
|
|
|
|
self.run_shell_payload(f"""
|
|
set -e
|
|
sudo iptables -A FORWARD -o {gw} -i ceph-brx -j ACCEPT
|
|
sudo iptables -A FORWARD -i {gw} -o ceph-brx -j ACCEPT
|
|
sudo iptables -t nat -A POSTROUTING -s {ip}/{mask} -o {gw} -j MASQUERADE
|
|
""", timeout=(5*60), omit_sudo=False, cwd='/')
|
|
|
|
def _setup_netns(self):
|
|
p = self.client_remote.run(args=['ip', 'netns', 'list'],
|
|
stderr=StringIO(), stdout=StringIO(),
|
|
timeout=(5*60)).stdout.getvalue().strip()
|
|
|
|
# Get the netns name list
|
|
netns_list = re.findall(r'[^()\s][-.\w]+[^():\s]', p)
|
|
|
|
out = re.search(r"{0}".format(self.netns_name), p)
|
|
if out is None:
|
|
# Get an uniq nsid for the new netns
|
|
nsid = 0
|
|
p = self.client_remote.run(args=['ip', 'netns', 'list-id'],
|
|
stderr=StringIO(), stdout=StringIO(),
|
|
timeout=(5*60)).stdout.getvalue()
|
|
while True:
|
|
out = re.search(r"nsid {} ".format(nsid), p)
|
|
if out is None:
|
|
break
|
|
|
|
nsid += 1
|
|
|
|
# Add one new netns and set it id
|
|
self.run_shell_payload(f"""
|
|
set -e
|
|
sudo ip netns add {self.netns_name}
|
|
sudo ip netns set {self.netns_name} {nsid}
|
|
""", timeout=(5*60), omit_sudo=False, cwd='/')
|
|
self.nsid = nsid;
|
|
else:
|
|
# The netns already exists and maybe suspended by self.kill()
|
|
self.resume_netns();
|
|
|
|
nsid = int(re.search(r"{0} \(id: (\d+)\)".format(self.netns_name), p).group(1))
|
|
self.nsid = nsid;
|
|
return
|
|
|
|
# Get one ip address for netns
|
|
ips = IP(self.ceph_brx_net)
|
|
for ip in ips:
|
|
found = False
|
|
if ip == ips[0]:
|
|
continue
|
|
if ip == ips[-2]:
|
|
raise RuntimeError("we have ran out of the ip addresses")
|
|
|
|
for ns in netns_list:
|
|
ns_name = ns.split()[0]
|
|
args = ['sudo', 'ip', 'netns', 'exec', '{0}'.format(ns_name), 'ip', 'addr']
|
|
try:
|
|
p = self.client_remote.run(args=args, stderr=StringIO(),
|
|
stdout=StringIO(), timeout=(5*60),
|
|
omit_sudo=False)
|
|
q = re.search("{0}".format(ip), p.stdout.getvalue())
|
|
if q is not None:
|
|
found = True
|
|
break
|
|
except CommandFailedError:
|
|
if "No such file or directory" in p.stderr.getvalue():
|
|
pass
|
|
if "Invalid argument" in p.stderr.getvalue():
|
|
pass
|
|
|
|
if found == False:
|
|
break
|
|
|
|
mask = self.ceph_brx_net.split('/')[1]
|
|
brd = IP(self.ceph_brx_net).broadcast()
|
|
|
|
log.info("Setuping the netns '{0}' with {1}/{2}".format(self.netns_name, ip, mask))
|
|
|
|
# Setup the veth interfaces
|
|
brxip = IP(self.ceph_brx_net)[-2]
|
|
self.run_shell_payload(f"""
|
|
set -e
|
|
sudo ip link add veth0 netns {self.netns_name} type veth peer name brx.{nsid}
|
|
sudo ip netns exec {self.netns_name} ip addr add {ip}/{mask} brd {brd} dev veth0
|
|
sudo ip netns exec {self.netns_name} ip link set veth0 up
|
|
sudo ip netns exec {self.netns_name} ip link set lo up
|
|
sudo ip netns exec {self.netns_name} ip route add default via {brxip}
|
|
""", timeout=(5*60), omit_sudo=False, cwd='/')
|
|
|
|
# Bring up the brx interface and join it to 'ceph-brx'
|
|
self.run_shell_payload(f"""
|
|
set -e
|
|
sudo ip link set brx.{nsid} up
|
|
sudo ip link set dev brx.{nsid} master ceph-brx
|
|
""", timeout=(5*60), omit_sudo=False, cwd='/')
|
|
|
|
def _cleanup_netns(self):
|
|
if self.nsid == -1:
|
|
return
|
|
log.info("Removing the netns '{0}'".format(self.netns_name))
|
|
|
|
# Delete the netns and the peer veth interface
|
|
self.run_shell_payload(f"""
|
|
set -e
|
|
sudo ip link set brx.{self.nsid} down
|
|
sudo ip link delete dev brx.{self.nsid}
|
|
sudo ip netns delete {self.netns_name}
|
|
""", timeout=(5*60), omit_sudo=False, cwd='/')
|
|
|
|
self.nsid = -1
|
|
|
|
def _cleanup_brx_and_nat(self):
|
|
brx = self.client_remote.run(args=['ip', 'addr'], stderr=StringIO(),
|
|
stdout=StringIO(), timeout=(5*60))
|
|
brx = re.findall(r'inet .* ceph-brx', brx.stdout.getvalue())
|
|
if not brx:
|
|
return
|
|
|
|
# If we are the last netns, will delete the ceph-brx
|
|
args = ['sudo', 'ip', 'link', 'show']
|
|
p = self.client_remote.run(args=args, stdout=StringIO(),
|
|
timeout=(5*60), omit_sudo=False)
|
|
_list = re.findall(r'brx\.', p.stdout.getvalue().strip())
|
|
if len(_list) != 0:
|
|
return
|
|
|
|
log.info("Removing the 'ceph-brx'")
|
|
|
|
self.run_shell_payload("""
|
|
set -e
|
|
sudo ip link set ceph-brx down
|
|
sudo ip link delete ceph-brx
|
|
""", timeout=(5*60), omit_sudo=False, cwd='/')
|
|
|
|
# Drop the iptables NAT rules
|
|
ip = IP(self.ceph_brx_net)[-2]
|
|
mask = self.ceph_brx_net.split('/')[1]
|
|
|
|
p = self.client_remote.run(args=['route'], stderr=StringIO(),
|
|
stdout=StringIO(), timeout=(5*60))
|
|
p = re.findall(r'default .*', p.stdout.getvalue())
|
|
if p == False:
|
|
raise RuntimeError("No default gw found")
|
|
gw = p[0].split()[7]
|
|
self.run_shell_payload(f"""
|
|
set -e
|
|
sudo iptables -D FORWARD -o {gw} -i ceph-brx -j ACCEPT
|
|
sudo iptables -D FORWARD -i {gw} -o ceph-brx -j ACCEPT
|
|
sudo iptables -t nat -D POSTROUTING -s {ip}/{mask} -o {gw} -j MASQUERADE
|
|
""", timeout=(5*60), omit_sudo=False, cwd='/')
|
|
|
|
def setup_netns(self):
|
|
"""
|
|
Setup the netns for the mountpoint.
|
|
"""
|
|
log.info("Setting the '{0}' netns for '{1}'".format(self._netns_name, self.mountpoint))
|
|
self._setup_brx_and_nat()
|
|
self._setup_netns()
|
|
|
|
def cleanup_netns(self):
|
|
"""
|
|
Cleanup the netns for the mountpoint.
|
|
"""
|
|
# We will defer cleaning the netnses and bridge until the last
|
|
# mountpoint is unmounted, this will be a temporary work around
|
|
# for issue#46282.
|
|
|
|
# log.info("Cleaning the '{0}' netns for '{1}'".format(self._netns_name, self.mountpoint))
|
|
# self._cleanup_netns()
|
|
# self._cleanup_brx_and_nat()
|
|
|
|
def suspend_netns(self):
|
|
"""
|
|
Suspend the netns veth interface.
|
|
"""
|
|
if self.nsid == -1:
|
|
return
|
|
|
|
log.info("Suspending the '{0}' netns for '{1}'".format(self._netns_name, self.mountpoint))
|
|
|
|
args = ['sudo', 'ip', 'link', 'set', 'brx.{0}'.format(self.nsid), 'down']
|
|
self.client_remote.run(args=args, timeout=(5*60), omit_sudo=False)
|
|
|
|
def resume_netns(self):
|
|
"""
|
|
Resume the netns veth interface.
|
|
"""
|
|
if self.nsid == -1:
|
|
return
|
|
|
|
log.info("Resuming the '{0}' netns for '{1}'".format(self._netns_name, self.mountpoint))
|
|
|
|
args = ['sudo', 'ip', 'link', 'set', 'brx.{0}'.format(self.nsid), 'up']
|
|
self.client_remote.run(args=args, timeout=(5*60), omit_sudo=False)
|
|
|
|
def mount(self, mntopts=[], check_status=True, **kwargs):
|
|
"""
|
|
kwargs expects its members to be same as the arguments accepted by
|
|
self.update_attrs().
|
|
"""
|
|
raise NotImplementedError()
|
|
|
|
def mount_wait(self, **kwargs):
|
|
"""
|
|
Accepts arguments same as self.mount().
|
|
"""
|
|
self.mount(**kwargs)
|
|
self.wait_until_mounted()
|
|
|
|
def _run_umount_lf(self):
|
|
log.debug(f'Force/lazy unmounting on client.{self.client_id}')
|
|
|
|
try:
|
|
proc = self.client_remote.run(
|
|
args=f'sudo umount --lazy --force {self.hostfs_mntpt}',
|
|
timeout=UMOUNT_TIMEOUT, omit_sudo=False)
|
|
except CommandFailedError:
|
|
if self.is_mounted():
|
|
raise
|
|
|
|
return proc
|
|
|
|
def umount(self):
|
|
raise NotImplementedError()
|
|
|
|
def umount_wait(self, force=False, require_clean=False,
|
|
timeout=UMOUNT_TIMEOUT):
|
|
"""
|
|
|
|
:param force: Expect that the mount will not shutdown cleanly: kill
|
|
it hard.
|
|
:param require_clean: Wait for the Ceph client associated with the
|
|
mount (e.g. ceph-fuse) to terminate, and
|
|
raise if it doesn't do so cleanly.
|
|
:param timeout: amount of time to be waited for umount command to finish
|
|
:return:
|
|
"""
|
|
raise NotImplementedError()
|
|
|
|
def _verify_attrs(self, **kwargs):
|
|
"""
|
|
Verify that client_id, client_keyring_path, client_remote, hostfs_mntpt,
|
|
cephfs_name, cephfs_mntpt are either type str or None.
|
|
"""
|
|
for k, v in kwargs.items():
|
|
if v is not None and not isinstance(v, str):
|
|
raise RuntimeError('value of attributes should be either str '
|
|
f'or None. {k} - {v}')
|
|
|
|
def update_attrs(self, client_id=None, client_keyring_path=None,
|
|
client_remote=None, hostfs_mntpt=None, cephfs_name=None,
|
|
cephfs_mntpt=None):
|
|
if not (client_id or client_keyring_path or client_remote or
|
|
cephfs_name or cephfs_mntpt or hostfs_mntpt):
|
|
return
|
|
|
|
self._verify_attrs(client_id=client_id,
|
|
client_keyring_path=client_keyring_path,
|
|
hostfs_mntpt=hostfs_mntpt, cephfs_name=cephfs_name,
|
|
cephfs_mntpt=cephfs_mntpt)
|
|
|
|
if client_id:
|
|
self.client_id = client_id
|
|
if client_keyring_path:
|
|
self.client_keyring_path = client_keyring_path
|
|
if client_remote:
|
|
self.client_remote = client_remote
|
|
if hostfs_mntpt:
|
|
self.hostfs_mntpt = hostfs_mntpt
|
|
if cephfs_name:
|
|
self.cephfs_name = cephfs_name
|
|
if cephfs_mntpt:
|
|
self.cephfs_mntpt = cephfs_mntpt
|
|
|
|
def remount(self, **kwargs):
|
|
"""
|
|
Update mount object's attributes and attempt remount with these
|
|
new values for these attrbiutes.
|
|
|
|
1. Run umount_wait().
|
|
2. Run update_attrs().
|
|
3. Run mount().
|
|
|
|
Accepts arguments of self.mount() and self.update_attrs() with 1
|
|
exception: wait accepted too which can be True or False.
|
|
"""
|
|
self.umount_wait()
|
|
assert not self.is_mounted()
|
|
|
|
mntopts = kwargs.pop('mntopts', [])
|
|
check_status = kwargs.pop('check_status', True)
|
|
wait = kwargs.pop('wait', True)
|
|
|
|
self.update_attrs(**kwargs)
|
|
|
|
retval = self.mount(mntopts=mntopts, check_status=check_status)
|
|
# avoid this scenario (again): mount command might've failed and
|
|
# check_status might have silenced the exception, yet we attempt to
|
|
# wait which might lead to an error.
|
|
if retval is None and wait:
|
|
self.wait_until_mounted()
|
|
|
|
return retval
|
|
|
|
def kill(self):
|
|
"""
|
|
Suspend the netns veth interface to make the client disconnected
|
|
from the ceph cluster
|
|
"""
|
|
log.info('Killing connection on {0}...'.format(self.client_remote.name))
|
|
self.suspend_netns()
|
|
|
|
def kill_cleanup(self):
|
|
"""
|
|
Follow up ``kill`` to get to a clean unmounted state.
|
|
"""
|
|
log.info('Cleaning up killed connection on {0}'.format(self.client_remote.name))
|
|
self.umount_wait(force=True)
|
|
|
|
def cleanup(self):
|
|
"""
|
|
Remove the mount point.
|
|
|
|
Prerequisite: the client is not mounted.
|
|
"""
|
|
log.info('Cleaning up mount {0}'.format(self.client_remote.name))
|
|
stderr = StringIO()
|
|
try:
|
|
self.client_remote.run(args=['rmdir', '--', self.mountpoint],
|
|
cwd=self.test_dir, stderr=stderr,
|
|
timeout=(60*5), check_status=False)
|
|
except CommandFailedError:
|
|
if "no such file or directory" not in stderr.getvalue().lower():
|
|
raise
|
|
|
|
self.cleanup_netns()
|
|
|
|
def wait_until_mounted(self):
|
|
raise NotImplementedError()
|
|
|
|
def get_keyring_path(self):
|
|
# N.B.: default keyring is /etc/ceph/ceph.keyring; see ceph.py and generate_caps
|
|
return '/etc/ceph/ceph.client.{id}.keyring'.format(id=self.client_id)
|
|
|
|
def get_key_from_keyfile(self):
|
|
# XXX: don't call run_shell(), since CephFS might be unmounted.
|
|
keyring = self.client_remote.read_file(self.client_keyring_path).\
|
|
decode()
|
|
|
|
for line in keyring.split('\n'):
|
|
if line.find('key') != -1:
|
|
return line[line.find('=') + 1 : ].strip()
|
|
|
|
raise RuntimeError('Key not found in keyring file '
|
|
f'{self.client_keyring_path}. Its contents are -\n'
|
|
f'{keyring}')
|
|
|
|
@property
|
|
def config_path(self):
|
|
"""
|
|
Path to ceph.conf: override this if you're not a normal systemwide ceph install
|
|
:return: stringv
|
|
"""
|
|
return "/etc/ceph/ceph.conf"
|
|
|
|
@contextmanager
|
|
def mounted_wait(self):
|
|
"""
|
|
A context manager, from an initially unmounted state, to mount
|
|
this, yield, and then unmount and clean up.
|
|
"""
|
|
self.mount()
|
|
self.wait_until_mounted()
|
|
try:
|
|
yield
|
|
finally:
|
|
self.umount_wait()
|
|
|
|
def create_file(self, filename='testfile', dirname=None, user=None,
|
|
check_status=True):
|
|
assert(self.is_mounted())
|
|
|
|
if not os.path.isabs(filename):
|
|
if dirname:
|
|
if os.path.isabs(dirname):
|
|
path = os.path.join(dirname, filename)
|
|
else:
|
|
path = os.path.join(self.hostfs_mntpt, dirname, filename)
|
|
else:
|
|
path = os.path.join(self.hostfs_mntpt, filename)
|
|
else:
|
|
path = filename
|
|
|
|
if user:
|
|
args = ['sudo', '-u', user, '-s', '/bin/bash', '-c', 'touch ' + path]
|
|
else:
|
|
args = 'touch ' + path
|
|
|
|
return self.client_remote.run(args=args, check_status=check_status)
|
|
|
|
def create_files(self):
|
|
assert(self.is_mounted())
|
|
|
|
for suffix in self.test_files:
|
|
log.info("Creating file {0}".format(suffix))
|
|
self.client_remote.run(args=[
|
|
'touch', os.path.join(self.hostfs_mntpt, suffix)
|
|
])
|
|
|
|
def test_create_file(self, filename='testfile', dirname=None, user=None,
|
|
check_status=True):
|
|
return self.create_file(filename=filename, dirname=dirname, user=user,
|
|
check_status=False)
|
|
|
|
def check_files(self):
|
|
assert(self.is_mounted())
|
|
|
|
for suffix in self.test_files:
|
|
log.info("Checking file {0}".format(suffix))
|
|
r = self.client_remote.run(args=[
|
|
'ls', os.path.join(self.hostfs_mntpt, suffix)
|
|
], check_status=False)
|
|
if r.exitstatus != 0:
|
|
raise RuntimeError("Expected file {0} not found".format(suffix))
|
|
|
|
def write_file(self, path, data, perms=None):
|
|
"""
|
|
Write the given data at the given path and set the given perms to the
|
|
file on the path.
|
|
"""
|
|
if path.find(self.hostfs_mntpt) == -1:
|
|
path = os.path.join(self.hostfs_mntpt, path)
|
|
|
|
write_file(self.client_remote, path, data)
|
|
|
|
if perms:
|
|
self.run_shell(args=f'chmod {perms} {path}')
|
|
|
|
def read_file(self, path):
|
|
"""
|
|
Return the data from the file on given path.
|
|
"""
|
|
if path.find(self.hostfs_mntpt) == -1:
|
|
path = os.path.join(self.hostfs_mntpt, path)
|
|
|
|
return self.run_shell(args=['cat', path]).\
|
|
stdout.getvalue().strip()
|
|
|
|
def create_destroy(self):
|
|
assert(self.is_mounted())
|
|
|
|
filename = "{0} {1}".format(datetime.datetime.now(), self.client_id)
|
|
log.debug("Creating test file {0}".format(filename))
|
|
self.client_remote.run(args=[
|
|
'touch', os.path.join(self.hostfs_mntpt, filename)
|
|
])
|
|
log.debug("Deleting test file {0}".format(filename))
|
|
self.client_remote.run(args=[
|
|
'rm', '-f', os.path.join(self.hostfs_mntpt, filename)
|
|
])
|
|
|
|
def _run_python(self, pyscript, py_version='python3', sudo=False):
|
|
args, omit_sudo = [], True
|
|
if sudo:
|
|
args.append('sudo')
|
|
omit_sudo = False
|
|
args += ['adjust-ulimits', 'daemon-helper', 'kill', py_version, '-c', pyscript]
|
|
return self.client_remote.run(args=args, wait=False, stdin=run.PIPE,
|
|
stdout=StringIO(), omit_sudo=omit_sudo)
|
|
|
|
def run_python(self, pyscript, py_version='python3', sudo=False):
|
|
p = self._run_python(pyscript, py_version, sudo=sudo)
|
|
p.wait()
|
|
return p.stdout.getvalue().strip()
|
|
|
|
def run_shell(self, args, timeout=300, **kwargs):
|
|
omit_sudo = kwargs.pop('omit_sudo', False)
|
|
cwd = kwargs.pop('cwd', self.mountpoint)
|
|
stdout = kwargs.pop('stdout', StringIO())
|
|
stderr = kwargs.pop('stderr', StringIO())
|
|
|
|
return self.client_remote.run(args=args, cwd=cwd, timeout=timeout,
|
|
stdout=stdout, stderr=stderr,
|
|
omit_sudo=omit_sudo, **kwargs)
|
|
|
|
def run_shell_payload(self, payload, **kwargs):
|
|
kwargs['args'] = ["bash", "-c", Raw(f"'{payload}'")]
|
|
if kwargs.pop('sudo', False):
|
|
kwargs['args'].insert(0, 'sudo')
|
|
kwargs['omit_sudo'] = False
|
|
return self.run_shell(**kwargs)
|
|
|
|
def run_as_user(self, **kwargs):
|
|
"""
|
|
Besides the arguments defined for run_shell() this method also
|
|
accepts argument 'user'.
|
|
"""
|
|
args = kwargs.pop('args')
|
|
user = kwargs.pop('user')
|
|
if isinstance(args, str):
|
|
args = ['sudo', '-u', user, '-s', '/bin/bash', '-c', args]
|
|
elif isinstance(args, list):
|
|
cmdlist = args
|
|
cmd = ''
|
|
for i in cmdlist:
|
|
cmd = cmd + i + ' '
|
|
# get rid of extra space at the end.
|
|
cmd = cmd[:-1]
|
|
|
|
args = ['sudo', '-u', user, '-s', '/bin/bash', '-c', cmd]
|
|
|
|
kwargs['args'] = args
|
|
kwargs['omit_sudo'] = False
|
|
return self.run_shell(**kwargs)
|
|
|
|
def run_as_root(self, **kwargs):
|
|
"""
|
|
Accepts same arguments as run_shell().
|
|
"""
|
|
kwargs['user'] = 'root'
|
|
return self.run_as_user(**kwargs)
|
|
|
|
def assert_retval(self, proc_retval, exp_retval):
|
|
msg = (f'expected return value: {exp_retval}\n'
|
|
f'received return value: {proc_retval}\n')
|
|
assert proc_retval == exp_retval, msg
|
|
|
|
def _verify(self, proc, exp_retval=None, exp_errmsgs=None):
|
|
if exp_retval is None and exp_errmsgs is None:
|
|
raise RuntimeError('Method didn\'t get enough parameters. Pass '
|
|
'return value or error message expected from '
|
|
'the command/process.')
|
|
|
|
if exp_retval is not None:
|
|
self.assert_retval(proc.returncode, exp_retval)
|
|
if exp_errmsgs is None:
|
|
return
|
|
|
|
if isinstance(exp_errmsgs, str):
|
|
exp_errmsgs = (exp_errmsgs, )
|
|
|
|
proc_stderr = proc.stderr.getvalue().lower()
|
|
msg = ('didn\'t find any of the expected string in stderr.\n'
|
|
f'expected string: {exp_errmsgs}\n'
|
|
f'received error message: {proc_stderr}\n'
|
|
'note: received error message is converted to lowercase')
|
|
for e in exp_errmsgs:
|
|
if e in proc_stderr:
|
|
break
|
|
# this else is meant for for loop.
|
|
else:
|
|
assert False, msg
|
|
|
|
def negtestcmd(self, args, retval=None, errmsgs=None, stdin=None,
|
|
cwd=None, wait=True):
|
|
"""
|
|
Conduct a negative test for the given command.
|
|
|
|
retval and errmsgs are parameters to confirm the cause of command
|
|
failure.
|
|
|
|
Note: errmsgs is expected to be a tuple, but in case there's only
|
|
error message, it can also be a string. This method will handle
|
|
that internally.
|
|
"""
|
|
proc = self.run_shell(args=args, wait=wait, stdin=stdin, cwd=cwd,
|
|
check_status=False)
|
|
self._verify(proc, retval, errmsgs)
|
|
return proc
|
|
|
|
def negtestcmd_as_user(self, args, user, retval=None, errmsgs=None,
|
|
stdin=None, cwd=None, wait=True):
|
|
proc = self.run_as_user(args=args, user=user, wait=wait, stdin=stdin,
|
|
cwd=cwd, check_status=False)
|
|
self._verify(proc, retval, errmsgs)
|
|
return proc
|
|
|
|
def negtestcmd_as_root(self, args, retval=None, errmsgs=None, stdin=None,
|
|
cwd=None, wait=True):
|
|
proc = self.run_as_root(args=args, wait=wait, stdin=stdin, cwd=cwd,
|
|
check_status=False)
|
|
self._verify(proc, retval, errmsgs)
|
|
return proc
|
|
|
|
def open_no_data(self, basename):
|
|
"""
|
|
A pure metadata operation
|
|
"""
|
|
assert(self.is_mounted())
|
|
|
|
path = os.path.join(self.hostfs_mntpt, basename)
|
|
|
|
p = self._run_python(dedent(
|
|
"""
|
|
f = open("{path}", 'w')
|
|
""".format(path=path)
|
|
))
|
|
p.wait()
|
|
|
|
def open_background(self, basename="background_file", write=True):
|
|
"""
|
|
Open a file for writing, then block such that the client
|
|
will hold a capability.
|
|
|
|
Don't return until the remote process has got as far as opening
|
|
the file, then return the RemoteProcess instance.
|
|
"""
|
|
assert(self.is_mounted())
|
|
|
|
path = os.path.join(self.hostfs_mntpt, basename)
|
|
|
|
if write:
|
|
pyscript = dedent("""
|
|
import time
|
|
|
|
with open("{path}", 'w') as f:
|
|
f.write('content')
|
|
f.flush()
|
|
f.write('content2')
|
|
while True:
|
|
time.sleep(1)
|
|
""").format(path=path)
|
|
else:
|
|
pyscript = dedent("""
|
|
import time
|
|
|
|
with open("{path}", 'r') as f:
|
|
while True:
|
|
time.sleep(1)
|
|
""").format(path=path)
|
|
|
|
rproc = self._run_python(pyscript)
|
|
self.background_procs.append(rproc)
|
|
|
|
# This wait would not be sufficient if the file had already
|
|
# existed, but it's simple and in practice users of open_background
|
|
# are not using it on existing files.
|
|
self.wait_for_visible(basename)
|
|
|
|
return rproc
|
|
|
|
def open_dir_background(self, basename):
|
|
"""
|
|
Create and hold a capability to a directory.
|
|
"""
|
|
assert(self.is_mounted())
|
|
|
|
path = os.path.join(self.hostfs_mntpt, basename)
|
|
|
|
pyscript = dedent("""
|
|
import time
|
|
import os
|
|
|
|
os.mkdir("{path}")
|
|
fd = os.open("{path}", os.O_RDONLY)
|
|
while True:
|
|
time.sleep(1)
|
|
""").format(path=path)
|
|
|
|
rproc = self._run_python(pyscript)
|
|
self.background_procs.append(rproc)
|
|
|
|
self.wait_for_visible(basename)
|
|
|
|
return rproc
|
|
|
|
def wait_for_dir_empty(self, dirname, timeout=30):
|
|
dirpath = os.path.join(self.hostfs_mntpt, dirname)
|
|
with safe_while(sleep=5, tries=(timeout//5)) as proceed:
|
|
while proceed():
|
|
p = self.run_shell_payload(f"stat -c %h {dirpath}")
|
|
nr_links = int(p.stdout.getvalue().strip())
|
|
if nr_links == 2:
|
|
return
|
|
|
|
def wait_for_visible(self, basename="background_file", timeout=30):
|
|
i = 0
|
|
while i < timeout:
|
|
r = self.client_remote.run(args=[
|
|
'stat', os.path.join(self.hostfs_mntpt, basename)
|
|
], check_status=False)
|
|
if r.exitstatus == 0:
|
|
log.debug("File {0} became visible from {1} after {2}s".format(
|
|
basename, self.client_id, i))
|
|
return
|
|
else:
|
|
time.sleep(1)
|
|
i += 1
|
|
|
|
raise RuntimeError("Timed out after {0}s waiting for {1} to become visible from {2}".format(
|
|
i, basename, self.client_id))
|
|
|
|
def lock_background(self, basename="background_file", do_flock=True):
|
|
"""
|
|
Open and lock a files for writing, hold the lock in a background process
|
|
"""
|
|
assert(self.is_mounted())
|
|
|
|
path = os.path.join(self.hostfs_mntpt, basename)
|
|
|
|
script_builder = """
|
|
import time
|
|
import fcntl
|
|
import struct"""
|
|
if do_flock:
|
|
script_builder += """
|
|
f1 = open("{path}-1", 'w')
|
|
fcntl.flock(f1, fcntl.LOCK_EX | fcntl.LOCK_NB)"""
|
|
script_builder += """
|
|
f2 = open("{path}-2", 'w')
|
|
lockdata = struct.pack('hhllhh', fcntl.F_WRLCK, 0, 0, 0, 0, 0)
|
|
fcntl.fcntl(f2, fcntl.F_SETLK, lockdata)
|
|
while True:
|
|
time.sleep(1)
|
|
"""
|
|
|
|
pyscript = dedent(script_builder).format(path=path)
|
|
|
|
log.info("lock_background file {0}".format(basename))
|
|
rproc = self._run_python(pyscript)
|
|
self.background_procs.append(rproc)
|
|
return rproc
|
|
|
|
def lock_and_release(self, basename="background_file"):
|
|
assert(self.is_mounted())
|
|
|
|
path = os.path.join(self.hostfs_mntpt, basename)
|
|
|
|
script = """
|
|
import time
|
|
import fcntl
|
|
import struct
|
|
f1 = open("{path}-1", 'w')
|
|
fcntl.flock(f1, fcntl.LOCK_EX)
|
|
f2 = open("{path}-2", 'w')
|
|
lockdata = struct.pack('hhllhh', fcntl.F_WRLCK, 0, 0, 0, 0, 0)
|
|
fcntl.fcntl(f2, fcntl.F_SETLK, lockdata)
|
|
"""
|
|
pyscript = dedent(script).format(path=path)
|
|
|
|
log.info("lock_and_release file {0}".format(basename))
|
|
return self._run_python(pyscript)
|
|
|
|
def check_filelock(self, basename="background_file", do_flock=True):
|
|
assert(self.is_mounted())
|
|
|
|
path = os.path.join(self.hostfs_mntpt, basename)
|
|
|
|
script_builder = """
|
|
import fcntl
|
|
import errno
|
|
import struct"""
|
|
if do_flock:
|
|
script_builder += """
|
|
f1 = open("{path}-1", 'r')
|
|
try:
|
|
fcntl.flock(f1, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
|
except IOError as e:
|
|
if e.errno == errno.EAGAIN:
|
|
pass
|
|
else:
|
|
raise RuntimeError("flock on file {path}-1 not found")"""
|
|
script_builder += """
|
|
f2 = open("{path}-2", 'r')
|
|
try:
|
|
lockdata = struct.pack('hhllhh', fcntl.F_WRLCK, 0, 0, 0, 0, 0)
|
|
fcntl.fcntl(f2, fcntl.F_SETLK, lockdata)
|
|
except IOError as e:
|
|
if e.errno == errno.EAGAIN:
|
|
pass
|
|
else:
|
|
raise RuntimeError("posix lock on file {path}-2 not found")
|
|
"""
|
|
pyscript = dedent(script_builder).format(path=path)
|
|
|
|
log.info("check lock on file {0}".format(basename))
|
|
self.client_remote.run(args=[
|
|
'python3', '-c', pyscript
|
|
])
|
|
|
|
def write_background(self, basename="background_file", loop=False):
|
|
"""
|
|
Open a file for writing, complete as soon as you can
|
|
:param basename:
|
|
:return:
|
|
"""
|
|
assert(self.is_mounted())
|
|
|
|
path = os.path.join(self.hostfs_mntpt, basename)
|
|
|
|
pyscript = dedent("""
|
|
import os
|
|
import time
|
|
|
|
fd = os.open("{path}", os.O_RDWR | os.O_CREAT, 0o644)
|
|
try:
|
|
while True:
|
|
os.write(fd, b'content')
|
|
time.sleep(1)
|
|
if not {loop}:
|
|
break
|
|
except IOError as e:
|
|
pass
|
|
os.close(fd)
|
|
""").format(path=path, loop=str(loop))
|
|
|
|
rproc = self._run_python(pyscript)
|
|
self.background_procs.append(rproc)
|
|
return rproc
|
|
|
|
def write_n_mb(self, filename, n_mb, seek=0, wait=True):
|
|
"""
|
|
Write the requested number of megabytes to a file
|
|
"""
|
|
assert(self.is_mounted())
|
|
|
|
return self.run_shell(["dd", "if=/dev/urandom", "of={0}".format(filename),
|
|
"bs=1M", "conv=fdatasync",
|
|
"count={0}".format(int(n_mb)),
|
|
"seek={0}".format(int(seek))
|
|
], wait=wait)
|
|
|
|
def write_test_pattern(self, filename, size):
|
|
log.info("Writing {0} bytes to {1}".format(size, filename))
|
|
return self.run_python(dedent("""
|
|
import zlib
|
|
path = "{path}"
|
|
with open(path, 'w') as f:
|
|
for i in range(0, {size}):
|
|
val = zlib.crc32(str(i).encode('utf-8')) & 7
|
|
f.write(chr(val))
|
|
""".format(
|
|
path=os.path.join(self.hostfs_mntpt, filename),
|
|
size=size
|
|
)))
|
|
|
|
def validate_test_pattern(self, filename, size):
|
|
log.info("Validating {0} bytes from {1}".format(size, filename))
|
|
# Use sudo because cephfs-data-scan may recreate the file with owner==root
|
|
return self.run_python(dedent("""
|
|
import zlib
|
|
path = "{path}"
|
|
with open(path, 'r') as f:
|
|
bytes = f.read()
|
|
if len(bytes) != {size}:
|
|
raise RuntimeError("Bad length {{0}} vs. expected {{1}}".format(
|
|
len(bytes), {size}
|
|
))
|
|
for i, b in enumerate(bytes):
|
|
val = zlib.crc32(str(i).encode('utf-8')) & 7
|
|
if b != chr(val):
|
|
raise RuntimeError("Bad data at offset {{0}}".format(i))
|
|
""".format(
|
|
path=os.path.join(self.hostfs_mntpt, filename),
|
|
size=size
|
|
)), sudo=True)
|
|
|
|
def open_n_background(self, fs_path, count):
|
|
"""
|
|
Open N files for writing, hold them open in a background process
|
|
|
|
:param fs_path: Path relative to CephFS root, e.g. "foo/bar"
|
|
:return: a RemoteProcess
|
|
"""
|
|
assert(self.is_mounted())
|
|
|
|
abs_path = os.path.join(self.hostfs_mntpt, fs_path)
|
|
|
|
pyscript = dedent("""
|
|
import sys
|
|
import time
|
|
import os
|
|
|
|
n = {count}
|
|
abs_path = "{abs_path}"
|
|
|
|
if not os.path.exists(abs_path):
|
|
os.makedirs(abs_path)
|
|
|
|
handles = []
|
|
for i in range(0, n):
|
|
fname = "file_"+str(i)
|
|
path = os.path.join(abs_path, fname)
|
|
handles.append(open(path, 'w'))
|
|
|
|
while True:
|
|
time.sleep(1)
|
|
""").format(abs_path=abs_path, count=count)
|
|
|
|
rproc = self._run_python(pyscript)
|
|
self.background_procs.append(rproc)
|
|
return rproc
|
|
|
|
def create_n_files(self, fs_path, count, sync=False, dirsync=False, unlink=False, finaldirsync=False):
|
|
"""
|
|
Create n files.
|
|
|
|
:param sync: sync the file after writing
|
|
:param dirsync: sync the containing directory after closing the file
|
|
:param unlink: unlink the file after closing
|
|
:param finaldirsync: sync the containing directory after closing the last file
|
|
"""
|
|
|
|
assert(self.is_mounted())
|
|
|
|
abs_path = os.path.join(self.hostfs_mntpt, fs_path)
|
|
|
|
pyscript = dedent(f"""
|
|
import os
|
|
|
|
n = {count}
|
|
path = "{abs_path}"
|
|
|
|
dpath = os.path.dirname(path)
|
|
fnameprefix = os.path.basename(path)
|
|
os.makedirs(dpath, exist_ok=True)
|
|
|
|
try:
|
|
dirfd = os.open(dpath, os.O_DIRECTORY)
|
|
|
|
for i in range(n):
|
|
fpath = os.path.join(dpath, f"{{fnameprefix}}_{{i}}")
|
|
with open(fpath, 'w') as f:
|
|
f.write(f"{{i}}")
|
|
if {sync}:
|
|
f.flush()
|
|
os.fsync(f.fileno())
|
|
if {unlink}:
|
|
os.unlink(fpath)
|
|
if {dirsync}:
|
|
os.fsync(dirfd)
|
|
if {finaldirsync}:
|
|
os.fsync(dirfd)
|
|
finally:
|
|
os.close(dirfd)
|
|
""")
|
|
|
|
self.run_python(pyscript)
|
|
|
|
def teardown(self):
|
|
for p in self.background_procs:
|
|
log.info("Terminating background process")
|
|
self._kill_background(p)
|
|
|
|
self.background_procs = []
|
|
|
|
def _kill_background(self, p):
|
|
if p.stdin:
|
|
p.stdin.close()
|
|
try:
|
|
p.wait()
|
|
except (CommandFailedError, ConnectionLostError):
|
|
pass
|
|
|
|
def kill_background(self, p):
|
|
"""
|
|
For a process that was returned by one of the _background member functions,
|
|
kill it hard.
|
|
"""
|
|
self._kill_background(p)
|
|
self.background_procs.remove(p)
|
|
|
|
def send_signal(self, signal):
|
|
signal = signal.lower()
|
|
if signal.lower() not in ['sigstop', 'sigcont', 'sigterm', 'sigkill']:
|
|
raise NotImplementedError
|
|
|
|
self.client_remote.run(args=['sudo', 'kill', '-{0}'.format(signal),
|
|
self.client_pid], omit_sudo=False)
|
|
|
|
def get_global_id(self):
|
|
raise NotImplementedError()
|
|
|
|
def get_global_inst(self):
|
|
raise NotImplementedError()
|
|
|
|
def get_global_addr(self):
|
|
raise NotImplementedError()
|
|
|
|
def get_osd_epoch(self):
|
|
raise NotImplementedError()
|
|
|
|
def get_op_read_count(self):
|
|
raise NotImplementedError()
|
|
|
|
def readlink(self, fs_path):
|
|
abs_path = os.path.join(self.hostfs_mntpt, fs_path)
|
|
|
|
pyscript = dedent("""
|
|
import os
|
|
|
|
print(os.readlink("{path}"))
|
|
""").format(path=abs_path)
|
|
|
|
proc = self._run_python(pyscript)
|
|
proc.wait()
|
|
return str(proc.stdout.getvalue().strip())
|
|
|
|
|
|
def lstat(self, fs_path, follow_symlinks=False, wait=True):
|
|
return self.stat(fs_path, follow_symlinks=False, wait=True)
|
|
|
|
def stat(self, fs_path, follow_symlinks=True, wait=True, **kwargs):
|
|
"""
|
|
stat a file, and return the result as a dictionary like this:
|
|
{
|
|
"st_ctime": 1414161137.0,
|
|
"st_mtime": 1414161137.0,
|
|
"st_nlink": 33,
|
|
"st_gid": 0,
|
|
"st_dev": 16777218,
|
|
"st_size": 1190,
|
|
"st_ino": 2,
|
|
"st_uid": 0,
|
|
"st_mode": 16877,
|
|
"st_atime": 1431520593.0
|
|
}
|
|
|
|
Raises exception on absent file.
|
|
"""
|
|
abs_path = os.path.join(self.hostfs_mntpt, fs_path)
|
|
if follow_symlinks:
|
|
stat_call = "os.stat('" + abs_path + "')"
|
|
else:
|
|
stat_call = "os.lstat('" + abs_path + "')"
|
|
|
|
pyscript = dedent("""
|
|
import os
|
|
import stat
|
|
import json
|
|
import sys
|
|
|
|
try:
|
|
s = {stat_call}
|
|
except OSError as e:
|
|
sys.exit(e.errno)
|
|
|
|
attrs = ["st_mode", "st_ino", "st_dev", "st_nlink", "st_uid", "st_gid", "st_size", "st_atime", "st_mtime", "st_ctime"]
|
|
print(json.dumps(
|
|
dict([(a, getattr(s, a)) for a in attrs]),
|
|
indent=2))
|
|
""").format(stat_call=stat_call)
|
|
proc = self._run_python(pyscript, **kwargs)
|
|
if wait:
|
|
proc.wait()
|
|
return json.loads(proc.stdout.getvalue().strip())
|
|
else:
|
|
return proc
|
|
|
|
def touch(self, fs_path):
|
|
"""
|
|
Create a dentry if it doesn't already exist. This python
|
|
implementation exists because the usual command line tool doesn't
|
|
pass through error codes like EIO.
|
|
|
|
:param fs_path:
|
|
:return:
|
|
"""
|
|
abs_path = os.path.join(self.hostfs_mntpt, fs_path)
|
|
pyscript = dedent("""
|
|
import sys
|
|
import errno
|
|
|
|
try:
|
|
f = open("{path}", "w")
|
|
f.close()
|
|
except IOError as e:
|
|
sys.exit(errno.EIO)
|
|
""").format(path=abs_path)
|
|
proc = self._run_python(pyscript)
|
|
proc.wait()
|
|
|
|
def path_to_ino(self, fs_path, follow_symlinks=True):
|
|
abs_path = os.path.join(self.hostfs_mntpt, fs_path)
|
|
|
|
if follow_symlinks:
|
|
pyscript = dedent("""
|
|
import os
|
|
import stat
|
|
|
|
print(os.stat("{path}").st_ino)
|
|
""").format(path=abs_path)
|
|
else:
|
|
pyscript = dedent("""
|
|
import os
|
|
import stat
|
|
|
|
print(os.lstat("{path}").st_ino)
|
|
""").format(path=abs_path)
|
|
|
|
proc = self._run_python(pyscript)
|
|
proc.wait()
|
|
return int(proc.stdout.getvalue().strip())
|
|
|
|
def path_to_nlink(self, fs_path):
|
|
abs_path = os.path.join(self.hostfs_mntpt, fs_path)
|
|
|
|
pyscript = dedent("""
|
|
import os
|
|
import stat
|
|
|
|
print(os.stat("{path}").st_nlink)
|
|
""").format(path=abs_path)
|
|
|
|
proc = self._run_python(pyscript)
|
|
proc.wait()
|
|
return int(proc.stdout.getvalue().strip())
|
|
|
|
def ls(self, path=None, **kwargs):
|
|
"""
|
|
Wrap ls: return a list of strings
|
|
"""
|
|
kwargs['args'] = ["ls"]
|
|
if path:
|
|
kwargs['args'].append(path)
|
|
if kwargs.pop('sudo', False):
|
|
kwargs['args'].insert(0, 'sudo')
|
|
kwargs['omit_sudo'] = False
|
|
ls_text = self.run_shell(**kwargs).stdout.getvalue().strip()
|
|
|
|
if ls_text:
|
|
return ls_text.split("\n")
|
|
else:
|
|
# Special case because otherwise split on empty string
|
|
# gives you [''] instead of []
|
|
return []
|
|
|
|
def setfattr(self, path, key, val, **kwargs):
|
|
"""
|
|
Wrap setfattr.
|
|
|
|
:param path: relative to mount point
|
|
:param key: xattr name
|
|
:param val: xattr value
|
|
:return: None
|
|
"""
|
|
kwargs['args'] = ["setfattr", "-n", key, "-v", val, path]
|
|
if kwargs.pop('sudo', False):
|
|
kwargs['args'].insert(0, 'sudo')
|
|
kwargs['omit_sudo'] = False
|
|
self.run_shell(**kwargs)
|
|
|
|
def getfattr(self, path, attr, **kwargs):
|
|
"""
|
|
Wrap getfattr: return the values of a named xattr on one file, or
|
|
None if the attribute is not found.
|
|
|
|
:return: a string
|
|
"""
|
|
kwargs['args'] = ["getfattr", "--only-values", "-n", attr, path]
|
|
if kwargs.pop('sudo', False):
|
|
kwargs['args'].insert(0, 'sudo')
|
|
kwargs['omit_sudo'] = False
|
|
kwargs['wait'] = False
|
|
p = self.run_shell(**kwargs)
|
|
try:
|
|
p.wait()
|
|
except CommandFailedError as e:
|
|
if e.exitstatus == 1 and "No such attribute" in p.stderr.getvalue():
|
|
return None
|
|
else:
|
|
raise
|
|
|
|
return str(p.stdout.getvalue())
|
|
|
|
def df(self):
|
|
"""
|
|
Wrap df: return a dict of usage fields in bytes
|
|
"""
|
|
|
|
p = self.run_shell(["df", "-B1", "."])
|
|
lines = p.stdout.getvalue().strip().split("\n")
|
|
fs, total, used, avail = lines[1].split()[:4]
|
|
log.warning(lines)
|
|
|
|
return {
|
|
"total": int(total),
|
|
"used": int(used),
|
|
"available": int(avail)
|
|
}
|
|
|
|
def dir_checksum(self, path=None, follow_symlinks=False):
|
|
cmd = ["find"]
|
|
if follow_symlinks:
|
|
cmd.append("-L")
|
|
if path:
|
|
cmd.append(path)
|
|
cmd.extend(["-type", "f", "-exec", "md5sum", "{}", "+"])
|
|
checksum_text = self.run_shell(cmd).stdout.getvalue().strip()
|
|
checksum_sorted = sorted(checksum_text.split('\n'), key=lambda v: v.split()[1])
|
|
return hashlib.md5(('\n'.join(checksum_sorted)).encode('utf-8')).hexdigest()
|
|
|
|
def validate_subvol_options(self):
|
|
mount_subvol_num = self.client_config.get('mount_subvol_num', None)
|
|
if self.cephfs_mntpt and mount_subvol_num is not None:
|
|
log.warning("You cannot specify both: cephfs_mntpt and mount_subvol_num")
|
|
log.info(f"Mounting subvol {mount_subvol_num} for now")
|
|
|
|
if mount_subvol_num is not None:
|
|
# mount_subvol must be an index into the subvol path array for the fs
|
|
if not self.cephfs_name:
|
|
self.cephfs_name = 'cephfs'
|
|
assert(hasattr(self.ctx, "created_subvols"))
|
|
# mount_subvol must be specified under client.[0-9] yaml section
|
|
subvol_paths = self.ctx.created_subvols[self.cephfs_name]
|
|
path_to_mount = subvol_paths[mount_subvol_num]
|
|
self.cephfs_mntpt = path_to_mount
|
|
elif not self.cephfs_mntpt:
|
|
# default to the "/" path
|
|
self.cephfs_mntpt = "/"
|