mirror of
https://github.com/ceph/ceph
synced 2025-02-23 19:17:37 +00:00
qa/cephfs/test_cephfs_shell: Fixed pep8 formatting violations
NOTE: Although most of the issues are fixed but a few function and variable names are unchanged in order to prevent ambiguity and preserve their meaning. They are: - functions: setUp(), test_ls_H_prints_human_readable_file_size(), - variables: ls_H_output, ls_H_file_size Signed-off-by: Dhairya Parmar <dparmar@redhat.com>
This commit is contained in:
parent
7abc382bfb
commit
c02ab23ede
@ -16,23 +16,26 @@ from teuthology.exceptions import CommandFailedError
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def humansize(nbytes):
|
||||
suffixes = ['B', 'K', 'M', 'G', 'T', 'P']
|
||||
i = 0
|
||||
while nbytes >= 1024 and i < len(suffixes)-1:
|
||||
while nbytes >= 1024 and i < len(suffixes) - 1:
|
||||
nbytes /= 1024.
|
||||
i += 1
|
||||
nbytes = math.ceil(nbytes)
|
||||
f = ('%d' % nbytes).rstrip('.')
|
||||
return '%s%s' % (f, suffixes[i])
|
||||
|
||||
|
||||
def ensure_str(s):
|
||||
if isinstance(s, str):
|
||||
return s
|
||||
if isinstance(s, bytes):
|
||||
return s.decode()
|
||||
raise TypeError("not expecting type '%s'" % type(s))
|
||||
|
||||
|
||||
|
||||
class TestCephFSShell(CephFSTestCase):
|
||||
CLIENTS_REQUIRED = 1
|
||||
|
||||
@ -112,7 +115,8 @@ class TestCephFSShell(CephFSTestCase):
|
||||
|
||||
def get_cephfs_shell_cmd_output(self, cmd, mount_x=None,
|
||||
shell_conf_path=None, opts=None,
|
||||
stdout=None, stdin=None,check_status=True):
|
||||
stdout=None, stdin=None,
|
||||
check_status=True):
|
||||
return ensure_str(self.run_cephfs_shell_cmd(
|
||||
cmd=cmd, mount_x=mount_x, shell_conf_path=shell_conf_path,
|
||||
opts=opts, stdout=stdout, stdin=stdin,
|
||||
@ -199,7 +203,7 @@ class TestMkdir(TestCephFSShell):
|
||||
|
||||
# mkdir d4 should pass
|
||||
o = self.mount_a.stat('d4')
|
||||
assert((o['st_mode'] & 0o700) == 0o700)
|
||||
assert ((o['st_mode'] & 0o700) == 0o700)
|
||||
|
||||
def test_mkdir_with_bad_non_octal_mode(self):
|
||||
"""
|
||||
@ -232,6 +236,7 @@ class TestMkdir(TestCephFSShell):
|
||||
o = self.mount_a.stat('d5/d6/d7')
|
||||
log.info("mount_a output:\n{}".format(o))
|
||||
|
||||
|
||||
class TestRmdir(TestCephFSShell):
|
||||
dir_name = "test_dir"
|
||||
|
||||
@ -242,7 +247,7 @@ class TestRmdir(TestCephFSShell):
|
||||
try:
|
||||
self.mount_a.stat(self.dir_name)
|
||||
except CommandFailedError as e:
|
||||
if e.exitstatus == 2:
|
||||
if e.exitstatus == 2:
|
||||
return 0
|
||||
raise
|
||||
|
||||
@ -251,7 +256,7 @@ class TestRmdir(TestCephFSShell):
|
||||
Test that rmdir deletes directory
|
||||
"""
|
||||
self.run_cephfs_shell_cmd("mkdir " + self.dir_name)
|
||||
self.run_cephfs_shell_cmd("rmdir "+ self.dir_name)
|
||||
self.run_cephfs_shell_cmd("rmdir " + self.dir_name)
|
||||
self.dir_does_not_exists()
|
||||
|
||||
def test_rmdir_non_existing_dir(self):
|
||||
@ -266,7 +271,8 @@ class TestRmdir(TestCephFSShell):
|
||||
Test that rmdir does not delete directory containing file
|
||||
"""
|
||||
self.run_cephfs_shell_cmd("mkdir " + self.dir_name)
|
||||
self.run_cephfs_shell_cmd("put - test_dir/dumpfile", stdin="Valid File")
|
||||
self.run_cephfs_shell_cmd("put - test_dir/dumpfile",
|
||||
stdin="Valid File")
|
||||
self.run_cephfs_shell_cmd("rmdir" + self.dir_name)
|
||||
self.mount_a.stat(self.dir_name)
|
||||
|
||||
@ -280,10 +286,11 @@ class TestRmdir(TestCephFSShell):
|
||||
|
||||
def test_rmdir_p(self):
|
||||
"""
|
||||
Test that rmdir -p deletes all empty directories in the root directory passed
|
||||
Test that rmdir -p deletes all empty directories in the root
|
||||
directory passed
|
||||
"""
|
||||
self.run_cephfs_shell_cmd("mkdir -p test_dir/t1/t2/t3")
|
||||
self.run_cephfs_shell_cmd("rmdir -p "+ self.dir_name)
|
||||
self.run_cephfs_shell_cmd("rmdir -p " + self.dir_name)
|
||||
self.dir_does_not_exists()
|
||||
|
||||
def test_rmdir_p_valid_path(self):
|
||||
@ -306,10 +313,12 @@ class TestRmdir(TestCephFSShell):
|
||||
Test that rmdir -p does not delete the directory containing a file
|
||||
"""
|
||||
self.run_cephfs_shell_cmd("mkdir " + self.dir_name)
|
||||
self.run_cephfs_shell_cmd("put - test_dir/dumpfile", stdin="Valid File")
|
||||
self.run_cephfs_shell_cmd("put - test_dir/dumpfile",
|
||||
stdin="Valid File")
|
||||
self.run_cephfs_shell_cmd("rmdir -p " + self.dir_name)
|
||||
self.mount_a.stat(self.dir_name)
|
||||
|
||||
|
||||
class TestGetAndPut(TestCephFSShell):
|
||||
def test_without_target_dir(self):
|
||||
"""
|
||||
@ -319,11 +328,12 @@ class TestGetAndPut(TestCephFSShell):
|
||||
tempdirname = path.basename(tempdir)
|
||||
files = ('dump1', 'dump2', 'dump3', tempdirname)
|
||||
|
||||
for i, file_ in enumerate(files[ : -1]):
|
||||
for i, file_ in enumerate(files[: -1]):
|
||||
size = i + 1
|
||||
ofarg = 'of=' + path.join(tempdir, file_)
|
||||
bsarg = 'bs=' + str(size) + 'M'
|
||||
self.mount_a.run_shell_payload(f"dd if=/dev/urandom {ofarg} {bsarg} count=1")
|
||||
self.mount_a.run_shell_payload("dd if=/dev/urandom "
|
||||
f"{ofarg} {bsarg} count=1")
|
||||
|
||||
self.run_cephfs_shell_cmd('put ' + tempdir)
|
||||
for file_ in files:
|
||||
@ -337,13 +347,15 @@ class TestGetAndPut(TestCephFSShell):
|
||||
|
||||
self.run_cephfs_shell_cmd('get ' + tempdirname)
|
||||
# NOTE: cwd=None because we want to run it at CWD, not at cephfs mntpt.
|
||||
pwd = self.mount_a.run_shell('pwd', cwd=None).stdout.getvalue().\
|
||||
pwd = self.mount_a.run_shell('pwd', cwd=None).stdout.getvalue(). \
|
||||
strip()
|
||||
for file_ in files:
|
||||
if file_ == tempdirname:
|
||||
self.mount_a.run_shell_payload(f"stat {path.join(pwd, file_)}")
|
||||
self.mount_a.run_shell_payload(f"stat {path.join(pwd, file_)}")
|
||||
else:
|
||||
self.mount_a.run_shell_payload(f"stat {path.join(pwd, tempdirname, file_)}")
|
||||
self.mount_a.run_shell_payload(
|
||||
f"stat "
|
||||
f"{path.join(pwd, tempdirname, file_)}")
|
||||
|
||||
def test_get_with_target_name(self):
|
||||
"""
|
||||
@ -362,14 +374,14 @@ class TestGetAndPut(TestCephFSShell):
|
||||
log.info("cephfs-shell output:\n{}".format(o))
|
||||
|
||||
# NOTE: cwd=None because we want to run it at CWD, not at cephfs mntpt.
|
||||
o = self.mount_a.run_shell('cat dump4', cwd=None).stdout.getvalue().\
|
||||
o = self.mount_a.run_shell('cat dump4', cwd=None).stdout.getvalue(). \
|
||||
strip()
|
||||
o_hash = crypt.crypt(o, '.A')
|
||||
|
||||
# s_hash must be equal to o_hash
|
||||
log.info("s_hash:{}".format(s_hash))
|
||||
log.info("o_hash:{}".format(o_hash))
|
||||
assert(s_hash == o_hash)
|
||||
assert (s_hash == o_hash)
|
||||
|
||||
def test_get_without_target_name(self):
|
||||
"""
|
||||
@ -409,7 +421,8 @@ class TestGetAndPut(TestCephFSShell):
|
||||
# s_hash must be equal to o_hash
|
||||
log.info("s_hash:{}".format(s_hash))
|
||||
log.info("o_hash:{}".format(o_hash))
|
||||
assert(s_hash == o_hash)
|
||||
assert (s_hash == o_hash)
|
||||
|
||||
|
||||
class TestSnapshots(TestCephFSShell):
|
||||
def test_snap(self):
|
||||
@ -472,6 +485,7 @@ class TestSnapshots(TestCephFSShell):
|
||||
log.info("mount_a output:\n{}".format(o))
|
||||
self.assertNotIn('st_mode', o)
|
||||
|
||||
|
||||
class TestCD(TestCephFSShell):
|
||||
CLIENTS_REQUIRED = 1
|
||||
|
||||
@ -501,6 +515,7 @@ class TestCD(TestCephFSShell):
|
||||
output = self.get_cephfs_shell_script_output(script)
|
||||
self.assertEqual(output, expected_cwd)
|
||||
|
||||
|
||||
class TestDU(TestCephFSShell):
|
||||
CLIENTS_REQUIRED = 1
|
||||
|
||||
@ -523,7 +538,8 @@ class TestDU(TestCephFSShell):
|
||||
self.mount_a.run_shell_payload(f"mkdir {dir_abspath}")
|
||||
self.mount_a.client_remote.write_file(regfile_abspath, 'somedata')
|
||||
|
||||
# XXX: we stat `regfile_abspath` here because ceph du reports a non-empty
|
||||
# XXX: we stat `regfile_abspath` here because ceph du reports
|
||||
# a non-empty
|
||||
# directory's size as sum of sizes of all files under it.
|
||||
size = humansize(self.mount_a.stat(regfile_abspath)['st_size'])
|
||||
expected_output = r'{}{}{}'.format(size, " +", dirname)
|
||||
@ -563,10 +579,11 @@ class TestDU(TestCephFSShell):
|
||||
self.mount_a.client_remote.write_file(regfile_abspath, 'somedata')
|
||||
slinkname = 'some_softlink'
|
||||
slink_abspath = path.join(self.mount_a.mountpoint, slinkname)
|
||||
self.mount_a.run_shell_payload(f"ln -s {regfile_abspath} {slink_abspath}")
|
||||
self.mount_a.run_shell_payload(
|
||||
f"ln -s {regfile_abspath} {slink_abspath}")
|
||||
|
||||
size = humansize(self.mount_a.lstat(slink_abspath)['st_size'])
|
||||
expected_output = r'{}{}{}'.format((size), " +", slinkname)
|
||||
expected_output = r'{}{}{}'.format(size, " +", slinkname)
|
||||
|
||||
du_output = self.get_cephfs_shell_cmd_output('du ' + slinkname)
|
||||
self.assertRegex(du_output, expected_output)
|
||||
@ -603,7 +620,8 @@ class TestDU(TestCephFSShell):
|
||||
self.mount_a.run_shell_payload(f"mkdir {dir_abspath}")
|
||||
self.mount_a.run_shell_payload(f"touch {regfile_abspath}")
|
||||
self.mount_a.run_shell_payload(f"ln {regfile_abspath} {hlink_abspath}")
|
||||
self.mount_a.run_shell_payload(f"ln -s {regfile_abspath} {slink_abspath}")
|
||||
self.mount_a.run_shell_payload(
|
||||
f"ln -s {regfile_abspath} {slink_abspath}")
|
||||
self.mount_a.run_shell_payload(f"ln -s {dir_abspath} {slink2_abspath}")
|
||||
|
||||
dir2_name = 'dir2'
|
||||
@ -616,7 +634,8 @@ class TestDU(TestCephFSShell):
|
||||
self.mount_a.run_shell_payload(f"touch {regfile121_abspath}")
|
||||
|
||||
self.mount_a.client_remote.write_file(regfile_abspath, 'somedata')
|
||||
self.mount_a.client_remote.write_file(regfile121_abspath, 'somemoredata')
|
||||
self.mount_a.client_remote.write_file(regfile121_abspath,
|
||||
'somemoredata')
|
||||
|
||||
# TODO: is there a way to trigger/force update ceph.dir.rbytes?
|
||||
# wait so that attr ceph.dir.rbytes gets a chance to be updated.
|
||||
@ -629,19 +648,21 @@ class TestDU(TestCephFSShell):
|
||||
if f == '/':
|
||||
expected_patterns.append(r'{}{}{}'.format(size, " +", '.' + f))
|
||||
else:
|
||||
expected_patterns.append(r'{}{}{}'.format(size, " +",
|
||||
expected_patterns.append(r'{}{}{}'.format(
|
||||
size, " +",
|
||||
path_prefix + path.relpath(f, self.mount_a.mountpoint)))
|
||||
|
||||
for f in [dir_abspath, regfile_abspath, regfile121_abspath,
|
||||
hlink_abspath, slink_abspath, slink2_abspath]:
|
||||
size = humansize(self.mount_a.stat(f, follow_symlinks=
|
||||
False)['st_size'])
|
||||
size = humansize(self.mount_a.stat(
|
||||
f, follow_symlinks=False)['st_size'])
|
||||
append_expected_output_pattern(f)
|
||||
|
||||
# get size for directories containig regfiles within
|
||||
for f in [dir2_abspath, dir21_abspath]:
|
||||
size = humansize(self.mount_a.stat(regfile121_abspath,
|
||||
follow_symlinks=False)['st_size'])
|
||||
follow_symlinks=False)[
|
||||
'st_size'])
|
||||
append_expected_output_pattern(f)
|
||||
|
||||
# get size for CephFS root
|
||||
@ -656,9 +677,9 @@ class TestDU(TestCephFSShell):
|
||||
for p in [dir_abspath, regfile_abspath, dir2_abspath,
|
||||
dir21_abspath, regfile121_abspath, hlink_abspath,
|
||||
slink_abspath, slink2_abspath]:
|
||||
path_to_files.append(path.relpath(p, self.mount_a.mountpoint))
|
||||
path_to_files.append(path.relpath(p, self.mount_a.mountpoint))
|
||||
|
||||
return (expected_patterns, path_to_files)
|
||||
return expected_patterns, path_to_files
|
||||
else:
|
||||
return expected_patterns
|
||||
|
||||
@ -670,8 +691,8 @@ class TestDU(TestCephFSShell):
|
||||
self.assertRegex(du_output, expected_output)
|
||||
|
||||
def test_du_with_path_in_args(self):
|
||||
expected_patterns_in_output, path_to_files = self._setup_files(True,
|
||||
path_prefix='')
|
||||
expected_patterns_in_output, path_to_files = self._setup_files(
|
||||
True, path_prefix='')
|
||||
|
||||
args = ['du', '/']
|
||||
for p in path_to_files:
|
||||
@ -695,7 +716,7 @@ class TestDU(TestCephFSShell):
|
||||
|
||||
class TestDF(TestCephFSShell):
|
||||
def validate_df(self, filename):
|
||||
df_output = self.get_cephfs_shell_cmd_output('df '+filename)
|
||||
df_output = self.get_cephfs_shell_cmd_output('df ' + filename)
|
||||
log.info("cephfs-shell df output:\n{}".format(df_output))
|
||||
|
||||
shell_df = df_output.splitlines()[1].split()
|
||||
@ -708,7 +729,8 @@ class TestDF(TestCephFSShell):
|
||||
log.info("cephfs available:{}\n".format(block_size - st_size))
|
||||
|
||||
self.assertTupleEqual((block_size, st_size, block_size - st_size),
|
||||
(int(shell_df[0]), int(shell_df[1]) , int(shell_df[2])))
|
||||
(int(shell_df[0]), int(shell_df[1]),
|
||||
int(shell_df[2])))
|
||||
|
||||
def test_df_with_no_args(self):
|
||||
expected_output = ''
|
||||
@ -737,7 +759,8 @@ class TestQuota(TestCephFSShell):
|
||||
dir_name = 'testdir'
|
||||
|
||||
def create_dir(self):
|
||||
mount_output = self.get_cephfs_shell_cmd_output('mkdir ' + self.dir_name)
|
||||
mount_output = self.get_cephfs_shell_cmd_output(
|
||||
'mkdir ' + self.dir_name)
|
||||
log.info("cephfs-shell mount output:\n{}".format(mount_output))
|
||||
|
||||
def set_and_get_quota_vals(self, input_val, check_status=True):
|
||||
@ -745,8 +768,9 @@ class TestQuota(TestCephFSShell):
|
||||
input_val[0], '--max_files', input_val[1],
|
||||
self.dir_name], check_status=check_status)
|
||||
|
||||
quota_output = self.get_cephfs_shell_cmd_output(['quota', 'get', self.dir_name],
|
||||
check_status=check_status)
|
||||
quota_output = self.get_cephfs_shell_cmd_output(
|
||||
['quota', 'get', self.dir_name],
|
||||
check_status=check_status)
|
||||
|
||||
quota_output = quota_output.split()
|
||||
return quota_output[1], quota_output[3]
|
||||
@ -754,21 +778,25 @@ class TestQuota(TestCephFSShell):
|
||||
def test_set(self):
|
||||
self.create_dir()
|
||||
set_values = ('6', '2')
|
||||
self.assertTupleEqual(self.set_and_get_quota_vals(set_values), set_values)
|
||||
self.assertTupleEqual(self.set_and_get_quota_vals(set_values),
|
||||
set_values)
|
||||
|
||||
def test_replace_values(self):
|
||||
self.test_set()
|
||||
set_values = ('20', '4')
|
||||
self.assertTupleEqual(self.set_and_get_quota_vals(set_values), set_values)
|
||||
self.assertTupleEqual(self.set_and_get_quota_vals(set_values),
|
||||
set_values)
|
||||
|
||||
def test_set_invalid_dir(self):
|
||||
set_values = ('5', '5')
|
||||
try:
|
||||
self.assertTupleEqual(self.set_and_get_quota_vals(
|
||||
set_values, False), set_values)
|
||||
raise Exception("Something went wrong!! Values set for non existing directory")
|
||||
set_values, False), set_values)
|
||||
raise Exception(
|
||||
"Something went wrong!! Values set for non existing directory")
|
||||
except IndexError:
|
||||
# Test should pass as values cannot be set for non existing directory
|
||||
# Test should pass as values cannot be set for non
|
||||
# existing directory
|
||||
pass
|
||||
|
||||
def test_set_invalid_values(self):
|
||||
@ -776,7 +804,8 @@ class TestQuota(TestCephFSShell):
|
||||
set_values = ('-6', '-5')
|
||||
try:
|
||||
self.assertTupleEqual(self.set_and_get_quota_vals(set_values,
|
||||
False), set_values)
|
||||
False),
|
||||
set_values)
|
||||
raise Exception("Something went wrong!! Invalid values set")
|
||||
except IndexError:
|
||||
# Test should pass as invalid values cannot be set
|
||||
@ -789,7 +818,8 @@ class TestQuota(TestCephFSShell):
|
||||
file2 = path.join(dir_abspath, "file2")
|
||||
try:
|
||||
self.mount_a.run_shell_payload(f"touch {file2}")
|
||||
raise Exception("Something went wrong!! File creation should have failed")
|
||||
raise Exception(
|
||||
"Something went wrong!! File creation should have failed")
|
||||
except CommandFailedError:
|
||||
# Test should pass as file quota set to 2
|
||||
# Additional condition to confirm file creation failure
|
||||
@ -804,7 +834,8 @@ class TestQuota(TestCephFSShell):
|
||||
file_abspath = path.join(dir_abspath, filename)
|
||||
try:
|
||||
# Write should fail as bytes quota is set to 6
|
||||
self.mount_a.client_remote.write_file(file_abspath, 'Disk raise Exception')
|
||||
self.mount_a.client_remote.write_file(file_abspath,
|
||||
'Disk raise Exception')
|
||||
raise Exception("Write should have failed")
|
||||
except CommandFailedError:
|
||||
# Test should pass only when write command fails
|
||||
@ -813,7 +844,8 @@ class TestQuota(TestCephFSShell):
|
||||
# Testing with teuthology: No file is created.
|
||||
return 0
|
||||
elif path_exists and not path.getsize(file_abspath):
|
||||
# Testing on Fedora 30: When write fails, empty file gets created.
|
||||
# Testing on Fedora 30: When write fails, empty
|
||||
# file gets created.
|
||||
return 0
|
||||
else:
|
||||
raise
|
||||
@ -843,23 +875,28 @@ class TestXattr(TestCephFSShell):
|
||||
def test_set(self):
|
||||
self.create_dir()
|
||||
set_values = ('user.key', '2')
|
||||
self.assertTupleEqual(self.set_get_list_xattr_vals(set_values), set_values)
|
||||
self.assertTupleEqual(self.set_get_list_xattr_vals(set_values),
|
||||
set_values)
|
||||
|
||||
def test_reset(self):
|
||||
self.test_set()
|
||||
set_values = ('user.key', '4')
|
||||
self.assertTupleEqual(self.set_get_list_xattr_vals(set_values), set_values)
|
||||
self.assertTupleEqual(self.set_get_list_xattr_vals(set_values),
|
||||
set_values)
|
||||
|
||||
def test_non_existing_dir(self):
|
||||
input_val = ('user.key', '9')
|
||||
self.negtest_cephfs_shell_cmd(cmd=['setxattr', self.dir_name, input_val[0],
|
||||
input_val[1]])
|
||||
self.negtest_cephfs_shell_cmd(cmd=['getxattr', self.dir_name, input_val[0]])
|
||||
self.negtest_cephfs_shell_cmd(
|
||||
cmd=['setxattr', self.dir_name, input_val[0],
|
||||
input_val[1]])
|
||||
self.negtest_cephfs_shell_cmd(
|
||||
cmd=['getxattr', self.dir_name, input_val[0]])
|
||||
self.negtest_cephfs_shell_cmd(cmd=['listxattr', self.dir_name])
|
||||
|
||||
|
||||
class TestLS(TestCephFSShell):
|
||||
dir_name = ('test_dir')
|
||||
hidden_dir_name = ('.test_hidden_dir')
|
||||
dir_name = 'test_dir'
|
||||
hidden_dir_name = '.test_hidden_dir'
|
||||
|
||||
def test_ls(self):
|
||||
""" Test that ls prints files in CWD. """
|
||||
@ -893,7 +930,8 @@ class TestLS(TestCephFSShell):
|
||||
def test_ls_a_prints_non_hidden_dir(self):
|
||||
""" Test ls -a command prints non hidden directory """
|
||||
|
||||
self.run_cephfs_shell_cmd(f'mkdir {self.hidden_dir_name} {self.dir_name}')
|
||||
self.run_cephfs_shell_cmd(
|
||||
f'mkdir {self.hidden_dir_name} {self.dir_name}')
|
||||
|
||||
ls_a_output = self.get_cephfs_shell_cmd_output(['ls', '-a'])
|
||||
log.info(f"output of ls -a command:\n{ls_a_output}")
|
||||
@ -903,13 +941,13 @@ class TestLS(TestCephFSShell):
|
||||
def test_ls_H_prints_human_readable_file_size(self):
|
||||
""" Test "ls -lH" prints human readable file size."""
|
||||
|
||||
file_sizes = ['1','1K', '1M', '1G']
|
||||
file_sizes = ['1', '1K', '1M', '1G']
|
||||
file_names = ['dump1', 'dump2', 'dump3', 'dump4']
|
||||
|
||||
|
||||
for (file_size, file_name) in zip(file_sizes, file_names):
|
||||
temp_file = self.mount_a.client_remote.mktemp(file_name)
|
||||
self.mount_a.run_shell_payload(f"fallocate -l {file_size} {temp_file}")
|
||||
self.mount_a.run_shell_payload(
|
||||
f"fallocate -l {file_size} {temp_file}")
|
||||
self.mount_a.run_shell_payload(f'mv {temp_file} ./')
|
||||
|
||||
ls_H_output = self.get_cephfs_shell_cmd_output(['ls', '-lH'])
|
||||
@ -919,7 +957,7 @@ class TestLS(TestCephFSShell):
|
||||
ls_H_file_size.add(line.split()[1])
|
||||
|
||||
# test that file sizes are in human readable format
|
||||
self.assertEqual({'1B','1K', '1M', '1G'}, ls_H_file_size)
|
||||
self.assertEqual({'1B', '1K', '1M', '1G'}, ls_H_file_size)
|
||||
|
||||
def test_ls_s_sort_by_size(self):
|
||||
""" Test "ls -S" sorts file listing by file_size """
|
||||
@ -937,7 +975,7 @@ class TestLS(TestCephFSShell):
|
||||
for line in ls_s_output.split('\n'):
|
||||
file_sizes.append(line.split()[1])
|
||||
|
||||
#test that file size are in ascending order
|
||||
# test that file size are in ascending order
|
||||
self.assertEqual(file_sizes, sorted(file_sizes))
|
||||
|
||||
|
||||
@ -951,7 +989,7 @@ class TestMisc(TestCephFSShell):
|
||||
dirname = 'somedirectory'
|
||||
self.run_cephfs_shell_cmd(['mkdir', dirname])
|
||||
|
||||
output = self.mount_a.client_remote.sh(['cephfs-shell', 'ls']).\
|
||||
output = self.mount_a.client_remote.sh(['cephfs-shell', 'ls']). \
|
||||
strip()
|
||||
|
||||
self.assertRegex(output, dirname)
|
||||
@ -963,6 +1001,7 @@ class TestMisc(TestCephFSShell):
|
||||
o = self.get_cephfs_shell_cmd_output("help all")
|
||||
log.info("output:\n{}".format(o))
|
||||
|
||||
|
||||
class TestShellOpts(TestCephFSShell):
|
||||
"""
|
||||
Contains tests for shell options from conf file and shell prompt.
|
||||
@ -977,14 +1016,14 @@ class TestShellOpts(TestCephFSShell):
|
||||
# editor: '?'
|
||||
self.editor_val = self.get_cephfs_shell_cmd_output(
|
||||
'set editor ?, set editor').split('\n')[2]
|
||||
self.editor_val = self.editor_val.split(':')[1].\
|
||||
self.editor_val = self.editor_val.split(':')[1]. \
|
||||
replace("'", "", 2).strip()
|
||||
|
||||
def write_tempconf(self, confcontents):
|
||||
self.tempconfpath = self.mount_a.client_remote.mktemp(
|
||||
suffix='cephfs-shell.conf')
|
||||
self.mount_a.client_remote.write_file(self.tempconfpath,
|
||||
confcontents)
|
||||
confcontents)
|
||||
|
||||
def test_reading_conf(self):
|
||||
self.write_tempconf("[cephfs-shell]\neditor = ???")
|
||||
@ -1024,7 +1063,7 @@ class TestShellOpts(TestCephFSShell):
|
||||
# now: vim
|
||||
# editor: vim
|
||||
final_editor_val = self.get_cephfs_shell_cmd_output(
|
||||
cmd='set editor %s, set editor' % (self.editor_val),
|
||||
cmd='set editor %s, set editor' % self.editor_val,
|
||||
shell_conf_path=self.tempconfpath)
|
||||
final_editor_val = final_editor_val.split('\n')[2]
|
||||
final_editor_val = final_editor_val.split(': ')[1]
|
||||
|
Loading…
Reference in New Issue
Block a user