mgr/orchestrator: make use of @CLICommand

Also Modified some command to allow multiple hosts
Also: Added more tests

Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
This commit is contained in:
Sebastian Wagner 2019-01-22 16:30:01 +01:00
parent e883e7398b
commit d71a97b7fa
4 changed files with 172 additions and 224 deletions

View File

@ -39,10 +39,26 @@ class TestOrchestratorCli(MgrTestCase):
ret = self._orch_cmd("device", "ls")
self.assertIn("localhost:", ret)
def test_device_ls_hoshs(self):
ret = self._orch_cmd("device", "ls", "localhost", "host1")
self.assertIn("localhost:", ret)
def test_device_ls_json(self):
ret = self._orch_cmd("device", "ls", "--format", "json")
self.assertIn("localhost", ret)
self.assertIsInstance(json.loads(ret), list)
def test_service_ls(self):
ret = self._orch_cmd("service", "ls")
self.assertIn("ceph-mgr", ret)
def test_service_ls_json(self):
ret = self._orch_cmd("service", "ls", "--format", "json")
self.assertIsInstance(json.loads(ret), list)
self.assertIn("ceph-mgr", ret)
def test_service_action(self):
self._orch_cmd("service", "reload", "mds", "cephfs")
self._orch_cmd("service", "stop", "mds", "cephfs")
@ -67,3 +83,24 @@ class TestOrchestratorCli(MgrTestCase):
with self.assertRaises(CommandFailedError):
self._orch_cmd("osd", "create", "notfound:device")
def test_mds_add(self):
self._orch_cmd("mds", "add", "service_name")
def test_rgw_add(self):
self._orch_cmd("rgw", "add", "service_name")
def test_nfs_add(self):
self._orch_cmd("nfs", "add", "service_name", "pool", "--namespace", "ns")
self._orch_cmd("nfs", "add", "service_name", "pool")
def test_osd_rm(self):
self._orch_cmd("osd", "rm", "osd.0")
def test_mds_rm(self):
self._orch_cmd("mds", "rm", "foo")
def test_rgw_rm(self):
self._orch_cmd("rgw", "rm", "foo")
def test_nfs_rm(self):
self._orch_cmd("nfs", "rm", "service_name")

View File

