diff --git a/src/pybind/mgr/orchestrator/__init__.py b/src/pybind/mgr/orchestrator/__init__.py index 374d68d3256..3842a84a3c3 100644 --- a/src/pybind/mgr/orchestrator/__init__.py +++ b/src/pybind/mgr/orchestrator/__init__.py @@ -4,7 +4,7 @@ from .module import OrchestratorCli # usage: E.g. `from orchestrator import StatelessServiceSpec` from ._interface import \ - Completion, TrivialReadCompletion, raise_if_exception, ProgressReference, pretty_print, _Promise, \ + OrchResult, raise_if_exception, \ CLICommand, _cli_write_command, _cli_read_command, CLICommandMeta, \ Orchestrator, OrchestratorClientMixin, \ OrchestratorValidationError, OrchestratorError, NoOrchestrator, \ diff --git a/src/pybind/mgr/orchestrator/_interface.py b/src/pybind/mgr/orchestrator/_interface.py index 798f70f83c3..353b8e77651 100644 --- a/src/pybind/mgr/orchestrator/_interface.py +++ b/src/pybind/mgr/orchestrator/_interface.py @@ -12,12 +12,10 @@ import errno import logging import pickle import re -import time -import uuid from collections import namedtuple, OrderedDict from contextlib import contextmanager -from functools import wraps +from functools import wraps, reduce from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \ Sequence, Dict, cast @@ -115,6 +113,22 @@ def handle_exception(prefix: str, perm: str, func: FuncT) -> FuncT: return cast(FuncT, wrapper_copy) +def handle_orch_error(f: Callable[..., T]) -> Callable[..., 'OrchResult[T]']: + """ + Decorator to make Orchestrator methods return + an OrchResult. + """ + + @wraps(f) + def wrapper(*args: Any, **kwargs: Any) -> OrchResult[T]: + try: + return OrchResult(f(*args, **kwargs)) + except Exception as e: + return OrchResult(None, exception=e) + + return cast(Callable[..., OrchResult[T]], wrapper) + + class InnerCliCommandCallable(Protocol): def __call__(self, prefix: str) -> Callable[[FuncT], FuncT]: ... @@ -156,57 +170,30 @@ class CLICommandMeta(type): cls.handle_command = handle_command -def _no_result() -> None: - return object() # type: ignore - - -class _Promise(object): +class OrchResult(Generic[T]): """ - A completion may need multiple promises to be fulfilled. `_Promise` is one - step. - - Typically ``Orchestrator`` implementations inherit from this class to - build their own way of finishing a step to fulfil a future. - - They are not exposed in the orchestrator interface and can be seen as a - helper to build orchestrator modules. + Stores a result and an exception. Mainly to circumvent the + MgrModule.remote() method that hides all exceptions and for + handling different sub-interpreters. """ - INITIALIZED = 1 # We have a parent completion and a next completion - RUNNING = 2 - FINISHED = 3 # we have a final result - NO_RESULT: None = _no_result() # type: ignore - ASYNC_RESULT = object() + def __init__(self, result: Optional[T], exception: Optional[Exception] = None) -> None: + self.result = result + self.serialized_exception: Optional[bytes] = None + self.exception_str: str = '' + self.set_exception(exception) - def __init__(self, - _first_promise: Optional["_Promise"] = None, - value: Optional[Any] = NO_RESULT, - on_complete: Optional[Callable] = None, - name: Optional[str] = None, - ): - self._on_complete_ = on_complete - self._name = name - self._next_promise: Optional[_Promise] = None + __slots__ = 'result', 'serialized_exception', 'exception_str' - self._state = self.INITIALIZED - self._exception: Optional[Exception] = None + def set_exception(self, e: Optional[Exception]) -> None: + if e is None: + self.serialized_exception = None + self.exception_str = '' + return - # Value of this _Promise. may be an intermediate result. - self._value = value - - # _Promise is not a continuation monad, as `_result` is of type - # T instead of (T -> r) -> r. Therefore we need to store the first promise here. - self._first_promise: '_Promise' = _first_promise or self - - @property - def _exception(self) -> Optional[Exception]: - return getattr(self, '_exception_', None) - - @_exception.setter - def _exception(self, e: Exception) -> None: - self._exception_ = e + self.exception_str = f'{type(e)}: {str(e)}' try: - self._serialized_exception_ = pickle.dumps(e) if e is not None else None + self.serialized_exception = pickle.dumps(e) except pickle.PicklingError: logger.error(f"failed to pickle {e}") if isinstance(e, Exception): @@ -214,365 +201,7 @@ class _Promise(object): else: e = Exception(str(e)) # degenerate to a plain Exception - self._serialized_exception_ = pickle.dumps(e) - - @property - def _serialized_exception(self) -> Optional[bytes]: - return getattr(self, '_serialized_exception_', None) - - @property - def _on_complete(self) -> Optional[Callable]: - # https://github.com/python/mypy/issues/4125 - return self._on_complete_ - - @_on_complete.setter - def _on_complete(self, val: Optional[Callable]) -> None: - self._on_complete_ = val - - def __repr__(self) -> str: - name = self._name or getattr(self._on_complete, '__name__', - '??') if self._on_complete else 'None' - val = repr(self._value) if self._value is not self.NO_RESULT else 'NA' - return '{}(_s={}, val={}, _on_c={}, id={}, name={}, pr={}, _next={})'.format( - self.__class__, self._state, val, self._on_complete, id(self), name, getattr( - next, '_progress_reference', 'NA'), repr(self._next_promise) - ) - - def pretty_print_1(self) -> str: - if self._name: - name = self._name - elif self._on_complete is None: - name = 'lambda x: x' - elif hasattr(self._on_complete, '__name__'): - name = getattr(self._on_complete, '__name__') - else: - name = self._on_complete.__class__.__name__ - val = repr(self._value) if self._value not in (self.NO_RESULT, self.ASYNC_RESULT) else '...' - prefix = { - self.INITIALIZED: ' ', - self.RUNNING: ' >>>', # noqa: E241 - self.FINISHED: '(done)', # noqa: E241 - }[self._state] - return '{} {}({}),'.format(prefix, name, val) - - def then(self: Any, on_complete: Callable) -> Any: - """ - Call ``on_complete`` as soon as this promise is finalized. - """ - assert self._state in (self.INITIALIZED, self.RUNNING) - - if self._next_promise is not None: - return self._next_promise.then(on_complete) - - if self._on_complete is not None: - self._set_next_promise(self.__class__( - _first_promise=self._first_promise, - on_complete=on_complete - )) - return self._next_promise - - else: - self._on_complete = on_complete - self._set_next_promise(self.__class__(_first_promise=self._first_promise)) - return self._next_promise - - def _set_next_promise(self, next: '_Promise') -> None: - assert self is not next - assert self._state in (self.INITIALIZED, self.RUNNING) - - self._next_promise = next - assert self._next_promise is not None - for p in iter(self._next_promise): - p._first_promise = self._first_promise - - def _finalize(self, value: Optional[T] = NO_RESULT) -> None: - """ - Sets this promise to complete. - - Orchestrators may choose to use this helper function. - - :param value: new value. - """ - if self._state not in (self.INITIALIZED, self.RUNNING): - raise ValueError('finalize: {} already finished. {}'.format(repr(self), value)) - - self._state = self.RUNNING - - if value is not self.NO_RESULT: - self._value = value - assert self._value is not self.NO_RESULT, repr(self) - - if self._on_complete: - try: - next_result = self._on_complete(self._value) - except Exception as e: - self.fail(e) - return - else: - next_result = self._value - - if isinstance(next_result, _Promise): - # hack: _Promise is not a continuation monad. - next_result = next_result._first_promise # type: ignore - assert next_result not in self, repr(self._first_promise) + repr(next_result) - assert self not in next_result - next_result._append_promise(self._next_promise) - self._set_next_promise(next_result) - assert self._next_promise - if self._next_promise._value is self.NO_RESULT: - self._next_promise._value = self._value - self.propagate_to_next() - elif next_result is not self.ASYNC_RESULT: - # simple map. simply forward - if self._next_promise: - self._next_promise._value = next_result - else: - # Hack: next_result is of type U, _value is of type T - self._value = next_result # type: ignore - self.propagate_to_next() - else: - # asynchronous promise - pass - - def propagate_to_next(self) -> None: - self._state = self.FINISHED - logger.debug('finalized {}'.format(repr(self))) - if self._next_promise: - self._next_promise._finalize() - - def fail(self, e: Exception) -> None: - """ - Sets the whole completion to be faild with this exception and end the - evaluation. - """ - if self._state == self.FINISHED: - raise ValueError( - 'Invalid State: called fail, but Completion is already finished: {}'.format(str(e))) - assert self._state in (self.INITIALIZED, self.RUNNING) - logger.exception('_Promise failed') - self._exception = e - self._value = f'_exception: {e}' - if self._next_promise: - self._next_promise.fail(e) - self._state = self.FINISHED - - def __contains__(self, item: '_Promise') -> bool: - return any(item is p for p in iter(self._first_promise)) - - def __iter__(self) -> Iterator['_Promise']: - yield self - elem = self._next_promise - while elem is not None: - yield elem - elem = elem._next_promise - - def _append_promise(self, other: Optional['_Promise']) -> None: - if other is not None: - assert self not in other - assert other not in self - self._last_promise()._set_next_promise(other) - - def _last_promise(self) -> '_Promise': - return list(iter(self))[-1] - - -class ProgressReference(object): - def __init__(self, - message: str, - mgr: Any, - completion: Optional[Callable[[], 'Completion']] = None - ): - """ - ProgressReference can be used within Completions:: - - +---------------+ +---------------------------------+ - | | then | | - | My Completion | +--> | on_complete=ProgressReference() | - | | | | - +---------------+ +---------------------------------+ - - See :func:`Completion.with_progress` for an easy way to create - a progress reference - - """ - super(ProgressReference, self).__init__() - self.progress_id = str(uuid.uuid4()) - self.message = message - self.mgr = mgr - - #: The completion can already have a result, before the write - #: operation is effective. progress == 1 means, the services are - #: created / removed. - self.completion: Optional[Callable[[], Completion]] = completion - - #: if a orchestrator module can provide a more detailed - #: progress information, it needs to also call ``progress.update()``. - self.progress = 0.0 - - self._completion_has_result = False - self.mgr.all_progress_references.append(self) - - def __str__(self) -> str: - """ - ``__str__()`` is used for determining the message for progress events. - """ - return self.message or super(ProgressReference, self).__str__() - - def __call__(self, arg: T) -> T: - self._completion_has_result = True - self.progress = 1.0 - return arg - - @property - def progress(self) -> float: - return self._progress - - @progress.setter - def progress(self, progress: float) -> None: - assert progress <= 1.0 - self._progress = progress - try: - if self.effective: - self.mgr.remote("progress", "complete", self.progress_id) - self.mgr.all_progress_references = [ - p for p in self.mgr.all_progress_references if p is not self] - else: - self.mgr.remote("progress", "update", self.progress_id, self.message, - progress, - [("origin", "orchestrator")]) - except ImportError: - # If the progress module is disabled that's fine, - # they just won't see the output. - pass - - @property - def effective(self) -> bool: - return self.progress == 1 and self._completion_has_result - - def update(self) -> None: - def progress_run(progress: float) -> None: - self.progress = progress - if self.completion: - c = self.completion().then(progress_run) - self.mgr.process([c._first_promise]) - else: - self.progress = 1 - - def fail(self) -> None: - self._completion_has_result = True - self.progress = 1 - - -class Completion(_Promise, Generic[T]): - """ - Combines multiple promises into one overall operation. - - Completions are composable by being able to - call one completion from another completion. I.e. making them re-usable - using Promises E.g.:: - - >>> #doctest: +SKIP - ... return Orchestrator().get_hosts().then(self._create_osd) - - where ``get_hosts`` returns a Completion of list of hosts and - ``_create_osd`` takes a list of hosts. - - The concept behind this is to store the computation steps - explicit and then explicitly evaluate the chain: - - >>> #doctest: +SKIP - ... p = Completion(on_complete=lambda x: x*2).then(on_complete=lambda x: str(x)) - ... p.finalize(2) - ... assert p.result = "4" - - or graphically:: - - +---------------+ +-----------------+ - | | then | | - | lambda x: x*x | +--> | lambda x: str(x)| - | | | | - +---------------+ +-----------------+ - - """ - - def __init__(self, - _first_promise: Optional["Completion"] = None, - value: Any = _Promise.NO_RESULT, - on_complete: Optional[Callable] = None, - name: Optional[str] = None, - ) -> None: - super(Completion, self).__init__(_first_promise, value, on_complete, name) - - @property - def _progress_reference(self) -> Optional[ProgressReference]: - if hasattr(self._on_complete, 'progress_id'): - return self._on_complete # type: ignore - return None - - @property - def progress_reference(self) -> Optional[ProgressReference]: - """ - ProgressReference. Marks this completion - as a write completeion. - """ - - references = [c._progress_reference for c in iter( # type: ignore - self) if c._progress_reference is not None] # type: ignore - if references: - assert len(references) == 1 - return references[0] - return None - - @classmethod - def with_progress(cls: Any, - message: str, - mgr: Any, - _first_promise: Optional["Completion"] = None, - value: Any = _Promise.NO_RESULT, - on_complete: Optional[Callable] = None, - calc_percent: Optional[Callable[[], Any]] = None - ) -> Any: - - c = cls( - _first_promise=_first_promise, - value=value, - on_complete=on_complete - ).add_progress(message, mgr, calc_percent) - - return c._first_promise - - def add_progress(self, - message: str, - mgr: Any, - calc_percent: Optional[Callable[[], Any]] = None - ) -> Any: - return self.then( - on_complete=ProgressReference( - message=message, - mgr=mgr, - completion=calc_percent - ) - ) - - def fail(self, e: Exception) -> None: - super(Completion, self).fail(e) - if self._progress_reference: - self._progress_reference.fail() - - def finalize(self, result: Union[None, object, T] = _Promise.NO_RESULT) -> None: - if self._first_promise._state == self.INITIALIZED: - self._first_promise._finalize(result) - - @property - def result(self) -> T: - """ - The result of the operation that we were waited - for. Only valid after calling Orchestrator.process() on this - completion. - """ - last = self._last_promise() - assert last._state == _Promise.FINISHED - return cast(T, last._value) + self.serialized_exception = pickle.dumps(e) def result_str(self) -> str: """Force a string.""" @@ -582,88 +211,19 @@ class Completion(_Promise, Generic[T]): return '\n'.join(str(x) for x in self.result) return str(self.result) - @property - def exception(self) -> Optional[Exception]: - return self._last_promise()._exception - @property - def serialized_exception(self) -> Optional[bytes]: - return self._last_promise()._serialized_exception - - @property - def has_result(self) -> bool: - """ - Has the operation already a result? - - For Write operations, it can already have a - result, if the orchestrator's configuration is - persistently written. Typically this would - indicate that an update had been written to - a manifest, but that the update had not - necessarily been pushed out to the cluster. - - :return: - """ - return self._last_promise()._state == _Promise.FINISHED - - @property - def is_errored(self) -> bool: - """ - Has the completion failed. Default implementation looks for - self.exception. Can be overwritten. - """ - return self.exception is not None - - @property - def needs_result(self) -> bool: - """ - Could the external operation be deemed as complete, - or should we wait? - We must wait for a read operation only if it is not complete. - """ - return not self.is_errored and not self.has_result - - @property - def is_finished(self) -> bool: - """ - Could the external operation be deemed as complete, - or should we wait? - We must wait for a read operation only if it is not complete. - """ - return self.is_errored or (self.has_result) - - def pretty_print(self) -> str: - - reprs = '\n'.join(p.pretty_print_1() for p in iter(self._first_promise)) - return """<{}>[\n{}\n]""".format(self.__class__.__name__, reprs) - - -def pretty_print(completions: Sequence[Completion]) -> str: - return ', '.join(c.pretty_print() for c in completions) - - -def raise_if_exception(c: Completion) -> None: +def raise_if_exception(c: OrchResult[T]) -> T: """ - :raises OrchestratorError: Some user error or a config error. - :raises Exception: Some internal error + Due to different sub-interpreters, this MUST not be in the `OrchResult` class. """ if c.serialized_exception is not None: try: e = pickle.loads(c.serialized_exception) except (KeyError, AttributeError): - raise Exception('{}: {}'.format(type(c.exception), c.exception)) + raise Exception(c.exception_str) raise e - - -class TrivialReadCompletion(Completion[T]): - """ - This is the trivial completion simply wrapping a result. - """ - - def __init__(self, result: T): - super(TrivialReadCompletion, self).__init__() - if result: - self.finalize(result) + assert c.result is not None, 'OrchResult should either have an exception or a result' + return c.result def _hide_in_features(f: FuncT) -> FuncT: @@ -732,21 +292,6 @@ class Orchestrator(object): """ raise NotImplementedError() - @_hide_in_features - def process(self, completions: List[Completion]) -> None: - """ - Given a list of Completion instances, process any which are - incomplete. - - Callers should inspect the detail of each completion to identify - partial completion/progress information, and present that information - to the user. - - This method should not block, as this would make it slow to query - a status, while other long running operations are in progress. - """ - raise NotImplementedError() - @_hide_in_features def get_feature_set(self) -> Dict[str, dict]: """Describes which methods this orchestrator implements @@ -790,7 +335,7 @@ class Orchestrator(object): def resume(self) -> None: raise NotImplementedError() - def add_host(self, host_spec: HostSpec) -> Completion[str]: + def add_host(self, host_spec: HostSpec) -> OrchResult[str]: """ Add a host to the orchestrator inventory. @@ -798,7 +343,7 @@ class Orchestrator(object): """ raise NotImplementedError() - def remove_host(self, host: str) -> Completion[str]: + def remove_host(self, host: str) -> OrchResult[str]: """ Remove a host from the orchestrator inventory. @@ -806,7 +351,7 @@ class Orchestrator(object): """ raise NotImplementedError() - def update_host_addr(self, host: str, addr: str) -> Completion[str]: + def update_host_addr(self, host: str, addr: str) -> OrchResult[str]: """ Update a host's address @@ -815,7 +360,7 @@ class Orchestrator(object): """ raise NotImplementedError() - def get_hosts(self) -> Completion[List[HostSpec]]: + def get_hosts(self) -> OrchResult[List[HostSpec]]: """ Report the hosts in the cluster. @@ -823,19 +368,19 @@ class Orchestrator(object): """ raise NotImplementedError() - def add_host_label(self, host: str, label: str) -> Completion[str]: + def add_host_label(self, host: str, label: str) -> OrchResult[str]: """ Add a host label """ raise NotImplementedError() - def remove_host_label(self, host: str, label: str) -> Completion[str]: + def remove_host_label(self, host: str, label: str) -> OrchResult[str]: """ Remove a host label """ raise NotImplementedError() - def host_ok_to_stop(self, hostname: str) -> Completion: + def host_ok_to_stop(self, hostname: str) -> OrchResult: """ Check if the specified host can be safely stopped without reducing availability @@ -843,19 +388,19 @@ class Orchestrator(object): """ raise NotImplementedError() - def enter_host_maintenance(self, hostname: str, force: bool = False) -> Completion: + def enter_host_maintenance(self, hostname: str, force: bool = False) -> OrchResult: """ Place a host in maintenance, stopping daemons and disabling it's systemd target """ raise NotImplementedError() - def exit_host_maintenance(self, hostname: str) -> Completion: + def exit_host_maintenance(self, hostname: str) -> OrchResult: """ Return a host from maintenance, restarting the clusters systemd target """ raise NotImplementedError() - def get_inventory(self, host_filter: Optional['InventoryFilter'] = None, refresh: bool = False) -> Completion[List['InventoryHost']]: + def get_inventory(self, host_filter: Optional['InventoryFilter'] = None, refresh: bool = False) -> OrchResult[List['InventoryHost']]: """ Returns something that was created by `ceph-volume inventory`. @@ -863,7 +408,7 @@ class Orchestrator(object): """ raise NotImplementedError() - def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> Completion[List['ServiceDescription']]: + def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> OrchResult[List['ServiceDescription']]: """ Describe a service (of any kind) that is already configured in the orchestrator. For example, when viewing an OSD in the dashboard @@ -877,7 +422,7 @@ class Orchestrator(object): """ raise NotImplementedError() - def list_daemons(self, service_name: Optional[str] = None, daemon_type: Optional[str] = None, daemon_id: Optional[str] = None, host: Optional[str] = None, refresh: bool = False) -> Completion[List['DaemonDescription']]: + def list_daemons(self, service_name: Optional[str] = None, daemon_type: Optional[str] = None, daemon_id: Optional[str] = None, host: Optional[str] = None, refresh: bool = False) -> OrchResult[List['DaemonDescription']]: """ Describe a daemon (of any kind) that is already configured in the orchestrator. @@ -886,11 +431,12 @@ class Orchestrator(object): """ raise NotImplementedError() - def apply(self, specs: Sequence["GenericSpec"]) -> Completion[List[str]]: + @handle_orch_error + def apply(self, specs: Sequence["GenericSpec"]) -> List[str]: """ Applies any spec """ - fns: Dict[str, Callable] = { + fns: Dict[str, Callable[..., OrchResult[str]]] = { 'alertmanager': self.apply_alertmanager, 'crash': self.apply_crash, 'grafana': self.apply_grafana, @@ -900,7 +446,7 @@ class Orchestrator(object): 'mon': self.apply_mon, 'nfs': self.apply_nfs, 'node-exporter': self.apply_node_exporter, - 'osd': lambda dg: self.apply_drivegroups([dg]), + 'osd': lambda dg: self.apply_drivegroups([dg]), # type: ignore 'prometheus': self.apply_prometheus, 'rbd-mirror': self.apply_rbd_mirror, 'rgw': self.apply_rgw, @@ -909,29 +455,20 @@ class Orchestrator(object): 'cephadm-exporter': self.apply_cephadm_exporter, } - def merge(ls: Union[List[T], T], r: Union[List[T], T]) -> List[T]: - if isinstance(ls, list): - return ls + [r] # type: ignore - return [ls, r] # type: ignore + def merge(l: OrchResult[List[str]], r: OrchResult[str]) -> OrchResult[List[str]]: # noqa: E741 + l_res = raise_if_exception(l) + r_res = raise_if_exception(r) + l_res.append(r_res) + return OrchResult(l_res) + return raise_if_exception(reduce(merge, [fns[spec.service_type](spec) for spec in specs], OrchResult([]))) - spec, *specs = specs - - fn = cast(Callable[["GenericSpec"], Completion], fns[spec.service_type]) - completion = fn(spec) - for s in specs: - def next(ls: list) -> Any: - fn = cast(Callable[["GenericSpec"], Completion], fns[spec.service_type]) - return fn(s).then(lambda r: merge(ls, r)) - completion = completion.then(next) - return completion - - def plan(self, spec: Sequence["GenericSpec"]) -> Completion[List]: + def plan(self, spec: Sequence["GenericSpec"]) -> OrchResult[List]: """ Plan (Dry-run, Preview) a List of Specs. """ raise NotImplementedError() - def remove_daemons(self, names: List[str]) -> Completion[List[str]]: + def remove_daemons(self, names: List[str]) -> OrchResult[List[str]]: """ Remove specific daemon(s). @@ -939,7 +476,7 @@ class Orchestrator(object): """ raise NotImplementedError() - def remove_service(self, service_name: str) -> Completion[str]: + def remove_service(self, service_name: str) -> OrchResult[str]: """ Remove a service (a collection of daemons). @@ -947,7 +484,7 @@ class Orchestrator(object): """ raise NotImplementedError() - def service_action(self, action: str, service_name: str) -> Completion[List[str]]: + def service_action(self, action: str, service_name: str) -> OrchResult[List[str]]: """ Perform an action (start/stop/reload) on a service (i.e., all daemons providing the logical service). @@ -955,24 +492,24 @@ class Orchestrator(object): :param action: one of "start", "stop", "restart", "redeploy", "reconfig" :param service_name: service_type + '.' + service_id (e.g. "mon", "mgr", "mds.mycephfs", "rgw.realm.zone", ...) - :rtype: Completion + :rtype: OrchResult """ # assert action in ["start", "stop", "reload, "restart", "redeploy"] raise NotImplementedError() - def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> Completion[str]: + def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> OrchResult[str]: """ Perform an action (start/stop/reload) on a daemon. :param action: one of "start", "stop", "restart", "redeploy", "reconfig" :param daemon_name: name of daemon :param image: Container image when redeploying that daemon - :rtype: Completion + :rtype: OrchResult """ # assert action in ["start", "stop", "reload, "restart", "redeploy"] raise NotImplementedError() - def create_osds(self, drive_group: DriveGroupSpec) -> Completion[str]: + def create_osds(self, drive_group: DriveGroupSpec) -> OrchResult[str]: """ Create one or more OSDs within a single Drive Group. @@ -983,7 +520,7 @@ class Orchestrator(object): """ raise NotImplementedError() - def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> Completion[List[str]]: + def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> OrchResult[List[str]]: """ Update OSD cluster """ raise NotImplementedError() @@ -997,13 +534,13 @@ class Orchestrator(object): def preview_osdspecs(self, osdspec_name: Optional[str] = 'osd', osdspecs: Optional[List[DriveGroupSpec]] = None - ) -> Completion[str]: + ) -> OrchResult[str]: """ Get a preview for OSD deployments """ raise NotImplementedError() def remove_osds(self, osd_ids: List[str], replace: bool = False, - force: bool = False) -> Completion[str]: + force: bool = False) -> OrchResult[str]: """ :param osd_ids: list of OSD IDs :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace` @@ -1013,19 +550,19 @@ class Orchestrator(object): """ raise NotImplementedError() - def stop_remove_osds(self, osd_ids: List[str]) -> Completion: + def stop_remove_osds(self, osd_ids: List[str]) -> OrchResult: """ TODO """ raise NotImplementedError() - def remove_osds_status(self) -> Completion: + def remove_osds_status(self) -> OrchResult: """ Returns a status of the ongoing OSD removal operations. """ raise NotImplementedError() - def blink_device_light(self, ident_fault: str, on: bool, locations: List['DeviceLightLoc']) -> Completion[List[str]]: + def blink_device_light(self, ident_fault: str, on: bool, locations: List['DeviceLightLoc']) -> OrchResult[List[str]]: """ Instructs the orchestrator to enable or disable either the ident or the fault LED. @@ -1035,134 +572,134 @@ class Orchestrator(object): """ raise NotImplementedError() - def zap_device(self, host: str, path: str) -> Completion[str]: + def zap_device(self, host: str, path: str) -> OrchResult[str]: """Zap/Erase a device (DESTROYS DATA)""" raise NotImplementedError() - def add_mon(self, spec: ServiceSpec) -> Completion[List[str]]: + def add_mon(self, spec: ServiceSpec) -> OrchResult[List[str]]: """Create mon daemon(s)""" raise NotImplementedError() - def apply_mon(self, spec: ServiceSpec) -> Completion[str]: + def apply_mon(self, spec: ServiceSpec) -> OrchResult[str]: """Update mon cluster""" raise NotImplementedError() - def add_mgr(self, spec: ServiceSpec) -> Completion[List[str]]: + def add_mgr(self, spec: ServiceSpec) -> OrchResult[List[str]]: """Create mgr daemon(s)""" raise NotImplementedError() - def apply_mgr(self, spec: ServiceSpec) -> Completion[str]: + def apply_mgr(self, spec: ServiceSpec) -> OrchResult[str]: """Update mgr cluster""" raise NotImplementedError() - def add_mds(self, spec: ServiceSpec) -> Completion[List[str]]: + def add_mds(self, spec: ServiceSpec) -> OrchResult[List[str]]: """Create MDS daemon(s)""" raise NotImplementedError() - def apply_mds(self, spec: ServiceSpec) -> Completion[str]: + def apply_mds(self, spec: ServiceSpec) -> OrchResult[str]: """Update MDS cluster""" raise NotImplementedError() - def add_rgw(self, spec: RGWSpec) -> Completion[List[str]]: + def add_rgw(self, spec: RGWSpec) -> OrchResult[List[str]]: """Create RGW daemon(s)""" raise NotImplementedError() - def apply_rgw(self, spec: RGWSpec) -> Completion[str]: + def apply_rgw(self, spec: RGWSpec) -> OrchResult[str]: """Update RGW cluster""" raise NotImplementedError() - def apply_ha_rgw(self, spec: HA_RGWSpec) -> Completion[str]: + def apply_ha_rgw(self, spec: HA_RGWSpec) -> OrchResult[str]: """Update ha-rgw daemons""" raise NotImplementedError() - def add_rbd_mirror(self, spec: ServiceSpec) -> Completion[List[str]]: + def add_rbd_mirror(self, spec: ServiceSpec) -> OrchResult[List[str]]: """Create rbd-mirror daemon(s)""" raise NotImplementedError() - def apply_rbd_mirror(self, spec: ServiceSpec) -> Completion[str]: + def apply_rbd_mirror(self, spec: ServiceSpec) -> OrchResult[str]: """Update rbd-mirror cluster""" raise NotImplementedError() - def add_nfs(self, spec: NFSServiceSpec) -> Completion[List[str]]: + def add_nfs(self, spec: NFSServiceSpec) -> OrchResult[List[str]]: """Create NFS daemon(s)""" raise NotImplementedError() - def apply_nfs(self, spec: NFSServiceSpec) -> Completion[str]: + def apply_nfs(self, spec: NFSServiceSpec) -> OrchResult[str]: """Update NFS cluster""" raise NotImplementedError() - def add_iscsi(self, spec: IscsiServiceSpec) -> Completion[List[str]]: + def add_iscsi(self, spec: IscsiServiceSpec) -> OrchResult[List[str]]: """Create iscsi daemon(s)""" raise NotImplementedError() - def apply_iscsi(self, spec: IscsiServiceSpec) -> Completion[str]: + def apply_iscsi(self, spec: IscsiServiceSpec) -> OrchResult[str]: """Update iscsi cluster""" raise NotImplementedError() - def add_prometheus(self, spec: ServiceSpec) -> Completion[List[str]]: + def add_prometheus(self, spec: ServiceSpec) -> OrchResult[List[str]]: """Create new prometheus daemon""" raise NotImplementedError() - def apply_prometheus(self, spec: ServiceSpec) -> Completion[str]: + def apply_prometheus(self, spec: ServiceSpec) -> OrchResult[str]: """Update prometheus cluster""" raise NotImplementedError() - def add_node_exporter(self, spec: ServiceSpec) -> Completion[List[str]]: + def add_node_exporter(self, spec: ServiceSpec) -> OrchResult[List[str]]: """Create a new Node-Exporter service""" raise NotImplementedError() - def apply_node_exporter(self, spec: ServiceSpec) -> Completion[str]: + def apply_node_exporter(self, spec: ServiceSpec) -> OrchResult[str]: """Update existing a Node-Exporter daemon(s)""" raise NotImplementedError() - def add_crash(self, spec: ServiceSpec) -> Completion[List[str]]: + def add_crash(self, spec: ServiceSpec) -> OrchResult[List[str]]: """Create a new crash service""" raise NotImplementedError() - def apply_crash(self, spec: ServiceSpec) -> Completion[str]: + def apply_crash(self, spec: ServiceSpec) -> OrchResult[str]: """Update existing a crash daemon(s)""" raise NotImplementedError() - def add_grafana(self, spec: ServiceSpec) -> Completion[List[str]]: + def add_grafana(self, spec: ServiceSpec) -> OrchResult[List[str]]: """Create a new grafana service""" raise NotImplementedError() - def apply_grafana(self, spec: ServiceSpec) -> Completion[str]: + def apply_grafana(self, spec: ServiceSpec) -> OrchResult[str]: """Update existing a grafana service""" raise NotImplementedError() - def add_alertmanager(self, spec: ServiceSpec) -> Completion[List[str]]: + def add_alertmanager(self, spec: ServiceSpec) -> OrchResult[List[str]]: """Create a new AlertManager service""" raise NotImplementedError() - def apply_alertmanager(self, spec: ServiceSpec) -> Completion[str]: + def apply_alertmanager(self, spec: ServiceSpec) -> OrchResult[str]: """Update an existing AlertManager daemon(s)""" raise NotImplementedError() - def add_cephadm_exporter(self, spec: ServiceSpec) -> Completion[List[str]]: + def add_cephadm_exporter(self, spec: ServiceSpec) -> OrchResult[List[str]]: """Create a new cephadm exporter daemon""" raise NotImplementedError() - def apply_cephadm_exporter(self, spec: ServiceSpec) -> Completion[str]: + def apply_cephadm_exporter(self, spec: ServiceSpec) -> OrchResult[str]: """Update an existing cephadm exporter daemon""" raise NotImplementedError() - def upgrade_check(self, image: Optional[str], version: Optional[str]) -> Completion[str]: + def upgrade_check(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]: raise NotImplementedError() - def upgrade_start(self, image: Optional[str], version: Optional[str]) -> Completion[str]: + def upgrade_start(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]: raise NotImplementedError() - def upgrade_pause(self) -> Completion[str]: + def upgrade_pause(self) -> OrchResult[str]: raise NotImplementedError() - def upgrade_resume(self) -> Completion[str]: + def upgrade_resume(self) -> OrchResult[str]: raise NotImplementedError() - def upgrade_stop(self) -> Completion[str]: + def upgrade_stop(self) -> OrchResult[str]: raise NotImplementedError() - def upgrade_status(self) -> Completion['UpgradeStatusSpec']: + def upgrade_status(self) -> OrchResult['UpgradeStatusSpec']: """ If an upgrade is currently underway, report on where we are in the process, or if some error has occurred. @@ -1172,7 +709,7 @@ class Orchestrator(object): raise NotImplementedError() @_hide_in_features - def upgrade_available(self) -> Completion: + def upgrade_available(self) -> OrchResult: """ Report on what versions are available to upgrade to @@ -1778,7 +1315,6 @@ class OrchestratorClientMixin(Orchestrator): >>> class MyModule(OrchestratorClientMixin): ... def func(self): ... completion = self.add_host('somehost') # calls `_oremote()` - ... self._orchestrator_wait([completion]) ... self.log.debug(completion.result) .. note:: Orchestrator implementations should not inherit from `OrchestratorClientMixin`. @@ -1835,24 +1371,3 @@ class OrchestratorClientMixin(Orchestrator): if meth not in f_set or not f_set[meth]['available']: raise NotImplementedError(f'{o} does not implement {meth}') from e raise - - def _orchestrator_wait(self, completions: List[Completion]) -> None: - """ - Wait for completions to complete (reads) or - become persistent (writes). - - Waits for writes to be *persistent* but not *effective*. - - :param completions: List of Completions - :raises NoOrchestrator: - :raises RuntimeError: something went wrong while calling the process method. - :raises ImportError: no `orchestrator` module or backend not found. - """ - while any(not c.has_result for c in completions): - self.process(completions) - self.__get_mgr().log.info("Operations pending: %s", - sum(1 for c in completions if not c.has_result)) - if any(c.needs_result for c in completions): - time.sleep(1) - else: - break