ceph/qa/tasks/cephfs/test_cephfs_shell.py
Rishabh Dave 9f95f96dc0
Merge pull request #45998 from dparmar18/implement_ln_cephfs_shell
cephfs-shell: ln command implementation

Reviewed-by: Nikhilkumar Shelke <nshelke@redhat.com>
Reviewed-by: Venky Shankar <vshankar@redhat.com>
Reviewed-by: Anthony D'Atri <anthony.datri@gmail.com>
Reviewed-by: Rishabh Dave <ridave@redhat.com>
2022-07-08 19:26:34 +05:30

1168 lines
44 KiB
Python

"""
NOTE: For running this tests locally (using vstart_runner.py), export the
path to src/tools/cephfs/cephfs-shell module to $PATH. Running
"export PATH=$PATH:$(cd ../src/tools/cephfs && pwd)" from the build dir
will update the environment without hassles of typing the path correctly.
"""
from io import StringIO
from os import path
import crypt
import logging
from tempfile import mkstemp as tempfile_mkstemp
import math
from time import sleep
from tasks.cephfs.cephfs_test_case import CephFSTestCase
from teuthology.exceptions import CommandFailedError
from textwrap import dedent
log = logging.getLogger(__name__)
def humansize(nbytes):
suffixes = ['B', 'K', 'M', 'G', 'T', 'P']
i = 0
while nbytes >= 1024 and i < len(suffixes) - 1:
nbytes /= 1024.
i += 1
nbytes = math.ceil(nbytes)
f = ('%d' % nbytes).rstrip('.')
return '%s%s' % (f, suffixes[i])
def ensure_str(s):
if isinstance(s, str):
return s
if isinstance(s, bytes):
return s.decode()
raise TypeError("not expecting type '%s'" % type(s))
class TestCephFSShell(CephFSTestCase):
CLIENTS_REQUIRED = 1
def setUp(self):
super(TestCephFSShell, self).setUp()
conf_contents = "[cephfs-shell]\ncolors = False\ndebug = True\n"
confpath = self.mount_a.client_remote.sh('mktemp').strip()
self.mount_a.client_remote.write_file(confpath, conf_contents)
self.default_shell_conf_path = confpath
def run_cephfs_shell_cmd(self, cmd, mount_x=None, shell_conf_path=None,
opts=None, stdout=None, stderr=None, stdin=None,
check_status=True):
stdout = stdout or StringIO()
stderr = stderr or StringIO()
if mount_x is None:
mount_x = self.mount_a
if isinstance(cmd, list):
cmd = " ".join(cmd)
if not shell_conf_path:
shell_conf_path = self.default_shell_conf_path
args = ["cephfs-shell", "-c", shell_conf_path]
if opts:
args += opts
args.extend(("--", cmd))
log.info("Running command: {}".format(" ".join(args)))
return mount_x.client_remote.run(args=args, stdout=stdout,
stderr=stderr, stdin=stdin,
check_status=check_status)
def negtest_cephfs_shell_cmd(self, **kwargs):
"""
This method verifies that cephfs shell command fails with expected
return value and/or error message.
kwargs is expected to hold the arguments same as
run_cephfs_shell_cmd() with the following exceptions -
* It should not contain check_status (since commands are expected
to fail, check_status is hardcoded to False).
* It is optional to set expected error message and return value to
dict members 'errmsg' and 'retval' respectively.
This method servers as shorthand for codeblocks like -
try:
proc = self.run_cephfs_shell_cmd(args=['some', 'cmd'],
check_status=False,
stdout=stdout)
except CommandFailedError as e:
self.assertNotIn('some error message',
proc.stderr.getvalue.lower())
try:
proc = self.run_cephfs_shell_cmd(args=['some', 'cmd'],
check_status=False,
stdout=stdout)
except CommandFailedError as e:
self.assertNotEqual(1, proc.returncode)
"""
retval = kwargs.pop('retval', None)
errmsg = kwargs.pop('errmsg', None)
kwargs['check_status'] = False
proc = self.run_cephfs_shell_cmd(**kwargs)
if retval:
self.assertEqual(proc.returncode, retval)
else:
self.assertNotEqual(proc.returncode, 0)
if errmsg:
self.assertIn(errmsg, proc.stderr.getvalue().lower())
return proc
def get_cephfs_shell_cmd_output(self, cmd, mount_x=None,
shell_conf_path=None, opts=None,
stdout=None, stdin=None,
check_status=True):
return ensure_str(self.run_cephfs_shell_cmd(
cmd=cmd, mount_x=mount_x, shell_conf_path=shell_conf_path,
opts=opts, stdout=stdout, stdin=stdin,
check_status=check_status).stdout.getvalue().strip())
def get_cephfs_shell_cmd_error(self, cmd, mount_x=None,
shell_conf_path=None, opts=None,
stderr=None, stdin=None, check_status=True):
return ensure_str(self.run_cephfs_shell_cmd(
cmd=cmd, mount_x=mount_x, shell_conf_path=shell_conf_path,
opts=opts, stderr=stderr, stdin=stdin,
check_status=check_status).stderr.getvalue().strip())
def run_cephfs_shell_script(self, script, mount_x=None,
shell_conf_path=None, opts=None, stdout=None,
stderr=None, stdin=None, check_status=True):
stdout = stdout or StringIO()
stderr = stderr or StringIO()
if mount_x is None:
mount_x = self.mount_a
scriptpath = tempfile_mkstemp(prefix='test-cephfs', text=True)[1]
with open(scriptpath, 'w') as scriptfile:
scriptfile.write(script)
# copy script to the machine running cephfs-shell.
mount_x.client_remote.put_file(scriptpath, scriptpath)
mount_x.run_shell_payload(f"chmod 755 {scriptpath}")
args = ["cephfs-shell", '-b', scriptpath]
if shell_conf_path:
args[1:1] = ["-c", shell_conf_path]
log.info('Running script \"' + scriptpath + '\"')
return mount_x.client_remote.run(args=args, stdout=stdout,
stderr=stderr, stdin=stdin,
check_status=True)
def get_cephfs_shell_script_output(self, script, mount_x=None,
shell_conf_path=None, opts=None,
stdout=None, stdin=None,
check_status=True):
return ensure_str(self.run_cephfs_shell_script(
script=script, mount_x=mount_x, shell_conf_path=shell_conf_path,
opts=opts, stdout=stdout, stdin=stdin,
check_status=check_status).stdout.getvalue().strip())
class TestGeneric(TestCephFSShell):
def test_mistyped_cmd(self):
with self.assertRaises(CommandFailedError) as cm:
self.run_cephfs_shell_cmd('lsx')
self.assertEqual(cm.exception.exitstatus, 127)
class TestMkdir(TestCephFSShell):
def test_mkdir(self):
"""
Test that mkdir creates directory
"""
o = self.get_cephfs_shell_cmd_output("mkdir d1")
log.info("cephfs-shell output:\n{}".format(o))
o = self.mount_a.stat('d1')
log.info("mount_a output:\n{}".format(o))
def test_mkdir_with_070000_octal_mode(self):
"""
Test that mkdir fails with octal mode greater than 07777
"""
self.negtest_cephfs_shell_cmd(cmd="mkdir -m 070000 d2")
try:
self.mount_a.stat('d2')
except CommandFailedError:
pass
def test_mkdir_with_negative_octal_mode(self):
"""
Test that mkdir fails with negative octal mode
"""
self.negtest_cephfs_shell_cmd(cmd="mkdir -m -0755 d3")
try:
self.mount_a.stat('d3')
except CommandFailedError:
pass
def test_mkdir_with_non_octal_mode(self):
"""
Test that mkdir passes with non-octal mode
"""
o = self.get_cephfs_shell_cmd_output("mkdir -m u=rwx d4")
log.info("cephfs-shell output:\n{}".format(o))
# mkdir d4 should pass
o = self.mount_a.stat('d4')
assert ((o['st_mode'] & 0o700) == 0o700)
def test_mkdir_with_bad_non_octal_mode(self):
"""
Test that mkdir failes with bad non-octal mode
"""
self.negtest_cephfs_shell_cmd(cmd="mkdir -m ugx=0755 d5")
try:
self.mount_a.stat('d5')
except CommandFailedError:
pass
def test_mkdir_path_without_path_option(self):
"""
Test that mkdir fails without path option for creating path
"""
self.negtest_cephfs_shell_cmd(cmd="mkdir d5/d6/d7")
try:
self.mount_a.stat('d5/d6/d7')
except CommandFailedError:
pass
def test_mkdir_path_with_path_option(self):
"""
Test that mkdir passes with path option for creating path
"""
o = self.get_cephfs_shell_cmd_output("mkdir -p d5/d6/d7")
log.info("cephfs-shell output:\n{}".format(o))
# mkdir d5/d6/d7 should pass
o = self.mount_a.stat('d5/d6/d7')
log.info("mount_a output:\n{}".format(o))
class TestRmdir(TestCephFSShell):
dir_name = "test_dir"
def dir_does_not_exists(self):
"""
Tests that directory does not exists
"""
try:
self.mount_a.stat(self.dir_name)
except CommandFailedError as e:
if e.exitstatus == 2:
return 0
raise
def test_rmdir(self):
"""
Test that rmdir deletes directory
"""
self.run_cephfs_shell_cmd("mkdir " + self.dir_name)
self.run_cephfs_shell_cmd("rmdir " + self.dir_name)
self.dir_does_not_exists()
def test_rmdir_non_existing_dir(self):
"""
Test that rmdir does not delete a non existing directory
"""
self.negtest_cephfs_shell_cmd(cmd="rmdir test_dir")
self.dir_does_not_exists()
def test_rmdir_dir_with_file(self):
"""
Test that rmdir does not delete directory containing file
"""
self.run_cephfs_shell_cmd("mkdir " + self.dir_name)
self.run_cephfs_shell_cmd("put - test_dir/dumpfile", stdin="Valid File")
# see comment below
# with self.assertRaises(CommandFailedError) as cm:
with self.assertRaises(CommandFailedError):
self.run_cephfs_shell_cmd("rmdir " + self.dir_name)
# TODO: we need to check for exit code and error message as well.
# skipping it for not since error codes used by cephfs-shell are not
# standard and they may change soon.
# self.assertEqual(cm.exception.exitcode, 39)
self.mount_a.stat(self.dir_name)
def test_rmdir_existing_file(self):
"""
Test that rmdir does not delete a file
"""
self.run_cephfs_shell_cmd("put - dumpfile", stdin="Valid File")
self.negtest_cephfs_shell_cmd(cmd="rmdir dumpfile")
self.mount_a.stat("dumpfile")
def test_rmdir_p(self):
"""
Test that rmdir -p deletes all empty directories in the root
directory passed
"""
self.run_cephfs_shell_cmd("mkdir -p test_dir/t1/t2/t3")
self.run_cephfs_shell_cmd("rmdir -p " + self.dir_name)
self.dir_does_not_exists()
def test_rmdir_p_valid_path(self):
"""
Test that rmdir -p deletes all empty directories in the path passed
"""
self.run_cephfs_shell_cmd("mkdir -p test_dir/t1/t2/t3")
self.run_cephfs_shell_cmd("rmdir -p test_dir/t1/t2/t3")
self.dir_does_not_exists()
def test_rmdir_p_non_existing_dir(self):
"""
Test that rmdir -p does not delete an invalid directory
"""
self.negtest_cephfs_shell_cmd(cmd="rmdir -p test_dir")
self.dir_does_not_exists()
def test_rmdir_p_dir_with_file(self):
"""
Test that rmdir -p does not delete the directory containing a file
"""
self.run_cephfs_shell_cmd("mkdir " + self.dir_name)
self.run_cephfs_shell_cmd("put - test_dir/dumpfile",
stdin="Valid File")
self.run_cephfs_shell_cmd("rmdir -p " + self.dir_name)
self.mount_a.stat(self.dir_name)
class TestLn(TestCephFSShell):
dir1 = 'test_dir1'
dir2 = 'test_dir2'
dump_id = 11
s = 'somedata'
dump_file = 'dump11'
def test_soft_link_without_link_name(self):
self.run_cephfs_shell_cmd(f'mkdir -p {self.dir1}/{self.dir2}')
self.mount_a.write_file(path=f'{self.dir1}/{self.dump_file}',
data=self.s)
self.run_cephfs_shell_script(script=dedent(f'''
cd /{self.dir1}/{self.dir2}
ln -s ../{self.dump_file}'''))
o = self.get_cephfs_shell_cmd_output(f'cat /{self.dir1}/{self.dir2}'
f'/{self.dump_file}')
self.assertEqual(self.s, o)
def test_soft_link_with_link_name(self):
self.run_cephfs_shell_cmd(f'mkdir -p {self.dir1}/{self.dir2}')
self.mount_a.write_file(path=f'{self.dir1}/{self.dump_file}',
data=self.s)
self.run_cephfs_shell_cmd(f'ln -s /{self.dir1}/{self.dump_file} '
f'/{self.dir1}/{self.dir2}/')
o = self.get_cephfs_shell_cmd_output(f'cat /{self.dir1}/{self.dir2}'
f'/{self.dump_file}')
self.assertEqual(self.s, o)
def test_hard_link_without_link_name(self):
self.run_cephfs_shell_cmd(f'mkdir -p {self.dir1}/{self.dir2}')
self.mount_a.write_file(path=f'{self.dir1}/{self.dump_file}',
data=self.s)
self.run_cephfs_shell_script(script=dedent(f'''
cd /{self.dir1}/{self.dir2}
ln ../{self.dump_file}'''))
o = self.get_cephfs_shell_cmd_output(f'cat /{self.dir1}/{self.dir2}'
f'/{self.dump_file}')
self.assertEqual(self.s, o)
def test_hard_link_with_link_name(self):
self.run_cephfs_shell_cmd(f'mkdir -p {self.dir1}/{self.dir2}')
self.mount_a.write_file(path=f'{self.dir1}/{self.dump_file}',
data=self.s)
self.run_cephfs_shell_cmd(f'ln /{self.dir1}/{self.dump_file} '
f'/{self.dir1}/{self.dir2}/')
o = self.get_cephfs_shell_cmd_output(f'cat /{self.dir1}/{self.dir2}'
f'/{self.dump_file}')
self.assertEqual(self.s, o)
def test_hard_link_to_dir_not_allowed(self):
self.run_cephfs_shell_cmd(f'mkdir {self.dir1}')
self.run_cephfs_shell_cmd(f'mkdir {self.dir2}')
r = self.run_cephfs_shell_cmd(f'ln /{self.dir1} /{self.dir2}/',
check_status=False)
self.assertEqual(r.returncode, 3)
def test_target_exists_in_dir(self):
self.mount_a.write_file(path=f'{self.dump_file}', data=self.s)
r = self.run_cephfs_shell_cmd(f'ln {self.dump_file} {self.dump_file}',
check_status=False)
self.assertEqual(r.returncode, 1)
def test_incorrect_dir(self):
self.mount_a.write_file(path=f'{self.dump_file}', data=self.s)
r = self.run_cephfs_shell_cmd(f'ln {self.dump_file} /dir1/',
check_status=False)
self.assertEqual(r.returncode, 5)
class TestGetAndPut(TestCephFSShell):
def test_get_with_target_name(self):
"""
Test that get passes with target name
"""
s = 'C' * 1024
s_hash = crypt.crypt(s, '.A')
o = self.get_cephfs_shell_cmd_output("put - dump4", stdin=s)
log.info("cephfs-shell output:\n{}".format(o))
# put - dump4 should pass
o = self.mount_a.stat('dump4')
log.info("mount_a output:\n{}".format(o))
o = self.get_cephfs_shell_cmd_output("get dump4 ./dump4")
log.info("cephfs-shell output:\n{}".format(o))
# NOTE: cwd=None because we want to run it at CWD, not at cephfs mntpt.
o = self.mount_a.run_shell('cat dump4', cwd=None).stdout.getvalue(). \
strip()
o_hash = crypt.crypt(o, '.A')
# s_hash must be equal to o_hash
log.info("s_hash:{}".format(s_hash))
log.info("o_hash:{}".format(o_hash))
assert (s_hash == o_hash)
# cleanup
self.mount_a.run_shell("rm dump4", cwd=None, check_status=False)
def test_get_without_target_name(self):
"""
Test that get should fail when there is no target name
"""
s = 'Somedata'
# put - dump5 should pass
self.get_cephfs_shell_cmd_output("put - dump5", stdin=s)
self.mount_a.stat('dump5')
# get dump5 should fail as there is no local_path mentioned
with self.assertRaises(CommandFailedError):
self.get_cephfs_shell_cmd_output("get dump5")
# stat dump would return non-zero exit code as get dump failed
# cwd=None because we want to run it at CWD, not at cephfs mntpt.
r = self.mount_a.run_shell('stat dump5', cwd=None,
check_status=False).returncode
self.assertEqual(r, 1)
def test_get_doesnt_create_dir(self):
# if get cmd is creating subdirs on its own then dump7 will be
# stored as ./dump7/tmp/dump7 and not ./dump7, therefore
# if doing `cat ./dump7` returns non-zero exit code(i.e. 1) then
# it implies that no such file exists at that location
dir_abspath = path.join(self.mount_a.mountpoint, 'tmp')
self.mount_a.run_shell_payload(f"mkdir {dir_abspath}")
self.mount_a.client_remote.write_file(path.join(dir_abspath, 'dump7'),
'somedata')
self.get_cephfs_shell_cmd_output("get /tmp/dump7 ./dump7")
# test that dump7 exists
self.mount_a.run_shell("cat ./dump7", cwd=None)
# cleanup
self.mount_a.run_shell(args='rm dump7', cwd=None, check_status=False)
def test_get_to_console(self):
"""
Test that get passes with target name
"""
s = 'E' * 1024
s_hash = crypt.crypt(s, '.A')
o = self.get_cephfs_shell_cmd_output("put - dump6", stdin=s)
log.info("cephfs-shell output:\n{}".format(o))
# put - dump6 should pass
o = self.mount_a.stat('dump6')
log.info("mount_a output:\n{}".format(o))
# get dump6 - should pass
o = self.get_cephfs_shell_cmd_output("get dump6 -")
o_hash = crypt.crypt(o, '.A')
log.info("cephfs-shell output:\n{}".format(o))
# s_hash must be equal to o_hash
log.info("s_hash:{}".format(s_hash))
log.info("o_hash:{}".format(o_hash))
assert (s_hash == o_hash)
def test_put_without_target_name(self):
"""
put - should fail as the cmd expects both arguments are mandatory.
"""
with self.assertRaises(CommandFailedError):
self.get_cephfs_shell_cmd_output("put -")
def test_put_validate_local_path(self):
"""
This test is intended to make sure local_path is validated before
trying to put the file from local fs to cephfs and the command
put ./dumpXYZ dump8 would fail as dumpXYX doesn't exist.
"""
with self.assertRaises(CommandFailedError):
o = self.get_cephfs_shell_cmd_output("put ./dumpXYZ dump8")
log.info("cephfs-shell output:\n{}".format(o))
class TestSnapshots(TestCephFSShell):
def test_snap(self):
"""
Test that snapshot creation and deletion work
"""
sd = self.fs.get_config('client_snapdir')
sdn = "data_dir/{}/snap1".format(sd)
# create a data dir and dump some files into it
self.get_cephfs_shell_cmd_output("mkdir data_dir")
s = 'A' * 10240
o = self.get_cephfs_shell_cmd_output("put - data_dir/data_a", stdin=s)
s = 'B' * 10240
o = self.get_cephfs_shell_cmd_output("put - data_dir/data_b", stdin=s)
s = 'C' * 10240
o = self.get_cephfs_shell_cmd_output("put - data_dir/data_c", stdin=s)
s = 'D' * 10240
o = self.get_cephfs_shell_cmd_output("put - data_dir/data_d", stdin=s)
s = 'E' * 10240
o = self.get_cephfs_shell_cmd_output("put - data_dir/data_e", stdin=s)
o = self.get_cephfs_shell_cmd_output("ls -l /data_dir")
log.info("cephfs-shell output:\n{}".format(o))
# create the snapshot - must pass
o = self.get_cephfs_shell_cmd_output("snap create snap1 /data_dir")
log.info("cephfs-shell output:\n{}".format(o))
self.assertEqual("", o)
o = self.mount_a.stat(sdn)
log.info("mount_a output:\n{}".format(o))
self.assertIn('st_mode', o)
# create the same snapshot again - must fail with an error message
self.negtest_cephfs_shell_cmd(cmd="snap create snap1 /data_dir",
errmsg="snapshot 'snap1' already exists")
o = self.mount_a.stat(sdn)
log.info("mount_a output:\n{}".format(o))
self.assertIn('st_mode', o)
# delete the snapshot - must pass
o = self.get_cephfs_shell_cmd_output("snap delete snap1 /data_dir")
log.info("cephfs-shell output:\n{}".format(o))
self.assertEqual("", o)
try:
o = self.mount_a.stat(sdn)
except CommandFailedError:
# snap dir should not exist anymore
pass
log.info("mount_a output:\n{}".format(o))
self.assertNotIn('st_mode', o)
# delete the same snapshot again - must fail with an error message
self.negtest_cephfs_shell_cmd(cmd="snap delete snap1 /data_dir",
errmsg="'snap1': no such snapshot")
try:
o = self.mount_a.stat(sdn)
except CommandFailedError:
pass
log.info("mount_a output:\n{}".format(o))
self.assertNotIn('st_mode', o)
class TestCD(TestCephFSShell):
CLIENTS_REQUIRED = 1
def test_cd_with_no_args(self):
"""
Test that when cd is issued without any arguments, CWD is changed
to root directory.
"""
path = 'dir1/dir2/dir3'
self.mount_a.run_shell_payload(f"mkdir -p {path}")
expected_cwd = '/'
script = 'cd {}\ncd\ncwd\n'.format(path)
output = self.get_cephfs_shell_script_output(script)
self.assertEqual(output, expected_cwd)
def test_cd_with_args(self):
"""
Test that when cd is issued with an argument, CWD is changed
to the path passed in the argument.
"""
path = 'dir1/dir2/dir3'
self.mount_a.run_shell_payload(f"mkdir -p {path}")
expected_cwd = '/dir1/dir2/dir3'
script = 'cd {}\ncwd\n'.format(path)
output = self.get_cephfs_shell_script_output(script)
self.assertEqual(output, expected_cwd)
class TestDU(TestCephFSShell):
CLIENTS_REQUIRED = 1
def test_du_works_for_regfiles(self):
regfilename = 'some_regfile'
regfile_abspath = path.join(self.mount_a.mountpoint, regfilename)
self.mount_a.client_remote.write_file(regfile_abspath, 'somedata')
size = humansize(self.mount_a.stat(regfile_abspath)['st_size'])
expected_output = r'{}{}{}'.format(size, " +", regfilename)
du_output = self.get_cephfs_shell_cmd_output('du ' + regfilename)
self.assertRegex(du_output, expected_output)
def test_du_works_for_non_empty_dirs(self):
dirname = 'some_directory'
dir_abspath = path.join(self.mount_a.mountpoint, dirname)
regfilename = 'some_regfile'
regfile_abspath = path.join(dir_abspath, regfilename)
self.mount_a.run_shell_payload(f"mkdir {dir_abspath}")
self.mount_a.client_remote.write_file(regfile_abspath, 'somedata')
# XXX: we stat `regfile_abspath` here because ceph du reports
# a non-empty
# directory's size as sum of sizes of all files under it.
size = humansize(self.mount_a.stat(regfile_abspath)['st_size'])
expected_output = r'{}{}{}'.format(size, " +", dirname)
sleep(10)
du_output = self.get_cephfs_shell_cmd_output('du ' + dirname)
self.assertRegex(du_output, expected_output)
def test_du_works_for_empty_dirs(self):
dirname = 'some_directory'
dir_abspath = path.join(self.mount_a.mountpoint, dirname)
self.mount_a.run_shell_payload(f"mkdir {dir_abspath}")
size = humansize(self.mount_a.stat(dir_abspath)['st_size'])
expected_output = r'{}{}{}'.format(size, " +", dirname)
du_output = self.get_cephfs_shell_cmd_output('du ' + dirname)
self.assertRegex(du_output, expected_output)
def test_du_works_for_hardlinks(self):
regfilename = 'some_regfile'
regfile_abspath = path.join(self.mount_a.mountpoint, regfilename)
self.mount_a.client_remote.write_file(regfile_abspath, 'somedata')
hlinkname = 'some_hardlink'
hlink_abspath = path.join(self.mount_a.mountpoint, hlinkname)
self.mount_a.run_shell_payload(f"ln {regfile_abspath} {hlink_abspath}")
size = humansize(self.mount_a.stat(hlink_abspath)['st_size'])
expected_output = r'{}{}{}'.format(size, " +", hlinkname)
du_output = self.get_cephfs_shell_cmd_output('du ' + hlinkname)
self.assertRegex(du_output, expected_output)
def test_du_works_for_softlinks_to_files(self):
regfilename = 'some_regfile'
regfile_abspath = path.join(self.mount_a.mountpoint, regfilename)
self.mount_a.client_remote.write_file(regfile_abspath, 'somedata')
slinkname = 'some_softlink'
slink_abspath = path.join(self.mount_a.mountpoint, slinkname)
self.mount_a.run_shell_payload(
f"ln -s {regfile_abspath} {slink_abspath}")
size = humansize(self.mount_a.lstat(slink_abspath)['st_size'])
expected_output = r'{}{}{}'.format(size, " +", slinkname)
du_output = self.get_cephfs_shell_cmd_output('du ' + slinkname)
self.assertRegex(du_output, expected_output)
def test_du_works_for_softlinks_to_dirs(self):
dirname = 'some_directory'
dir_abspath = path.join(self.mount_a.mountpoint, dirname)
self.mount_a.run_shell_payload(f"mkdir {dir_abspath}")
slinkname = 'some_softlink'
slink_abspath = path.join(self.mount_a.mountpoint, slinkname)
self.mount_a.run_shell_payload(f"ln -s {dir_abspath} {slink_abspath}")
size = humansize(self.mount_a.lstat(slink_abspath)['st_size'])
expected_output = r'{}{}{}'.format(size, " +", slinkname)
du_output = self.get_cephfs_shell_cmd_output('du ' + slinkname)
self.assertRegex(du_output, expected_output)
# NOTE: tests using these are pretty slow since to this methods sleeps for
# 15 seconds
def _setup_files(self, return_path_to_files=False, path_prefix='./'):
dirname = 'dir1'
regfilename = 'regfile'
hlinkname = 'hlink'
slinkname = 'slink1'
slink2name = 'slink2'
dir_abspath = path.join(self.mount_a.mountpoint, dirname)
regfile_abspath = path.join(self.mount_a.mountpoint, regfilename)
hlink_abspath = path.join(self.mount_a.mountpoint, hlinkname)
slink_abspath = path.join(self.mount_a.mountpoint, slinkname)
slink2_abspath = path.join(self.mount_a.mountpoint, slink2name)
self.mount_a.run_shell_payload(f"mkdir {dir_abspath}")
self.mount_a.run_shell_payload(f"touch {regfile_abspath}")
self.mount_a.run_shell_payload(f"ln {regfile_abspath} {hlink_abspath}")
self.mount_a.run_shell_payload(
f"ln -s {regfile_abspath} {slink_abspath}")
self.mount_a.run_shell_payload(f"ln -s {dir_abspath} {slink2_abspath}")
dir2_name = 'dir2'
dir21_name = 'dir21'
regfile121_name = 'regfile121'
dir2_abspath = path.join(self.mount_a.mountpoint, dir2_name)
dir21_abspath = path.join(dir2_abspath, dir21_name)
regfile121_abspath = path.join(dir21_abspath, regfile121_name)
self.mount_a.run_shell_payload(f"mkdir -p {dir21_abspath}")
self.mount_a.run_shell_payload(f"touch {regfile121_abspath}")
self.mount_a.client_remote.write_file(regfile_abspath, 'somedata')
self.mount_a.client_remote.write_file(regfile121_abspath,
'somemoredata')
# TODO: is there a way to trigger/force update ceph.dir.rbytes?
# wait so that attr ceph.dir.rbytes gets a chance to be updated.
sleep(20)
expected_patterns = []
path_to_files = []
def append_expected_output_pattern(f):
if f == '/':
expected_patterns.append(r'{}{}{}'.format(size, " +", '.' + f))
else:
expected_patterns.append(r'{}{}{}'.format(
size, " +",
path_prefix + path.relpath(f, self.mount_a.mountpoint)))
for f in [dir_abspath, regfile_abspath, regfile121_abspath,
hlink_abspath, slink_abspath, slink2_abspath]:
size = humansize(self.mount_a.stat(
f, follow_symlinks=False)['st_size'])
append_expected_output_pattern(f)
# get size for directories containig regfiles within
for f in [dir2_abspath, dir21_abspath]:
size = humansize(self.mount_a.stat(regfile121_abspath,
follow_symlinks=False)[
'st_size'])
append_expected_output_pattern(f)
# get size for CephFS root
size = 0
for f in [regfile_abspath, regfile121_abspath, slink_abspath,
slink2_abspath]:
size += self.mount_a.stat(f, follow_symlinks=False)['st_size']
size = humansize(size)
append_expected_output_pattern('/')
if return_path_to_files:
for p in [dir_abspath, regfile_abspath, dir2_abspath,
dir21_abspath, regfile121_abspath, hlink_abspath,
slink_abspath, slink2_abspath]:
path_to_files.append(path.relpath(p, self.mount_a.mountpoint))
return expected_patterns, path_to_files
else:
return expected_patterns
def test_du_works_recursively_with_no_path_in_args(self):
expected_patterns_in_output = self._setup_files()
du_output = self.get_cephfs_shell_cmd_output('du -r')
for expected_output in expected_patterns_in_output:
self.assertRegex(du_output, expected_output)
def test_du_with_path_in_args(self):
expected_patterns_in_output, path_to_files = self._setup_files(
True, path_prefix='')
args = ['du', '/']
for p in path_to_files:
args.append(p)
du_output = self.get_cephfs_shell_cmd_output(args)
for expected_output in expected_patterns_in_output:
self.assertRegex(du_output, expected_output)
def test_du_with_no_args(self):
expected_patterns_in_output = self._setup_files()
du_output = self.get_cephfs_shell_cmd_output('du')
for expected_output in expected_patterns_in_output:
# Since CWD is CephFS root and being non-recursive expect only
# CWD in DU report.
if expected_output.find('/') == len(expected_output) - 1:
self.assertRegex(du_output, expected_output)
class TestDF(TestCephFSShell):
def validate_df(self, filename):
df_output = self.get_cephfs_shell_cmd_output('df ' + filename)
log.info("cephfs-shell df output:\n{}".format(df_output))
shell_df = df_output.splitlines()[1].split()
block_size = int(self.mount_a.df()["total"]) // 1024
log.info("cephfs df block size output:{}\n".format(block_size))
st_size = int(self.mount_a.stat(filename)["st_size"])
log.info("cephfs stat used output:{}".format(st_size))
log.info("cephfs available:{}\n".format(block_size - st_size))
self.assertTupleEqual((block_size, st_size, block_size - st_size),
(int(shell_df[0]), int(shell_df[1]),
int(shell_df[2])))
def test_df_with_no_args(self):
expected_output = ''
df_output = self.get_cephfs_shell_cmd_output('df')
assert df_output == expected_output
def test_df_for_valid_directory(self):
dir_name = 'dir1'
mount_output = self.mount_a.run_shell_payload(f"mkdir {dir_name}")
log.info("cephfs-shell mount output:\n{}".format(mount_output))
self.validate_df(dir_name)
def test_df_for_invalid_directory(self):
dir_abspath = path.join(self.mount_a.mountpoint, 'non-existent-dir')
self.negtest_cephfs_shell_cmd(cmd='df ' + dir_abspath,
errmsg='error in stat')
def test_df_for_valid_file(self):
s = 'df test' * 14145016
o = self.get_cephfs_shell_cmd_output("put - dumpfile", stdin=s)
log.info("cephfs-shell output:\n{}".format(o))
self.validate_df("dumpfile")
class TestQuota(TestCephFSShell):
dir_name = 'testdir'
def create_dir(self):
mount_output = self.get_cephfs_shell_cmd_output(
'mkdir ' + self.dir_name)
log.info("cephfs-shell mount output:\n{}".format(mount_output))
def set_and_get_quota_vals(self, input_val, check_status=True):
self.run_cephfs_shell_cmd(['quota', 'set', '--max_bytes',
input_val[0], '--max_files', input_val[1],
self.dir_name], check_status=check_status)
quota_output = self.get_cephfs_shell_cmd_output(
['quota', 'get', self.dir_name],
check_status=check_status)
quota_output = quota_output.split()
return quota_output[1], quota_output[3]
def test_set(self):
self.create_dir()
set_values = ('6', '2')
self.assertTupleEqual(self.set_and_get_quota_vals(set_values),
set_values)
def test_replace_values(self):
self.test_set()
set_values = ('20', '4')
self.assertTupleEqual(self.set_and_get_quota_vals(set_values),
set_values)
def test_set_invalid_dir(self):
set_values = ('5', '5')
try:
self.assertTupleEqual(self.set_and_get_quota_vals(
set_values, False), set_values)
raise Exception(
"Something went wrong!! Values set for non existing directory")
except IndexError:
# Test should pass as values cannot be set for non
# existing directory
pass
def test_set_invalid_values(self):
self.create_dir()
set_values = ('-6', '-5')
try:
self.assertTupleEqual(self.set_and_get_quota_vals(set_values,
False),
set_values)
raise Exception("Something went wrong!! Invalid values set")
except IndexError:
# Test should pass as invalid values cannot be set
pass
def test_exceed_file_limit(self):
self.test_set()
dir_abspath = path.join(self.mount_a.mountpoint, self.dir_name)
self.mount_a.run_shell_payload(f"touch {dir_abspath}/file1")
file2 = path.join(dir_abspath, "file2")
try:
self.mount_a.run_shell_payload(f"touch {file2}")
raise Exception(
"Something went wrong!! File creation should have failed")
except CommandFailedError:
# Test should pass as file quota set to 2
# Additional condition to confirm file creation failure
if not path.exists(file2):
return 0
raise
def test_exceed_write_limit(self):
self.test_set()
dir_abspath = path.join(self.mount_a.mountpoint, self.dir_name)
filename = 'test_file'
file_abspath = path.join(dir_abspath, filename)
try:
# Write should fail as bytes quota is set to 6
self.mount_a.client_remote.write_file(file_abspath,
'Disk raise Exception')
raise Exception("Write should have failed")
except CommandFailedError:
# Test should pass only when write command fails
path_exists = path.exists(file_abspath)
if not path_exists:
# Testing with teuthology: No file is created.
return 0
elif path_exists and not path.getsize(file_abspath):
# Testing on Fedora 30: When write fails, empty
# file gets created.
return 0
else:
raise
class TestXattr(TestCephFSShell):
dir_name = 'testdir'
def create_dir(self):
self.run_cephfs_shell_cmd('mkdir ' + self.dir_name)
def set_get_list_xattr_vals(self, input_val, negtest=False):
setxattr_output = self.get_cephfs_shell_cmd_output(
['setxattr', self.dir_name, input_val[0], input_val[1]])
log.info("cephfs-shell setxattr output:\n{}".format(setxattr_output))
getxattr_output = self.get_cephfs_shell_cmd_output(
['getxattr', self.dir_name, input_val[0]])
log.info("cephfs-shell getxattr output:\n{}".format(getxattr_output))
listxattr_output = self.get_cephfs_shell_cmd_output(
['listxattr', self.dir_name])
log.info("cephfs-shell listxattr output:\n{}".format(listxattr_output))
return listxattr_output, getxattr_output
def test_set(self):
self.create_dir()
set_values = ('user.key', '2')
self.assertTupleEqual(self.set_get_list_xattr_vals(set_values),
set_values)
def test_reset(self):
self.test_set()
set_values = ('user.key', '4')
self.assertTupleEqual(self.set_get_list_xattr_vals(set_values),
set_values)
def test_non_existing_dir(self):
input_val = ('user.key', '9')
self.negtest_cephfs_shell_cmd(
cmd=['setxattr', self.dir_name, input_val[0],
input_val[1]])
self.negtest_cephfs_shell_cmd(
cmd=['getxattr', self.dir_name, input_val[0]])
self.negtest_cephfs_shell_cmd(cmd=['listxattr', self.dir_name])
class TestLS(TestCephFSShell):
dir_name = 'test_dir'
hidden_dir_name = '.test_hidden_dir'
def test_ls(self):
""" Test that ls prints files in CWD. """
self.run_cephfs_shell_cmd(f'mkdir {self.dir_name}')
ls_output = self.get_cephfs_shell_cmd_output("ls")
log.info(f"output of ls command:\n{ls_output}")
self.assertIn(self.dir_name, ls_output)
def test_ls_a(self):
""" Test ls -a prints hidden files in CWD."""
self.run_cephfs_shell_cmd(f'mkdir {self.hidden_dir_name}')
ls_a_output = self.get_cephfs_shell_cmd_output(['ls', '-a'])
log.info(f"output of ls -a command:\n{ls_a_output}")
self.assertIn(self.hidden_dir_name, ls_a_output)
def test_ls_does_not_print_hidden_dir(self):
""" Test ls command does not print hidden directory """
self.run_cephfs_shell_cmd(f'mkdir {self.hidden_dir_name}')
ls_output = self.get_cephfs_shell_cmd_output("ls")
log.info(f"output of ls command:\n{ls_output}")
self.assertNotIn(self.hidden_dir_name, ls_output)
def test_ls_a_prints_non_hidden_dir(self):
""" Test ls -a command prints non hidden directory """
self.run_cephfs_shell_cmd(
f'mkdir {self.hidden_dir_name} {self.dir_name}')
ls_a_output = self.get_cephfs_shell_cmd_output(['ls', '-a'])
log.info(f"output of ls -a command:\n{ls_a_output}")
self.assertIn(self.dir_name, ls_a_output)
def test_ls_H_prints_human_readable_file_size(self):
""" Test "ls -lH" prints human readable file size."""
file_sizes = ['1', '1K', '1M', '1G']
file_names = ['dump1', 'dump2', 'dump3', 'dump4']
for (file_size, file_name) in zip(file_sizes, file_names):
temp_file = self.mount_a.client_remote.mktemp(file_name)
self.mount_a.run_shell_payload(
f"fallocate -l {file_size} {temp_file}")
self.mount_a.run_shell_payload(f'mv {temp_file} ./')
ls_H_output = self.get_cephfs_shell_cmd_output(['ls', '-lH'])
ls_H_file_size = set()
for line in ls_H_output.split('\n'):
ls_H_file_size.add(line.split()[1])
# test that file sizes are in human readable format
self.assertEqual({'1B', '1K', '1M', '1G'}, ls_H_file_size)
def test_ls_s_sort_by_size(self):
""" Test "ls -S" sorts file listing by file_size """
test_file1 = "test_file1.txt"
test_file2 = "test_file2.txt"
file1_content = 'A' * 102
file2_content = 'B' * 10
self.run_cephfs_shell_cmd(f"write {test_file1}", stdin=file1_content)
self.run_cephfs_shell_cmd(f"write {test_file2}", stdin=file2_content)
ls_s_output = self.get_cephfs_shell_cmd_output(['ls', '-lS'])
file_sizes = []
for line in ls_s_output.split('\n'):
file_sizes.append(line.split()[1])
# test that file size are in ascending order
self.assertEqual(file_sizes, sorted(file_sizes))
class TestMisc(TestCephFSShell):
def test_issue_cephfs_shell_cmd_at_invocation(self):
"""
Test that `cephfs-shell -c conf cmd` works.
"""
# choosing a long name since short ones have a higher probability
# of getting matched by coincidence.
dirname = 'somedirectory'
self.run_cephfs_shell_cmd(['mkdir', dirname])
output = self.mount_a.client_remote.sh(['cephfs-shell', 'ls']). \
strip()
self.assertRegex(output, dirname)
def test_help(self):
"""
Test that help outputs commands.
"""
o = self.get_cephfs_shell_cmd_output("help all")
log.info("output:\n{}".format(o))
def test_chmod(self):
"""Test chmod is allowed above o0777 """
test_file1 = "test_file2.txt"
file1_content = 'A' * 102
self.run_cephfs_shell_cmd(f"write {test_file1}", stdin=file1_content)
self.run_cephfs_shell_cmd(f"chmod 01777 {test_file1}")
class TestShellOpts(TestCephFSShell):
"""
Contains tests for shell options from conf file and shell prompt.
"""
def setUp(self):
super(type(self), self).setUp()
# output of following command -
# editor - was: 'vim'
# now: '?'
# editor: '?'
self.editor_val = self.get_cephfs_shell_cmd_output(
'set editor ?, set editor').split('\n')[2]
self.editor_val = self.editor_val.split(':')[1]. \
replace("'", "", 2).strip()
def write_tempconf(self, confcontents):
self.tempconfpath = self.mount_a.client_remote.mktemp(
suffix='cephfs-shell.conf')
self.mount_a.client_remote.write_file(self.tempconfpath,
confcontents)
def test_reading_conf(self):
self.write_tempconf("[cephfs-shell]\neditor = ???")
# output of following command -
# CephFS:~/>>> set editor
# editor: 'vim'
final_editor_val = self.get_cephfs_shell_cmd_output(
cmd='set editor', shell_conf_path=self.tempconfpath)
final_editor_val = final_editor_val.split(': ')[1]
final_editor_val = final_editor_val.replace("'", "", 2)
self.assertNotEqual(self.editor_val, final_editor_val)
def test_reading_conf_with_dup_opt(self):
"""
Read conf without duplicate sections/options.
"""
self.write_tempconf("[cephfs-shell]\neditor = ???\neditor = " +
self.editor_val)
# output of following command -
# CephFS:~/>>> set editor
# editor: 'vim'
final_editor_val = self.get_cephfs_shell_cmd_output(
cmd='set editor', shell_conf_path=self.tempconfpath)
final_editor_val = final_editor_val.split(': ')[1]
final_editor_val = final_editor_val.replace("'", "", 2)
self.assertEqual(self.editor_val, final_editor_val)
def test_setting_opt_after_reading_conf(self):
self.write_tempconf("[cephfs-shell]\neditor = ???")
# output of following command -
# editor - was: vim
# now: vim
# editor: vim
final_editor_val = self.get_cephfs_shell_cmd_output(
cmd='set editor %s, set editor' % self.editor_val,
shell_conf_path=self.tempconfpath)
final_editor_val = final_editor_val.split('\n')[2]
final_editor_val = final_editor_val.split(': ')[1]
final_editor_val = final_editor_val.replace("'", "", 2)
self.assertEqual(self.editor_val, final_editor_val)