qa/tasks/nfs: Test mounting of export created with nfs command

Fixes: https://tracker.ceph.com/issues/46989
Signed-off-by: Varsha Rao <varao@redhat.com>
This commit is contained in:
Varsha Rao 2020-08-17 14:07:06 +05:30
parent 5d6742b6b4
commit f56a83f806

View File

@ -231,6 +231,37 @@ class TestNFS(MgrTestCase):
if b'export-' in rados_obj_ls or (conf_obj and b'conf-nfs' in rados_obj_ls):
self.fail("Delete export failed")
def _get_port_ip_info(self):
'''
Return port and ip for a cluster
'''
#{'test': [{'hostname': 'smithi068', 'ip': ['172.21.15.68'], 'port': 2049}]}
info_output = json.loads(self._nfs_cmd('cluster', 'info', self.cluster_id))['test'][0]
return info_output["port"], info_output["ip"][0]
def _test_mnt(self, pseudo_path, port, ip, check=True):
'''
Test mounting of created exports
:param pseudo_path: It is the pseudo root name
:param port: Port of deployed nfs cluster
:param ip: IP of deployed nfs cluster
:param check: It denotes if i/o testing needs to be done
'''
try:
self.ctx.cluster.run(args=['sudo', 'mount', '-t', 'nfs', '-o', f'port={port}',
f'{ip}:{pseudo_path}', '/mnt'])
except CommandFailedError as e:
# Check if mount failed only when non existing pseudo path is passed
if not check and e.exitstatus == 32:
return
raise
if check:
self.ctx.cluster.run(args=['sudo', 'touch', '/mnt/test'])
out_mnt = self._sys_cmd(['sudo', 'ls', '/mnt'])
self.assertEqual(out_mnt, b'test\n')
self.ctx.cluster.run(args=['sudo', 'umount', '/mnt'])
def test_create_and_delete_cluster(self):
'''
Test successful creation and deletion of the nfs cluster.
@ -281,9 +312,12 @@ class TestNFS(MgrTestCase):
'''
self._create_default_export()
self._test_get_export()
port, ip = self._get_port_ip_info()
self._test_mnt(self.pseudo_path, port, ip)
self._delete_export()
# Check if rados export object is deleted
self._check_export_obj_deleted()
self._test_mnt(self.pseudo_path, port, ip, False)
self._test_delete_cluster()
def test_create_delete_export_idempotency(self):
@ -329,6 +363,8 @@ class TestNFS(MgrTestCase):
self._orch_cmd("set", "backend", "cephadm")
# Checks if created export is listed
self._test_list_export()
port, ip = self._get_port_ip_info()
self._test_mnt(self.pseudo_path, port, ip)
self._delete_export()
self._test_delete_cluster()
@ -407,6 +443,7 @@ class TestNFS(MgrTestCase):
pool = 'nfs-ganesha'
user_id = 'test'
fs_name = 'user_test_fs'
pseudo_path = '/ceph'
self._cmd('fs', 'volume', 'create', fs_name)
time.sleep(20)
key = self._cmd('auth', 'get-or-create-key', f'client.{user_id}', 'mon',
@ -421,7 +458,7 @@ class TestNFS(MgrTestCase):
Export_Id = 100;
Transports = TCP;
Path = /;
Pseudo = /ceph/;
Pseudo = {pseudo_path};
Protocols = 4;
Access_Type = RW;
Attr_Expiration_Time = 0;
@ -433,32 +470,20 @@ class TestNFS(MgrTestCase):
Secret_Access_Key = '{key}';
}}
}}"""
#{'test': [{'hostname': 'smithi068', 'ip': ['172.21.15.68'], 'port': 2049}]}
info_output = json.loads(self._nfs_cmd('cluster', 'info', self.cluster_id))['test'][0]
mnt_cmd = ['sudo', 'mount', '-t', 'nfs', '-o', f'port={info_output["port"]}', f'{info_output["ip"][0]}:/ceph', '/mnt']
MNT_FAILED = 32
port, ip = self._get_port_ip_info()
self.ctx.cluster.run(args=['sudo', 'ceph', 'nfs', 'cluster', 'config',
'set', self.cluster_id, '-i', '-'], stdin=config)
time.sleep(30)
log.info(self._sys_cmd(['rados', '-p', 'nfs-ganesha', '-N', self.cluster_id, 'ls']))
res = self._sys_cmd(['rados', '-p', pool, '-N', self.cluster_id, 'get',
f'userconf-nfs.ganesha-{user_id}', '-'])
self.assertEqual(config, res.decode('utf-8'))
self.ctx.cluster.run(args=mnt_cmd)
self.ctx.cluster.run(args=['sudo', 'touch', '/mnt/test'])
out_mnt = self._sys_cmd(['sudo', 'ls', '/mnt'])
self.assertEqual(out_mnt, b'test\n')
self.ctx.cluster.run(args=['sudo', 'umount', '/mnt'])
self._test_mnt(pseudo_path, port, ip)
self._nfs_cmd('cluster', 'config', 'reset', self.cluster_id)
rados_obj_ls = self._sys_cmd(['rados', '-p', 'nfs-ganesha', '-N', self.cluster_id, 'ls'])
if b'conf-nfs' not in rados_obj_ls and b'userconf-nfs' in rados_obj_ls:
self.fail("User config not deleted")
time.sleep(30)
try:
self.ctx.cluster.run(args=mnt_cmd)
except CommandFailedError as e:
if e.exitstatus != MNT_FAILED:
raise
self._test_mnt(pseudo_path, port, ip, False)
self._cmd('fs', 'volume', 'rm', fs_name, '--yes-i-really-mean-it')
self._test_delete_cluster()