Merge pull request #32372 from sebastian-philipp/cephadm-progress

mgr/cephadm: Add progress to update_mgr()

Reviewed-by: Joshua Schmid <jschmid@suse.de>
This commit is contained in:
Sebastian Wagner 2020-01-16 09:55:41 +01:00 committed by GitHub
commit 7c15378120
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 130 additions and 103 deletions

View File

@ -86,12 +86,14 @@ class AsyncCompletion(orchestrator.Completion):
on_complete=None, # type: Optional[Callable]
name=None, # type: Optional[str]
many=False, # type: bool
update_progress=False, # type: bool
):
assert CephadmOrchestrator.instance is not None
self.many = many
self.update_progress = update_progress
if name is None and on_complete is not None:
name = on_complete.__name__
name = getattr(on_complete, '__name__', None)
super(AsyncCompletion, self).__init__(_first_promise, value, on_complete, name)
@property
@ -109,6 +111,9 @@ class AsyncCompletion(orchestrator.Completion):
def callback(result):
try:
if self.update_progress:
assert self.progress_reference
self.progress_reference.progress = 1.0
self._on_complete_ = None
self._finalize(result)
except Exception as e:
@ -117,35 +122,40 @@ class AsyncCompletion(orchestrator.Completion):
def error_callback(e):
self.fail(e)
if six.PY3:
_callback = self._on_complete_
else:
def _callback(*args, **kwargs):
# Py2 only: _worker_pool doesn't call error_callback
try:
return self._on_complete_(*args, **kwargs)
except Exception as e:
self.fail(e)
def run(value):
def do_work(*args, **kwargs):
assert self._on_complete_ is not None
try:
res = self._on_complete_(*args, **kwargs)
if self.update_progress and self.many:
assert self.progress_reference
self.progress_reference.progress += 1.0 / len(value)
return res
except Exception as e:
if six.PY3:
raise
else:
# Py2 only: _worker_pool doesn't call error_callback
self.fail(e)
assert CephadmOrchestrator.instance
if self.many:
if not value:
logger.info('calling map_async without values')
callback([])
if six.PY3:
CephadmOrchestrator.instance._worker_pool.map_async(_callback, value,
CephadmOrchestrator.instance._worker_pool.map_async(do_work, value,
callback=callback,
error_callback=error_callback)
else:
CephadmOrchestrator.instance._worker_pool.map_async(_callback, value,
CephadmOrchestrator.instance._worker_pool.map_async(do_work, value,
callback=callback)
else:
if six.PY3:
CephadmOrchestrator.instance._worker_pool.apply_async(_callback, (value,),
CephadmOrchestrator.instance._worker_pool.apply_async(do_work, (value,),
callback=callback, error_callback=error_callback)
else:
CephadmOrchestrator.instance._worker_pool.apply_async(_callback, (value,),
CephadmOrchestrator.instance._worker_pool.apply_async(do_work, (value,),
callback=callback)
return self.ASYNC_RESULT
@ -1212,17 +1222,15 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
return self._create_daemon('mgr', name, host, keyring)
def update_mgrs(self, spec):
# type: (orchestrator.StatefulServiceSpec) -> orchestrator.Completion
@with_services('mgr')
def update_mgrs(self, spec, services):
# type: (orchestrator.StatefulServiceSpec, List[orchestrator.ServiceDescription]) -> orchestrator.Completion
"""
Adjust the number of cluster managers.
"""
spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='mgr').load()
return self._get_services('mgr').then(lambda daemons: self._update_mgrs(spec, daemons))
def _update_mgrs(self, spec, daemons):
# type: (orchestrator.StatefulServiceSpec, List[orchestrator.ServiceDescription]) -> orchestrator.Completion
num_mgrs = len(daemons)
num_mgrs = len(services)
if spec.count == num_mgrs:
return orchestrator.Completion(value="The requested number of managers exist.")
@ -1242,7 +1250,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
for standby in mgr_map.get('standbys', []):
connected.append(standby.get('name', ''))
to_remove_damons = []
for d in daemons:
for d in services:
if d.service_instance not in connected:
to_remove_damons.append(('%s.%s' % (d.service_type, d.service_instance),
d.nodename))
@ -1252,12 +1260,15 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
# otherwise, remove *any* mgr
if num_to_remove > 0:
for d in daemons:
for d in services:
to_remove_damons.append(('%s.%s' % (d.service_type, d.service_instance), d.nodename))
num_to_remove -= 1
if num_to_remove == 0:
break
return self._remove_daemon(to_remove_damons)
c = self._remove_daemon(to_remove_damons)
c.add_progress('Removing MGRs', self)
c.update_progress = True
return c
else:
# we assume explicit placement by which there are the same number of
@ -1269,11 +1280,11 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
len(spec.placement.hosts), num_new_mgrs))
for host_spec in spec.placement.hosts:
if host_spec.name and len([d for d in daemons if d.service_instance == host_spec.name]):
if host_spec.name and len([d for d in services if d.service_instance == host_spec.name]):
raise RuntimeError('name %s alrady exists', host_spec.name)
for host_spec in spec.placement.hosts:
if host_spec.name and len([d for d in daemons if d.service_instance == host_spec.name]):
if host_spec.name and len([d for d in services if d.service_instance == host_spec.name]):
raise RuntimeError('name %s alrady exists', host_spec.name)
self.log.info("creating {} managers on hosts: '{}'".format(
@ -1281,10 +1292,13 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
args = []
for host_spec in spec.placement.hosts:
name = host_spec.name or self.get_unique_name(daemons)
name = host_spec.name or self.get_unique_name(services)
host = host_spec.hostname
args.append((host, name))
return self._create_mgr(args)
c = self._create_mgr(args)
c.add_progress('Creating MGRs', self)
c.update_progress = True
return c
def add_mds(self, spec):
if not spec.placement.hosts or len(spec.placement.hosts) < spec.placement.count:

View File

@ -1,8 +1,12 @@
from contextlib import contextmanager
import time
try:
from typing import Any
except ImportError:
pass
import pytest
from cephadm import CephadmOrchestrator
from orchestrator import raise_if_exception, Completion
from tests import mock
@ -30,6 +34,7 @@ def get_ceph_option(_, key):
def cephadm_module():
with mock.patch("cephadm.module.CephadmOrchestrator.get_ceph_option", get_ceph_option),\
mock.patch("cephadm.module.CephadmOrchestrator._configure_logging", lambda *args: None),\
mock.patch("cephadm.module.CephadmOrchestrator.remote"),\
mock.patch("cephadm.module.CephadmOrchestrator.set_store", set_store),\
mock.patch("cephadm.module.CephadmOrchestrator.get_store", get_store),\
mock.patch("cephadm.module.CephadmOrchestrator.get_store_prefix", get_store_prefix):
@ -44,3 +49,25 @@ def cephadm_module():
}
m.__init__('cephadm', 0, 0)
yield m
def wait(m, c):
# type: (CephadmOrchestrator, Completion) -> Any
m.process([c])
try:
import pydevd # if in debugger
while True: # don't timeout
if c.is_finished:
raise_if_exception(c)
return c.result
time.sleep(0.1)
except ImportError: # not in debugger
for i in range(30):
if i % 10 == 0:
m.process([c])
if c.is_finished:
raise_if_exception(c)
return c.result
time.sleep(0.1)
assert False, "timeout" + str(c._state)

View File

@ -9,11 +9,10 @@ try:
except ImportError:
pass
from orchestrator import ServiceDescription, raise_if_exception, Completion, InventoryNode, \
StatelessServiceSpec, PlacementSpec, RGWSpec, parse_host_specs, StatefulServiceSpec
from ..module import CephadmOrchestrator
from orchestrator import ServiceDescription, InventoryNode, \
StatelessServiceSpec, PlacementSpec, RGWSpec, StatefulServiceSpec
from tests import mock
from .fixtures import cephadm_module
from .fixtures import cephadm_module, wait
"""
@ -35,35 +34,12 @@ def mon_command(*args, **kwargs):
class TestCephadm(object):
def _wait(self, m, c):
# type: (CephadmOrchestrator, Completion) -> Any
m.process([c])
try:
import pydevd # if in debugger
while True: # don't timeout
if c.is_finished:
raise_if_exception(c)
return c.result
time.sleep(0.1)
except ImportError: # not in debugger
for i in range(30):
if i % 10 == 0:
m.process([c])
if c.is_finished:
raise_if_exception(c)
return c.result
time.sleep(0.1)
assert False, "timeout" + str(c._state)
m.process([c])
assert False, "timeout" + str(c._state)
@contextmanager
def _with_host(self, m, name):
self._wait(m, m.add_host(name))
wait(m, m.add_host(name))
yield
self._wait(m, m.remove_host(name))
wait(m, m.remove_host(name))
def test_get_unique_name(self, cephadm_module):
existing = [
@ -75,21 +51,21 @@ class TestCephadm(object):
def test_host(self, cephadm_module):
with self._with_host(cephadm_module, 'test'):
assert self._wait(cephadm_module, cephadm_module.get_hosts()) == [InventoryNode('test')]
assert wait(cephadm_module, cephadm_module.get_hosts()) == [InventoryNode('test')]
c = cephadm_module.get_hosts()
assert self._wait(cephadm_module, c) == []
assert wait(cephadm_module, c) == []
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
def test_service_ls(self, cephadm_module):
with self._with_host(cephadm_module, 'test'):
c = cephadm_module.describe_service()
assert self._wait(cephadm_module, c) == []
assert wait(cephadm_module, c) == []
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
def test_device_ls(self, cephadm_module):
with self._with_host(cephadm_module, 'test'):
c = cephadm_module.get_inventory()
assert self._wait(cephadm_module, c) == [InventoryNode('test')]
assert wait(cephadm_module, c) == [InventoryNode('test')]
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(
json.dumps([
@ -111,11 +87,11 @@ class TestCephadm(object):
cephadm_module.service_cache_timeout = 10
with self._with_host(cephadm_module, 'test'):
c = cephadm_module.service_action('redeploy', 'rgw', service_id='myrgw.foobar')
assert self._wait(cephadm_module, c) == ["Deployed rgw.myrgw.foobar on host 'test'"]
assert wait(cephadm_module, c) == ["Deployed rgw.myrgw.foobar on host 'test'"]
for what in ('start', 'stop', 'restart'):
c = cephadm_module.service_action(what, 'rgw', service_id='myrgw.foobar')
assert self._wait(cephadm_module, c) == [what + " rgw.myrgw.foobar from host 'test'"]
assert wait(cephadm_module, c) == [what + " rgw.myrgw.foobar from host 'test'"]
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
@ -126,7 +102,7 @@ class TestCephadm(object):
with self._with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
c = cephadm_module.update_mons(StatefulServiceSpec(placement=ps))
assert self._wait(cephadm_module, c) == ["Deployed mon.a on host 'test'"]
assert wait(cephadm_module, c) == ["Deployed mon.a on host 'test'"]
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
@mock.patch("cephadm.module.CephadmOrchestrator.send_command")
@ -136,7 +112,7 @@ class TestCephadm(object):
with self._with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
c = cephadm_module.update_mgrs(StatefulServiceSpec(placement=ps))
[out] = self._wait(cephadm_module, c)
[out] = wait(cephadm_module, c)
assert "Deployed mgr." in out
assert " on host 'test'" in out
@ -148,7 +124,7 @@ class TestCephadm(object):
with self._with_host(cephadm_module, 'test'):
dg = DriveGroupSpec('test', DeviceSelection(paths=['']))
c = cephadm_module.create_osds(dg)
assert self._wait(cephadm_module, c) == "Created osd(s) on host 'test'"
assert wait(cephadm_module, c) == "Created osd(s) on host 'test'"
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(
json.dumps([
@ -166,7 +142,7 @@ class TestCephadm(object):
cephadm_module._cluster_fsid = "fsid"
with self._with_host(cephadm_module, 'test'):
c = cephadm_module.remove_osds(['0'])
out = self._wait(cephadm_module, c)
out = wait(cephadm_module, c)
assert out == ["Removed osd.0 from host 'test'"]
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
@ -177,7 +153,7 @@ class TestCephadm(object):
with self._with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test'], count=1)
c = cephadm_module.add_mds(StatelessServiceSpec('name', placement=ps))
[out] = self._wait(cephadm_module, c)
[out] = wait(cephadm_module, c)
assert "Deployed mds.name." in out
assert " on host 'test'" in out
@ -189,7 +165,7 @@ class TestCephadm(object):
with self._with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test'], count=1)
c = cephadm_module.add_rgw(RGWSpec('realm', 'zone', placement=ps))
[out] = self._wait(cephadm_module, c)
[out] = wait(cephadm_module, c)
assert "Deployed rgw.realm.zone." in out
assert " on host 'test'" in out
@ -209,7 +185,7 @@ class TestCephadm(object):
cephadm_module._cluster_fsid = "fsid"
with self._with_host(cephadm_module, 'test'):
c = cephadm_module.remove_rgw('myrgw')
out = self._wait(cephadm_module, c)
out = wait(cephadm_module, c)
assert out == ["Removed rgw.myrgw.foobar from host 'test'"]
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
@ -220,7 +196,7 @@ class TestCephadm(object):
with self._with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test'], count=1)
c = cephadm_module.add_rbd_mirror(StatelessServiceSpec(name='name', placement=ps))
[out] = self._wait(cephadm_module, c)
[out] = wait(cephadm_module, c)
assert "Deployed rbd-mirror." in out
assert " on host 'test'" in out
@ -231,5 +207,5 @@ class TestCephadm(object):
def test_blink_device_light(self, _send_command, _get_connection, cephadm_module):
with self._with_host(cephadm_module, 'test'):
c = cephadm_module.blink_device_light('ident', True, [('test', '', '')])
assert self._wait(cephadm_module, c) == ['Set ident light for test: on']
assert wait(cephadm_module, c) == ['Set ident light for test: on']

View File

@ -10,29 +10,18 @@ except ImportError:
import pytest
from orchestrator import raise_if_exception, Completion
from .fixtures import cephadm_module
from ..module import trivial_completion, async_completion, async_map_completion, CephadmOrchestrator
from tests import mock
from .fixtures import cephadm_module, wait
from ..module import trivial_completion, async_completion, async_map_completion
class TestCompletion(object):
def _wait(self, m, c):
# type: (CephadmOrchestrator, Completion) -> Any
m.process([c])
m.process([c])
for _ in range(30):
if c.is_finished:
raise_if_exception(c)
return c.result
time.sleep(0.1)
assert False, "timeout" + str(c._state)
def test_trivial(self, cephadm_module):
@trivial_completion
def run(x):
return x+1
assert self._wait(cephadm_module, run(1)) == 2
assert wait(cephadm_module, run(1)) == 2
@pytest.mark.parametrize("input", [
((1, ), ),
@ -45,7 +34,7 @@ class TestCompletion(object):
def run(*args):
return str(args)
assert self._wait(cephadm_module, run(*input)) == str(input)
assert wait(cephadm_module, run(*input)) == str(input)
@pytest.mark.parametrize("input,expected", [
([], []),
@ -61,7 +50,7 @@ class TestCompletion(object):
return str(args)
c = run(input)
self._wait(cephadm_module, c)
wait(cephadm_module, c)
assert c.result == expected
def test_async_self(self, cephadm_module):
@ -74,7 +63,7 @@ class TestCompletion(object):
assert self.attr == 1
return x + 1
assert self._wait(cephadm_module, Run().run(1)) == 2
assert wait(cephadm_module, Run().run(1)) == 2
@pytest.mark.parametrize("input,expected", [
([], []),
@ -95,7 +84,7 @@ class TestCompletion(object):
return str(args)
c = Run().run(input)
self._wait(cephadm_module, c)
wait(cephadm_module, c)
assert c.result == expected
def test_then1(self, cephadm_module):
@ -103,7 +92,7 @@ class TestCompletion(object):
def run(x):
return x+1
assert self._wait(cephadm_module, run([1,2]).then(str)) == '[2, 3]'
assert wait(cephadm_module, run([1,2]).then(str)) == '[2, 3]'
def test_then2(self, cephadm_module):
@async_map_completion
@ -117,7 +106,7 @@ class TestCompletion(object):
c = run([1,2]).then(async_str)
self._wait(cephadm_module, c)
wait(cephadm_module, c)
assert c.result == '[2, 3]'
def test_then3(self, cephadm_module):
@ -131,7 +120,7 @@ class TestCompletion(object):
c = run([1,2]).then(async_str)
self._wait(cephadm_module, c)
wait(cephadm_module, c)
assert c.result == '[2, 3]'
def test_then4(self, cephadm_module):
@ -145,7 +134,7 @@ class TestCompletion(object):
c = run([1,2]).then(async_str)
self._wait(cephadm_module, c)
wait(cephadm_module, c)
assert c.result == '[2, 3]hello'
@pytest.mark.skip(reason="see limitation of async_map_completion")
@ -157,7 +146,7 @@ class TestCompletion(object):
c = run([1,2])
self._wait(cephadm_module, c)
wait(cephadm_module, c)
assert c.result == "['2', '3']"
def test_raise(self, cephadm_module):
@ -166,4 +155,24 @@ class TestCompletion(object):
raise ZeroDivisionError()
with pytest.raises(ZeroDivisionError):
self._wait(cephadm_module, run(1))
wait(cephadm_module, run(1))
def test_progress(self, cephadm_module):
@async_map_completion
def run(*args):
return str(args)
c = run(list(range(2)))
c.update_progress = True
c.add_progress(
mgr=cephadm_module,
message="my progress"
)
wait(cephadm_module, c)
assert c.result == [str((x,)) for x in range(2)]
assert cephadm_module.remote.mock_calls == [
mock.call('progress', 'update', mock.ANY, 'my progress', float(i) / 2, [('origin', 'orchestrator')])
for i in range(2+1)] + [
mock.call('progress', 'update', mock.ANY, 'my progress', 1.0, [('origin', 'orchestrator')]),
mock.call('progress', 'complete', mock.ANY),
]

View File

@ -273,7 +273,8 @@ class _Promise(object):
:param value: new value.
"""
assert self._state in (self.INITIALIZED, self.RUNNING)
if self._state not in (self.INITIALIZED, self.RUNNING):
raise ValueError('finalize: {} already finished. {}'.format(repr(self), value))
self._state = self.RUNNING
@ -402,8 +403,7 @@ class ProgressReference(object):
def __call__(self, arg):
self._completion_has_result = True
if self.progress == 0.0:
self.progress = 0.5
self.progress = 1.0
return arg
@property
@ -412,6 +412,7 @@ class ProgressReference(object):
@progress.setter
def progress(self, progress):
assert progress <= 1.0
self._progress = progress
try:
if self.effective:

View File

@ -177,7 +177,7 @@ def test_progress():
mgr.remote.assert_called_with('progress', 'update', c.progress_reference.progress_id, 'hello world', 0.0, [('origin', 'orchestrator')])
c.finalize()
mgr.remote.assert_called_with('progress', 'update', c.progress_reference.progress_id, 'hello world', 0.5, [('origin', 'orchestrator')])
mgr.remote.assert_called_with('progress', 'complete', c.progress_reference.progress_id)
c.progress_reference.update()
mgr.remote.assert_called_with('progress', 'update', c.progress_reference.progress_id, 'hello world', progress_val, [('origin', 'orchestrator')])