1
0
mirror of https://github.com/ceph/ceph synced 2025-03-21 01:38:15 +00:00

Revert "Merge PR into master"

This reverts commit f865f3e0a0, reversing
changes made to 7ef5458e26.

Signed-off-by: Sage Weil <sage@redhat.com>
This commit is contained in:
Sage Weil 2020-03-27 09:00:25 -05:00
parent 7dc02fd0ad
commit af8fa11a1f
2 changed files with 213 additions and 56 deletions
src/pybind/mgr/cephadm

View File

@ -386,10 +386,12 @@ class AsyncCompletion(orchestrator.Completion):
value=orchestrator._Promise.NO_RESULT, # type: Any
on_complete=None, # type: Optional[Callable]
name=None, # type: Optional[str]
many=False, # type: bool
update_progress=False, # type: bool
):
assert CephadmOrchestrator.instance is not None
self.many = many
self.update_progress = update_progress
if name is None and on_complete is not None:
name = getattr(on_complete, '__name__', None)
@ -431,14 +433,33 @@ class AsyncCompletion(orchestrator.Completion):
assert self._on_complete_ is not None
try:
res = self._on_complete_(*args, **kwargs)
if self.update_progress and self.many:
assert self.progress_reference
self.progress_reference.progress += 1.0 / len(value)
return res
except Exception as e:
self.fail(e)
raise
assert CephadmOrchestrator.instance
CephadmOrchestrator.instance._worker_pool.apply_async(do_work, (value,),
callback=callback, error_callback=error_callback)
if self.many:
if not value:
logger.info('calling map_async without values')
callback([])
if six.PY3:
CephadmOrchestrator.instance._worker_pool.map_async(do_work, value,
callback=callback,
error_callback=error_callback)
else:
CephadmOrchestrator.instance._worker_pool.map_async(do_work, value,
callback=callback)
else:
if six.PY3:
CephadmOrchestrator.instance._worker_pool.apply_async(do_work, (value,),
callback=callback, error_callback=error_callback)
else:
CephadmOrchestrator.instance._worker_pool.apply_async(do_work, (value,),
callback=callback)
return self.ASYNC_RESULT
return run
@ -449,35 +470,66 @@ class AsyncCompletion(orchestrator.Completion):
self._on_complete_ = inner
def forall_hosts(f):
@wraps(f)
def forall_hosts_wrapper(*args) -> list:
def ssh_completion(cls=AsyncCompletion, **c_kwargs):
# type: (Type[orchestrator.Completion], Any) -> Callable
"""
See ./HACKING.rst for a how-to
"""
def decorator(f):
@wraps(f)
def wrapper(*args):
# Some weired logic to make calling functions with multiple arguments work.
if len(args) == 1:
vals = args[0]
self = None
elif len(args) == 2:
self, vals = args
else:
assert 'either f([...]) or self.f([...])'
name = f.__name__
many = c_kwargs.get('many', False)
def do_work(arg):
if not isinstance(arg, tuple):
arg = (arg, )
try:
if self:
return f(self, *arg)
return f(*arg)
except Exception as e:
logger.exception(f'executing {f.__name__}({args}) failed.')
raise
# Some weired logic to make calling functions with multiple arguments work.
if len(args) == 1:
[value] = args
if many and value and isinstance(value[0], tuple):
return cls(on_complete=lambda x: f(*x), value=value, name=name, **c_kwargs)
else:
return cls(on_complete=f, value=value, name=name, **c_kwargs)
else:
if many:
self, value = args
assert CephadmOrchestrator.instance is not None
return CephadmOrchestrator.instance._worker_pool.map(do_work, vals)
def call_self(inner_args):
if not isinstance(inner_args, tuple):
inner_args = (inner_args, )
return f(self, *inner_args)
return cls(on_complete=call_self, value=value, name=name, **c_kwargs)
else:
return cls(on_complete=lambda x: f(*x), value=args, name=name, **c_kwargs)
return wrapper
return decorator
return forall_hosts_wrapper
def async_completion(f):
# type: (Callable) -> Callable[..., AsyncCompletion]
"""
See ./HACKING.rst for a how-to
:param f: wrapped function
"""
return ssh_completion()(f)
def async_map_completion(f):
# type: (Callable) -> Callable[..., AsyncCompletion]
"""
See ./HACKING.rst for a how-to
:param f: wrapped function
kind of similar to
>>> def sync_map(f):
... return lambda x: map(f, x)
"""
return ssh_completion(many=True)(f)
def trivial_completion(f):
@ -1608,7 +1660,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
r.append(h)
return r
@trivial_completion
@async_completion
def add_host(self, spec):
# type: (HostSpec) -> str
"""
@ -1632,7 +1684,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
self.log.info('Added host %s' % spec.hostname)
return "Added host '{}'".format(spec.hostname)
@trivial_completion
@async_completion
def remove_host(self, host):
# type: (str) -> str
"""
@ -1648,7 +1700,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
self.log.info('Removed host %s' % host)
return "Removed host '{}'".format(host)
@trivial_completion
@async_completion
def update_host_addr(self, host, addr):
if host not in self.inventory:
raise OrchestratorError('host %s not registered' % host)
@ -1678,7 +1730,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
))
return r
@trivial_completion
@async_completion
def add_host_label(self, host, label):
if host not in self.inventory:
raise OrchestratorError('host %s does not exist' % host)
@ -1691,7 +1743,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
self.log.info('Added label %s to host %s' % (label, host))
return 'Added label %s to host %s' % (label, host)
@trivial_completion
@async_completion
def remove_host_label(self, host, label):
if host not in self.inventory:
raise OrchestratorError('host %s does not exist' % host)
@ -1879,7 +1931,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
result.append(dd)
return result
@trivial_completion
def service_action(self, action, service_name):
args = []
for host, dm in self.cache.daemons.items():
@ -1890,7 +1941,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
self.log.info('%s service %s' % (action.capitalize(), service_name))
return self._daemon_actions(args)
@forall_hosts
@async_map_completion
def _daemon_actions(self, daemon_type, daemon_id, host, action):
return self._daemon_action(daemon_type, daemon_id, host, action)
@ -1916,7 +1967,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
self.cache.invalidate_host_daemons(host)
return "{} {} from host '{}'".format(action, name, host)
@trivial_completion
def daemon_action(self, action, daemon_type, daemon_id):
args = []
for host, dm in self.cache.daemons.items():
@ -1933,7 +1983,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
','.join(['%s.%s' % (a[0], a[1]) for a in args])))
return self._daemon_actions(args)
@trivial_completion
def remove_daemons(self, names):
# type: (List[str]) -> orchestrator.Completion
args = []
@ -1992,9 +2041,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err))
return '\n'.join(out + err)
@trivial_completion
def blink_device_light(self, ident_fault, on, locs):
@forall_hosts
@async_map_completion
def blink(host, dev, path):
cmd = [
'lsmcli',
@ -2260,7 +2308,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
return "{} {} on host '{}'".format(
'Reconfigured' if reconfig else 'Deployed', name, host)
@forall_hosts
@async_map_completion
def _remove_daemons(self, name, host):
return self._remove_daemon(name, host)
@ -2535,7 +2583,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
)
daemons.append(sd)
@forall_hosts
@async_map_completion
def create_func_map(*args):
return create_func(*args)
@ -2586,7 +2634,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
keyring=keyring,
extra_config=extra_config)
@trivial_completion
def add_mon(self, spec):
# type: (ServiceSpec) -> orchestrator.Completion
return self._add_daemon('mon', spec, self._create_mon)
@ -2606,7 +2653,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
return self._create_daemon('mgr', mgr_id, host, keyring=keyring)
@trivial_completion
def add_mgr(self, spec):
# type: (ServiceSpec) -> orchestrator.Completion
return self._add_daemon('mgr', spec, self._create_mgr)
@ -2654,7 +2700,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
def apply_mgr(self, spec):
return self._apply(spec)
@trivial_completion
def add_mds(self, spec: ServiceSpec):
return self._add_daemon('mds', spec, self._create_mds, self._config_mds)
@ -2683,7 +2728,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
})
return self._create_daemon('mds', mds_id, host, keyring=keyring)
@trivial_completion
def add_rgw(self, spec):
return self._add_daemon('rgw', spec, self._create_rgw, self._config_rgw)
@ -2726,7 +2770,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
def apply_rgw(self, spec):
return self._apply(spec)
@trivial_completion
def add_rbd_mirror(self, spec):
return self._add_daemon('rbd-mirror', spec, self._create_rbd_mirror)
@ -3047,7 +3090,6 @@ receivers:
"peers": peers
}, sorted(deps)
@trivial_completion
def add_prometheus(self, spec):
return self._add_daemon('prometheus', spec, self._create_prometheus)
@ -3058,7 +3100,6 @@ receivers:
def apply_prometheus(self, spec):
return self._apply(spec)
@trivial_completion
def add_node_exporter(self, spec):
# type: (ServiceSpec) -> AsyncCompletion
return self._add_daemon('node-exporter', spec,
@ -3071,7 +3112,6 @@ receivers:
def _create_node_exporter(self, daemon_id, host):
return self._create_daemon('node-exporter', daemon_id, host)
@trivial_completion
def add_crash(self, spec):
# type: (ServiceSpec) -> AsyncCompletion
return self._add_daemon('crash', spec,
@ -3090,7 +3130,6 @@ receivers:
})
return self._create_daemon('crash', daemon_id, host, keyring=keyring)
@trivial_completion
def add_grafana(self, spec):
# type: (ServiceSpec) -> AsyncCompletion
return self._add_daemon('grafana', spec, self._create_grafana)
@ -3103,7 +3142,6 @@ receivers:
# type: (str, str) -> str
return self._create_daemon('grafana', daemon_id, host)
@trivial_completion
def add_alertmanager(self, spec):
# type: (ServiceSpec) -> AsyncCompletion
return self._add_daemon('alertmanager', spec, self._create_alertmanager)

