From d71a97b7faabc848fd72eb755b3d439b21b2dbdd Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Tue, 22 Jan 2019 16:30:01 +0100 Subject: [PATCH] mgr/orchestrator: make use of @CLICommand Also Modified some command to allow multiple hosts Also: Added more tests Signed-off-by: Sebastian Wagner --- qa/tasks/mgr/test_orchestrator_cli.py | 37 +++ src/pybind/mgr/orchestrator.py | 11 +- src/pybind/mgr/orchestrator_cli/module.py | 334 ++++++++------------- src/pybind/mgr/test_orchestrator/module.py | 14 +- 4 files changed, 172 insertions(+), 224 deletions(-) diff --git a/qa/tasks/mgr/test_orchestrator_cli.py b/qa/tasks/mgr/test_orchestrator_cli.py index e85444cba6a..3caa956e3b0 100644 --- a/qa/tasks/mgr/test_orchestrator_cli.py +++ b/qa/tasks/mgr/test_orchestrator_cli.py @@ -39,10 +39,26 @@ class TestOrchestratorCli(MgrTestCase): ret = self._orch_cmd("device", "ls") self.assertIn("localhost:", ret) + def test_device_ls_hoshs(self): + ret = self._orch_cmd("device", "ls", "localhost", "host1") + self.assertIn("localhost:", ret) + + + def test_device_ls_json(self): + ret = self._orch_cmd("device", "ls", "--format", "json") + self.assertIn("localhost", ret) + self.assertIsInstance(json.loads(ret), list) + def test_service_ls(self): ret = self._orch_cmd("service", "ls") self.assertIn("ceph-mgr", ret) + def test_service_ls_json(self): + ret = self._orch_cmd("service", "ls", "--format", "json") + self.assertIsInstance(json.loads(ret), list) + self.assertIn("ceph-mgr", ret) + + def test_service_action(self): self._orch_cmd("service", "reload", "mds", "cephfs") self._orch_cmd("service", "stop", "mds", "cephfs") @@ -67,3 +83,24 @@ class TestOrchestratorCli(MgrTestCase): with self.assertRaises(CommandFailedError): self._orch_cmd("osd", "create", "notfound:device") + def test_mds_add(self): + self._orch_cmd("mds", "add", "service_name") + + def test_rgw_add(self): + self._orch_cmd("rgw", "add", "service_name") + + def test_nfs_add(self): + self._orch_cmd("nfs", "add", "service_name", "pool", "--namespace", "ns") + self._orch_cmd("nfs", "add", "service_name", "pool") + + def test_osd_rm(self): + self._orch_cmd("osd", "rm", "osd.0") + + def test_mds_rm(self): + self._orch_cmd("mds", "rm", "foo") + + def test_rgw_rm(self): + self._orch_cmd("rgw", "rm", "foo") + + def test_nfs_rm(self): + self._orch_cmd("nfs", "rm", "service_name") diff --git a/src/pybind/mgr/orchestrator.py b/src/pybind/mgr/orchestrator.py index 7b7ae3a965b..9f90d907eeb 100644 --- a/src/pybind/mgr/orchestrator.py +++ b/src/pybind/mgr/orchestrator.py @@ -7,7 +7,7 @@ Please see the ceph-mgr module developer's guide for more information. import six try: - from typing import TypeVar, Generic, List, Optional, Union + from typing import TypeVar, Generic, List, Optional, Union, Tuple T = TypeVar('T') G = Generic[T] except ImportError: @@ -151,6 +151,7 @@ class Orchestrator(object): return True def available(self): + # type: () -> Tuple[Optional[bool], Optional[str]] """ Report whether we can talk to the orchestrator. This is the place to give the user a meaningful message if the orchestrator @@ -670,9 +671,10 @@ class InventoryFilter(object): in e.g. OSD servers. """ - def __init__(self): - self.labels = None # Optional: get info about nodes matching labels - self.nodes = None # Optional: get info about certain named nodes only + def __init__(self, labels=None, nodes=None): + # type: (List[str], List[str]) -> None + self.labels = labels # Optional: get info about nodes matching labels + self.nodes = nodes # Optional: get info about certain named nodes only class InventoryDevice(object): @@ -775,6 +777,7 @@ class OrchestratorClientMixin(Orchestrator): return self.remote(o, meth, *args, **kwargs) def _orchestrator_wait(self, completions): + # type: (List[_Completion]) -> None """ Helper to wait for completions to complete (reads) or become persistent (writes). diff --git a/src/pybind/mgr/orchestrator_cli/module.py b/src/pybind/mgr/orchestrator_cli/module.py index dc157bfb244..94eb267000a 100644 --- a/src/pybind/mgr/orchestrator_cli/module.py +++ b/src/pybind/mgr/orchestrator_cli/module.py @@ -6,13 +6,29 @@ try: except ImportError: pass # just for type checking. -from mgr_module import MgrModule, HandleCommandResult +from functools import wraps +from typing import List + +from mgr_module import MgrModule, HandleCommandResult, CLIWriteCommand, CLIReadCommand import orchestrator class NoOrchestrator(Exception): - pass + def __init__(self): + super(NoOrchestrator, self).__init__("No orchestrator configured (try " + "`ceph orchestrator set backend`)") + + +def handle_exceptions(func): + + @wraps(func) + def inner(*args, **kwargs): + try: + return func(*args, **kwargs) + except (NoOrchestrator, ImportError) as e: + return HandleCommandResult(-errno.ENOENT, stderr=str(e)) + return inner class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule): @@ -20,113 +36,6 @@ class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule): {'name': 'orchestrator'} ] - COMMANDS = [ - { - 'cmd': "orchestrator device ls " - "name=host,type=CephString,req=false " - "name=format,type=CephChoices,strings=json|plain,req=false ", - "desc": "List devices on a node", - "perm": "r" - }, - { - 'cmd': "orchestrator service ls " - "name=host,type=CephString,req=false " - "name=svc_type,type=CephString,req=false " - "name=svc_id,type=CephString,req=false " - "name=format,type=CephChoices,strings=json|plain,req=false ", - "desc": "List services known to orchestrator" , - "perm": "r" - }, - { - 'cmd': "orchestrator service status " - "name=host,type=CephString,req=false " - "name=svc_type,type=CephString " - "name=svc_id,type=CephString " - "name=format,type=CephChoices,strings=json|plain,req=false ", - "desc": "Get orchestrator state for Ceph service", - "perm": "r" - }, - { - 'cmd': "orchestrator osd create " - "name=svc_arg,type=CephString,req=false ", - "desc": "Create an OSD service. Either --svc_arg=host:drives or -i ", - "perm": "rw" - }, - { - 'cmd': "orchestrator osd rm " - "name=svc_id,type=CephString,n=N ", - "desc": "Remove an OSD service", - "perm": "rw" - }, - { - 'cmd': "orchestrator mds add " - "name=svc_arg,type=CephString ", - "desc": "Create an MDS service", - "perm": "rw" - }, - { - 'cmd': "orchestrator mds rm " - "name=svc_id,type=CephString ", - "desc": "Remove an MDS service", - "perm": "rw" - }, - { - 'cmd': "orchestrator rgw add " - "name=svc_arg,type=CephString ", - "desc": "Create an RGW service", - "perm": "rw" - }, - { - 'cmd': "orchestrator rgw rm " - "name=svc_id,type=CephString ", - "desc": "Remove an RGW service", - "perm": "rw" - }, - { - 'cmd': "orchestrator nfs add " - "name=svc_arg,type=CephString " - "name=pool,type=CephString " - "name=namespace,type=CephString,req=false ", - "desc": "Create an NFS service", - "perm": "rw" - }, - { - 'cmd': "orchestrator nfs rm " - "name=svc_id,type=CephString ", - "desc": "Remove an NFS service", - "perm": "rw" - }, - { - 'cmd': "orchestrator service " - "name=action,type=CephChoices," - "strings=start|stop|reload " - "name=svc_type,type=CephString " - "name=svc_name,type=CephString", - "desc": "Start, stop or reload an entire service (i.e. all daemons)", - "perm": "rw" - }, - { - 'cmd': "orchestrator service-instance " - "name=action,type=CephChoices," - "strings=start|stop|reload " - "name=svc_type,type=CephString " - "name=svc_id,type=CephString", - "desc": "Start, stop or reload a specific service instance", - "perm": "rw" - }, - { - 'cmd': "orchestrator set backend " - "name=module,type=CephString,req=true", - "desc": "Select orchestrator module backend", - "perm": "rw" - }, - { - "cmd": "orchestrator status", - "desc": "Report configured backend and its status", - "perm": "r" - } - ] - def _select_orchestrator(self): o = self.get_module_option("orchestrator") if o is None: @@ -134,8 +43,13 @@ class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule): return o - - def _list_devices(self, cmd): + @CLIReadCommand('orchestrator device ls', + "name=host,type=CephString,n=N,req=false " + "name=format,type=CephChoices,strings=json|plain,req=false", + 'List devices on a node') + @handle_exceptions + def _list_devices(self, host=None, format='plain'): + # type: (List[str], str) -> HandleCommandResult """ Provide information about storage devices present in cluster hosts @@ -143,19 +57,13 @@ class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule): date hardware inventory is fine as long as hardware ultimately appears in the output of this command. """ - host = cmd.get('host', None) - - if host: - nf = orchestrator.InventoryFilter() - nf.nodes = [host] - else: - nf = None + nf = orchestrator.InventoryFilter(nodes=host) if host else None completion = self.get_inventory(node_filter=nf) self._orchestrator_wait([completion]) - if cmd.get('format', 'plain') == 'json': + if format == 'json': data = [n.to_json() for n in completion.result] return HandleCommandResult(stdout=json.dumps(data)) else: @@ -176,14 +84,18 @@ class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule): return HandleCommandResult(stdout=result) - def _list_services(self, cmd): - hostname = cmd.get('host', None) - svc_id = cmd.get('svc_id', None) - svc_type = cmd.get('svc_type', None) + @CLIReadCommand('orchestrator service ls', + "name=host,type=CephString,req=false " + "name=svc_type,type=CephChoices,strings=mon|mgr|osd|mds|nfs|rgw|rbd-mirror,req=false " + "name=svc_id,type=CephString,req=false " + "name=format,type=CephChoices,strings=json|plain,req=false", + 'List services known to orchestrator') + @handle_exceptions + def _list_services(self, host=None, svc_type=None, svc_id=None, format='plain'): # XXX this is kind of confusing for people because in the orchestrator # context the service ID for MDS is the filesystem ID, not the daemon ID - completion = self.describe_service(svc_type, svc_id, hostname) + completion = self.describe_service(svc_type, svc_id, host) self._orchestrator_wait([completion]) services = completion.result @@ -192,7 +104,7 @@ class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule): if len(services) == 0: return HandleCommandResult(stdout="No services reported") - elif cmd.get('format', 'plain') == 'json': + elif format == 'json': data = [s.to_json() for s in services] return HandleCommandResult(stdout=json.dumps(data)) else: @@ -213,8 +125,12 @@ class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule): return HandleCommandResult(stdout="\n".join(lines)) - def _create_osd(self, inbuf, cmd): - # type: (str, Dict[str, str]) -> HandleCommandResult + @CLIWriteCommand('orchestrator osd create', + "name=svc_arg,type=CephString,req=false", + 'Create an OSD service. Either --svc_arg=host:drives or -i ') + @handle_exceptions + def _create_osd(self, svc_arg=None, inbuf=None): + # type: (str, str) -> HandleCommandResult """Create one or more OSDs""" usage = """ @@ -230,16 +146,18 @@ Usage: msg = 'Failed to read JSON input: {}'.format(str(e)) + usage return HandleCommandResult(-errno.EINVAL, stderr=msg) - else: + elif svc_arg: try: - node_name, block_device = cmd['svc_arg'].split(":") + node_name, block_device = svc_arg.split(":") block_devices = block_device.split(',') except (TypeError, KeyError, ValueError): - msg = "Invalid host:device spec: '{}'".format(cmd['svc_arg']) + usage + msg = "Invalid host:device spec: '{}'".format(svc_arg) + usage return HandleCommandResult(-errno.EINVAL, stderr=msg) devs = orchestrator.DeviceSelection(paths=block_devices) drive_group = orchestrator.DriveGroupSpec(node_name, data_devices=devs) + else: + return HandleCommandResult(-errno.EINVAL, stderr=usage) # TODO: Remove this and make the orchestrator composable # Like a future or so. @@ -257,12 +175,17 @@ Usage: self.log.warning(str(completion.result)) return HandleCommandResult(stdout=str(completion.result)) - def _osd_rm(self, cmd): + @CLIWriteCommand('orchestrator osd rm', + "name=svc_id,type=CephString,n=N", + 'Remove OSD services') + @handle_exceptions + def _osd_rm(self, svc_id): + # type: (List[str]) -> HandleCommandResult """ Remove OSD's :cmd : Arguments for remove the osd """ - completion = self.remove_osds(cmd["svc_id"]) + completion = self.remove_osds(svc_id) self._orchestrator_wait([completion]) return HandleCommandResult(stdout=str(completion.result)) @@ -271,26 +194,36 @@ Usage: self._orchestrator_wait([completion]) return HandleCommandResult() - def _mds_add(self, cmd): + @CLIWriteCommand('orchestrator mds add', + "name=svc_arg,type=CephString", + 'Create an MDS service') + @handle_exceptions + def _mds_add(self, svc_arg): spec = orchestrator.StatelessServiceSpec() - spec.name = cmd['svc_arg'] + spec.name = svc_arg return self._add_stateless_svc("mds", spec) - def _rgw_add(self, cmd): + @CLIWriteCommand('orchestrator rgw add', + "name=svc_arg,type=CephString", + 'Create an RGW service') + @handle_exceptions + def _rgw_add(self, svc_arg): spec = orchestrator.StatelessServiceSpec() - spec.name = cmd['svc_arg'] + spec.name = svc_arg return self._add_stateless_svc("rgw", spec) - def _nfs_add(self, cmd): - cluster_name = cmd['svc_arg'] - pool = cmd['pool'] - ns = cmd.get('namespace', None) - + @CLIWriteCommand('orchestrator nfs add', + "name=svc_arg,type=CephString " + "name=pool,type=CephString " + "name=namespace,type=CephString,req=false", + 'Create an NFS service') + @handle_exceptions + def _nfs_add(self, svc_arg, pool, namespace=None): spec = orchestrator.StatelessServiceSpec() - spec.name = cluster_name + spec.name = svc_arg spec.extended = { "pool":pool } - if ns != None: - spec.extended["namespace"] = ns + if namespace is not None: + spec.extended["namespace"] = namespace return self._add_stateless_svc("nfs", spec) def _rm_stateless_svc(self, svc_type, svc_id): @@ -298,36 +231,53 @@ Usage: self._orchestrator_wait([completion]) return HandleCommandResult() - def _mds_rm(self, cmd): - return self._rm_stateless_svc("mds", cmd['svc_id']) + @CLIWriteCommand('orchestrator mds rm', + "name=svc_id,type=CephString", + 'Remove an MDS service') + def _mds_rm(self, svc_id): + return self._rm_stateless_svc("mds", svc_id) - def _rgw_rm(self, cmd): - return self._rm_stateless_svc("rgw", cmd['svc_id']) + @handle_exceptions + @CLIWriteCommand('orchestrator rgw rm', + "name=svc_id,type=CephString", + 'Remove an RGW service') + def _rgw_rm(self, svc_id): + return self._rm_stateless_svc("rgw", svc_id) - def _nfs_rm(self, cmd): - return self._rm_stateless_svc("nfs", cmd['svc_id']) - - def _service_action(self, cmd): - action = cmd['action'] - svc_type = cmd['svc_type'] - svc_name = cmd['svc_name'] + @CLIWriteCommand('orchestrator nfs rm', + "name=svc_id,type=CephString", + 'Remove an NFS service') + @handle_exceptions + def _nfs_rm(self, svc_id): + return self._rm_stateless_svc("nfs", svc_id) + @CLIWriteCommand('orchestrator service', + "name=action,type=CephChoices,strings=start|stop|reload " + "name=svc_type,type=CephString " + "name=svc_name,type=CephString", + 'Start, stop or reload an entire service (i.e. all daemons)') + @handle_exceptions + def _service_action(self, action, svc_type, svc_name): completion = self.service_action(action, svc_type, service_name=svc_name) self._orchestrator_wait([completion]) - return HandleCommandResult() - def _service_instance_action(self, cmd): - action = cmd['action'] - svc_type = cmd['svc_type'] - svc_id = cmd['svc_id'] - + @CLIWriteCommand('orchestrator service-instance', + "name=action,type=CephChoices,strings=start|stop|reload " + "name=svc_type,type=CephString " + "name=svc_id,type=CephString", + 'Start, stop or reload a specific service instance') + @handle_exceptions + def _service_instance_action(self, action, svc_type, svc_id): completion = self.service_action(action, svc_type, service_id=svc_id) self._orchestrator_wait([completion]) - return HandleCommandResult() - def _set_backend(self, cmd): + @CLIWriteCommand('orchestrator set backend', + "name=module_name,type=CephString,req=true", + 'Select orchestrator module backend') + @handle_exceptions + def _set_backend(self, module_name): """ We implement a setter command instead of just having the user modify the setting directly, so that we can validate they're setting @@ -336,9 +286,7 @@ Usage: There isn't a mechanism for ensuring they don't *disable* the module later, but this is better than nothing. """ - mgr_map = self.get("mgr_map") - module_name = cmd['module'] if module_name == "": self.set_module_option("orchestrator", None) @@ -374,13 +322,11 @@ Usage: return HandleCommandResult(-errno.EINVAL, stderr="Module '{0}' not found".format(module_name)) + @CLIReadCommand('orchestrator status', + desc='Report configured backend and its status') + @handle_exceptions def _status(self): - try: - avail, why = self.available() - except NoOrchestrator: - return HandleCommandResult(-errno.ENODEV, - stderr="No orchestrator configured (try " - "`ceph orchestrator set backend`)") + avail, why = self.available() if avail is None: # The module does not report its availability @@ -391,49 +337,3 @@ Usage: avail, " ({0})".format(why) if not avail else "" )) - - def handle_command(self, inbuf, cmd): - try: - return self._handle_command(inbuf, cmd) - except NoOrchestrator: - return HandleCommandResult(-errno.ENODEV, stderr="No orchestrator configured") - except ImportError as e: - return HandleCommandResult(-errno.ENOENT, stderr=str(e)) - except NotImplementedError: - return HandleCommandResult(-errno.EINVAL, stderr="Command not found") - - def _handle_command(self, inbuf, cmd): - if cmd['prefix'] == "orchestrator device ls": - return self._list_devices(cmd) - elif cmd['prefix'] == "orchestrator service ls": - return self._list_services(cmd) - elif cmd['prefix'] == "orchestrator service status": - return self._list_services(cmd) # TODO: create more detailed output - elif cmd['prefix'] == "orchestrator osd rm": - return self._osd_rm(cmd) - elif cmd['prefix'] == "orchestrator mds add": - return self._mds_add(cmd) - elif cmd['prefix'] == "orchestrator mds rm": - return self._mds_rm(cmd) - elif cmd['prefix'] == "orchestrator rgw add": - return self._rgw_add(cmd) - elif cmd['prefix'] == "orchestrator rgw rm": - return self._rgw_rm(cmd) - elif cmd['prefix'] == "orchestrator nfs add": - return self._nfs_add(cmd) - elif cmd['prefix'] == "orchestrator nfs rm": - return self._nfs_rm(cmd) - elif cmd['prefix'] == "orchestrator service": - return self._service_action(cmd) - elif cmd['prefix'] == "orchestrator service-instance": - return self._service_instance_action(cmd) - elif cmd['prefix'] == "orchestrator set backend": - return self._set_backend(cmd) - elif cmd['prefix'] == "orchestrator status": - return self._status() - elif cmd['prefix'] == "orchestrator osd create": - return self._create_osd(inbuf, cmd) - elif cmd['prefix'] == "orchestrator osd remove": - return self._remove_osd(cmd) - else: - raise NotImplementedError() diff --git a/src/pybind/mgr/test_orchestrator/module.py b/src/pybind/mgr/test_orchestrator/module.py index ebe2c2e7feb..bfc6cf9f385 100644 --- a/src/pybind/mgr/test_orchestrator/module.py +++ b/src/pybind/mgr/test_orchestrator/module.py @@ -97,7 +97,6 @@ def deferred_write(message): def wrapper(f): @functools.wraps(f) def inner(*args, **kwargs): - args[0].log.warning('message' + message) return TestWriteCompletion(lambda: f(*args, **kwargs), '{}, args={}, kwargs={}'.format(message, args, kwargs)) return inner @@ -208,11 +207,13 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator): """ There is no guarantee which devices are returned by get_inventory. """ + if node_filter and node_filter.nodes is not None: + assert isinstance(node_filter.nodes, list) try: c_v_out = check_output(['ceph-volume', 'inventory', '--format', 'json']) except OSError: cmd = """ - . {}/ceph-volume-virtualenv/bin/activate + . {tmpdir}/ceph-volume-virtualenv/bin/activate ceph-volume inventory --format json """.format(tmpdir=os.environ.get('TMPDIR', '/tmp')) c_v_out = check_output(cmd, shell=True) @@ -259,15 +260,22 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator): return result + @deferred_write("Adding stateless service") def add_stateless_service(self, service_type, spec): - raise NotImplementedError(service_type) + pass @deferred_write("create_osds") def create_osds(self, drive_group, all_hosts): drive_group.validate(all_hosts) + @deferred_write("remove_osds") + def remove_osds(self, osd_ids): + assert isinstance(osd_ids, list) @deferred_write("service_action") def service_action(self, action, service_type, service_name=None, service_id=None): pass + @deferred_write("remove_stateless_service") + def remove_stateless_service(self, service_type, id_): + pass