diff --git a/src/pybind/mgr/progress/__init__.py b/src/pybind/mgr/progress/__init__.py new file mode 100644 index 00000000000..bd1113bf1b2 --- /dev/null +++ b/src/pybind/mgr/progress/__init__.py @@ -0,0 +1,2 @@ + +from module import * diff --git a/src/pybind/mgr/progress/module.py b/src/pybind/mgr/progress/module.py new file mode 100644 index 00000000000..cd57d12dc71 --- /dev/null +++ b/src/pybind/mgr/progress/module.py @@ -0,0 +1,426 @@ +from mgr_module import MgrModule +import threading +import datetime +import uuid + +import json + + +# How often to potentially write back any dirty events (event history +# persistence is best effort! +PERSIST_PERIOD = 5 + +ENCODING_VERSION = 1 + + +class Event(object): + """ + A generic "event" that has a start time, completion percentage, + and a list of "refs" that are (type, id) tuples describing which + objects (osds, pools) this relates to. + """ + + def __init__(self, message, refs): + self._message = message + self._refs = refs + + self.started_at = datetime.datetime.utcnow() + + self.id = None + + @property + def message(self): + return self._message + + @property + def refs(self): + return self._refs + + @property + def progress(self): + raise NotImplementedError() + + def summary(self): + return "{0} {1}".format(self.progress, self._message) + + def _progress_str(self, width): + inner_width = width - 2 + out = "[" + done_chars = int(self.progress * inner_width) + out += done_chars * '=' + out += (inner_width - done_chars) * '.' + out += "]" + + return out + + def twoline_progress(self): + """ + e.g. + + - Eating my delicious strudel + [===============..............] + + """ + return "{0}\n {1}".format( + self._message, self._progress_str(30)) + + +class GhostEvent(Event): + """ + The ghost of a completed event: these are the fields that we persist + after the event is complete. + """ + + def __init__(self, my_id, message, refs): + super(GhostEvent, self).__init__(message, refs) + self.id = my_id + + @property + def progress(self): + return 1.0 + + def encode(self): + return { + "id": self.id, + "message": self.message, + "refs": self._refs + } + + +class RemoteEvent(Event): + """ + An event that was published by another module: we know nothing about + this, rely on the other module to continuously update us with + progress information as it emerges. + """ + + def __init__(self, my_id, message, refs): + super(RemoteEvent, self).__init__(message, refs) + self.id = my_id + self._progress = 0.0 + + def set_progress(self, progress): + self._progress = progress + + @property + def progress(self): + return self._progress + + +class PgRecoveryEvent(Event): + """ + An event whose completion is determined by the recovery of a set of + PGs to a healthy state. + + Always call update() immediately after construction. + """ + + def __init__(self, message, refs, which_pgs, evactuate_osds): + super(PgRecoveryEvent, self).__init__(message, refs) + + self._pgs = which_pgs + + self._evacuate_osds = evactuate_osds + + self._original_pg_count = len(self._pgs) + + self._original_bytes_recovered = None + + self._progress = 0.0 + + self.id = str(uuid.uuid4()) + + def pg_update(self, pg_dump, log): + # FIXME: O(pg_num) in python + # FIXME: far more fields getting pythonized than we really care about + pg_to_state = dict([(p['pgid'], p) for p in pg_dump['pg_stats']]) + + if self._original_bytes_recovered is None: + self._original_bytes_recovered = {} + for pg in self._pgs: + pg_str = str(pg) + log.debug(json.dumps(pg_to_state[pg_str], indent=2)) + self._original_bytes_recovered[pg] = \ + pg_to_state[pg_str]['stat_sum']['num_bytes_recovered'] + + complete_accumulate = 0.0 + + # Calculating progress as the number of PGs recovered divided by the + # original where partially completed PGs count for something + # between 0.0-1.0. This is perhaps less faithful than lookign at the + # total number of bytes recovered, but it does a better job of + # representing the work still to do if there are a number of very + # few-bytes PGs that still need the housekeeping of their recovery + # to be done. This is subjective... + + complete = set() + for pg in self._pgs: + pg_str = str(pg) + info = pg_to_state[pg_str] + state = info['state'] + + states = state.split("+") + + unmoved = bool(set(self._evacuate_osds) & ( + set(info['up']) | set(info['acting']))) + + if "active" in states and "clean" in states and not unmoved: + complete.add(pg) + else: + # FIXME: handle apparent negative progress + # (e.g. if num_bytes_recovered goes backwards) + # FIXME: handle apparent >1.0 progress + # (num_bytes_recovered incremented by more than num_bytes) + if info['stat_sum']['num_bytes'] == 0: + # Empty PGs are considered 0% done until they are + # in the correct state. + pass + else: + recovered = info['stat_sum']['num_bytes_recovered'] + total_bytes = info['stat_sum']['num_bytes'] + ratio = float(recovered - + self._original_bytes_recovered[pg]) / \ + total_bytes + + complete_accumulate += ratio + + self._pgs = list(set(self._pgs) ^ complete) + completed_pgs = self._original_pg_count - len(self._pgs) + self._progress = (completed_pgs + complete_accumulate)\ + / self._original_pg_count + + log.info("Updated progress to {0} ({1})".format( + self._progress, self._message + )) + + @property + def progress(self): + return self._progress + + +class PgId(object): + def __init__(self, pool_id, ps): + self.pool_id = pool_id + self.ps = ps + + def __cmp__(self, other): + return (self.pool_id, self.ps) == (other.pool_id, other.ps) + + def __lt__(self, other): + return (self.pool_id, self.ps) < (other.pool_id, other.ps) + + def __str__(self): + return "{0}.{1:x}".format(self.pool_id, self.ps) + + +class Module(MgrModule): + COMMANDS = [ + {"cmd": "progress", + "desc": "Show progress of recovery operations", + "perm": "r"}, + {"cmd": "progress clear", + "desc": "Reset progress tracking", + "perm": "rw"} + ] + + def __init__(self, *args, **kwargs): + super(Module, self).__init__(*args, **kwargs) + + self._events = {} + self._completed_events = [] + + self._old_osd_map = None + + self._ready = threading.Event() + self._shutdown = threading.Event() + + self._latest_osdmap = None + + self._dirty = False + + def _osd_out(self, old_map, old_dump, new_map, osd_id): + affected_pgs = [] + for pool in old_dump['pools']: + pool_id = pool['pool'] + for ps in range(0, pool['pg_num']): + self.log.debug("pool_id, ps = {0}, {1}".format( + pool_id, ps + )) + up_acting = old_map.pg_to_up_acting_osds(pool['pool'], ps) + self.log.debug( + "up_acting: {0}".format(json.dumps(up_acting, indent=2))) + if osd_id in up_acting['up'] or osd_id in up_acting['acting']: + affected_pgs.append(PgId(pool_id, ps)) + + self.log.warn("{0} PGs affected by osd.{1} going out".format( + len(affected_pgs), osd_id)) + + if len(affected_pgs) == 0: + # Don't emit events if there were no PGs + return + + # TODO: reconcile with existing events referring to this OSD going out + ev = PgRecoveryEvent( + "Rebalancing after OSD {0} marked out".format(osd_id), + refs=[("osd", osd_id)], + which_pgs=affected_pgs, + evactuate_osds=[osd_id] + ) + ev.pg_update(self.get("pg_dump"), self.log) + self._events[ev.id] = ev + + def _osdmap_changed(self, old_osdmap, new_osdmap): + old_dump = old_osdmap.dump() + new_dump = new_osdmap.dump() + + old_osds = dict([(o['osd'], o) for o in old_dump['osds']]) + + for osd in new_dump['osds']: + osd_id = osd['osd'] + new_weight = osd['in'] + if osd_id in old_osds and new_weight == 0.0: + old_weight = old_osds[osd_id]['in'] + + if old_weight > new_weight: + self.log.warn("osd.{0} marked out".format(osd_id)) + self._osd_out(old_osdmap, old_dump, new_osdmap, osd_id) + + def notify(self, notify_type, notify_data): + self._ready.wait() + + if notify_type == "osd_map": + old_osdmap = self._latest_osdmap + self._latest_osdmap = self.get_osdmap() + + self.log.info("Processing OSDMap change {0}..{1}".format( + old_osdmap.get_epoch(), self._latest_osdmap.get_epoch() + )) + self._osdmap_changed(old_osdmap, self._latest_osdmap) + elif notify_type == "pg_summary": + data = self.get("pg_dump") + for ev_id, ev in self._events.items(): + if isinstance(ev, PgRecoveryEvent): + ev.pg_update(data, self.log) + else: + self.log.debug("Ignore notification '{0}'".format(notify_type)) + pass + + def _persist(self): + self.log.info("Writing back {0} completed events".format( + len(self._completed_events) + )) + # TODO: bound the number we store. + encoded = json.dumps({ + "events": [ev.encode() for ev in self._completed_events], + "version": ENCODING_VERSION, + "compat_version": ENCODING_VERSION + }) + self.set_store("completed", encoded) + + def _unpersist(self): + stored = self.get_store("completed") + + if stored is None: + self.log.info("No stored events to load") + return + + decoded = json.loads(stored) + if decoded['compat_version'] > ENCODING_VERSION: + raise RuntimeError("Cannot decode version {0}".format( + decoded['compat_version'])) + + for ev in decoded['events']: + self._completed_events.append(GhostEvent(ev['id'], ev['message'], ev['refs'])) + + def serve(self): + self.log.info("Loading...") + + self._unpersist() + self.log.info("Loaded {0} historic events".format(self._completed_events)) + + self._latest_osdmap = self.get_osdmap() + self.log.info("Loaded OSDMap, ready.") + + self._ready.set() + + while True: + if self._dirty: + self._persist() + self._dirty = False + + self._shutdown.wait(timeout=PERSIST_PERIOD) + + self._shutdown.wait() + + def shutdown(self): + self._shutdown.set() + + def update(self, ev_id, ev_msg, ev_progress): + """ + For calling from other mgr modules + """ + try: + ev = self._events[ev_id] + except KeyError: + ev = RemoteEvent(ev_id, ev_msg, []) + self._events[ev_id] = ev + self.log.info("update: starting ev {0} ({1})".format( + ev_id, ev_msg)) + else: + self.log.debug("update: {0} on {1}".format( + ev_progress, ev_msg)) + + ev.set_progress(ev_progress) + + def complete(self, ev_id): + """ + For calling from other mgr modules + """ + try: + ev = self._events[ev_id] + ev.set_progress(1.0) + self.log.info("complete: finished ev {0} ({1})".format(ev_id, + ev.message)) + self._completed_events.append( + GhostEvent(ev.id, ev.message, ev.refs)) + self._dirty = True + del self._events[ev_id] + + except KeyError: + self.log.warn("complete: ev {0} does not exist".format(ev_id)) + pass + + def _handle_ls(self): + if len(self._events) or len(self._completed_events): + out = "" + chrono_order = sorted(self._events.values(), + key=lambda x: x.started_at, reverse=True) + for ev in chrono_order: + out += ev.twoline_progress() + out += "\n" + + if len(self._completed_events): + # TODO: limit number of completed events to show + out += "\n" + for ev in self._completed_events: + out += "[Complete]: {0}\n".format(ev.twoline_progress()) + + return 0, out, "" + else: + return 0, "", "Nothing in progress" + + def _handle_clear(self): + self._events = {} + return 0, "", "" + + def handle_command(self, inbuf, cmd): + if cmd['prefix'] == "progress": + return self._handle_ls() + if cmd['prefix'] == "progress clear": + # The clear command isn't usually needed - it's to enable + # the admin to "kick" this module if it seems to have done + # something wrong (e.g. we have a bug causing a progress event + # that never finishes) + return self._handle_clear() + else: + raise NotImplementedError(cmd['prefix'])