mirror of
https://github.com/ceph/ceph
synced 2025-01-20 10:01:45 +00:00
Merge pull request #35694 from rhcs-dashboard/wip-45901-master
mgr/dashboard: increase API test coverage in API controllers Reviewed-by: Alfonso Martínez <almartin@redhat.com> Reviewed-by: Ernesto Puertat <epuertat@redhat.com> Reviewed-by: Laura Paduano <lpaduano@suse.com> Reviewed-by: Volker Theile <vtheile@suse.com>
This commit is contained in:
commit
63a56b4b52
@ -99,6 +99,16 @@ class CephfsTest(DashboardTestCase):
|
||||
self._delete("/api/cephfs/{}/client/1234".format(fs_id))
|
||||
self.assertStatus(404)
|
||||
|
||||
def test_cephfs_evict_invalid_client_id(self):
|
||||
fs_id = self.get_fs_id()
|
||||
self._delete("/api/cephfs/{}/client/xyz".format(fs_id))
|
||||
self.assertStatus(400)
|
||||
self.assertJsonBody({
|
||||
"component": 'cephfs',
|
||||
"code": "invalid_cephfs_client_id",
|
||||
"detail": "Invalid cephfs client ID xyz"
|
||||
})
|
||||
|
||||
def test_cephfs_get(self):
|
||||
fs_id = self.get_fs_id()
|
||||
data = self._get("/api/cephfs/{}/".format(fs_id))
|
||||
@ -134,6 +144,15 @@ class CephfsTest(DashboardTestCase):
|
||||
self.assertToHave(cephfs, 'id')
|
||||
self.assertToHave(cephfs, 'mdsmap')
|
||||
|
||||
def test_cephfs_get_quotas(self):
|
||||
fs_id = self.get_fs_id()
|
||||
data = self._get("/api/cephfs/{}/get_quotas?path=/".format(fs_id))
|
||||
self.assertStatus(200)
|
||||
self.assertSchema(data, JObj({
|
||||
'max_bytes': int,
|
||||
'max_files': int
|
||||
}))
|
||||
|
||||
def test_cephfs_tabs(self):
|
||||
fs_id = self.get_fs_id()
|
||||
data = self._get("/ui-api/cephfs/{}/tabs".format(fs_id))
|
||||
|
@ -4,7 +4,7 @@
|
||||
from __future__ import absolute_import
|
||||
|
||||
|
||||
from .helper import DashboardTestCase
|
||||
from .helper import DashboardTestCase, JList, JObj
|
||||
|
||||
|
||||
class GaneshaTest(DashboardTestCase):
|
||||
@ -166,3 +166,38 @@ class GaneshaTest(DashboardTestCase):
|
||||
self.assertIn('available', data)
|
||||
self.assertIn('message', data)
|
||||
self.assertTrue(data['available'])
|
||||
|
||||
def test_ganesha_fsals(self):
|
||||
data = self._get('/ui-api/nfs-ganesha/fsals')
|
||||
self.assertStatus(200)
|
||||
self.assertIn('CEPH', data)
|
||||
|
||||
def test_ganesha_filesystems(self):
|
||||
data = self._get('/ui-api/nfs-ganesha/cephfs/filesystems')
|
||||
self.assertStatus(200)
|
||||
self.assertSchema(data, JList(JObj({
|
||||
'id': int,
|
||||
'name': str
|
||||
})))
|
||||
|
||||
def test_ganesha_lsdir(self):
|
||||
self._get('/ui-api/nfs-ganesha/lsdir')
|
||||
self.assertStatus(500)
|
||||
|
||||
def test_ganesha_buckets(self):
|
||||
data = self._get('/ui-api/nfs-ganesha/rgw/buckets')
|
||||
self.assertStatus(200)
|
||||
schema = JList(str)
|
||||
self.assertSchema(data, schema)
|
||||
|
||||
def test_ganesha_clusters(self):
|
||||
data = self._get('/ui-api/nfs-ganesha/clusters')
|
||||
self.assertStatus(200)
|
||||
schema = JList(str)
|
||||
self.assertSchema(data, schema)
|
||||
|
||||
def test_ganesha_cephx_clients(self):
|
||||
data = self._get('/ui-api/nfs-ganesha/cephx/clients')
|
||||
self.assertStatus(200)
|
||||
schema = JList(str)
|
||||
self.assertSchema(data, schema)
|
||||
|
@ -90,6 +90,25 @@ class HostControllerTest(DashboardTestCase):
|
||||
}))
|
||||
})))
|
||||
|
||||
def test_host_daemons(self):
|
||||
hosts = self._get('{}'.format(self.URL_HOST))
|
||||
hosts = [host['hostname'] for host in hosts if host['hostname'] != '']
|
||||
assert hosts[0]
|
||||
data = self._get('{}/daemons'.format('{}/{}'.format(self.URL_HOST, hosts[0])))
|
||||
self.assertStatus(200)
|
||||
self.assertSchema(data, JList(JObj({
|
||||
'hostname': str,
|
||||
'daemon_id': str,
|
||||
'daemon_type': str
|
||||
})))
|
||||
|
||||
def test_host_smart(self):
|
||||
hosts = self._get('{}'.format(self.URL_HOST))
|
||||
hosts = [host['hostname'] for host in hosts if host['hostname'] != '']
|
||||
assert hosts[0]
|
||||
self._get('{}/smart'.format('{}/{}'.format(self.URL_HOST, hosts[0])))
|
||||
self.assertStatus(200)
|
||||
|
||||
|
||||
class HostControllerNoOrchestratorTest(DashboardTestCase):
|
||||
def test_host_create(self):
|
||||
|
@ -31,6 +31,22 @@ class MgrModuleTestCase(DashboardTestCase):
|
||||
|
||||
|
||||
class MgrModuleTest(MgrModuleTestCase):
|
||||
|
||||
__options_schema = JObj({
|
||||
'name': str,
|
||||
'type': str,
|
||||
'level': str,
|
||||
'flags': int,
|
||||
'default_value': JAny(none=True),
|
||||
'min': JAny(none=False),
|
||||
'max': JAny(none=False),
|
||||
'enum_allowed': JList(str),
|
||||
'desc': str,
|
||||
'long_desc': str,
|
||||
'tags': JList(str),
|
||||
'see_also': JList(str)
|
||||
})
|
||||
|
||||
def test_list_disabled_module(self):
|
||||
self._ceph_cmd(['mgr', 'module', 'disable', 'iostat'])
|
||||
self.wait_until_rest_api_accessible()
|
||||
@ -122,6 +138,39 @@ class MgrModuleTest(MgrModuleTestCase):
|
||||
'url': str
|
||||
}))
|
||||
|
||||
def test_module_options(self):
|
||||
data = self._get('/api/mgr/module/telemetry/options')
|
||||
self.assertStatus(200)
|
||||
schema = JObj({
|
||||
'channel_basic': self.__options_schema,
|
||||
'channel_crash': self.__options_schema,
|
||||
'channel_device': self.__options_schema,
|
||||
'channel_ident': self.__options_schema,
|
||||
'contact': self.__options_schema,
|
||||
'description': self.__options_schema,
|
||||
'device_url': self.__options_schema,
|
||||
'enabled': self.__options_schema,
|
||||
'interval': self.__options_schema,
|
||||
'last_opt_revision': self.__options_schema,
|
||||
'leaderboard': self.__options_schema,
|
||||
'log_level': self.__options_schema,
|
||||
'log_to_cluster': self.__options_schema,
|
||||
'log_to_cluster_level': self.__options_schema,
|
||||
'log_to_file': self.__options_schema,
|
||||
'organization': self.__options_schema,
|
||||
'proxy': self.__options_schema,
|
||||
'url': self.__options_schema
|
||||
})
|
||||
self.assertSchema(data, schema)
|
||||
|
||||
def test_module_enable(self):
|
||||
self._post('/api/mgr/module/telemetry/enable')
|
||||
self.assertStatus(200)
|
||||
|
||||
def test_disable(self):
|
||||
self._post('/api/mgr/module/iostat/disable')
|
||||
self.assertStatus(200)
|
||||
|
||||
def test_put(self):
|
||||
self.set_config_key('config/mgr/mgr/iostat/log_level', 'critical')
|
||||
self.set_config_key('config/mgr/mgr/iostat/log_to_cluster', 'False')
|
||||
|
@ -61,6 +61,19 @@ class OsdTest(DashboardTestCase):
|
||||
self._post('/api/osd/0/scrub?deep=True')
|
||||
self.assertStatus(200)
|
||||
|
||||
def test_safe_to_delete(self):
|
||||
data = self._get('/api/osd/safe_to_delete?svc_ids=0')
|
||||
self.assertStatus(200)
|
||||
self.assertSchema(data, JObj({
|
||||
'is_safe_to_delete': JAny(none=True),
|
||||
'message': str
|
||||
}))
|
||||
self.assertTrue(data['is_safe_to_delete'])
|
||||
|
||||
def test_osd_smart(self):
|
||||
self._get('/api/osd/0/smart')
|
||||
self.assertStatus(200)
|
||||
|
||||
def test_mark_out_and_in(self):
|
||||
self._post('/api/osd/0/mark_out')
|
||||
self.assertStatus(200)
|
||||
@ -98,6 +111,18 @@ class OsdTest(DashboardTestCase):
|
||||
'tracking_id': 'bare-5'
|
||||
})
|
||||
self.assertStatus(201)
|
||||
|
||||
# invalid method
|
||||
self._task_post('/api/osd', {
|
||||
'method': 'xyz',
|
||||
'data': {
|
||||
'uuid': 'f860ca2e-757d-48ce-b74a-87052cad563f',
|
||||
'svc_id': 5
|
||||
},
|
||||
'tracking_id': 'bare-5'
|
||||
})
|
||||
self.assertStatus(400)
|
||||
|
||||
# Lost
|
||||
self._post('/api/osd/5/mark_lost')
|
||||
self.assertStatus(200)
|
||||
|
@ -159,6 +159,16 @@ class PoolTest(DashboardTestCase):
|
||||
self._delete('/api/pool/ddd')
|
||||
self.assertStatus(403)
|
||||
|
||||
def test_pool_configuration(self):
|
||||
pool_name = 'device_health_metrics'
|
||||
data = self._get('/api/pool/{}/configuration'.format(pool_name))
|
||||
self.assertStatus(200)
|
||||
self.assertSchema(data, JList(JObj({
|
||||
'name': str,
|
||||
'value': str,
|
||||
'source': int
|
||||
})))
|
||||
|
||||
def test_pool_list(self):
|
||||
data = self._get("/api/pool")
|
||||
self.assertStatus(200)
|
||||
|
@ -511,6 +511,11 @@ class RgwUserTest(RgwTestCase):
|
||||
self.assertGreaterEqual(len(data), 1)
|
||||
self.assertIn('admin', data)
|
||||
|
||||
def test_get_emails(self):
|
||||
data = self._get('/api/rgw/user/get_emails')
|
||||
self.assertStatus(200)
|
||||
self.assertSchema(data, JList(str))
|
||||
|
||||
def test_create_get_update_delete(self):
|
||||
# Create a new user.
|
||||
self._post('/api/rgw/user', params={
|
||||
|
@ -6,7 +6,15 @@ ceph dashboard module
|
||||
from __future__ import absolute_import
|
||||
|
||||
import os
|
||||
import cherrypy
|
||||
|
||||
if 'COVERAGE_ENABLED' in os.environ:
|
||||
import coverage # pylint: disable=import-error
|
||||
__cov = coverage.Coverage(config_file="{}/.coveragerc".format(os.path.dirname(__file__)),
|
||||
data_suffix=True)
|
||||
__cov.start()
|
||||
cherrypy.engine.subscribe('after_request', __cov.save)
|
||||
cherrypy.engine.subscribe('stop', __cov.stop)
|
||||
|
||||
if 'UNITTEST' not in os.environ:
|
||||
class _ModuleProxy(object):
|
||||
|
@ -19,7 +19,7 @@ from ..tools import ViewCache
|
||||
|
||||
@ApiController('/cephfs', Scope.CEPHFS)
|
||||
class CephFS(RESTController):
|
||||
def __init__(self):
|
||||
def __init__(self): # pragma: no cover
|
||||
super(CephFS, self).__init__()
|
||||
|
||||
# Stateful instances of CephFSClients, hold cached results. Key to
|
||||
@ -179,7 +179,7 @@ class CephFS(RESTController):
|
||||
info['name'],
|
||||
"mds_server.handle_client_request")
|
||||
else:
|
||||
activity = 0.0
|
||||
activity = 0.0 # pragma: no cover
|
||||
|
||||
self._append_mds_metadata(mds_versions, info['name'])
|
||||
rank_table.append(
|
||||
@ -285,15 +285,15 @@ class CephFS(RESTController):
|
||||
# indepdendent of whether it's a kernel or userspace
|
||||
# client, so that the javascript doesn't have to grok that.
|
||||
for client in clients:
|
||||
if "ceph_version" in client['client_metadata']:
|
||||
if "ceph_version" in client['client_metadata']: # pragma: no cover - no complexity
|
||||
client['type'] = "userspace"
|
||||
client['version'] = client['client_metadata']['ceph_version']
|
||||
client['hostname'] = client['client_metadata']['hostname']
|
||||
elif "kernel_version" in client['client_metadata']:
|
||||
elif "kernel_version" in client['client_metadata']: # pragma: no cover - no complexity
|
||||
client['type'] = "kernel"
|
||||
client['version'] = client['client_metadata']['kernel_version']
|
||||
client['hostname'] = client['client_metadata']['hostname']
|
||||
else:
|
||||
else: # pragma: no cover - no complexity there
|
||||
client['type'] = "unknown"
|
||||
client['version'] = ""
|
||||
client['hostname'] = ""
|
||||
@ -334,7 +334,7 @@ class CephFS(RESTController):
|
||||
"""
|
||||
try:
|
||||
return self._get_root_directory(self._cephfs_instance(fs_id))
|
||||
except (cephfs.PermissionError, cephfs.ObjectNotFound):
|
||||
except (cephfs.PermissionError, cephfs.ObjectNotFound): # pragma: no cover
|
||||
return None
|
||||
|
||||
def _get_root_directory(self, cfs):
|
||||
@ -365,7 +365,7 @@ class CephFS(RESTController):
|
||||
try:
|
||||
cfs = self._cephfs_instance(fs_id)
|
||||
paths = cfs.ls_dir(path, depth)
|
||||
except (cephfs.PermissionError, cephfs.ObjectNotFound):
|
||||
except (cephfs.PermissionError, cephfs.ObjectNotFound): # pragma: no cover
|
||||
paths = []
|
||||
return paths
|
||||
|
||||
@ -516,6 +516,6 @@ class CephFsUi(CephFS):
|
||||
paths = cfs.ls_dir(path, depth)
|
||||
if path == os.sep:
|
||||
paths = [self._get_root_directory(cfs)] + paths
|
||||
except (cephfs.PermissionError, cephfs.ObjectNotFound):
|
||||
except (cephfs.PermissionError, cephfs.ObjectNotFound): # pragma: no cover
|
||||
paths = []
|
||||
return paths
|
||||
|
@ -117,7 +117,7 @@ class Host(RESTController):
|
||||
@raise_if_no_orchestrator
|
||||
@handle_orchestrator_error('host')
|
||||
@host_task('create', {'hostname': '{hostname}'})
|
||||
def create(self, hostname):
|
||||
def create(self, hostname): # pragma: no cover - requires realtime env
|
||||
orch_client = OrchClient.instance()
|
||||
self._check_orchestrator_host_op(orch_client, hostname, True)
|
||||
orch_client.hosts.add(hostname)
|
||||
@ -125,12 +125,12 @@ class Host(RESTController):
|
||||
@raise_if_no_orchestrator
|
||||
@handle_orchestrator_error('host')
|
||||
@host_task('delete', {'hostname': '{hostname}'})
|
||||
def delete(self, hostname):
|
||||
def delete(self, hostname): # pragma: no cover - requires realtime env
|
||||
orch_client = OrchClient.instance()
|
||||
self._check_orchestrator_host_op(orch_client, hostname, False)
|
||||
orch_client.hosts.remove(hostname)
|
||||
|
||||
def _check_orchestrator_host_op(self, orch_client, hostname, add_host=True):
|
||||
def _check_orchestrator_host_op(self, orch_client, hostname, add_host=True): # pragma:no cover
|
||||
"""Check if we can adding or removing a host with orchestrator
|
||||
|
||||
:param orch_client: Orchestrator client
|
||||
|
@ -181,7 +181,7 @@ class NFSGaneshaExports(RESTController):
|
||||
ganesha_conf = GaneshaConf.instance(cluster_id)
|
||||
|
||||
if not ganesha_conf.has_export(export_id):
|
||||
raise cherrypy.HTTPError(404)
|
||||
raise cherrypy.HTTPError(404) # pragma: no cover - the handling is too obvious
|
||||
|
||||
if fsal['name'] not in Ganesha.fsals_available():
|
||||
raise NFSException("Cannot make modifications to this export. "
|
||||
@ -227,8 +227,7 @@ class NFSGaneshaExports(RESTController):
|
||||
ganesha_conf = GaneshaConf.instance(cluster_id)
|
||||
|
||||
if not ganesha_conf.has_export(export_id):
|
||||
raise cherrypy.HTTPError(404)
|
||||
|
||||
raise cherrypy.HTTPError(404) # pragma: no cover - the handling is too obvious
|
||||
export = ganesha_conf.remove_export(export_id)
|
||||
if reload_daemons:
|
||||
ganesha_conf.reload_daemons(export.daemons)
|
||||
@ -280,7 +279,7 @@ class NFSGaneshaUi(BaseController):
|
||||
return Ganesha.fsals_available()
|
||||
|
||||
@Endpoint('GET', '/lsdir')
|
||||
def lsdir(self, root_dir=None, depth=1):
|
||||
def lsdir(self, root_dir=None, depth=1): # pragma: no cover
|
||||
if root_dir is None:
|
||||
root_dir = "/"
|
||||
depth = int(depth)
|
||||
|
@ -58,7 +58,7 @@ def raise_if_no_orchestrator(method):
|
||||
def inner(self, *args, **kwargs):
|
||||
orch = OrchClient.instance()
|
||||
if not orch.available():
|
||||
raise DashboardException(code='orchestrator_status_unavailable',
|
||||
raise DashboardException(code='orchestrator_status_unavailable', # pragma: no cover
|
||||
msg='Orchestrator is unavailable',
|
||||
component='orchestrator',
|
||||
http_status_code=503)
|
||||
@ -79,7 +79,7 @@ class Orchestrator(RESTController):
|
||||
@raise_if_no_orchestrator
|
||||
@handle_orchestrator_error('osd')
|
||||
@orchestrator_task('identify_device', ['{hostname}', '{device}'])
|
||||
def identify_device(self, hostname, device, duration):
|
||||
def identify_device(self, hostname, device, duration): # pragma: no cover
|
||||
# type: (str, str, int) -> None
|
||||
"""
|
||||
Identify a device by switching on the device light for N seconds.
|
||||
@ -111,7 +111,7 @@ class OrchestratorInventory(RESTController):
|
||||
for inventory_host in inventory_hosts:
|
||||
host_osds = device_osd_map.get(inventory_host['name'])
|
||||
for device in inventory_host['devices']:
|
||||
if host_osds:
|
||||
if host_osds: # pragma: no cover
|
||||
dev_name = os.path.basename(device['path'])
|
||||
device['osd_ids'] = sorted(host_osds.get(dev_name, []))
|
||||
else:
|
||||
|
@ -19,7 +19,7 @@ from ..services.orchestrator import OrchClient
|
||||
from ..tools import str_to_bool
|
||||
try:
|
||||
from typing import Dict, List, Any, Union # noqa: F401 pylint: disable=unused-import
|
||||
except ImportError:
|
||||
except ImportError: # pragma: no cover
|
||||
pass # For typing only
|
||||
|
||||
|
||||
@ -58,7 +58,7 @@ class Osd(RESTController):
|
||||
osd['stats_history'] = {}
|
||||
osd_spec = str(osd_id)
|
||||
if 'osd' not in osd:
|
||||
continue
|
||||
continue # pragma: no cover - simple early continue
|
||||
for stat in ['osd.op_w', 'osd.op_in_bytes', 'osd.op_r', 'osd.op_out_bytes']:
|
||||
prop = stat.split('.')[1]
|
||||
rates = CephService.get_rates('osd', osd_spec, stat)
|
||||
@ -105,10 +105,10 @@ class Osd(RESTController):
|
||||
try:
|
||||
histogram = CephService.send_command(
|
||||
'osd', srv_spec=svc_id, prefix='perf histogram dump')
|
||||
except SendCommandError as e:
|
||||
if 'osd down' in str(e):
|
||||
except SendCommandError as e: # pragma: no cover - the handling is too obvious
|
||||
if 'osd down' in str(e): # pragma: no cover - no complexity there
|
||||
histogram = str(e)
|
||||
else:
|
||||
else: # pragma: no cover - no complexity there
|
||||
raise
|
||||
|
||||
return {
|
||||
@ -117,7 +117,7 @@ class Osd(RESTController):
|
||||
'histogram': histogram,
|
||||
}
|
||||
|
||||
def set(self, svc_id, device_class):
|
||||
def set(self, svc_id, device_class): # pragma: no cover
|
||||
old_device_class = CephService.send_command('mon', 'osd crush get-device-class',
|
||||
ids=[svc_id])
|
||||
old_device_class = old_device_class[0]['device_class']
|
||||
@ -157,7 +157,7 @@ class Osd(RESTController):
|
||||
@raise_if_no_orchestrator
|
||||
@handle_orchestrator_error('osd')
|
||||
@osd_task('delete', {'svc_id': '{svc_id}'})
|
||||
def delete(self, svc_id, preserve_id=None, force=None):
|
||||
def delete(self, svc_id, preserve_id=None, force=None): # pragma: no cover
|
||||
replace = False
|
||||
check = False
|
||||
try:
|
||||
@ -168,7 +168,6 @@ class Osd(RESTController):
|
||||
except ValueError:
|
||||
raise DashboardException(
|
||||
component='osd', http_status_code=400, msg='Invalid parameter(s)')
|
||||
|
||||
orch = OrchClient.instance()
|
||||
if check:
|
||||
logger.info('Check for removing osd.%s...', svc_id)
|
||||
|
@ -5,7 +5,6 @@ import logging
|
||||
import json
|
||||
|
||||
import cherrypy
|
||||
|
||||
from . import ApiController, BaseController, RESTController, Endpoint, \
|
||||
ReadPermission
|
||||
from ..exceptions import DashboardException
|
||||
@ -18,7 +17,7 @@ from ..tools import json_str_to_object, str_to_bool
|
||||
|
||||
try:
|
||||
from typing import List
|
||||
except ImportError:
|
||||
except ImportError: # pragma: no cover
|
||||
pass # Just for type checking
|
||||
|
||||
logger = logging.getLogger('controllers.rgw')
|
||||
@ -33,7 +32,7 @@ class Rgw(BaseController):
|
||||
try:
|
||||
instance = RgwClient.admin_instance()
|
||||
# Check if the service is online.
|
||||
if not instance.is_service_online():
|
||||
if not instance.is_service_online(): # pragma: no cover - no complexity there
|
||||
msg = 'Failed to connect to the Object Gateway\'s Admin Ops API.'
|
||||
raise RequestException(msg)
|
||||
# Ensure the API user ID is known by the RGW.
|
||||
@ -42,7 +41,7 @@ class Rgw(BaseController):
|
||||
instance.userid)
|
||||
raise RequestException(msg)
|
||||
# Ensure the system flag is set for the API user ID.
|
||||
if not instance.is_system_user():
|
||||
if not instance.is_system_user(): # pragma: no cover - no complexity there
|
||||
msg = 'The system flag is not set for user "{}".'.format(
|
||||
instance.userid)
|
||||
raise RequestException(msg)
|
||||
@ -229,7 +228,7 @@ class RgwBucket(RgwRESTController):
|
||||
lock_retention_period_days,
|
||||
lock_retention_period_years)
|
||||
return result
|
||||
except RequestException as e:
|
||||
except RequestException as e: # pragma: no cover - handling is too obvious
|
||||
raise DashboardException(e, http_status_code=500, component='rgw')
|
||||
|
||||
def set(self, bucket, bucket_id, uid, versioning_state=None,
|
||||
@ -380,7 +379,7 @@ class RgwUser(RgwRESTController):
|
||||
'Object Gateway'.format(uid))
|
||||
# Finally redirect request to the RGW proxy.
|
||||
return self.proxy('DELETE', 'user', {'uid': uid}, json_response=False)
|
||||
except (DashboardException, RequestException) as e:
|
||||
except (DashboardException, RequestException) as e: # pragma: no cover
|
||||
raise DashboardException(e, component='rgw')
|
||||
|
||||
# pylint: disable=redefined-builtin
|
||||
|
@ -30,8 +30,8 @@ class Settings(RESTController):
|
||||
|
||||
try:
|
||||
yield result
|
||||
except AttributeError:
|
||||
raise cherrypy.NotFound(result)
|
||||
except AttributeError: # pragma: no cover - handling is too obvious
|
||||
raise cherrypy.NotFound(result) # pragma: no cover - handling is too obvious
|
||||
|
||||
@staticmethod
|
||||
def _to_native(setting):
|
||||
@ -98,7 +98,7 @@ class StandardSettings(RESTController):
|
||||
settings.
|
||||
:rtype: dict
|
||||
"""
|
||||
return {
|
||||
return { # pragma: no cover - no complexity there
|
||||
'user_pwd_expiration_span':
|
||||
SettingsModule.USER_PWD_EXPIRATION_SPAN,
|
||||
'user_pwd_expiration_warning_1':
|
||||
|
@ -21,8 +21,8 @@ class Summary(BaseController):
|
||||
def _rbd_mirroring(self):
|
||||
try:
|
||||
_, data = get_daemons_and_pools()
|
||||
except ViewCacheNoDataException:
|
||||
return {}
|
||||
except ViewCacheNoDataException: # pragma: no cover
|
||||
return {} # pragma: no cover
|
||||
|
||||
daemons = data.get('daemons', [])
|
||||
pools = data.get('pools', {})
|
||||
@ -30,18 +30,18 @@ class Summary(BaseController):
|
||||
warnings = 0
|
||||
errors = 0
|
||||
for daemon in daemons:
|
||||
if daemon['health_color'] == 'error':
|
||||
if daemon['health_color'] == 'error': # pragma: no cover
|
||||
errors += 1
|
||||
elif daemon['health_color'] == 'warning':
|
||||
elif daemon['health_color'] == 'warning': # pragma: no cover
|
||||
warnings += 1
|
||||
for _, pool in pools.items():
|
||||
if pool['health_color'] == 'error':
|
||||
if pool['health_color'] == 'error': # pragma: no cover
|
||||
errors += 1
|
||||
elif pool['health_color'] == 'warning':
|
||||
elif pool['health_color'] == 'warning': # pragma: no cover
|
||||
warnings += 1
|
||||
return {'warnings': warnings, 'errors': errors}
|
||||
|
||||
def _task_permissions(self, name):
|
||||
def _task_permissions(self, name): # pragma: no cover
|
||||
result = True
|
||||
if name == 'pool/create':
|
||||
result = self._has_permissions(Permission.CREATE, Scope.POOL)
|
||||
|
@ -12,6 +12,7 @@ import socket
|
||||
import tempfile
|
||||
import threading
|
||||
import time
|
||||
|
||||
from mgr_module import MgrModule, MgrStandbyModule, Option, CLIWriteCommand
|
||||
from mgr_util import get_default_addr, ServerConfigException, verify_tls_files, \
|
||||
create_self_signed_cert
|
||||
@ -29,15 +30,6 @@ if cherrypy is not None:
|
||||
from .cherrypy_backports import patch_cherrypy
|
||||
patch_cherrypy(cherrypy.__version__)
|
||||
|
||||
if 'COVERAGE_ENABLED' in os.environ:
|
||||
import coverage
|
||||
__cov = coverage.Coverage(config_file="{}/.coveragerc".format(os.path.dirname(__file__)),
|
||||
data_suffix=True)
|
||||
|
||||
cherrypy.engine.subscribe('start', __cov.start)
|
||||
cherrypy.engine.subscribe('after_request', __cov.save)
|
||||
cherrypy.engine.subscribe('stop', __cov.stop)
|
||||
|
||||
# pylint: disable=wrong-import-position
|
||||
from . import mgr
|
||||
from .controllers import generate_routes, json_error_page
|
||||
@ -296,6 +288,16 @@ class Module(MgrModule, CherryPyConfig):
|
||||
return os.path.join(current_dir, 'frontend/dist')
|
||||
|
||||
def serve(self):
|
||||
|
||||
if 'COVERAGE_ENABLED' in os.environ:
|
||||
import coverage
|
||||
__cov = coverage.Coverage(config_file="{}/.coveragerc"
|
||||
.format(os.path.dirname(__file__)),
|
||||
data_suffix=True)
|
||||
__cov.start()
|
||||
cherrypy.engine.subscribe('after_request', __cov.save)
|
||||
cherrypy.engine.subscribe('stop', __cov.stop)
|
||||
|
||||
AuthManager.initialize()
|
||||
load_sso_db()
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user