View File

@ -12,7 +12,7 @@ import pytest
from tests import mock
from .fixtures import cephadm_module, wait
from ..module import trivial_completion, forall_hosts
from ..module import trivial_completion, async_completion, async_map_completion
class TestCompletion(object):
@ -23,6 +23,19 @@ class TestCompletion(object):
return x+1
assert wait(cephadm_module, run(1)) == 2
@pytest.mark.parametrize("input", [
((1, ), ),
((1, 2), ),
(("hallo", ), ),
(("hallo", "foo"), ),
])
def test_async(self, input, cephadm_module):
@async_completion
def run(*args):
return str(args)
assert wait(cephadm_module, run(*input)) == str(input)
@pytest.mark.parametrize("input,expected", [
([], []),
([1], ["(1,)"]),
@ -32,11 +45,25 @@ class TestCompletion(object):
([(1, 2), (3, 4)], ["(1, 2)", "(3, 4)"]),
])
def test_async_map(self, input, expected, cephadm_module):
@forall_hosts
def run_forall(*args):
@async_map_completion
def run(*args):
return str(args)
assert run_forall(input) == expected
c = run(input)
wait(cephadm_module, c)
assert c.result == expected
def test_async_self(self, cephadm_module):
class Run(object):
def __init__(self):
self.attr = 1
@async_completion
def run(self, x):
assert self.attr == 1
return x + 1
assert wait(cephadm_module, Run().run(1)) == 2
@pytest.mark.parametrize("input,expected", [
([], []),
@ -51,9 +78,101 @@ class TestCompletion(object):
def __init__(self):
self.attr = 1
@forall_hosts
def run_forall(self, *args):
@async_map_completion
def run(self, *args):
assert self.attr == 1
return str(args)
assert Run().run_forall(input) == expected
c = Run().run(input)
wait(cephadm_module, c)
assert c.result == expected
def test_then1(self, cephadm_module):
@async_map_completion
def run(x):
return x+1
assert wait(cephadm_module, run([1,2]).then(str)) == '[2, 3]'
def test_then2(self, cephadm_module):
@async_map_completion
def run(x):
time.sleep(0.1)
return x+1
@async_completion
def async_str(results):
return str(results)
c = run([1,2]).then(async_str)
wait(cephadm_module, c)
assert c.result == '[2, 3]'
def test_then3(self, cephadm_module):
@async_map_completion
def run(x):
time.sleep(0.1)
return x+1
def async_str(results):
return async_completion(str)(results)
c = run([1,2]).then(async_str)
wait(cephadm_module, c)
assert c.result == '[2, 3]'
def test_then4(self, cephadm_module):
@async_map_completion
def run(x):
time.sleep(0.1)
return x+1
def async_str(results):
return async_completion(str)(results).then(lambda x: x + "hello")
c = run([1,2]).then(async_str)
wait(cephadm_module, c)
assert c.result == '[2, 3]hello'
@pytest.mark.skip(reason="see limitation of async_map_completion")
def test_then5(self, cephadm_module):
@async_map_completion
def run(x):
time.sleep(0.1)
return async_completion(str)(x+1)
c = run([1,2])
wait(cephadm_module, c)
assert c.result == "['2', '3']"
def test_raise(self, cephadm_module):
@async_completion
def run(x):
raise ZeroDivisionError()
with pytest.raises(ZeroDivisionError):
wait(cephadm_module, run(1))
def test_progress(self, cephadm_module):
@async_map_completion
def run(*args):
return str(args)
c = run(list(range(2)))
c.update_progress = True
c.add_progress(
mgr=cephadm_module,
message="my progress"
)
wait(cephadm_module, c)
assert c.result == [str((x,)) for x in range(2)]
assert cephadm_module.remote.mock_calls == [
mock.call('progress', 'update', mock.ANY, 'my progress', float(i) / 2, [('origin', 'orchestrator')])
for i in range(2+1)] + [
mock.call('progress', 'update', mock.ANY, 'my progress', 1.0, [('origin', 'orchestrator')]),
mock.call('progress', 'complete', mock.ANY),
]