mirror of
https://github.com/ceph/ceph
synced 2025-01-20 01:51:34 +00:00
Merge pull request #32985 from sebastian-philipp/mgr-progress-mypy
mgr/progress: Add integration to pybind/mgr/tox.ini Reviewed-by: Josh Durgin <jdurgin@redhat.com> Reviewed-by: Kefu Chai <kchai@redhat.com>
This commit is contained in:
commit
38d20f69e3
@ -2,10 +2,6 @@ import os
|
||||
if 'UNITTEST' not in os.environ:
|
||||
from .module import *
|
||||
else:
|
||||
import sys
|
||||
import mock
|
||||
sys.modules['ceph_module'] = mock.Mock()
|
||||
sys.modules['rados'] = mock.Mock()
|
||||
from .module import *
|
||||
import tests
|
||||
|
||||
|
||||
|
@ -1,4 +1,10 @@
|
||||
from mgr_module import MgrModule
|
||||
try:
|
||||
from typing import List, Dict, Union, Any, Optional
|
||||
from typing import TYPE_CHECKING
|
||||
except ImportError:
|
||||
TYPE_CHECKING = False
|
||||
|
||||
from mgr_module import MgrModule, OSDMap
|
||||
import os
|
||||
import threading
|
||||
import datetime
|
||||
@ -11,11 +17,11 @@ import json
|
||||
ENCODING_VERSION = 2
|
||||
|
||||
# keep a global reference to the module so we can use it from Event methods
|
||||
_module = None
|
||||
_module = None # type: Optional["Module"]
|
||||
|
||||
# if unit test we want MgrModule to be blank
|
||||
if 'UNITTEST' in os.environ:
|
||||
MgrModule = object
|
||||
MgrModule = object # type: ignore
|
||||
|
||||
class Event(object):
|
||||
"""
|
||||
@ -25,15 +31,17 @@ class Event(object):
|
||||
"""
|
||||
|
||||
def __init__(self, message, refs, started_at=None):
|
||||
# type: (str, List[str], Optional[float]) -> None
|
||||
self._message = message
|
||||
self._refs = refs
|
||||
self.started_at = started_at if started_at else time.time()
|
||||
self.id = None
|
||||
self.id = None # type: Optional[str]
|
||||
self.update_duration_event()
|
||||
self._time_remaining_str = "(time remaining: N/A)"
|
||||
|
||||
def _refresh(self):
|
||||
global _module
|
||||
assert _module
|
||||
_module.log.debug('refreshing mgr for %s (%s) at %f' % (self.id, self._message,
|
||||
self.progress))
|
||||
self.update_duration_event()
|
||||
@ -43,14 +51,17 @@ class Event(object):
|
||||
|
||||
@property
|
||||
def message(self):
|
||||
# type: () -> str
|
||||
return self._message
|
||||
|
||||
@property
|
||||
def refs(self):
|
||||
# type: () -> List[str]
|
||||
return self._refs
|
||||
|
||||
@property
|
||||
def progress(self):
|
||||
# type: () -> float
|
||||
raise NotImplementedError()
|
||||
|
||||
@property
|
||||
@ -66,6 +77,7 @@ class Event(object):
|
||||
return None
|
||||
|
||||
def summary(self):
|
||||
# type: () -> str
|
||||
return "{0} {1} {2}".format(self.progress, self.message, self.duration_str)
|
||||
|
||||
def _progress_str(self, width):
|
||||
@ -93,6 +105,7 @@ class Event(object):
|
||||
self._time_remaining_str)
|
||||
|
||||
def to_json(self):
|
||||
# type: () -> Dict[str, Any]
|
||||
return {
|
||||
"id": self.id,
|
||||
"message": self.message,
|
||||
@ -177,6 +190,7 @@ class RemoteEvent(Event):
|
||||
"""
|
||||
|
||||
def __init__(self, my_id, message, refs):
|
||||
# type: (str, str, List[str]) -> None
|
||||
super(RemoteEvent, self).__init__(message, refs)
|
||||
self.id = my_id
|
||||
self._progress = 0.0
|
||||
@ -184,6 +198,7 @@ class RemoteEvent(Event):
|
||||
self._refresh()
|
||||
|
||||
def set_progress(self, progress):
|
||||
# type: (float) -> None
|
||||
self._progress = progress
|
||||
self._refresh()
|
||||
|
||||
@ -219,6 +234,7 @@ class PgRecoveryEvent(Event):
|
||||
"""
|
||||
|
||||
def __init__(self, message, refs, which_pgs, which_osds, start_epoch):
|
||||
# type: (str, List[Any], List[PgId], List[str], int) -> None
|
||||
super(PgRecoveryEvent, self).__init__(message, refs)
|
||||
|
||||
self._pgs = which_pgs
|
||||
@ -227,14 +243,14 @@ class PgRecoveryEvent(Event):
|
||||
|
||||
self._original_pg_count = len(self._pgs)
|
||||
|
||||
self._original_bytes_recovered = None
|
||||
self._original_bytes_recovered = None # type: Optional[Dict[PgId, float]]
|
||||
|
||||
self._progress = 0.0
|
||||
|
||||
# self._start_epoch = _module.get_osdmap().get_epoch()
|
||||
self._start_epoch = start_epoch
|
||||
|
||||
self.id = str(uuid.uuid4())
|
||||
self.id = str(uuid.uuid4()) # type: str
|
||||
self._refresh()
|
||||
|
||||
@property
|
||||
@ -242,11 +258,12 @@ class PgRecoveryEvent(Event):
|
||||
return self. _which_osds
|
||||
|
||||
def pg_update(self, pg_dump, log):
|
||||
# type: (Dict, Any) -> None
|
||||
# FIXME: O(pg_num) in python
|
||||
# FIXME: far more fields getting pythonized than we really care about
|
||||
# Sanity check to see if there are any missing PGs and to assign
|
||||
# empty array and dictionary if there hasn't been any recovery
|
||||
pg_to_state = dict([(p['pgid'], p) for p in pg_dump['pg_stats']])
|
||||
pg_to_state = dict([(p['pgid'], p) for p in pg_dump['pg_stats']]) # type: Dict[str, Any]
|
||||
if self._original_bytes_recovered is None:
|
||||
self._original_bytes_recovered = {}
|
||||
missing_pgs = []
|
||||
@ -324,11 +341,13 @@ class PgRecoveryEvent(Event):
|
||||
|
||||
@property
|
||||
def progress(self):
|
||||
# type: () -> float
|
||||
return self._progress
|
||||
|
||||
|
||||
class PgId(object):
|
||||
def __init__(self, pool_id, ps):
|
||||
# type: (str, int) -> None
|
||||
self.pool_id = pool_id
|
||||
self.ps = ps
|
||||
|
||||
@ -370,26 +389,31 @@ class Module(MgrModule):
|
||||
'desc': 'how frequently to persist completed events',
|
||||
'runtime': True,
|
||||
},
|
||||
]
|
||||
] # type: List[Dict[str, Any]]
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(Module, self).__init__(*args, **kwargs)
|
||||
|
||||
self._events = {}
|
||||
self._completed_events = []
|
||||
self._events = {} # type: Dict[str, Union[RemoteEvent, PgRecoveryEvent]]
|
||||
self._completed_events = [] # type: List[GhostEvent]
|
||||
|
||||
self._old_osd_map = None
|
||||
self._old_osd_map = None # type: Optional[OSDMap]
|
||||
|
||||
self._ready = threading.Event()
|
||||
self._shutdown = threading.Event()
|
||||
|
||||
self._latest_osdmap = None
|
||||
self._latest_osdmap = None # type: Optional[OSDMap]
|
||||
|
||||
self._dirty = False
|
||||
|
||||
global _module
|
||||
_module = self
|
||||
|
||||
# only for mypy
|
||||
if TYPE_CHECKING:
|
||||
self.max_completed_events = 0
|
||||
self.persist_interval = 0
|
||||
|
||||
def config_notify(self):
|
||||
for opt in self.MODULE_OPTIONS:
|
||||
setattr(self,
|
||||
@ -398,12 +422,13 @@ class Module(MgrModule):
|
||||
self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name']))
|
||||
|
||||
def _osd_in_out(self, old_map, old_dump, new_map, osd_id, marked):
|
||||
# type: (OSDMap, Dict, OSDMap, str, str) -> None
|
||||
# A function that will create or complete an event when an
|
||||
# OSD is marked in or out according to the affected PGs
|
||||
affected_pgs = []
|
||||
unmoved_pgs = []
|
||||
for pool in old_dump['pools']:
|
||||
pool_id = pool['pool']
|
||||
pool_id = pool['pool'] # type: str
|
||||
for ps in range(0, pool['pg_num']):
|
||||
|
||||
# Was this OSD affected by the OSD coming in/out?
|
||||
@ -469,17 +494,18 @@ class Module(MgrModule):
|
||||
self._complete(ev)
|
||||
|
||||
if len(affected_pgs) > 0:
|
||||
ev = PgRecoveryEvent(
|
||||
r_ev = PgRecoveryEvent(
|
||||
"Rebalancing after osd.{0} marked {1}".format(osd_id, marked),
|
||||
refs=[("osd", osd_id)],
|
||||
which_pgs=affected_pgs,
|
||||
which_osds=[osd_id],
|
||||
start_epoch=self.get_osdmap().get_epoch()
|
||||
)
|
||||
ev.pg_update(self.get("pg_dump"), self.log)
|
||||
self._events[ev.id] = ev
|
||||
r_ev.pg_update(self.get("pg_dump"), self.log)
|
||||
self._events[r_ev.id] = r_ev
|
||||
|
||||
def _osdmap_changed(self, old_osdmap, new_osdmap):
|
||||
# type: (OSDMap, OSDMap) -> None
|
||||
old_dump = old_osdmap.dump()
|
||||
new_dump = new_osdmap.dump()
|
||||
|
||||
@ -507,6 +533,8 @@ class Module(MgrModule):
|
||||
if notify_type == "osd_map":
|
||||
old_osdmap = self._latest_osdmap
|
||||
self._latest_osdmap = self.get_osdmap()
|
||||
assert old_osdmap
|
||||
assert self._latest_osdmap
|
||||
|
||||
self.log.info("Processing OSDMap change {0}..{1}".format(
|
||||
old_osdmap.get_epoch(), self._latest_osdmap.get_epoch()
|
||||
@ -521,6 +549,7 @@ class Module(MgrModule):
|
||||
self.maybe_complete(ev)
|
||||
|
||||
def maybe_complete(self, event):
|
||||
# type: (Event) -> None
|
||||
if event.progress >= 1.0:
|
||||
self._complete(event)
|
||||
|
||||
@ -597,13 +626,16 @@ class Module(MgrModule):
|
||||
self.clear_all_progress_events()
|
||||
|
||||
def update(self, ev_id, ev_msg, ev_progress, refs=None):
|
||||
# type: (str, str, float, Optional[list]) -> None
|
||||
"""
|
||||
For calling from other mgr modules
|
||||
"""
|
||||
if refs is None:
|
||||
refs = []
|
||||
try:
|
||||
|
||||
ev = self._events[ev_id]
|
||||
assert isinstance(ev, RemoteEvent)
|
||||
except KeyError:
|
||||
ev = RemoteEvent(ev_id, ev_msg, refs)
|
||||
self._events[ev_id] = ev
|
||||
@ -617,6 +649,7 @@ class Module(MgrModule):
|
||||
ev.set_message(ev_msg)
|
||||
|
||||
def _complete(self, ev):
|
||||
# type: (Event) -> None
|
||||
duration = (time.time() - ev.started_at)
|
||||
self.log.info("Completed event {0} ({1}) in {2} seconds".format(
|
||||
ev.id, ev.message, int(round(duration))
|
||||
@ -626,6 +659,7 @@ class Module(MgrModule):
|
||||
self._completed_events.append(
|
||||
GhostEvent(ev.id, ev.message, ev.refs, ev.started_at,
|
||||
failed=ev.failed, failure_message=ev.failure_message))
|
||||
assert ev.id
|
||||
del self._events[ev.id]
|
||||
self._prune_completed_events()
|
||||
self._dirty = True
|
||||
@ -636,6 +670,7 @@ class Module(MgrModule):
|
||||
"""
|
||||
try:
|
||||
ev = self._events[ev_id]
|
||||
assert isinstance(ev, RemoteEvent)
|
||||
ev.set_progress(1.0)
|
||||
self.log.info("complete: finished ev {0} ({1})".format(ev_id,
|
||||
ev.message))
|
||||
@ -651,6 +686,7 @@ class Module(MgrModule):
|
||||
"""
|
||||
try:
|
||||
ev = self._events[ev_id]
|
||||
assert isinstance(ev, RemoteEvent)
|
||||
ev.set_failed(message)
|
||||
self.log.info("fail: finished ev {0} ({1}): {2}".format(ev_id,
|
||||
ev.message,
|
||||
@ -671,9 +707,9 @@ class Module(MgrModule):
|
||||
if len(self._completed_events):
|
||||
# TODO: limit number of completed events to show
|
||||
out += "\n"
|
||||
for ev in self._completed_events:
|
||||
out += "[{0}]: {1}\n".format("Complete" if not ev.failed else "Failed",
|
||||
ev.twoline_progress())
|
||||
for ghost_ev in self._completed_events:
|
||||
out += "[{0}]: {1}\n".format("Complete" if not ghost_ev.failed else "Failed",
|
||||
ghost_ev.twoline_progress())
|
||||
|
||||
return 0, out, ""
|
||||
else:
|
||||
|
@ -2,7 +2,8 @@
|
||||
import unittest
|
||||
import os
|
||||
import sys
|
||||
from mock import Mock
|
||||
from tests import mock
|
||||
|
||||
import pytest
|
||||
import json
|
||||
os.environ['UNITTEST'] = "1"
|
||||
@ -15,7 +16,7 @@ class TestPgRecoveryEvent(object):
|
||||
def setup(self):
|
||||
# Creating the class and Mocking
|
||||
# a bunch of attributes for testing
|
||||
module._module = Mock() # just so Event._refresh() works
|
||||
module._module = mock.Mock() # just so Event._refresh() works
|
||||
self.test_event = module.PgRecoveryEvent(None, None, [module.PgId(1,i) for i in range(3)], [0], 30)
|
||||
|
||||
def test_pg_update(self):
|
||||
@ -76,7 +77,7 @@ class TestPgRecoveryEvent(object):
|
||||
]
|
||||
}
|
||||
|
||||
self.test_event.pg_update(pg_dump, Mock())
|
||||
self.test_event.pg_update(pg_dump, mock.Mock())
|
||||
assert self.test_event._progress == 1.0
|
||||
|
||||
class OSDMap:
|
||||
@ -124,13 +125,13 @@ class TestModule(object):
|
||||
# Creating the class and Mocking a
|
||||
# bunch of attributes for testing
|
||||
|
||||
module.PgRecoveryEvent.pg_update = Mock()
|
||||
module.PgRecoveryEvent.pg_update = mock.Mock()
|
||||
self.test_module = module.Module() # so we can see if an event gets created
|
||||
self.test_module.log = Mock() # we don't need to log anything
|
||||
self.test_module.get = Mock() # so we can call pg_update
|
||||
self.test_module._complete = Mock() # we want just to see if this event gets called
|
||||
self.test_module.get_osdmap = Mock() # so that self.get_osdmap().get_epoch() works
|
||||
module._module = Mock() # so that Event.refresh() works
|
||||
self.test_module.log = mock.Mock() # we don't need to log anything
|
||||
self.test_module.get = mock.Mock() # so we can call pg_update
|
||||
self.test_module._complete = mock.Mock() # we want just to see if this event gets called
|
||||
self.test_module.get_osdmap = mock.Mock() # so that self.get_osdmap().get_epoch() works
|
||||
module._module = mock.Mock() # so that Event.refresh() works
|
||||
|
||||
def test_osd_in_out(self):
|
||||
# test for the correct event being
|
@ -5,7 +5,7 @@ skipsdist = true
|
||||
[testenv]
|
||||
setenv = UNITTEST = true
|
||||
deps = -r requirements.txt
|
||||
commands = pytest -v --cov --cov-append --cov-report=term --doctest-modules {posargs:mgr_util.py tests/ cephadm/ ansible/}
|
||||
commands = pytest -v --cov --cov-append --cov-report=term --doctest-modules {posargs:mgr_util.py tests/ cephadm/ ansible/ progress/}
|
||||
|
||||
[testenv:mypy]
|
||||
basepython = python3
|
||||
@ -19,5 +19,6 @@ commands = mypy --config-file=../../mypy.ini \
|
||||
mgr_util.py \
|
||||
orchestrator.py \
|
||||
orchestrator_cli/module.py \
|
||||
progress/module.py \
|
||||
rook/module.py \
|
||||
test_orchestrator/module.py
|
||||
|
Loading…
Reference in New Issue
Block a user