mgr/orchestrator: add test for default implementation for apply()

Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
This commit is contained in:
Sebastian Wagner 2020-03-18 13:02:12 +01:00
parent 8fd7087042
commit 096542e1c0
3 changed files with 36 additions and 9 deletions

View File

@ -23,7 +23,7 @@ from mgr_module import MgrModule, CLICommand, HandleCommandResult
try:
from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \
Type, Sequence, Dict
Type, Sequence, Dict, cast
except ImportError:
pass
@ -223,8 +223,11 @@ class _Promise(object):
Call ``on_complete`` as soon as this promise is finalized.
"""
assert self._state in (self.INITIALIZED, self.RUNNING)
if self._next_promise is not None:
return self._next_promise.then(on_complete)
if self._on_complete is not None:
assert self._next_promise is None
self._set_next_promise(self.__class__(
_first_promise=self._first_promise,
on_complete=on_complete
@ -848,25 +851,33 @@ class Orchestrator(object):
"""
Applies any spec
"""
fns = {
fns: Dict[str, Callable[[ServiceSpec], Completion]] = {
'alertmanager': self.apply_alertmanager,
'crash': self.apply_crash,
'grafana': self.apply_grafana,
'mds': self.apply_mds,
'mgr': self.apply_mgr,
'mon': self.apply_mon,
'nfs': self.apply_nfs,
'nfs': cast(Callable[[ServiceSpec], Completion], self.apply_nfs),
'node-exporter': self.apply_node_exporter,
'osd': self.apply_drivegroups,
'osd': cast(Callable[[ServiceSpec], Completion], lambda dg: self.apply_drivegroups([dg])),
'prometheus': self.apply_prometheus,
'rbd-mirror': self.apply_rbd_mirror,
'rgw': self.apply_rgw,
'rgw': cast(Callable[[ServiceSpec], Completion], self.apply_rgw),
}
spec, [specs] = specs
completion = fns[spec.service_name](spec)
def merge(ls, r):
if isinstance(ls, list):
return ls + [r]
return [ls, r]
spec, *specs = specs
completion = fns[spec.service_type](spec)
for s in specs:
completion.then(fns[spec.service_name](spec))
def next(ls):
return fns[s.service_type](s).then(lambda r: merge(ls, r))
completion = completion.then(next)
return completion
def remove_daemons(self, names):

View File

@ -22,9 +22,14 @@ if 'UNITTEST' in os.environ:
self._ceph_get_version = mock.Mock()
self._ceph_get = mock.MagicMock()
self._ceph_get_module_option = mock.MagicMock()
self._ceph_get_option = mock.MagicMock()
self._validate_module_option = lambda _: True
self._configure_logging = lambda *_: None
self._unconfigure_logging = mock.MagicMock()
self._ceph_log = mock.MagicMock()
self._ceph_get_store = lambda _: ''
self._ceph_get_store_prefix = lambda _: {}
self._ceph_dispatch_remote = lambda *_: None
cm = mock.Mock()

View File

@ -1,5 +1,7 @@
from __future__ import absolute_import
from ceph.deployment.service_spec import ServiceSpec
from test_orchestrator import TestOrchestrator as _TestOrchestrator
from tests import mock
import pytest
@ -226,3 +228,12 @@ def test_pretty_print():
assert p.result == 5
def test_apply():
to = _TestOrchestrator('', 0, 0)
completion = to.apply([
ServiceSpec(service_type='nfs'),
ServiceSpec(service_type='nfs'),
ServiceSpec(service_type='nfs'),
])
completion.finalize(42)
assert completion.result == [None, None, None]