@ -7,7 +7,7 @@ Please see the ceph-mgr module developer's guide for more information.
import six
try:
from typing import TypeVar, Generic, List, Optional, Union
from typing import TypeVar, Generic, List, Optional, Union, Tuple
T = TypeVar('T')
G = Generic[T]
except ImportError:
@ -151,6 +151,7 @@ class Orchestrator(object):
return True
def available(self):
# type: () -> Tuple[Optional[bool], Optional[str]]
"""
Report whether we can talk to the orchestrator. This is the
place to give the user a meaningful message if the orchestrator
@ -670,9 +671,10 @@ class InventoryFilter(object):
in e.g. OSD servers.
"""
def __init__(self):
self.labels = None # Optional: get info about nodes matching labels
self.nodes = None # Optional: get info about certain named nodes only
def __init__(self, labels=None, nodes=None):
# type: (List[str], List[str]) -> None
self.labels = labels # Optional: get info about nodes matching labels
self.nodes = nodes # Optional: get info about certain named nodes only
class InventoryDevice(object):
@ -775,6 +777,7 @@ class OrchestratorClientMixin(Orchestrator):
return self.remote(o, meth, *args, **kwargs)
def _orchestrator_wait(self, completions):
# type: (List[_Completion]) -> None
"""
Helper to wait for completions to complete (reads) or
become persistent (writes).

View File

@ -6,13 +6,29 @@ try:
except ImportError:
pass # just for type checking.
from mgr_module import MgrModule, HandleCommandResult
from functools import wraps
from typing import List
from mgr_module import MgrModule, HandleCommandResult, CLIWriteCommand, CLIReadCommand
import orchestrator
class NoOrchestrator(Exception):
pass
def __init__(self):
super(NoOrchestrator, self).__init__("No orchestrator configured (try "
"`ceph orchestrator set backend`)")
def handle_exceptions(func):
@wraps(func)
def inner(*args, **kwargs):
try:
return func(*args, **kwargs)
except (NoOrchestrator, ImportError) as e:
return HandleCommandResult(-errno.ENOENT, stderr=str(e))
return inner
class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule):
@ -20,113 +36,6 @@ class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule):
{'name': 'orchestrator'}
]
COMMANDS = [
{
'cmd': "orchestrator device ls "
"name=host,type=CephString,req=false "
"name=format,type=CephChoices,strings=json|plain,req=false ",
"desc": "List devices on a node",
"perm": "r"
},
{
'cmd': "orchestrator service ls "
"name=host,type=CephString,req=false "
"name=svc_type,type=CephString,req=false "
"name=svc_id,type=CephString,req=false "
"name=format,type=CephChoices,strings=json|plain,req=false ",
"desc": "List services known to orchestrator" ,
"perm": "r"
},
{
'cmd': "orchestrator service status "
"name=host,type=CephString,req=false "
"name=svc_type,type=CephString "
"name=svc_id,type=CephString "
"name=format,type=CephChoices,strings=json|plain,req=false ",
"desc": "Get orchestrator state for Ceph service",
"perm": "r"
},
{
'cmd': "orchestrator osd create "
"name=svc_arg,type=CephString,req=false ",
"desc": "Create an OSD service. Either --svc_arg=host:drives or -i <drive_group>",
"perm": "rw"
},
{
'cmd': "orchestrator osd rm "
"name=svc_id,type=CephString,n=N ",
"desc": "Remove an OSD service",
"perm": "rw"
},
{
'cmd': "orchestrator mds add "
"name=svc_arg,type=CephString ",
"desc": "Create an MDS service",
"perm": "rw"
},
{
'cmd': "orchestrator mds rm "
"name=svc_id,type=CephString ",
"desc": "Remove an MDS service",
"perm": "rw"
},
{
'cmd': "orchestrator rgw add "
"name=svc_arg,type=CephString ",
"desc": "Create an RGW service",
"perm": "rw"
},
{
'cmd': "orchestrator rgw rm "
"name=svc_id,type=CephString ",
"desc": "Remove an RGW service",
"perm": "rw"
},
{
'cmd': "orchestrator nfs add "
"name=svc_arg,type=CephString "
"name=pool,type=CephString "
"name=namespace,type=CephString,req=false ",
"desc": "Create an NFS service",
"perm": "rw"
},
{
'cmd': "orchestrator nfs rm "
"name=svc_id,type=CephString ",
"desc": "Remove an NFS service",
"perm": "rw"
},
{
'cmd': "orchestrator service "
"name=action,type=CephChoices,"
"strings=start|stop|reload "
"name=svc_type,type=CephString "
"name=svc_name,type=CephString",
"desc": "Start, stop or reload an entire service (i.e. all daemons)",
"perm": "rw"
},
{
'cmd': "orchestrator service-instance "
"name=action,type=CephChoices,"
"strings=start|stop|reload "
"name=svc_type,type=CephString "
"name=svc_id,type=CephString",
"desc": "Start, stop or reload a specific service instance",
"perm": "rw"
},
{
'cmd': "orchestrator set backend "
"name=module,type=CephString,req=true",
"desc": "Select orchestrator module backend",
"perm": "rw"
},
{
"cmd": "orchestrator status",
"desc": "Report configured backend and its status",
"perm": "r"
}
]
def _select_orchestrator(self):
o = self.get_module_option("orchestrator")
if o is None:
@ -134,8 +43,13 @@ class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule):
return o
def _list_devices(self, cmd):
@CLIReadCommand('orchestrator device ls',
"name=host,type=CephString,n=N,req=false "
"name=format,type=CephChoices,strings=json|plain,req=false",
'List devices on a node')
@handle_exceptions
def _list_devices(self, host=None, format='plain'):
# type: (List[str], str) -> HandleCommandResult
"""
Provide information about storage devices present in cluster hosts
@ -143,19 +57,13 @@ class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule):
date hardware inventory is fine as long as hardware ultimately appears
in the output of this command.
"""
host = cmd.get('host', None)
if host:
nf = orchestrator.InventoryFilter()
nf.nodes = [host]
else:
nf = None
nf = orchestrator.InventoryFilter(nodes=host) if host else None
completion = self.get_inventory(node_filter=nf)
self._orchestrator_wait([completion])
if cmd.get('format', 'plain') == 'json':
if format == 'json':
data = [n.to_json() for n in completion.result]
return HandleCommandResult(stdout=json.dumps(data))
else:
@ -176,14 +84,18 @@ class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule):
return HandleCommandResult(stdout=result)
def _list_services(self, cmd):
hostname = cmd.get('host', None)
svc_id = cmd.get('svc_id', None)
svc_type = cmd.get('svc_type', None)
@CLIReadCommand('orchestrator service ls',
"name=host,type=CephString,req=false "
"name=svc_type,type=CephChoices,strings=mon|mgr|osd|mds|nfs|rgw|rbd-mirror,req=false "
"name=svc_id,type=CephString,req=false "
"name=format,type=CephChoices,strings=json|plain,req=false",
'List services known to orchestrator')
@handle_exceptions
def _list_services(self, host=None, svc_type=None, svc_id=None, format='plain'):
# XXX this is kind of confusing for people because in the orchestrator
# context the service ID for MDS is the filesystem ID, not the daemon ID
completion = self.describe_service(svc_type, svc_id, hostname)
completion = self.describe_service(svc_type, svc_id, host)
self._orchestrator_wait([completion])
services = completion.result
@ -192,7 +104,7 @@ class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule):
if len(services) == 0:
return HandleCommandResult(stdout="No services reported")
elif cmd.get('format', 'plain') == 'json':
elif format == 'json':
data = [s.to_json() for s in services]
return HandleCommandResult(stdout=json.dumps(data))
else:
@ -213,8 +125,12 @@ class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule):
return HandleCommandResult(stdout="\n".join(lines))
def _create_osd(self, inbuf, cmd):
# type: (str, Dict[str, str]) -> HandleCommandResult
@CLIWriteCommand('orchestrator osd create',
"name=svc_arg,type=CephString,req=false",
'Create an OSD service. Either --svc_arg=host:drives or -i <drive_group>')
@handle_exceptions
def _create_osd(self, svc_arg=None, inbuf=None):
# type: (str, str) -> HandleCommandResult
"""Create one or more OSDs"""
usage = """
@ -230,16 +146,18 @@ Usage:
msg = 'Failed to read JSON input: {}'.format(str(e)) + usage
return HandleCommandResult(-errno.EINVAL, stderr=msg)
else:
elif svc_arg:
try:
node_name, block_device = cmd['svc_arg'].split(":")
node_name, block_device = svc_arg.split(":")
block_devices = block_device.split(',')
except (TypeError, KeyError, ValueError):
msg = "Invalid host:device spec: '{}'".format(cmd['svc_arg']) + usage
msg = "Invalid host:device spec: '{}'".format(svc_arg) + usage
return HandleCommandResult(-errno.EINVAL, stderr=msg)
devs = orchestrator.DeviceSelection(paths=block_devices)
drive_group = orchestrator.DriveGroupSpec(node_name, data_devices=devs)
else:
return HandleCommandResult(-errno.EINVAL, stderr=usage)
# TODO: Remove this and make the orchestrator composable
# Like a future or so.
@ -257,12 +175,17 @@ Usage:
self.log.warning(str(completion.result))
return HandleCommandResult(stdout=str(completion.result))
def _osd_rm(self, cmd):
@CLIWriteCommand('orchestrator osd rm',
"name=svc_id,type=CephString,n=N",
'Remove OSD services')
@handle_exceptions
def _osd_rm(self, svc_id):
# type: (List[str]) -> HandleCommandResult
"""
Remove OSD's
:cmd : Arguments for remove the osd
"""
completion = self.remove_osds(cmd["svc_id"])
completion = self.remove_osds(svc_id)
self._orchestrator_wait([completion])
return HandleCommandResult(stdout=str(completion.result))
@ -271,26 +194,36 @@ Usage:
self._orchestrator_wait([completion])
return HandleCommandResult()
def _mds_add(self, cmd):
@CLIWriteCommand('orchestrator mds add',
"name=svc_arg,type=CephString",
'Create an MDS service')
@handle_exceptions
def _mds_add(self, svc_arg):
spec = orchestrator.StatelessServiceSpec()
spec.name = cmd['svc_arg']
spec.name = svc_arg
return self._add_stateless_svc("mds", spec)
def _rgw_add(self, cmd):
@CLIWriteCommand('orchestrator rgw add',
"name=svc_arg,type=CephString",
'Create an RGW service')
@handle_exceptions
def _rgw_add(self, svc_arg):
spec = orchestrator.StatelessServiceSpec()
spec.name = cmd['svc_arg']
spec.name = svc_arg
return self._add_stateless_svc("rgw", spec)
def _nfs_add(self, cmd):
cluster_name = cmd['svc_arg']
pool = cmd['pool']
ns = cmd.get('namespace', None)
@CLIWriteCommand('orchestrator nfs add',
"name=svc_arg,type=CephString "
"name=pool,type=CephString "
"name=namespace,type=CephString,req=false",
'Create an NFS service')
@handle_exceptions
def _nfs_add(self, svc_arg, pool, namespace=None):
spec = orchestrator.StatelessServiceSpec()
spec.name = cluster_name
spec.name = svc_arg
spec.extended = { "pool":pool }
if ns != None:
spec.extended["namespace"] = ns
if namespace is not None:
spec.extended["namespace"] = namespace
return self._add_stateless_svc("nfs", spec)
def _rm_stateless_svc(self, svc_type, svc_id):
@ -298,36 +231,53 @@ Usage:
self._orchestrator_wait([completion])
return HandleCommandResult()
def _mds_rm(self, cmd):
return self._rm_stateless_svc("mds", cmd['svc_id'])
@CLIWriteCommand('orchestrator mds rm',
"name=svc_id,type=CephString",
'Remove an MDS service')
def _mds_rm(self, svc_id):
return self._rm_stateless_svc("mds", svc_id)
def _rgw_rm(self, cmd):
return self._rm_stateless_svc("rgw", cmd['svc_id'])
@handle_exceptions
@CLIWriteCommand('orchestrator rgw rm',
"name=svc_id,type=CephString",
'Remove an RGW service')
def _rgw_rm(self, svc_id):
return self._rm_stateless_svc("rgw", svc_id)
def _nfs_rm(self, cmd):
return self._rm_stateless_svc("nfs", cmd['svc_id'])
def _service_action(self, cmd):
action = cmd['action']
svc_type = cmd['svc_type']
svc_name = cmd['svc_name']
@CLIWriteCommand('orchestrator nfs rm',
"name=svc_id,type=CephString",
'Remove an NFS service')
@handle_exceptions
def _nfs_rm(self, svc_id):
return self._rm_stateless_svc("nfs", svc_id)
@CLIWriteCommand('orchestrator service',
"name=action,type=CephChoices,strings=start|stop|reload "
"name=svc_type,type=CephString "
"name=svc_name,type=CephString",
'Start, stop or reload an entire service (i.e. all daemons)')
@handle_exceptions
def _service_action(self, action, svc_type, svc_name):
completion = self.service_action(action, svc_type, service_name=svc_name)
self._orchestrator_wait([completion])
return HandleCommandResult()
def _service_instance_action(self, cmd):
action = cmd['action']
svc_type = cmd['svc_type']
svc_id = cmd['svc_id']
@CLIWriteCommand('orchestrator service-instance',
"name=action,type=CephChoices,strings=start|stop|reload "
"name=svc_type,type=CephString "
"name=svc_id,type=CephString",
'Start, stop or reload a specific service instance')
@handle_exceptions
def _service_instance_action(self, action, svc_type, svc_id):
completion = self.service_action(action, svc_type, service_id=svc_id)
self._orchestrator_wait([completion])
return HandleCommandResult()
def _set_backend(self, cmd):
@CLIWriteCommand('orchestrator set backend',
"name=module_name,type=CephString,req=true",
'Select orchestrator module backend')
@handle_exceptions
def _set_backend(self, module_name):
"""
We implement a setter command instead of just having the user
modify the setting directly, so that we can validate they're setting
@ -336,9 +286,7 @@ Usage:
There isn't a mechanism for ensuring they don't *disable* the module
later, but this is better than nothing.
"""
mgr_map = self.get("mgr_map")
module_name = cmd['module']
if module_name == "":
self.set_module_option("orchestrator", None)
@ -374,13 +322,11 @@ Usage:
return HandleCommandResult(-errno.EINVAL, stderr="Module '{0}' not found".format(module_name))
@CLIReadCommand('orchestrator status',
desc='Report configured backend and its status')
@handle_exceptions
def _status(self):
try:
avail, why = self.available()
except NoOrchestrator:
return HandleCommandResult(-errno.ENODEV,
stderr="No orchestrator configured (try "
"`ceph orchestrator set backend`)")
avail, why = self.available()
if avail is None:
# The module does not report its availability
@ -391,49 +337,3 @@ Usage:
avail,
" ({0})".format(why) if not avail else ""
))
def handle_command(self, inbuf, cmd):
try:
return self._handle_command(inbuf, cmd)
except NoOrchestrator:
return HandleCommandResult(-errno.ENODEV, stderr="No orchestrator configured")
except ImportError as e:
return HandleCommandResult(-errno.ENOENT, stderr=str(e))
except NotImplementedError:
return HandleCommandResult(-errno.EINVAL, stderr="Command not found")
def _handle_command(self, inbuf, cmd):
if cmd['prefix'] == "orchestrator device ls":
return self._list_devices(cmd)
elif cmd['prefix'] == "orchestrator service ls":
return self._list_services(cmd)
elif cmd['prefix'] == "orchestrator service status":
return self._list_services(cmd) # TODO: create more detailed output
elif cmd['prefix'] == "orchestrator osd rm":
return self._osd_rm(cmd)
elif cmd['prefix'] == "orchestrator mds add":
return self._mds_add(cmd)
elif cmd['prefix'] == "orchestrator mds rm":
return self._mds_rm(cmd)
elif cmd['prefix'] == "orchestrator rgw add":
return self._rgw_add(cmd)
elif cmd['prefix'] == "orchestrator rgw rm":
return self._rgw_rm(cmd)
elif cmd['prefix'] == "orchestrator nfs add":
return self._nfs_add(cmd)
elif cmd['prefix'] == "orchestrator nfs rm":
return self._nfs_rm(cmd)
elif cmd['prefix'] == "orchestrator service":
return self._service_action(cmd)
elif cmd['prefix'] == "orchestrator service-instance":
return self._service_instance_action(cmd)
elif cmd['prefix'] == "orchestrator set backend":
return self._set_backend(cmd)
elif cmd['prefix'] == "orchestrator status":
return self._status()
elif cmd['prefix'] == "orchestrator osd create":
return self._create_osd(inbuf, cmd)
elif cmd['prefix'] == "orchestrator osd remove":
return self._remove_osd(cmd)
else:
raise NotImplementedError()

