mirror of
https://github.com/ceph/ceph
synced 2025-01-19 09:32:00 +00:00
mgr/orchestrator: Add error handling to interface
Also: * Small test_orchestrator refactorization * Improved Docstring in MgrModule.remote * Added `raise_if_exception` that raises Exceptions * Added `OrchestratorError` and `OrchestratorValidationError` * `_orchestrator_wait` no longer raises anything * `volumes` model also calls `raise_if_exception` Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
This commit is contained in:
parent
476d88ae6b
commit
61d53ed70e
@ -135,6 +135,9 @@ effect. Second, the completion becomes *effective*, meaning that the operation
|
||||
|
||||
.. automethod:: Orchestrator.wait
|
||||
|
||||
.. autoclass:: _Completion
|
||||
:members:
|
||||
|
||||
.. autoclass:: ReadCompletion
|
||||
.. autoclass:: WriteCompletion
|
||||
|
||||
@ -150,6 +153,58 @@ specify a location when creating a stateless service.
|
||||
OSD services generally require a specific placement choice, as this
|
||||
will determine which storage devices are used.
|
||||
|
||||
Error Handling
|
||||
--------------
|
||||
|
||||
The main goal of error handling within orchestrator modules is to provide debug information to
|
||||
assist users when dealing with deployment errors.
|
||||
|
||||
.. autoclass:: OrchestratorError
|
||||
.. autoclass:: NoOrchestrator
|
||||
.. autoclass:: OrchestratorValidationError
|
||||
|
||||
|
||||
In detail, orchestrators need to explicitly deal with different kinds of errors:
|
||||
|
||||
1. No orchestrator configured
|
||||
|
||||
See :class:`NoOrchestrator`.
|
||||
|
||||
2. An orchestrator doesn't implement a specific method.
|
||||
|
||||
For example, an Orchestrator doesn't support ``add_host``.
|
||||
|
||||
In this case, a ``NotImplementedError`` is raised.
|
||||
|
||||
3. Missing features within implemented methods.
|
||||
|
||||
E.g. optional parameters to a command that are not supported by the
|
||||
backend (e.g. the hosts field in :func:`Orchestrator.update_mons` command with the rook backend).
|
||||
|
||||
See :class:`OrchestratorValidationError`.
|
||||
|
||||
4. Input validation errors
|
||||
|
||||
The ``orchestrator_cli`` module and other calling modules are supposed to
|
||||
provide meaningful error messages.
|
||||
|
||||
See :class:`OrchestratorValidationError`.
|
||||
|
||||
5. Errors when actually executing commands
|
||||
|
||||
The resulting Completion should contain an error string that assists in understanding the
|
||||
problem. In addition, :func:`_Completion.is_errored` is set to ``True``
|
||||
|
||||
6. Invalid configuration in the orchestrator modules
|
||||
|
||||
This can be tackled similar to 5.
|
||||
|
||||
|
||||
All other errors are unexpected orchestrator issues and thus should raise an exception that are then
|
||||
logged into the mgr log file. If there is a completion object at that point,
|
||||
:func:`_Completion.result` may contain an error message.
|
||||
|
||||
|
||||
Excluded functionality
|
||||
----------------------
|
||||
|
||||
@ -220,3 +275,8 @@ Utility
|
||||
|
||||
.. automethod:: Orchestrator.available
|
||||
|
||||
Client Modules
|
||||
--------------
|
||||
|
||||
.. autoclass:: OrchestratorClientMixin
|
||||
:members:
|
||||
|
@ -1,3 +1,4 @@
|
||||
import errno
|
||||
import json
|
||||
import logging
|
||||
from tempfile import NamedTemporaryFile
|
||||
@ -18,11 +19,9 @@ class TestOrchestratorCli(MgrTestCase):
|
||||
|
||||
def _orch_cmd_result(self, *args, **kwargs):
|
||||
"""
|
||||
superfluous, but raw_cluster_cmd doesn't support kwargs.
|
||||
raw_cluster_cmd doesn't support kwargs.
|
||||
"""
|
||||
res = self.mgr_cluster.mon_manager.raw_cluster_cmd_result("orchestrator", *args, **kwargs)
|
||||
self.assertEqual(res, 0)
|
||||
|
||||
return self.mgr_cluster.mon_manager.raw_cluster_cmd_result("orchestrator", *args, **kwargs)
|
||||
|
||||
def setUp(self):
|
||||
super(TestOrchestratorCli, self).setUp()
|
||||
@ -82,7 +81,8 @@ class TestOrchestratorCli(MgrTestCase):
|
||||
"data_devices": {"paths": ["/dev/sda"]}
|
||||
}
|
||||
|
||||
self._orch_cmd_result("osd", "create", "-i", "-", stdin=json.dumps(drive_group))
|
||||
res = self._orch_cmd_result("osd", "create", "-i", "-", stdin=json.dumps(drive_group))
|
||||
self.assertEqual(res, 0)
|
||||
|
||||
with self.assertRaises(CommandFailedError):
|
||||
self._orch_cmd("osd", "create", "notfound:device")
|
||||
@ -129,3 +129,15 @@ class TestOrchestratorCli(MgrTestCase):
|
||||
|
||||
def test_nfs_update(self):
|
||||
self._orch_cmd("nfs", "update", "service_name", "2")
|
||||
|
||||
def test_error(self):
|
||||
ret = self._orch_cmd_result("host", "add", "raise_no_support")
|
||||
self.assertEqual(ret, errno.ENOENT)
|
||||
ret = self._orch_cmd_result("host", "add", "raise_bug")
|
||||
self.assertEqual(ret, errno.EINVAL)
|
||||
ret = self._orch_cmd_result("host", "add", "raise_not_implemented")
|
||||
self.assertEqual(ret, errno.ENOENT)
|
||||
ret = self._orch_cmd_result("host", "add", "raise_no_orchestrator")
|
||||
self.assertEqual(ret, errno.ENOENT)
|
||||
ret = self._orch_cmd_result("host", "add", "raise_import_error")
|
||||
self.assertEqual(ret, errno.ENOENT)
|
||||
|
@ -1214,7 +1214,8 @@ class MgrModule(ceph_module.BaseMgrModule):
|
||||
exception is raised.
|
||||
:param args: Argument tuple
|
||||
:param kwargs: Keyword argument dict
|
||||
:return:
|
||||
:raises RuntimeError: **Any** error raised within the method is converted to a RuntimeError
|
||||
:raises ImportError: No such module
|
||||
"""
|
||||
return self._ceph_dispatch_remote(module_name, method_name,
|
||||
args, kwargs)
|
||||
|
@ -4,9 +4,9 @@ ceph-mgr orchestrator interface
|
||||
|
||||
Please see the ceph-mgr module developer's guide for more information.
|
||||
"""
|
||||
import six
|
||||
|
||||
from mgr_util import format_bytes
|
||||
import sys
|
||||
import time
|
||||
import fnmatch
|
||||
|
||||
try:
|
||||
from typing import TypeVar, Generic, List, Optional, Union, Tuple
|
||||
@ -15,13 +15,34 @@ try:
|
||||
except ImportError:
|
||||
T, G = object, object
|
||||
|
||||
import time
|
||||
import fnmatch
|
||||
import six
|
||||
|
||||
from mgr_util import format_bytes
|
||||
|
||||
|
||||
class OrchestratorError(Exception):
|
||||
"""
|
||||
General orchestrator specific error.
|
||||
|
||||
Used for deployment, configuration or user errors.
|
||||
|
||||
It's not intended for programming errors or orchestrator internal errors.
|
||||
"""
|
||||
|
||||
|
||||
class NoOrchestrator(OrchestratorError):
|
||||
"""
|
||||
No orchestrator in configured.
|
||||
"""
|
||||
def __init__(self, msg="No orchestrator configured (try `ceph orchestrator set backend`)"):
|
||||
super(NoOrchestrator, self).__init__(msg)
|
||||
|
||||
|
||||
class OrchestratorValidationError(OrchestratorError):
|
||||
"""
|
||||
Raised when an orchestrator doesn't support a specific feature.
|
||||
"""
|
||||
|
||||
class NoOrchestrator(Exception):
|
||||
def __init__(self):
|
||||
super(NoOrchestrator, self).__init__("No orchestrator configured (try "
|
||||
"`ceph orchestrator set backend`)")
|
||||
|
||||
class _Completion(G):
|
||||
@property
|
||||
@ -34,6 +55,21 @@ class _Completion(G):
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@property
|
||||
def exception(self):
|
||||
# type: () -> Optional[Exception]
|
||||
"""
|
||||
Holds an exception object.
|
||||
"""
|
||||
try:
|
||||
return self.__exception
|
||||
except AttributeError:
|
||||
return None
|
||||
|
||||
@exception.setter
|
||||
def exception(self, value):
|
||||
self.__exception = value
|
||||
|
||||
@property
|
||||
def is_read(self):
|
||||
# type: () -> bool
|
||||
@ -47,7 +83,11 @@ class _Completion(G):
|
||||
@property
|
||||
def is_errored(self):
|
||||
# type: () -> bool
|
||||
raise NotImplementedError()
|
||||
"""
|
||||
Has the completion failed. Default implementation looks for
|
||||
self.exception. Can be overwritten.
|
||||
"""
|
||||
return self.exception is not None
|
||||
|
||||
@property
|
||||
def should_wait(self):
|
||||
@ -55,6 +95,30 @@ class _Completion(G):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
def raise_if_exception(c):
|
||||
# type: (_Completion) -> None
|
||||
"""
|
||||
:raises OrchestratorError: Some user error or a config error.
|
||||
:raises Exception: Some internal error
|
||||
"""
|
||||
def copy_to_this_subinterpreter(r_obj):
|
||||
# This is something like `return pickle.loads(pickle.dumps(r_obj))`
|
||||
# Without importing anything.
|
||||
r_cls = r_obj.__class__
|
||||
if r_cls.__module__ == '__builtin__':
|
||||
return r_obj
|
||||
my_cls = getattr(sys.modules[r_cls.__module__], r_cls.__name__)
|
||||
if id(my_cls) == id(r_cls):
|
||||
return r_obj
|
||||
my_obj = my_cls.__new__(my_cls)
|
||||
for k,v in r_obj.__dict__.items():
|
||||
setattr(my_obj, k, copy_to_this_subinterpreter(v))
|
||||
return my_obj
|
||||
|
||||
if c.exception is not None:
|
||||
raise copy_to_this_subinterpreter(c.exception)
|
||||
|
||||
|
||||
class ReadCompletion(_Completion):
|
||||
"""
|
||||
``Orchestrator`` implementations should inherit from this
|
||||
@ -814,9 +878,27 @@ def _mk_orch_methods(cls):
|
||||
|
||||
@_mk_orch_methods
|
||||
class OrchestratorClientMixin(Orchestrator):
|
||||
"""
|
||||
A module that inherents from `OrchestratorClientMixin` can directly call
|
||||
all :class:`Orchestrator` methods without manually calling remote.
|
||||
|
||||
Every interface method from ``Orchestrator`` is converted into a stub method that internally
|
||||
calls :func:`OrchestratorClientMixin._oremote`
|
||||
|
||||
>>> class MyModule(OrchestratorClientMixin):
|
||||
... def func(self):
|
||||
... completion = self.add_host('somehost') # calls `_oremote()`
|
||||
... self._orchestrator_wait([completion])
|
||||
... self.log.debug(completion.result)
|
||||
|
||||
"""
|
||||
def _oremote(self, meth, args, kwargs):
|
||||
"""
|
||||
Helper for invoking `remote` on whichever orchestrator is enabled
|
||||
|
||||
:raises RuntimeError: If the remote method failed.
|
||||
:raises NoOrchestrator:
|
||||
:raises ImportError: no `orchestrator_cli` module or backend not found.
|
||||
"""
|
||||
try:
|
||||
o = self._select_orchestrator()
|
||||
@ -832,16 +914,17 @@ class OrchestratorClientMixin(Orchestrator):
|
||||
def _orchestrator_wait(self, completions):
|
||||
# type: (List[_Completion]) -> None
|
||||
"""
|
||||
Helper to wait for completions to complete (reads) or
|
||||
Wait for completions to complete (reads) or
|
||||
become persistent (writes).
|
||||
|
||||
Waits for writes to be *persistent* but not *effective*.
|
||||
|
||||
:param completions: List of Completions
|
||||
:raises NoOrchestrator:
|
||||
:raises ImportError: no `orchestrator_cli` module or backend not found.
|
||||
"""
|
||||
while not self.wait(completions):
|
||||
if any(c.should_wait for c in completions):
|
||||
time.sleep(5)
|
||||
else:
|
||||
break
|
||||
|
||||
if all(hasattr(c, 'error') and getattr(c, 'error') for c in completions):
|
||||
raise Exception([getattr(c, 'error') for c in completions])
|
||||
|
@ -8,21 +8,34 @@ except ImportError:
|
||||
|
||||
from functools import wraps
|
||||
|
||||
from mgr_module import MgrModule, HandleCommandResult, CLIWriteCommand, CLIReadCommand
|
||||
from mgr_module import MgrModule, HandleCommandResult, CLICommand
|
||||
|
||||
import orchestrator
|
||||
|
||||
|
||||
def handle_exceptions(func):
|
||||
|
||||
def handle_exception(prefix, cmd_args, desc, perm, func):
|
||||
@wraps(func)
|
||||
def inner(*args, **kwargs):
|
||||
def wrapper(*args, **kwargs):
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except (orchestrator.NoOrchestrator, ImportError) as e:
|
||||
except (orchestrator.OrchestratorError, ImportError) as e:
|
||||
# Do not print Traceback for expected errors.
|
||||
return HandleCommandResult(-errno.ENOENT, stderr=str(e))
|
||||
return inner
|
||||
except NotImplementedError:
|
||||
msg = 'This Orchestrator does not support `{}`'.format(prefix)
|
||||
return HandleCommandResult(-errno.ENOENT, stderr=msg)
|
||||
|
||||
return CLICommand(prefix, cmd_args, desc, perm)(wrapper)
|
||||
|
||||
|
||||
def _cli_command(perm):
|
||||
def inner_cli_command(prefix, cmd_args="", desc=""):
|
||||
return lambda func: handle_exception(prefix, cmd_args, desc, perm, func)
|
||||
return inner_cli_command
|
||||
|
||||
|
||||
_read_cli = _cli_command('r')
|
||||
_write_cli = _cli_command('rw')
|
||||
|
||||
class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule):
|
||||
MODULE_OPTIONS = [
|
||||
@ -32,39 +45,38 @@ class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule):
|
||||
def _select_orchestrator(self):
|
||||
return self.get_module_option("orchestrator")
|
||||
|
||||
@CLIWriteCommand('orchestrator host add',
|
||||
"name=host,type=CephString,req=true",
|
||||
'Add a host')
|
||||
@handle_exceptions
|
||||
@_write_cli('orchestrator host add',
|
||||
"name=host,type=CephString,req=true",
|
||||
'Add a host')
|
||||
def _add_host(self, host):
|
||||
completion = self.add_host(host)
|
||||
self._orchestrator_wait([completion])
|
||||
orchestrator.raise_if_exception(completion)
|
||||
return HandleCommandResult(stdout=str(completion.result))
|
||||
|
||||
@CLIWriteCommand('orchestrator host rm',
|
||||
"name=host,type=CephString,req=true",
|
||||
'Remove a host')
|
||||
@handle_exceptions
|
||||
@_write_cli('orchestrator host rm',
|
||||
"name=host,type=CephString,req=true",
|
||||
'Remove a host')
|
||||
def _remove_host(self, host):
|
||||
completion = self.remove_host(host)
|
||||
self._orchestrator_wait([completion])
|
||||
orchestrator.raise_if_exception(completion)
|
||||
return HandleCommandResult(stdout=str(completion.result))
|
||||
|
||||
@CLIReadCommand('orchestrator host ls',
|
||||
desc='List hosts')
|
||||
@handle_exceptions
|
||||
@_read_cli('orchestrator host ls',
|
||||
desc='List hosts')
|
||||
def _get_hosts(self):
|
||||
completion = self.get_hosts()
|
||||
self._orchestrator_wait([completion])
|
||||
orchestrator.raise_if_exception(completion)
|
||||
result = "\n".join(map(lambda node: node.name, completion.result))
|
||||
return HandleCommandResult(stdout=result)
|
||||
|
||||
@CLIReadCommand('orchestrator device ls',
|
||||
"name=host,type=CephString,n=N,req=false "
|
||||
"name=format,type=CephChoices,strings=json|plain,req=false "
|
||||
"name=refresh,type=CephBool,req=false",
|
||||
'List devices on a node')
|
||||
@handle_exceptions
|
||||
@_read_cli('orchestrator device ls',
|
||||
"name=host,type=CephString,n=N,req=false "
|
||||
"name=format,type=CephChoices,strings=json|plain,req=false "
|
||||
"name=refresh,type=CephBool,req=false",
|
||||
'List devices on a node')
|
||||
def _list_devices(self, host=None, format='plain', refresh=False):
|
||||
# type: (List[str], str, bool) -> HandleCommandResult
|
||||
"""
|
||||
@ -79,6 +91,7 @@ class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule):
|
||||
completion = self.get_inventory(node_filter=nf, refresh=refresh)
|
||||
|
||||
self._orchestrator_wait([completion])
|
||||
orchestrator.raise_if_exception(completion)
|
||||
|
||||
if format == 'json':
|
||||
data = [n.to_json() for n in completion.result]
|
||||
@ -101,19 +114,19 @@ class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule):
|
||||
|
||||
return HandleCommandResult(stdout=result)
|
||||
|
||||
@CLIReadCommand('orchestrator service ls',
|
||||
"name=host,type=CephString,req=false "
|
||||
"name=svc_type,type=CephChoices,strings=mon|mgr|osd|mds|nfs|rgw|rbd-mirror,req=false "
|
||||
"name=svc_id,type=CephString,req=false "
|
||||
"name=format,type=CephChoices,strings=json|plain,req=false",
|
||||
'List services known to orchestrator')
|
||||
@handle_exceptions
|
||||
@_read_cli('orchestrator service ls',
|
||||
"name=host,type=CephString,req=false "
|
||||
"name=svc_type,type=CephChoices,strings=mon|mgr|osd|mds|nfs|rgw|rbd-mirror,req=false "
|
||||
"name=svc_id,type=CephString,req=false "
|
||||
"name=format,type=CephChoices,strings=json|plain,req=false",
|
||||
'List services known to orchestrator')
|
||||
def _list_services(self, host=None, svc_type=None, svc_id=None, format='plain'):
|
||||
# XXX this is kind of confusing for people because in the orchestrator
|
||||
# context the service ID for MDS is the filesystem ID, not the daemon ID
|
||||
|
||||
completion = self.describe_service(svc_type, svc_id, host)
|
||||
self._orchestrator_wait([completion])
|
||||
orchestrator.raise_if_exception(completion)
|
||||
services = completion.result
|
||||
|
||||
# Sort the list for display
|
||||
@ -142,10 +155,9 @@ class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule):
|
||||
|
||||
return HandleCommandResult(stdout="\n".join(lines))
|
||||
|
||||
@CLIWriteCommand('orchestrator osd create',
|
||||
"name=svc_arg,type=CephString,req=false",
|
||||
'Create an OSD service. Either --svc_arg=host:drives or -i <drive_group>')
|
||||
@handle_exceptions
|
||||
@_write_cli('orchestrator osd create',
|
||||
"name=svc_arg,type=CephString,req=false",
|
||||
'Create an OSD service. Either --svc_arg=host:drives or -i <drive_group>')
|
||||
def _create_osd(self, svc_arg=None, inbuf=None):
|
||||
# type: (str, str) -> HandleCommandResult
|
||||
"""Create one or more OSDs"""
|
||||
@ -180,6 +192,7 @@ Usage:
|
||||
# Like a future or so.
|
||||
host_completion = self.get_hosts()
|
||||
self._orchestrator_wait([host_completion])
|
||||
orchestrator.raise_if_exception(host_completion)
|
||||
all_hosts = [h.name for h in host_completion.result]
|
||||
|
||||
try:
|
||||
@ -189,13 +202,13 @@ Usage:
|
||||
|
||||
completion = self.create_osds(drive_group, all_hosts)
|
||||
self._orchestrator_wait([completion])
|
||||
orchestrator.raise_if_exception(completion)
|
||||
self.log.warning(str(completion.result))
|
||||
return HandleCommandResult(stdout=str(completion.result))
|
||||
|
||||
@CLIWriteCommand('orchestrator osd rm',
|
||||
"name=svc_id,type=CephString,n=N",
|
||||
'Remove OSD services')
|
||||
@handle_exceptions
|
||||
@_write_cli('orchestrator osd rm',
|
||||
"name=svc_id,type=CephString,n=N",
|
||||
'Remove OSD services')
|
||||
def _osd_rm(self, svc_id):
|
||||
# type: (List[str]) -> HandleCommandResult
|
||||
"""
|
||||
@ -204,37 +217,36 @@ Usage:
|
||||
"""
|
||||
completion = self.remove_osds(svc_id)
|
||||
self._orchestrator_wait([completion])
|
||||
orchestrator.raise_if_exception(completion)
|
||||
return HandleCommandResult(stdout=str(completion.result))
|
||||
|
||||
def _add_stateless_svc(self, svc_type, spec):
|
||||
completion = self.add_stateless_service(svc_type, spec)
|
||||
self._orchestrator_wait([completion])
|
||||
orchestrator.raise_if_exception(completion)
|
||||
return HandleCommandResult()
|
||||
|
||||
@CLIWriteCommand('orchestrator mds add',
|
||||
"name=svc_arg,type=CephString",
|
||||
'Create an MDS service')
|
||||
@handle_exceptions
|
||||
@_write_cli('orchestrator mds add',
|
||||
"name=svc_arg,type=CephString",
|
||||
'Create an MDS service')
|
||||
def _mds_add(self, svc_arg):
|
||||
spec = orchestrator.StatelessServiceSpec()
|
||||
spec.name = svc_arg
|
||||
return self._add_stateless_svc("mds", spec)
|
||||
|
||||
@CLIWriteCommand('orchestrator rgw add',
|
||||
"name=svc_arg,type=CephString",
|
||||
'Create an RGW service')
|
||||
@handle_exceptions
|
||||
@_write_cli('orchestrator rgw add',
|
||||
"name=svc_arg,type=CephString",
|
||||
'Create an RGW service')
|
||||
def _rgw_add(self, svc_arg):
|
||||
spec = orchestrator.StatelessServiceSpec()
|
||||
spec.name = svc_arg
|
||||
return self._add_stateless_svc("rgw", spec)
|
||||
|
||||
@CLIWriteCommand('orchestrator nfs add',
|
||||
"name=svc_arg,type=CephString "
|
||||
"name=pool,type=CephString "
|
||||
"name=namespace,type=CephString,req=false",
|
||||
'Create an NFS service')
|
||||
@handle_exceptions
|
||||
@_write_cli('orchestrator nfs add',
|
||||
"name=svc_arg,type=CephString "
|
||||
"name=pool,type=CephString "
|
||||
"name=namespace,type=CephString,req=false",
|
||||
'Create an NFS service')
|
||||
def _nfs_add(self, svc_arg, pool, namespace=None):
|
||||
spec = orchestrator.StatelessServiceSpec()
|
||||
spec.name = svc_arg
|
||||
@ -246,33 +258,31 @@ Usage:
|
||||
def _rm_stateless_svc(self, svc_type, svc_id):
|
||||
completion = self.remove_stateless_service(svc_type, svc_id)
|
||||
self._orchestrator_wait([completion])
|
||||
orchestrator.raise_if_exception(completion)
|
||||
return HandleCommandResult()
|
||||
|
||||
@CLIWriteCommand('orchestrator mds rm',
|
||||
"name=svc_id,type=CephString",
|
||||
'Remove an MDS service')
|
||||
@_write_cli('orchestrator mds rm',
|
||||
"name=svc_id,type=CephString",
|
||||
'Remove an MDS service')
|
||||
def _mds_rm(self, svc_id):
|
||||
return self._rm_stateless_svc("mds", svc_id)
|
||||
|
||||
@handle_exceptions
|
||||
@CLIWriteCommand('orchestrator rgw rm',
|
||||
"name=svc_id,type=CephString",
|
||||
'Remove an RGW service')
|
||||
@_write_cli('orchestrator rgw rm',
|
||||
"name=svc_id,type=CephString",
|
||||
'Remove an RGW service')
|
||||
def _rgw_rm(self, svc_id):
|
||||
return self._rm_stateless_svc("rgw", svc_id)
|
||||
|
||||
@CLIWriteCommand('orchestrator nfs rm',
|
||||
"name=svc_id,type=CephString",
|
||||
'Remove an NFS service')
|
||||
@handle_exceptions
|
||||
@_write_cli('orchestrator nfs rm',
|
||||
"name=svc_id,type=CephString",
|
||||
'Remove an NFS service')
|
||||
def _nfs_rm(self, svc_id):
|
||||
return self._rm_stateless_svc("nfs", svc_id)
|
||||
|
||||
@CLIWriteCommand('orchestrator nfs update',
|
||||
"name=svc_id,type=CephString "
|
||||
"name=num,type=CephInt",
|
||||
'Scale an NFS service')
|
||||
@handle_exceptions
|
||||
@_write_cli('orchestrator nfs update',
|
||||
"name=svc_id,type=CephString "
|
||||
"name=num,type=CephInt",
|
||||
'Scale an NFS service')
|
||||
def _nfs_update(self, svc_id, num):
|
||||
spec = orchestrator.StatelessServiceSpec()
|
||||
spec.name = svc_id
|
||||
@ -281,33 +291,32 @@ Usage:
|
||||
self._orchestrator_wait([completion])
|
||||
return HandleCommandResult()
|
||||
|
||||
@CLIWriteCommand('orchestrator service',
|
||||
"name=action,type=CephChoices,strings=start|stop|reload "
|
||||
"name=svc_type,type=CephString "
|
||||
"name=svc_name,type=CephString",
|
||||
'Start, stop or reload an entire service (i.e. all daemons)')
|
||||
@handle_exceptions
|
||||
@_write_cli('orchestrator service',
|
||||
"name=action,type=CephChoices,strings=start|stop|reload "
|
||||
"name=svc_type,type=CephString "
|
||||
"name=svc_name,type=CephString",
|
||||
'Start, stop or reload an entire service (i.e. all daemons)')
|
||||
def _service_action(self, action, svc_type, svc_name):
|
||||
completion = self.service_action(action, svc_type, service_name=svc_name)
|
||||
self._orchestrator_wait([completion])
|
||||
orchestrator.raise_if_exception(completion)
|
||||
return HandleCommandResult()
|
||||
|
||||
@CLIWriteCommand('orchestrator service-instance',
|
||||
"name=action,type=CephChoices,strings=start|stop|reload "
|
||||
"name=svc_type,type=CephString "
|
||||
"name=svc_id,type=CephString",
|
||||
'Start, stop or reload a specific service instance')
|
||||
@handle_exceptions
|
||||
@_write_cli('orchestrator service-instance',
|
||||
"name=action,type=CephChoices,strings=start|stop|reload "
|
||||
"name=svc_type,type=CephString "
|
||||
"name=svc_id,type=CephString",
|
||||
'Start, stop or reload a specific service instance')
|
||||
def _service_instance_action(self, action, svc_type, svc_id):
|
||||
completion = self.service_action(action, svc_type, service_id=svc_id)
|
||||
self._orchestrator_wait([completion])
|
||||
orchestrator.raise_if_exception(completion)
|
||||
return HandleCommandResult()
|
||||
|
||||
@CLIWriteCommand('orchestrator mgr update',
|
||||
"name=num,type=CephInt,req=true "
|
||||
"name=hosts,type=CephString,n=N,req=false",
|
||||
'Update the number of manager instances')
|
||||
@handle_exceptions
|
||||
@_write_cli('orchestrator mgr update',
|
||||
"name=num,type=CephInt,req=true "
|
||||
"name=hosts,type=CephString,n=N,req=false",
|
||||
'Update the number of manager instances')
|
||||
def _update_mgrs(self, num, hosts=None):
|
||||
hosts = hosts if hosts is not None else []
|
||||
|
||||
@ -317,13 +326,13 @@ Usage:
|
||||
|
||||
completion = self.update_mgrs(num, hosts)
|
||||
self._orchestrator_wait([completion])
|
||||
orchestrator.raise_if_exception(completion)
|
||||
return HandleCommandResult(stdout=str(completion.result))
|
||||
|
||||
@CLIWriteCommand('orchestrator mon update',
|
||||
"name=num,type=CephInt,req=true "
|
||||
"name=hosts,type=CephString,n=N,req=false",
|
||||
'Update the number of monitor instances')
|
||||
@handle_exceptions
|
||||
@_write_cli('orchestrator mon update',
|
||||
"name=num,type=CephInt,req=true "
|
||||
"name=hosts,type=CephString,n=N,req=false",
|
||||
'Update the number of monitor instances')
|
||||
def _update_mons(self, num, hosts=None):
|
||||
hosts = hosts if hosts is not None else []
|
||||
|
||||
@ -352,12 +361,12 @@ Usage:
|
||||
|
||||
completion = self.update_mons(num, hosts)
|
||||
self._orchestrator_wait([completion])
|
||||
orchestrator.raise_if_exception(completion)
|
||||
return HandleCommandResult(stdout=str(completion.result))
|
||||
|
||||
@CLIWriteCommand('orchestrator set backend',
|
||||
"name=module_name,type=CephString,req=true",
|
||||
'Select orchestrator module backend')
|
||||
@handle_exceptions
|
||||
@_write_cli('orchestrator set backend',
|
||||
"name=module_name,type=CephString,req=true",
|
||||
'Select orchestrator module backend')
|
||||
def _set_backend(self, module_name):
|
||||
"""
|
||||
We implement a setter command instead of just having the user
|
||||
@ -403,9 +412,8 @@ Usage:
|
||||
|
||||
return HandleCommandResult(-errno.EINVAL, stderr="Module '{0}' not found".format(module_name))
|
||||
|
||||
@CLIReadCommand('orchestrator status',
|
||||
desc='Report configured backend and its status')
|
||||
@handle_exceptions
|
||||
@_read_cli('orchestrator status',
|
||||
desc='Report configured backend and its status')
|
||||
def _status(self):
|
||||
o = self._select_orchestrator()
|
||||
if o is None:
|
||||
|
@ -2,7 +2,8 @@ from __future__ import absolute_import
|
||||
import pytest
|
||||
|
||||
|
||||
from orchestrator import DriveGroupSpec, DeviceSelection, DriveGroupValidationError, InventoryDevice
|
||||
from orchestrator import DriveGroupSpec, DeviceSelection, DriveGroupValidationError, \
|
||||
InventoryDevice, ReadCompletion, raise_if_exception
|
||||
|
||||
|
||||
def test_DriveGroup():
|
||||
@ -38,3 +39,10 @@ def test_inventory_device():
|
||||
i_d = InventoryDevice()
|
||||
s = i_d.pretty_print()
|
||||
assert len(s)
|
||||
|
||||
|
||||
def test_raise():
|
||||
c = ReadCompletion()
|
||||
c.exception = ZeroDivisionError()
|
||||
with pytest.raises(ZeroDivisionError):
|
||||
raise_if_exception(c)
|
||||
|
@ -4,32 +4,27 @@ import os
|
||||
import threading
|
||||
import functools
|
||||
import uuid
|
||||
from subprocess import check_output
|
||||
from subprocess import check_output, CalledProcessError
|
||||
|
||||
from mgr_module import MgrModule
|
||||
|
||||
import orchestrator
|
||||
|
||||
|
||||
all_completions = []
|
||||
|
||||
|
||||
class TestReadCompletion(orchestrator.ReadCompletion):
|
||||
class TestCompletionMixin(object):
|
||||
all_completions = [] # Hacky global
|
||||
|
||||
def __init__(self, cb):
|
||||
super(TestReadCompletion, self).__init__()
|
||||
def __init__(self, cb, message, *args, **kwargs):
|
||||
super(TestCompletionMixin, self).__init__(*args, **kwargs)
|
||||
self.cb = cb
|
||||
self._result = None
|
||||
self._complete = False
|
||||
|
||||
self.message = "<read op>"
|
||||
|
||||
global all_completions
|
||||
all_completions.append(self)
|
||||
|
||||
def __str__(self):
|
||||
return "TestReadCompletion(result={} message={})".format(self.result, self.message)
|
||||
self.message = message
|
||||
|
||||
TestCompletionMixin.all_completions.append(self)
|
||||
|
||||
@property
|
||||
def result(self):
|
||||
@ -41,38 +36,23 @@ class TestReadCompletion(orchestrator.ReadCompletion):
|
||||
|
||||
def execute(self):
|
||||
self._result = self.cb()
|
||||
self.executed = True
|
||||
self._complete = True
|
||||
|
||||
|
||||
class TestWriteCompletion(orchestrator.WriteCompletion):
|
||||
def __init__(self, execute_cb, message):
|
||||
super(TestWriteCompletion, self).__init__()
|
||||
self.execute_cb = execute_cb
|
||||
|
||||
# Executed means I executed my API call, it may or may
|
||||
# not have succeeded
|
||||
self.executed = False
|
||||
|
||||
self._result = None
|
||||
|
||||
self.effective = False
|
||||
|
||||
self.id = str(uuid.uuid4())
|
||||
|
||||
self.message = message
|
||||
|
||||
self.error = None
|
||||
|
||||
# XXX hacky global
|
||||
global all_completions
|
||||
all_completions.append(self)
|
||||
|
||||
def __str__(self):
|
||||
return "TestWriteCompletion(executed={} result={} id={} message={} error={})".format(self.executed, self._result, self.id, self.message, self.error)
|
||||
return "{}(result={} message={}, exception={})".format(self.__class__.__name__, self.result,
|
||||
self.message, self.exception)
|
||||
|
||||
@property
|
||||
def result(self):
|
||||
return self._result
|
||||
|
||||
class TestReadCompletion(TestCompletionMixin, orchestrator.ReadCompletion):
|
||||
def __init__(self, cb):
|
||||
super(TestReadCompletion, self).__init__(cb, "<read op>")
|
||||
|
||||
|
||||
class TestWriteCompletion(TestCompletionMixin, orchestrator.WriteCompletion):
|
||||
def __init__(self, cb, message):
|
||||
super(TestWriteCompletion, self).__init__(cb, message)
|
||||
self.id = str(uuid.uuid4())
|
||||
|
||||
@property
|
||||
def is_persistent(self):
|
||||
@ -80,17 +60,7 @@ class TestWriteCompletion(orchestrator.WriteCompletion):
|
||||
|
||||
@property
|
||||
def is_effective(self):
|
||||
return self.effective
|
||||
|
||||
@property
|
||||
def is_errored(self):
|
||||
return self.error is not None
|
||||
|
||||
def execute(self):
|
||||
if not self.executed:
|
||||
self._result = self.execute_cb()
|
||||
self.executed = True
|
||||
self.effective = True
|
||||
return self._complete
|
||||
|
||||
|
||||
def deferred_write(message):
|
||||
@ -160,7 +130,7 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
|
||||
self.log.exception("Completion {0} threw an exception:".format(
|
||||
c.message
|
||||
))
|
||||
c.error = e
|
||||
c.exception = e
|
||||
c._complete = True
|
||||
if not c.is_read:
|
||||
self._progress("complete", c.id)
|
||||
@ -195,9 +165,9 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
|
||||
# in case we had a caller that wait()'ed on them long enough
|
||||
# to get persistence but not long enough to get completion
|
||||
|
||||
global all_completions
|
||||
self.wait(all_completions)
|
||||
all_completions = [c for c in all_completions if not c.is_complete]
|
||||
self.wait(TestCompletionMixin.all_completions)
|
||||
TestCompletionMixin.all_completions = [c for c in TestCompletionMixin.all_completions if
|
||||
not c.is_complete]
|
||||
|
||||
self._shutdown.wait(5)
|
||||
|
||||
@ -214,8 +184,11 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
|
||||
cmd = """
|
||||
. {tmpdir}/ceph-volume-virtualenv/bin/activate
|
||||
ceph-volume inventory --format json
|
||||
""".format(tmpdir=os.environ.get('TMPDIR', '/tmp'))
|
||||
c_v_out = check_output(cmd, shell=True)
|
||||
"""
|
||||
try:
|
||||
c_v_out = check_output(cmd.format(tmpdir=os.environ.get('TMPDIR', '/tmp')), shell=True)
|
||||
except (OSError, CalledProcessError):
|
||||
c_v_out = check_output(cmd.format(tmpdir='.'),shell=True)
|
||||
|
||||
for out in c_v_out.splitlines():
|
||||
if not out.startswith(b'-->') and not out.startswith(b' stderr'):
|
||||
@ -280,6 +253,16 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
|
||||
|
||||
@deferred_write("add_host")
|
||||
def add_host(self, host):
|
||||
if host == 'raise_no_support':
|
||||
raise orchestrator.OrchestratorValidationError("MON count must be either 1, 3 or 5")
|
||||
if host == 'raise_bug':
|
||||
raise ZeroDivisionError()
|
||||
if host == 'raise_not_implemented':
|
||||
raise NotImplementedError()
|
||||
if host == 'raise_no_orchestrator':
|
||||
raise orchestrator.NoOrchestrator()
|
||||
if host == 'raise_import_error':
|
||||
raise ImportError("test_orchestrator not enabled")
|
||||
assert isinstance(host, str)
|
||||
|
||||
@deferred_write("remove_host")
|
||||
|
@ -168,7 +168,8 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
|
||||
try:
|
||||
completion = self.add_stateless_service("mds", spec)
|
||||
self._orchestrator_wait([completion])
|
||||
except (ImportError, orchestrator.NoOrchestrator):
|
||||
orchestrator.raise_if_exception(completion)
|
||||
except (ImportError, orchestrator.OrchestratorError):
|
||||
return 0, "", "Volume created successfully (no MDS daemons created)"
|
||||
except Exception as e:
|
||||
# Don't let detailed orchestrator exceptions (python backtraces)
|
||||
@ -249,8 +250,9 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
|
||||
try:
|
||||
completion = self.remove_stateless_service("mds", vol_name)
|
||||
self._orchestrator_wait([completion])
|
||||
except (ImportError, orchestrator.NoOrchestrator):
|
||||
self.log.warning("No orchestrator, not tearing down MDS daemons")
|
||||
orchestrator.raise_if_exception(completion)
|
||||
except (ImportError, orchestrator.OrchestratorError):
|
||||
self.log.warning("OrchestratorError, not tearing down MDS daemons")
|
||||
except Exception as e:
|
||||
# Don't let detailed orchestrator exceptions (python backtraces)
|
||||
# bubble out to the user
|
||||
|
Loading…
Reference in New Issue
Block a user