mirror of
https://github.com/ceph/ceph
synced 2025-01-24 03:53:54 +00:00
0127cd1e88
There were a couple of problems found by flake8 in the qa/ directory (most of them fixed now). Enabling flake8 during the usual check runs hopefully avoids adding new issues in the future. Signed-off-by: Thomas Bechtold <tbechtold@suse.com>
169 lines
6.8 KiB
Python
169 lines
6.8 KiB
Python
# -*- coding: utf-8 -*-
|
|
# pylint: disable=too-many-public-methods
|
|
|
|
from __future__ import absolute_import
|
|
|
|
|
|
from .helper import DashboardTestCase
|
|
|
|
|
|
class GaneshaTest(DashboardTestCase):
|
|
CEPHFS = True
|
|
AUTH_ROLES = ['pool-manager', 'ganesha-manager']
|
|
|
|
@classmethod
|
|
def create_pool(cls, name, pg_num, pool_type, application='rbd'):
|
|
data = {
|
|
'pool': name,
|
|
'pg_num': pg_num,
|
|
'pool_type': pool_type,
|
|
'application_metadata': [application]
|
|
}
|
|
if pool_type == 'erasure':
|
|
data['flags'] = ['ec_overwrites']
|
|
cls._task_post("/api/pool", data)
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super(GaneshaTest, cls).setUpClass()
|
|
cls.create_pool('ganesha', 2**2, 'replicated')
|
|
cls._rados_cmd(['-p', 'ganesha', '-N', 'ganesha1', 'create', 'conf-node1'])
|
|
cls._rados_cmd(['-p', 'ganesha', '-N', 'ganesha1', 'create', 'conf-node2'])
|
|
cls._rados_cmd(['-p', 'ganesha', '-N', 'ganesha1', 'create', 'conf-node3'])
|
|
cls._rados_cmd(['-p', 'ganesha', '-N', 'ganesha2', 'create', 'conf-node1'])
|
|
cls._rados_cmd(['-p', 'ganesha', '-N', 'ganesha2', 'create', 'conf-node2'])
|
|
cls._rados_cmd(['-p', 'ganesha', '-N', 'ganesha2', 'create', 'conf-node3'])
|
|
cls._ceph_cmd(['dashboard', 'set-ganesha-clusters-rados-pool-namespace', 'cluster1:ganesha/ganesha1,cluster2:ganesha/ganesha2'])
|
|
|
|
# RGW setup
|
|
cls._radosgw_admin_cmd([
|
|
'user', 'create', '--uid', 'admin', '--display-name', 'admin',
|
|
'--system', '--access-key', 'admin', '--secret', 'admin'
|
|
])
|
|
cls._ceph_cmd(['dashboard', 'set-rgw-api-secret-key', 'admin'])
|
|
cls._ceph_cmd(['dashboard', 'set-rgw-api-access-key', 'admin'])
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
super(GaneshaTest, cls).tearDownClass()
|
|
cls._radosgw_admin_cmd(['user', 'rm', '--uid', 'admin', '--purge-data'])
|
|
cls._ceph_cmd(['osd', 'pool', 'delete', 'ganesha', 'ganesha', '--yes-i-really-really-mean-it'])
|
|
|
|
@DashboardTestCase.RunAs('test', 'test', [{'rbd-image': ['create', 'update', 'delete']}])
|
|
def test_read_access_permissions(self):
|
|
self._get('/api/nfs-ganesha/export')
|
|
self.assertStatus(403)
|
|
|
|
def test_list_daemons(self):
|
|
daemons = self._get("/api/nfs-ganesha/daemon")
|
|
self.assertEqual(len(daemons), 6)
|
|
daemons = [(d['daemon_id'], d['cluster_id']) for d in daemons]
|
|
self.assertIn(('node1', 'cluster1'), daemons)
|
|
self.assertIn(('node2', 'cluster1'), daemons)
|
|
self.assertIn(('node3', 'cluster1'), daemons)
|
|
self.assertIn(('node1', 'cluster2'), daemons)
|
|
self.assertIn(('node2', 'cluster2'), daemons)
|
|
self.assertIn(('node3', 'cluster2'), daemons)
|
|
|
|
@classmethod
|
|
def create_export(cls, path, cluster_id, daemons, fsal, sec_label_xattr=None):
|
|
if fsal == 'CEPH':
|
|
fsal = {"name": "CEPH", "user_id":"admin", "fs_name": None, "sec_label_xattr": sec_label_xattr}
|
|
pseudo = "/cephfs{}".format(path)
|
|
else:
|
|
fsal = {"name": "RGW", "rgw_user_id": "admin"}
|
|
pseudo = "/rgw/{}".format(path if path[0] != '/' else "")
|
|
ex_json = {
|
|
"path": path,
|
|
"fsal": fsal,
|
|
"cluster_id": cluster_id,
|
|
"daemons": ["node1", "node3"],
|
|
"pseudo": pseudo,
|
|
"tag": None,
|
|
"access_type": "RW",
|
|
"squash": "no_root_squash",
|
|
"security_label": sec_label_xattr is not None,
|
|
"protocols": [4],
|
|
"transports": ["TCP"],
|
|
"clients": [{
|
|
"addresses":["10.0.0.0/8"],
|
|
"access_type": "RO",
|
|
"squash": "root"
|
|
}]
|
|
}
|
|
return cls._task_post('/api/nfs-ganesha/export', ex_json)
|
|
|
|
def tearDown(self):
|
|
super(GaneshaTest, self).tearDown()
|
|
exports = self._get("/api/nfs-ganesha/export")
|
|
if self._resp.status_code != 200:
|
|
return
|
|
self.assertIsInstance(exports, list)
|
|
for exp in exports:
|
|
self._task_delete("/api/nfs-ganesha/export/{}/{}"
|
|
.format(exp['cluster_id'], exp['export_id']))
|
|
|
|
def test_create_export(self):
|
|
exports = self._get("/api/nfs-ganesha/export")
|
|
self.assertEqual(len(exports), 0)
|
|
|
|
data = self.create_export("/foo", 'cluster1', ['node1', 'node2'], 'CEPH', "security.selinux")
|
|
|
|
exports = self._get("/api/nfs-ganesha/export")
|
|
self.assertEqual(len(exports), 1)
|
|
self.assertDictEqual(exports[0], data)
|
|
return data
|
|
|
|
def test_update_export(self):
|
|
export = self.test_create_export()
|
|
export['access_type'] = 'RO'
|
|
export['daemons'] = ['node1', 'node3']
|
|
export['security_label'] = True
|
|
data = self._task_put('/api/nfs-ganesha/export/{}/{}'
|
|
.format(export['cluster_id'], export['export_id']),
|
|
export)
|
|
exports = self._get("/api/nfs-ganesha/export")
|
|
self.assertEqual(len(exports), 1)
|
|
self.assertDictEqual(exports[0], data)
|
|
self.assertEqual(exports[0]['daemons'], ['node1', 'node3'])
|
|
self.assertEqual(exports[0]['security_label'], True)
|
|
|
|
def test_delete_export(self):
|
|
export = self.test_create_export()
|
|
self._task_delete("/api/nfs-ganesha/export/{}/{}"
|
|
.format(export['cluster_id'], export['export_id']))
|
|
self.assertStatus(204)
|
|
|
|
def test_get_export(self):
|
|
exports = self._get("/api/nfs-ganesha/export")
|
|
self.assertEqual(len(exports), 0)
|
|
|
|
data1 = self.create_export("/foo", 'cluster2', ['node1', 'node2'], 'CEPH')
|
|
data2 = self.create_export("mybucket", 'cluster2', ['node2', 'node3'], 'RGW')
|
|
|
|
export1 = self._get("/api/nfs-ganesha/export/cluster2/1")
|
|
self.assertDictEqual(export1, data1)
|
|
|
|
export2 = self._get("/api/nfs-ganesha/export/cluster2/2")
|
|
self.assertDictEqual(export2, data2)
|
|
|
|
def test_invalid_status(self):
|
|
self._ceph_cmd(['dashboard', 'set-ganesha-clusters-rados-pool-namespace', ''])
|
|
|
|
data = self._get('/api/nfs-ganesha/status')
|
|
self.assertStatus(200)
|
|
self.assertIn('available', data)
|
|
self.assertIn('message', data)
|
|
self.assertFalse(data['available'])
|
|
self.assertIn('Ganesha config location is not configured. Please set the GANESHA_RADOS_POOL_NAMESPACE setting.',
|
|
data['message'])
|
|
|
|
self._ceph_cmd(['dashboard', 'set-ganesha-clusters-rados-pool-namespace', 'cluster1:ganesha/ganesha1,cluster2:ganesha/ganesha2'])
|
|
|
|
def test_valid_status(self):
|
|
data = self._get('/api/nfs-ganesha/status')
|
|
self.assertStatus(200)
|
|
self.assertIn('available', data)
|
|
self.assertIn('message', data)
|
|
self.assertTrue(data['available'])
|