View File

@ -97,7 +97,6 @@ def deferred_write(message):
def wrapper(f):
@functools.wraps(f)
def inner(*args, **kwargs):
args[0].log.warning('message' + message)
return TestWriteCompletion(lambda: f(*args, **kwargs),
'{}, args={}, kwargs={}'.format(message, args, kwargs))
return inner
@ -208,11 +207,13 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
"""
There is no guarantee which devices are returned by get_inventory.
"""
if node_filter and node_filter.nodes is not None:
assert isinstance(node_filter.nodes, list)
try:
c_v_out = check_output(['ceph-volume', 'inventory', '--format', 'json'])
except OSError:
cmd = """
. {}/ceph-volume-virtualenv/bin/activate
. {tmpdir}/ceph-volume-virtualenv/bin/activate
ceph-volume inventory --format json
""".format(tmpdir=os.environ.get('TMPDIR', '/tmp'))
c_v_out = check_output(cmd, shell=True)
@ -259,15 +260,22 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
return result
@deferred_write("Adding stateless service")
def add_stateless_service(self, service_type, spec):
raise NotImplementedError(service_type)
pass
@deferred_write("create_osds")
def create_osds(self, drive_group, all_hosts):
drive_group.validate(all_hosts)
@deferred_write("remove_osds")
def remove_osds(self, osd_ids):
assert isinstance(osd_ids, list)
@deferred_write("service_action")
def service_action(self, action, service_type, service_name=None, service_id=None):
pass
@deferred_write("remove_stateless_service")
def remove_stateless_service(self, service_type, id_):
pass