Merge pull request #26654 from sebastian-philipp/orchestrator-progress

mgr/orchestrator: add progress events to all orchestrators

Reviewed-by: Juan Miguel Olmo Martínez <jolmomar@redhat.com>
This commit is contained in:
Sebastian Wagner 2019-05-09 10:41:53 +02:00 committed by GitHub
commit 615b11b9c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 68 additions and 50 deletions

View File

@ -139,7 +139,10 @@ effect. Second, the completion becomes *effective*, meaning that the operation
:members:
.. autoclass:: ReadCompletion
:members:
.. autoclass:: WriteCompletion
:members:
Placement
---------

View File

@ -17,6 +17,9 @@ class TestOrchestratorCli(MgrTestCase):
def _orch_cmd(self, *args):
return self.mgr_cluster.mon_manager.raw_cluster_cmd("orchestrator", *args)
def _progress_cmd(self, *args):
return self.mgr_cluster.mon_manager.raw_cluster_cmd("progress", *args)
def _orch_cmd_result(self, *args, **kwargs):
"""
raw_cluster_cmd doesn't support kwargs.
@ -141,3 +144,12 @@ class TestOrchestratorCli(MgrTestCase):
self.assertEqual(ret, errno.ENOENT)
ret = self._orch_cmd_result("host", "add", "raise_import_error")
self.assertEqual(ret, errno.ENOENT)
def test_progress(self):
self._progress_cmd('clear')
evs = json.loads(self._progress_cmd('json'))['completed']
self.assertEqual(len(evs), 0)
self._orch_cmd("mgr", "update", "4")
evs = json.loads(self._progress_cmd('json'))['completed']
self.assertEqual(len(evs), 1)
self.assertIn('update_mgrs', evs[0]['message'])

View File

@ -7,6 +7,11 @@ Please see the ceph-mgr module developer's guide for more information.
import sys
import time
import fnmatch
import uuid
import six
from mgr_util import format_bytes
try:
from typing import TypeVar, Generic, List, Optional, Union, Tuple
@ -15,10 +20,6 @@ try:
except ImportError:
T, G = object, object
import six
from mgr_util import format_bytes
class OrchestratorError(Exception):
"""
@ -150,7 +151,17 @@ class WriteCompletion(_Completion):
"""
def __init__(self):
pass
self.progress_id = str(uuid.uuid4())
#: if a orchestrator module can provide a more detailed
#: progress information, it needs to also call ``progress.update()``.
self.progress = 0.5
def __str__(self):
"""
``__str__()`` is used for determining the message for progress events.
"""
return super(WriteCompletion, self).__str__()
@property
def is_persistent(self):
@ -835,7 +846,9 @@ def _mk_orch_methods(cls):
# Otherwise meth is always bound to last key
def shim(method_name):
def inner(self, *args, **kwargs):
return self._oremote(method_name, args, kwargs)
completion = self._oremote(method_name, args, kwargs)
self._update_completion_progress(completion, 0)
return completion
return inner
for meth in Orchestrator.__dict__:
@ -879,6 +892,23 @@ class OrchestratorClientMixin(Orchestrator):
self.log.debug("_oremote {} -> {}.{}(*{}, **{})".format(self.module_name, o, meth, args, kwargs))
return self.remote(o, meth, *args, **kwargs)
def _update_completion_progress(self, completion, force_progress=None):
# type: (WriteCompletion, Optional[float]) -> None
try:
progress = force_progress if force_progress is not None else completion.progress
if completion.is_complete:
self.remote("progress", "complete", completion.progress_id)
else:
self.remote("progress", "update", completion.progress_id, str(completion), progress,
["orchestrator"])
except AttributeError:
# No WriteCompletion. Ignore.
pass
except ImportError:
# If the progress module is disabled that's fine,
# they just won't see the output.
pass
def _orchestrator_wait(self, completions):
# type: (List[_Completion]) -> None
"""
@ -891,8 +921,12 @@ class OrchestratorClientMixin(Orchestrator):
:raises NoOrchestrator:
:raises ImportError: no `orchestrator_cli` module or backend not found.
"""
for c in completions:
self._update_completion_progress(c)
while not self.wait(completions):
if any(c.should_wait for c in completions):
time.sleep(5)
else:
break
for c in completions:
self._update_completion_progress(c)

View File

@ -30,8 +30,8 @@ class Event(object):
def _refresh(self):
global _module
_module.log.debug('refreshing mgr for %s (%s) at %f' % (self.id, self._message,
self._progress))
_module.update_progress_event(self.id, self._message, self._progress)
self.progress))
_module.update_progress_event(self.id, self._message, self.progress)
@property
def message(self):
@ -46,7 +46,7 @@ class Event(object):
raise NotImplementedError()
def summary(self):
return "{0} {1}".format(self.progress, self._message)
return "{0} {1}".format(self.progress, self.message)
def _progress_str(self, width):
inner_width = width - 2
@ -477,14 +477,16 @@ class Module(MgrModule):
self._shutdown.set()
self.clear_all_progress_events()
def update(self, ev_id, ev_msg, ev_progress):
def update(self, ev_id, ev_msg, ev_progress, refs=None):
"""
For calling from other mgr modules
"""
if refs is None:
refs = []
try:
ev = self._events[ev_id]
except KeyError:
ev = RemoteEvent(ev_id, ev_msg, [])
ev = RemoteEvent(ev_id, ev_msg, refs)
self._events[ev_id] = ev
self.log.info("update: starting ev {0} ({1})".format(
ev_id, ev_msg))

View File

@ -92,6 +92,9 @@ class RookWriteCompletion(orchestrator.WriteCompletion):
global all_completions
all_completions.append(self)
def __str__(self):
return self.message
@property
def result(self):
return self._result
@ -158,14 +161,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
# TODO: configure k8s API addr instead of assuming local
]
def _progress(self, *args, **kwargs):
try:
self.remote("progress", *args, **kwargs)
except ImportError:
# If the progress module is disabled that's fine,
# they just won't see the output.
pass
def wait(self, completions):
self.log.info("wait: completions={0}".format(completions))
@ -184,9 +179,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
if c.is_complete:
continue
if not c.is_read:
self._progress("update", c.id, c.message, 0.5)
try:
c.execute()
except Exception as e:
@ -195,12 +187,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
))
c.error = e
c._complete = True
if not c.is_read:
self._progress("complete", c.id)
else:
if c.is_complete:
if not c.is_read:
self._progress("complete", c.id)
if not c.is_complete:
incomplete = True

View File

@ -54,6 +54,7 @@ class SSHReadCompletionReady(SSHReadCompletion):
class SSHWriteCompletion(orchestrator.WriteCompletion):
def __init__(self, result):
super(SSHWriteCompletion, self).__init__()
if isinstance(result, multiprocessing.pool.AsyncResult):
self._result = [result]
else:
@ -83,6 +84,7 @@ class SSHWriteCompletion(orchestrator.WriteCompletion):
class SSHWriteCompletionReady(SSHWriteCompletion):
def __init__(self, result):
orchestrator.WriteCompletion.__init__(self)
self._result = result
@property

View File

@ -95,19 +95,10 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
The implementation is similar to the Rook orchestrator, but simpler.
"""
def _progress(self, *args, **kwargs):
try:
self.remote("progress", *args, **kwargs)
except ImportError:
# If the progress module is disabled that's fine,
# they just won't see the output.
pass
def wait(self, completions):
self.log.info("wait: completions={0}".format(completions))
incomplete = False
# Our `wait` implementation is very simple because everything's
# just an API call.
for c in completions:
@ -121,9 +112,6 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
if c.is_complete:
continue
if not c.is_read:
self._progress("update", c.id, c.message, 0.5)
try:
c.execute()
except Exception as e:
@ -132,17 +120,8 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
))
c.exception = e
c._complete = True
if not c.is_read:
self._progress("complete", c.id)
else:
if c.is_complete:
if not c.is_read:
self._progress("complete", c.id)
if not c.is_complete:
incomplete = True
return not incomplete
return all(c.is_complete for c in completions)
def available(self):
return True, ""