qa/tasks/cephfs/nfs: Add tests for cluster config set and reset

Signed-off-by: Varsha Rao <varao@redhat.com>
This commit is contained in:
Varsha Rao 2020-07-31 15:57:28 +05:30
parent 287da11921
commit bba2d66176

View File

@ -12,7 +12,9 @@ log = logging.getLogger(__name__)
# TODO Add test for cluster update when ganesha can be deployed on multiple ports.
class TestNFS(MgrTestCase):
def _cmd(self, *args):
def _cmd(self, *args, stdin=''):
if stdin:
return self.mgr_cluster.mon_manager.raw_cluster_cmd(*args, stdin=stdin)
return self.mgr_cluster.mon_manager.raw_cluster_cmd(*args)
def _nfs_cmd(self, *args):
@ -395,3 +397,97 @@ class TestNFS(MgrTestCase):
}]}
self.assertDictEqual(info_output, host_details)
self._test_delete_cluster()
def test_cluster_set_reset_user_config(self):
'''
Test cluster is created using user config and reverts back to default
config on reset.
'''
self._test_create_cluster()
time.sleep(30)
pool = 'nfs-ganesha'
user_id = 'test'
fs_name = 'user_test_fs'
self._cmd('fs', 'volume', 'create', fs_name)
time.sleep(20)
key = self._cmd('auth', 'get-or-create-key', f'client.{user_id}', 'mon',
'allow r', 'osd',
f'allow rw pool={pool} namespace={self.cluster_id}, allow rw tag cephfs data={fs_name}',
'mds', f'allow rw path={self.path}').strip()
config = f""" LOG {{
Default_log_level = FULL_DEBUG;
}}
EXPORT {{
Export_Id = 100;
Transports = TCP;
Path = /;
Pseudo = /ceph/;
Protocols = 4;
Access_Type = RW;
Attr_Expiration_Time = 0;
Squash = None;
FSAL {{
Name = CEPH;
Filesystem = {fs_name};
User_Id = {user_id};
Secret_Access_Key = '{key}';
}}
}}"""
#{'test': [{'hostname': 'smithi068', 'ip': ['172.21.15.68'], 'port': 2049}]}
info_output = json.loads(self._nfs_cmd('cluster', 'info', self.cluster_id))['test'][0]
mnt_cmd = ['sudo', 'mount', '-t', 'nfs', '-o', f'port={info_output["port"]}', f'{info_output["ip"][0]}:/ceph', '/mnt']
MNT_FAILED = 32
self.ctx.cluster.run(args=['sudo', 'ceph', 'nfs', 'cluster', 'config',
'set', self.cluster_id, '-i', '-'], stdin=config)
time.sleep(30)
log.info(self._sys_cmd(['rados', '-p', 'nfs-ganesha', '-N', self.cluster_id, 'ls']))
res = self._sys_cmd(['rados', '-p', pool, '-N', self.cluster_id, 'get',
f'userconf-nfs.ganesha-{user_id}', '-'])
self.assertEqual(config, res.decode('utf-8'))
self.ctx.cluster.run(args=mnt_cmd)
self.ctx.cluster.run(args=['sudo', 'touch', '/mnt/test'])
out_mnt = self._sys_cmd(['sudo', 'ls', '/mnt'])
self.assertEqual(out_mnt, b'test\n')
self.ctx.cluster.run(args=['sudo', 'umount', '/mnt'])
self._nfs_cmd('cluster', 'config', 'reset', self.cluster_id)
rados_obj_ls = self._sys_cmd(['rados', '-p', 'nfs-ganesha', '-N', self.cluster_id, 'ls'])
if b'conf-nfs' not in rados_obj_ls and b'userconf-nfs' in rados_obj_ls:
self.fail("User config not deleted")
time.sleep(30)
try:
self.ctx.cluster.run(args=mnt_cmd)
except CommandFailedError as e:
if e.exitstatus != MNT_FAILED:
raise
self._cmd('fs', 'volume', 'rm', fs_name, '--yes-i-really-mean-it')
self._test_delete_cluster()
time.sleep(30)
def test_cluster_set_user_config_with_non_existing_clusterid(self):
'''
Test setting user config for non-existing nfs cluster.
'''
try:
cluster_id = 'invalidtest'
self.ctx.cluster.run(args=['sudo', 'ceph', 'nfs', 'cluster',
'config', 'set', self.cluster_id, '-i', '-'], stdin='testing')
self.fail(f"User config set for non-existing cluster {cluster_id}")
except CommandFailedError as e:
# Command should fail for test to pass
if e.exitstatus != errno.ENOENT:
raise
def test_cluster_reset_user_config_with_non_existing_clusterid(self):
'''
Test resetting user config for non-existing nfs cluster.
'''
try:
cluster_id = 'invalidtest'
self._nfs_cmd('cluster', 'config', 'reset', cluster_id)
self.fail(f"User config reset for non-existing cluster {cluster_id}")
except CommandFailedError as e:
# Command should fail for test to pass
if e.exitstatus != errno.ENOENT:
raise