mgr/cephadm: cleanup async_map_completion etc.

This breaks the progress integration, but we haven't
used that anyway till now.

Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
This commit is contained in:
Sebastian Wagner 2020-03-20 17:39:08 +01:00
parent 29aa87c570
commit d8ced206e2
2 changed files with 3 additions and 216 deletions

View File

@ -395,12 +395,10 @@ class AsyncCompletion(orchestrator.Completion):
value=orchestrator._Promise.NO_RESULT, # type: Any
on_complete=None, # type: Optional[Callable]
name=None, # type: Optional[str]
many=False, # type: bool
update_progress=False, # type: bool
):
assert CephadmOrchestrator.instance is not None
self.many = many
self.update_progress = update_progress
if name is None and on_complete is not None:
name = getattr(on_complete, '__name__', None)
@ -442,33 +440,14 @@ class AsyncCompletion(orchestrator.Completion):
assert self._on_complete_ is not None
try:
res = self._on_complete_(*args, **kwargs)
if self.update_progress and self.many:
assert self.progress_reference
self.progress_reference.progress += 1.0 / len(value)
return res
except Exception as e:
self.fail(e)
raise
assert CephadmOrchestrator.instance
if self.many:
if not value:
logger.info('calling map_async without values')
callback([])
if six.PY3:
CephadmOrchestrator.instance._worker_pool.map_async(do_work, value,
callback=callback,
error_callback=error_callback)
else:
CephadmOrchestrator.instance._worker_pool.map_async(do_work, value,
callback=callback)
else:
if six.PY3:
CephadmOrchestrator.instance._worker_pool.apply_async(do_work, (value,),
callback=callback, error_callback=error_callback)
else:
CephadmOrchestrator.instance._worker_pool.apply_async(do_work, (value,),
callback=callback)
CephadmOrchestrator.instance._worker_pool.apply_async(do_work, (value,),
callback=callback, error_callback=error_callback)
return self.ASYNC_RESULT
return run
@ -479,41 +458,6 @@ class AsyncCompletion(orchestrator.Completion):
self._on_complete_ = inner
def ssh_completion(cls=AsyncCompletion, **c_kwargs):
# type: (Type[orchestrator.Completion], Any) -> Callable
"""
See ./HACKING.rst for a how-to
"""
def decorator(f):
@wraps(f)
def wrapper(*args):
name = f.__name__
many = c_kwargs.get('many', False)
# Some weired logic to make calling functions with multiple arguments work.
if len(args) == 1:
[value] = args
if many and value and isinstance(value[0], tuple):
return cls(on_complete=lambda x: f(*x), value=value, name=name, **c_kwargs)
else:
return cls(on_complete=f, value=value, name=name, **c_kwargs)
else:
if many:
self, value = args
def call_self(inner_args):
if not isinstance(inner_args, tuple):
inner_args = (inner_args, )
return f(self, *inner_args)
return cls(on_complete=call_self, value=value, name=name, **c_kwargs)
else:
return cls(on_complete=lambda x: f(*x), value=args, name=name, **c_kwargs)
return wrapper
return decorator
def forall_hosts(f):
@wraps(f)
def forall_hosts_wrapper(*args) -> list:
@ -545,32 +489,6 @@ def forall_hosts(f):
return forall_hosts_wrapper
def async_completion(f):
# type: (Callable) -> Callable[..., AsyncCompletion]
"""
See ./HACKING.rst for a how-to
:param f: wrapped function
"""
return ssh_completion()(f)
def async_map_completion(f):
# type: (Callable) -> Callable[..., AsyncCompletion]
"""
See ./HACKING.rst for a how-to
:param f: wrapped function
kind of similar to
>>> def sync_map(f):
... return lambda x: map(f, x)
"""
return ssh_completion(many=True)(f)
def trivial_completion(f):
# type: (Callable) -> Callable[..., orchestrator.Completion]
@wraps(f)

View File

