qa/cephfs: allow checking for multiple error messages while...

while negative testing.

Signed-off-by: Rishabh Dave <ridave@redhat.com>
This commit is contained in:
Rishabh Dave 2021-09-15 14:41:02 +05:30
parent 233cdea4e1
commit 82b58e3667
3 changed files with 43 additions and 20 deletions

View File

@ -59,6 +59,7 @@ class CapsHelper(CephFSTestCase):
self.assertEqual(data, contents1)
def conduct_neg_test_for_write_caps(self, filepaths, mounts):
possible_errmsgs = ('permission denied', 'operation not permitted')
cmdargs = ['echo', 'some random data', Raw('|'), 'tee']
for mount in mounts:
@ -66,7 +67,7 @@ class CapsHelper(CephFSTestCase):
if path.find(mount.hostfs_mntpt) != -1:
cmdargs.append(path)
mount.negtestcmd(args=cmdargs, retval=1,
errmsg='permission denied')
errmsgs=possible_errmsgs)
cmdargs.pop(-1)
def get_mon_cap_from_keyring(self, client_name):

View File

@ -753,44 +753,66 @@ class CephFSMount(object):
kwargs['user'] = 'root'
return self.run_as_user(**kwargs)
def _verify(self, proc, retval=None, errmsg=None):
if retval:
msg = ('expected return value: {}\nreceived return value: '
'{}\n'.format(retval, proc.returncode))
assert proc.returncode == retval, msg
def assert_retval(self, proc_retval, exp_retval):
msg = (f'expected return value: {exp_retval}\n'
f'received return value: {proc_retval}\n')
assert proc_retval == exp_retval, msg
if errmsg:
stderr = proc.stderr.getvalue().lower()
msg = ('didn\'t find given string in stderr -\nexpected string: '
'{}\nreceived error message: {}\nnote: received error '
'message is converted to lowercase'.format(errmsg, stderr))
assert errmsg in stderr, msg
def _verify(self, proc, exp_retval=None, exp_errmsgs=None):
if exp_retval is None and exp_errmsgs is None:
raise RuntimeError('Method didn\'t get enough parameters. Pass '
'return value or error message expected from '
'the command/process.')
def negtestcmd(self, args, retval=None, errmsg=None, stdin=None,
if exp_retval is not None:
self.assert_retval(proc.returncode, exp_retval)
if exp_errmsgs is None:
return
if isinstance(exp_errmsgs, str):
exp_errmsgs = (exp_errmsgs, )
proc_stderr = proc.stderr.getvalue().lower()
msg = ('didn\'t find any of the expected string in stderr.\n'
f'expected string: {exp_errmsgs}\n'
f'received error message: {proc_stderr}\n'
'note: received error message is converted to lowercase')
for e in exp_errmsgs:
if e in proc_stderr:
break
# this else is meant for for loop.
else:
assert False, msg
def negtestcmd(self, args, retval=None, errmsgs=None, stdin=None,
cwd=None, wait=True):
"""
Conduct a negative test for the given command.
retval and errmsg are parameters to confirm the cause of command
retval and errmsgs are parameters to confirm the cause of command
failure.
Note: errmsgs is expected to be a tuple, but in case there's only
error message, it can also be a string. This method will handle
that internally.
"""
proc = self.run_shell(args=args, wait=wait, stdin=stdin, cwd=cwd,
check_status=False)
self._verify(proc, retval, errmsg)
self._verify(proc, retval, errmsgs)
return proc
def negtestcmd_as_user(self, args, user, retval=None, errmsg=None,
def negtestcmd_as_user(self, args, user, retval=None, errmsgs=None,
stdin=None, cwd=None, wait=True):
proc = self.run_as_user(args=args, user=user, wait=wait, stdin=stdin,
cwd=cwd, check_status=False)
self._verify(proc, retval, errmsg)
self._verify(proc, retval, errmsgs)
return proc
def negtestcmd_as_root(self, args, retval=None, errmsg=None, stdin=None,
def negtestcmd_as_root(self, args, retval=None, errmsgs=None, stdin=None,
cwd=None, wait=True):
proc = self.run_as_root(args=args, wait=wait, stdin=stdin, cwd=cwd,
check_status=False)
self._verify(proc, retval, errmsg)
self._verify(proc, retval, errmsgs)
return proc
def open_no_data(self, basename):

View File

@ -1236,7 +1236,7 @@ class TestFsAuthorize(CapsHelper):
contents = self.mount_a.read_file(filepath)
self.assertEqual(filedata, contents)
cmdargs = ['echo', 'some random data', Raw('|'), 'sudo', 'tee', filepath]
self.mount_a.negtestcmd(args=cmdargs, retval=1, errmsg='permission denied')
self.mount_a.negtestcmd(args=cmdargs, retval=1, errmsgs='permission denied')
def test_single_path_authorize_on_nonalphanumeric_fsname(self):
"""