qa/tasks/cephfs: Add tests for nfs exports

Signed-off-by: Varsha Rao <varao@redhat.com>
This commit is contained in:
Varsha Rao 2020-05-28 10:57:25 +00:00
parent b8ce61e8bd
commit af3b925dfd

View File

@ -12,18 +12,20 @@ log = logging.getLogger(__name__)
class TestNFS(MgrTestCase):
def _cmd(self, *args):
return self.mgr_cluster.mon_manager.raw_cluster_cmd(*args)
def _nfs_cmd(self, *args):
return self.mgr_cluster.mon_manager.raw_cluster_cmd("nfs", *args)
return self._cmd("nfs", *args)
def _orch_cmd(self, *args):
return self.mgr_cluster.mon_manager.raw_cluster_cmd("orch", *args)
return self._cmd("orch", *args)
def _sys_cmd(self, cmd):
cmd[0:0] = ['sudo']
ret = self.ctx.cluster.run(args=cmd, check_status=False, stdout=BytesIO(), stderr=BytesIO())
stdout = ret[0].stdout
if stdout:
# It's RemoteProcess defined in teuthology/orchestra/run.py
return stdout.getvalue()
def setUp(self):
@ -34,7 +36,9 @@ class TestNFS(MgrTestCase):
self.cluster_id = "test"
self.export_type = "cephfs"
self.pseudo_path = "/cephfs"
self.expected_name = 'nfs.ganesha-test'
self.path = "/"
self.fs_name = "nfs-cephfs"
self.expected_name = "nfs.ganesha-test"
def _check_port_status(self):
log.info("NETSTAT")
@ -52,32 +56,66 @@ class TestNFS(MgrTestCase):
def _check_nfs_status(self):
return self._orch_cmd('ls', 'nfs')
def _check_idempotency(self, *args):
for _ in range(2):
self._nfs_cmd(*args)
def test_create_cluster(self):
def _test_create_cluster(self):
self._check_nfs_server_status()
self._nfs_cmd("cluster", "create", self.export_type, self.cluster_id)
self._nfs_cmd('cluster', 'create', self.export_type, self.cluster_id)
time.sleep(8)
orch_output = self._check_nfs_status()
expected_status = '1/1'
try:
if self.expected_name not in orch_output or expected_status not in orch_output:
raise CommandFailedError("NFS Ganesha cluster could not be deployed")
except (TypeError, CommandFailedError):
raise
if self.expected_name not in orch_output or expected_status not in orch_output:
raise RuntimeError("NFS Ganesha cluster could not be deployed")
def test_create_cluster_idempotent(self):
self._check_nfs_server_status()
self._check_idempotency("cluster", "create", self.export_type, self.cluster_id)
def test_delete_cluster(self):
self.test_create_cluster()
self._nfs_cmd("cluster", "delete", self.cluster_id)
def _test_delete_cluster(self):
self._nfs_cmd('cluster', 'delete', self.cluster_id)
time.sleep(8)
orch_output = self._check_nfs_status()
self.assertEqual("No services reported\n", orch_output)
def test_delete_cluster_idempotent(self):
self._check_idempotency("cluster", "delete", self.cluster_id)
def _create_export(self, export_id, create_fs=False, extra_cmd=None):
if create_fs:
self._cmd('fs', 'volume', 'create', self.fs_name)
export_cmd = ['nfs', 'export', 'create', 'cephfs', self.fs_name, self.cluster_id]
if isinstance(extra_cmd, list):
export_cmd.extend(extra_cmd)
else:
export_cmd.append(self.pseudo_path)
self._cmd(*export_cmd)
res = self._sys_cmd(['rados', '-p', 'nfs-ganesha', '-N', self.cluster_id, 'get', f'export-{export_id}', '-'])
if res == b'':
raise RuntimeError("Export cannot be created")
def _create_default_export(self):
self._test_create_cluster()
self._create_export(export_id='1', create_fs=True)
def _delete_export(self):
self._nfs_cmd('export', 'delete', self.cluster_id, self.pseudo_path)
def _check_export_obj_deleted(self, conf_obj=False):
rados_obj_ls = self._sys_cmd(['rados', '-p', 'nfs-ganesha', '-N', self.cluster_id, 'ls'])
if b'export-' in rados_obj_ls or (conf_obj and b'conf-nfs' in rados_obj_ls):
raise RuntimeError("Delete export failed")
def test_create_and_delete_cluster(self):
self._test_create_cluster()
self._test_delete_cluster()
def test_export_create_and_delete(self):
self._create_default_export()
self._delete_export()
self._check_export_obj_deleted()
self._test_delete_cluster()
def _test_create_multiple_exports(self):
#Export-1 with default values
self._create_default_export()
#Export-2 with r only
self._create_export(export_id='2', extra_cmd=[self.pseudo_path, '--readonly'])
#Export-3 for subvolume with r only
self._cmd('fs', 'subvolume', 'create', self.fs_name, 'sub_vol')
fs_path = self._cmd('fs', 'subvolume', 'getpath', self.fs_name, 'sub_vol')
self._create_export(export_id='3', extra_cmd=[self.pseudo_path, '--readonly', fs_path.strip()])
self._test_delete_cluster()
self._check_export_obj_deleted(conf_obj=True)