@ -12,7 +12,7 @@ import pytest
from tests import mock
from .fixtures import cephadm_module, wait
from ..module import trivial_completion, async_completion, async_map_completion, forall_hosts
from ..module import trivial_completion, forall_hosts
class TestCompletion(object):
@ -23,19 +23,6 @@ class TestCompletion(object):
return x+1
assert wait(cephadm_module, run(1)) == 2
@pytest.mark.parametrize("input", [
((1, ), ),
((1, 2), ),
(("hallo", ), ),
(("hallo", "foo"), ),
])
def test_async(self, input, cephadm_module):
@async_completion
def run(*args):
return str(args)
assert wait(cephadm_module, run(*input)) == str(input)
@pytest.mark.parametrize("input,expected", [
([], []),
([1], ["(1,)"]),
@ -45,30 +32,11 @@ class TestCompletion(object):
([(1, 2), (3, 4)], ["(1, 2)", "(3, 4)"]),
])
def test_async_map(self, input, expected, cephadm_module):
@async_map_completion
def run(*args):
return str(args)
c = run(input)
wait(cephadm_module, c)
assert c.result == expected
@forall_hosts
def run_forall(*args):
return str(args)
assert run_forall(input) == expected
def test_async_self(self, cephadm_module):
class Run(object):
def __init__(self):
self.attr = 1
@async_completion
def run(self, x):
assert self.attr == 1
return x + 1
assert wait(cephadm_module, Run().run(1)) == 2
@pytest.mark.parametrize("input,expected", [
([], []),
@ -83,108 +51,9 @@ class TestCompletion(object):
def __init__(self):
self.attr = 1
@async_map_completion
def run(self, *args):
assert self.attr == 1
return str(args)
@forall_hosts
def run_forall(self, *args):
assert self.attr == 1
return str(args)
c = Run().run(input)
wait(cephadm_module, c)
assert c.result == expected
assert Run().run_forall(input) == expected
def test_then1(self, cephadm_module):
@async_map_completion
def run(x):
return x+1
assert wait(cephadm_module, run([1,2]).then(str)) == '[2, 3]'
def test_then2(self, cephadm_module):
@async_map_completion
def run(x):
time.sleep(0.1)
return x+1
@async_completion
def async_str(results):
return str(results)
c = run([1,2]).then(async_str)
wait(cephadm_module, c)
assert c.result == '[2, 3]'
def test_then3(self, cephadm_module):
@async_map_completion
def run(x):
time.sleep(0.1)
return x+1
def async_str(results):
return async_completion(str)(results)
c = run([1,2]).then(async_str)
wait(cephadm_module, c)
assert c.result == '[2, 3]'
def test_then4(self, cephadm_module):
@async_map_completion
def run(x):
time.sleep(0.1)
return x+1
def async_str(results):
return async_completion(str)(results).then(lambda x: x + "hello")
c = run([1,2]).then(async_str)
wait(cephadm_module, c)
assert c.result == '[2, 3]hello'
@pytest.mark.skip(reason="see limitation of async_map_completion")
def test_then5(self, cephadm_module):
@async_map_completion
def run(x):
time.sleep(0.1)
return async_completion(str)(x+1)
c = run([1,2])
wait(cephadm_module, c)
assert c.result == "['2', '3']"
def test_raise(self, cephadm_module):
@async_completion
def run(x):
raise ZeroDivisionError()
with pytest.raises(ZeroDivisionError):
wait(cephadm_module, run(1))
def test_progress(self, cephadm_module):
@async_map_completion
def run(*args):
return str(args)
c = run(list(range(2)))
c.update_progress = True
c.add_progress(
mgr=cephadm_module,
message="my progress"
)
wait(cephadm_module, c)
assert c.result == [str((x,)) for x in range(2)]
assert cephadm_module.remote.mock_calls == [
mock.call('progress', 'update', mock.ANY, 'my progress', float(i) / 2, [('origin', 'orchestrator')])
for i in range(2+1)] + [
mock.call('progress', 'update', mock.ANY, 'my progress', 1.0, [('origin', 'orchestrator')]),
mock.call('progress', 'complete', mock.ANY),
]