diff --git a/ceph.spec.in b/ceph.spec.in index 498116212ec..4cd0e3e74e2 100644 --- a/ceph.spec.in +++ b/ceph.spec.in @@ -1657,7 +1657,6 @@ fi %{_datadir}/ceph/mgr/localpool %{_datadir}/ceph/mgr/orchestrator %{_datadir}/ceph/mgr/osd_perf_query -%{_datadir}/ceph/mgr/osd_support %{_datadir}/ceph/mgr/pg_autoscaler %{_datadir}/ceph/mgr/progress %{_datadir}/ceph/mgr/prometheus diff --git a/debian/ceph-mgr-modules-core.install b/debian/ceph-mgr-modules-core.install index bdb687a64c9..903a4d8b014 100644 --- a/debian/ceph-mgr-modules-core.install +++ b/debian/ceph-mgr-modules-core.install @@ -8,7 +8,6 @@ usr/share/ceph/mgr/iostat usr/share/ceph/mgr/localpool usr/share/ceph/mgr/orchestrator usr/share/ceph/mgr/osd_perf_query -usr/share/ceph/mgr/osd_support usr/share/ceph/mgr/pg_autoscaler usr/share/ceph/mgr/progress usr/share/ceph/mgr/prometheus diff --git a/doc/mgr/index.rst b/doc/mgr/index.rst index f6c85cec79c..d59a1369233 100644 --- a/doc/mgr/index.rst +++ b/doc/mgr/index.rst @@ -40,7 +40,6 @@ sensible. Telegraf module Telemetry module Iostat module - OSD Support module Crash module Insights module Orchestrator module diff --git a/doc/mgr/orchestrator.rst b/doc/mgr/orchestrator.rst index f5a13ee2ef8..d1d81406df8 100644 --- a/doc/mgr/orchestrator.rst +++ b/doc/mgr/orchestrator.rst @@ -199,16 +199,16 @@ In the case that you have already created the OSD's using the ``all-available-de ceph orch osd spec --service-name osd.all-available-devices --unmanaged Remove an OSD -------------------- +------------- :: - ceph orch osd rm ... [--replace] [--force] + ceph orch osd rm [--replace] [--force] -Removes one or more OSDs from the cluster. +Evacuates PGs from an OSD and removes it from the cluster. Example:: - # ceph orch osd rm 4 + # ceph orch osd rm 0 Scheduled OSD(s) for removal @@ -217,10 +217,10 @@ OSDs that are not safe-to-destroy will be rejected. You can query the state of the operation with:: # ceph orch osd rm status - NAME HOST PGS STARTED_AT - osd.7 node1 55 2020-04-22 19:28:38.785761 - osd.5 node3 3 2020-04-22 19:28:34.201685 - osd.3 node2 0 2020-04-22 19:28:34.201695 + OSD_ID HOST STATE PG_COUNT REPLACE FORCE STARTED_AT + 2 cephadm-dev done, waiting for purge 0 True False 2020-07-17 13:01:43.147684 + 3 cephadm-dev draining 17 False True 2020-07-17 13:01:45.162158 + 4 cephadm-dev started 42 False True 2020-07-17 13:01:45.162158 When no PGs are left on the osd, it will be decommissioned and removed from the cluster. @@ -229,11 +229,28 @@ When no PGs are left on the osd, it will be decommissioned and removed from the After removing an OSD, if you wipe the LVM physical volume in the device used by the removed OSD, a new OSD will be created. Read information about the ``unmanaged`` parameter in :ref:`orchestrator-cli-create-osds`. +Stopping OSD Removal +-------------------- + +You can stop the operation with + +:: + + ceph orch osd rm stop + +Example:: + + # ceph orch osd rm stop 4 + Stopped OSD(s) removal + +This will reset the initial state of the OSD and remove it from the queue. + + Replace an OSD ------------------- :: - orch osd rm ... --replace [--force] + orch osd rm --replace [--force] Example:: diff --git a/doc/mgr/osd_support.rst b/doc/mgr/osd_support.rst deleted file mode 100644 index fb1037c2b74..00000000000 --- a/doc/mgr/osd_support.rst +++ /dev/null @@ -1,61 +0,0 @@ -OSD Support Module -================== -The OSD Support module holds osd specific functionality that -is needed by different components like the orchestrators. - -In current scope: - -* osd draining - -Enabling --------- -When an orchestrator is used this should be enabled as a dependency. -(*currently only valid for the cephadm orchestrator*) - -The *osd_support* module is manually enabled with:: - - ceph mgr module enable osd_support - -Commands --------- - -Draining -######## - -This mode is for draining OSDs gracefully. `Draining` in this context means gracefully emptying out OSDs by setting their -weight to zero. An OSD is considered to be drained when no PGs are left. - -:: - - ceph osd drain $osd_id - -Takes a $osd_id and schedules it for draining. Since that process can take -quite some time, the operation will be executed in the background. To query the status -of the operation you can use: - -:: - - ceph osd drain status - -This gives you the status of all running operations in this format:: - - [{'osd_id': 0, 'pgs': 1234}, ..] - -If you wish to stop an OSD from being drained:: - - ceph osd drain stop [$osd_id] - -Stops all **scheduled** osd drain operations (not the operations that have been started already) -if no $osd_ids are given. If $osd_ids are present it only operates on them. -To stop and reset the weight of already started operations we need to save the initial weight -(see 'Ideas for improvement') - - -Ideas for improvement ----------------------- -- add health checks set_health_checks -- use objects to represent OSDs - - allows timestamps, trending information etc -- save osd drain state (at least the osd_ids in the mon store) - - resume after a mgr crash - - save the initial weight of a osd i.e. (set to initial weight on abort) diff --git a/src/mon/MgrMonitor.cc b/src/mon/MgrMonitor.cc index 17852efaf42..7651a054251 100644 --- a/src/mon/MgrMonitor.cc +++ b/src/mon/MgrMonitor.cc @@ -89,7 +89,6 @@ const static std::map> always_on_modules = { "devicehealth", "orchestrator", "rbd_support", - "osd_support", "volumes", "pg_autoscaler", "telemetry", @@ -104,7 +103,6 @@ const static std::map> always_on_modules = { "devicehealth", "orchestrator", "rbd_support", - "osd_support", "volumes", "pg_autoscaler", "telemetry", diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index dd9c56c380d..6c93a254843 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -38,7 +38,7 @@ from .services.cephadmservice import MonService, MgrService, MdsService, RgwServ RbdMirrorService, CrashService, CephadmService from .services.iscsi import IscsiService from .services.nfs import NFSService -from .services.osd import RemoveUtil, OSDRemoval, OSDService +from .services.osd import RemoveUtil, OSDQueue, OSDService, OSD, NotFoundError from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \ NodeExporterService from .schedule import HostAssignment, HostPlacementSpec @@ -318,7 +318,10 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, self.cache = HostCache(self) self.cache.load() + self.rm_util = RemoveUtil(self) + self.to_remove_osds = OSDQueue() + self.rm_util.load_from_store() self.spec_store = SpecStore(self) self.spec_store.load() @@ -495,7 +498,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, self._update_paused_health() if not self.paused: - self.rm_util._remove_osds_bg() + self.rm_util.process_removal_queue() self.migration.migrate() if self.migration.is_migration_ongoing(): @@ -564,6 +567,20 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, monmap['modified'], CEPH_DATEFMT) if self.last_monmap and self.last_monmap > datetime.datetime.utcnow(): self.last_monmap = None # just in case clocks are skewed + if notify_type == "pg_summary": + self._trigger_osd_removal() + + def _trigger_osd_removal(self): + data = self.get("osd_stats") + for osd in data.get('osd_stats', []): + if osd.get('num_pgs') == 0: + # if _ANY_ osd that is currently in the queue appears to be empty, + # start the removal process + if int(osd.get('osd')) in self.to_remove_osds.as_osd_ids(): + self.log.debug(f"Found empty osd. Starting removal process") + # if the osd that is now empty is also part of the removal queue + # start the process + self.rm_util.process_removal_queue() def pause(self): if not self.paused: @@ -2461,31 +2478,53 @@ you may want to run: """ Takes a list of OSDs and schedules them for removal. The function that takes care of the actual removal is - _remove_osds_bg(). + process_removal_queue(). """ - daemons = self.cache.get_daemons_by_service('osd') - found: Set[OSDRemoval] = set() + daemons: List[orchestrator.DaemonDescription] = self.cache.get_daemons_by_service('osd') + to_remove_daemons = list() for daemon in daemons: - if daemon.daemon_id not in osd_ids: - continue - found.add(OSDRemoval(daemon.daemon_id, replace, force, - daemon.hostname, daemon.name(), - datetime.datetime.utcnow(), -1)) + if daemon.daemon_id in osd_ids: + to_remove_daemons.append(daemon) + if not to_remove_daemons: + return f"Unable to find OSDs: {osd_ids}" - not_found = {osd_id for osd_id in osd_ids if osd_id not in [x.osd_id for x in found]} - if not_found: - raise OrchestratorError('Unable to find OSD: %s' % not_found) - - self.rm_util.queue_osds_for_removal(found) + for daemon in to_remove_daemons: + try: + self.to_remove_osds.enqueue(OSD(osd_id=int(daemon.daemon_id), + replace=replace, + force=force, + hostname=daemon.hostname, + fullname=daemon.name(), + process_started_at=datetime.datetime.utcnow(), + remove_util=self.rm_util)) + except NotFoundError: + return f"Unable to find OSDs: {osd_ids}" # trigger the serve loop to initiate the removal self._kick_serve_loop() return "Scheduled OSD(s) for removal" @trivial_completion - def remove_osds_status(self) -> Set[OSDRemoval]: + def stop_remove_osds(self, osd_ids: List[str]): + """ + Stops a `removal` process for a List of OSDs. + This will revert their weight and remove it from the osds_to_remove queue + """ + for osd_id in osd_ids: + try: + self.to_remove_osds.rm(OSD(osd_id=int(osd_id), + remove_util=self.rm_util)) + except (NotFoundError, KeyError): + return f'Unable to find OSD in the queue: {osd_id}' + + # trigger the serve loop to halt the removal + self._kick_serve_loop() + return "Stopped OSD(s) removal" + + @trivial_completion + def remove_osds_status(self): """ The CLI call to retrieve an osd removal report """ - return self.rm_util.report + return self.to_remove_osds.all_osds() diff --git a/src/pybind/mgr/cephadm/services/osd.py b/src/pybind/mgr/cephadm/services/osd.py index 21038c467b2..3a7297bdc44 100644 --- a/src/pybind/mgr/cephadm/services/osd.py +++ b/src/pybind/mgr/cephadm/services/osd.py @@ -1,4 +1,3 @@ -import datetime import json import logging from typing import List, Dict, Any, Set, Union, Tuple, cast, Optional @@ -7,6 +6,7 @@ from ceph.deployment import translate from ceph.deployment.drive_group import DriveGroupSpec from ceph.deployment.drive_selection import DriveSelection +from datetime import datetime import orchestrator from cephadm.utils import forall_hosts from orchestrator import OrchestratorError @@ -15,6 +15,7 @@ from mgr_module import MonCommandFailed from cephadm.services.cephadmservice import CephadmService, CephadmDaemonSpec logger = logging.getLogger(__name__) +DATEFMT = '%Y-%m-%dT%H:%M:%S.%f' class OSDService(CephadmService): @@ -280,139 +281,148 @@ class OSDService(CephadmService): return osd_host_map -class OSDRemoval(object): - def __init__(self, - osd_id: str, - replace: bool, - force: bool, - nodename: str, - fullname: str, - start_at: datetime.datetime, - pg_count: int): - self.osd_id = osd_id - self.replace = replace - self.force = force - self.nodename = nodename - self.fullname = fullname - self.started_at = start_at - self.pg_count = pg_count - - # needed due to changing 'started_at' attr - def __eq__(self, other): - return self.osd_id == other.osd_id - - def __hash__(self): - return hash(self.osd_id) - - def __repr__(self): - return ('(osd_id={}, replace={}, force={}, nodename={}' - ', fullname={}, started_at={}, pg_count={})').format( - self.osd_id, self.replace, self.force, self.nodename, - self.fullname, self.started_at, self.pg_count) - - @property - def pg_count_str(self) -> str: - return 'n/a' if self.pg_count < 0 else str(self.pg_count) - - class RemoveUtil(object): def __init__(self, mgr): self.mgr = mgr - self.to_remove_osds: Set[OSDRemoval] = set() - self.osd_removal_report: Dict[OSDRemoval, Union[int,str]] = dict() - @property - def report(self) -> Set[OSDRemoval]: - return self.to_remove_osds.copy() - - def queue_osds_for_removal(self, osds: Set[OSDRemoval]): - self.to_remove_osds.update(osds) - - def _remove_osds_bg(self) -> None: + def process_removal_queue(self) -> None: """ Performs actions in the _serve() loop to remove an OSD when criteria is met. """ + + # make sure that we don't run on OSDs that are not in the cluster anymore. + self.cleanup() + logger.debug( - f"{len(self.to_remove_osds)} OSDs are scheduled for removal: {list(self.to_remove_osds)}") - self._update_osd_removal_status() - remove_osds: set = self.to_remove_osds.copy() - for osd in remove_osds: + f"{self.mgr.to_remove_osds.queue_size()} OSDs are scheduled " + f"for removal: {self.mgr.to_remove_osds.all_osds()}") + + # find osds that are ok-to-stop and not yet draining + ok_to_stop_osds = self.find_osd_stop_threshold(self.mgr.to_remove_osds.idling_osds()) + if ok_to_stop_osds: + # start draining those + _ = [osd.start_draining() for osd in ok_to_stop_osds] + + # Check all osds for their state and take action (remove, purge etc) + to_remove_osds = self.mgr.to_remove_osds.all_osds() + new_queue = set() + for osd in to_remove_osds: if not osd.force: - self.drain_osd(osd.osd_id) # skip criteria - if not self.is_empty(osd.osd_id): + if not osd.is_empty: logger.info(f"OSD <{osd.osd_id}> is not empty yet. Waiting a bit more") + new_queue.add(osd) continue - if not self.ok_to_destroy([osd.osd_id]): + if not osd.safe_to_destroy(): logger.info( f"OSD <{osd.osd_id}> is not safe-to-destroy yet. Waiting a bit more") + new_queue.add(osd) continue # abort criteria - if not self.down_osd([osd.osd_id]): + if not osd.down(): # also remove it from the remove_osd list and set a health_check warning? raise orchestrator.OrchestratorError( f"Could not set OSD <{osd.osd_id}> to 'down'") if osd.replace: - if not self.destroy_osd(osd.osd_id): - # also remove it from the remove_osd list and set a health_check warning? + if not osd.destroy(): raise orchestrator.OrchestratorError( f"Could not destroy OSD <{osd.osd_id}>") else: - if not self.purge_osd(osd.osd_id): - # also remove it from the remove_osd list and set a health_check warning? + if not osd.purge(): raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>") + if not osd.exists: + continue self.mgr._remove_daemon(osd.fullname, osd.nodename) logger.info(f"Successfully removed OSD <{osd.osd_id}> on {osd.nodename}") logger.debug(f"Removing {osd.osd_id} from the queue.") - self.to_remove_osds.remove(osd) - def _update_osd_removal_status(self): - """ - Generate a OSD report that can be printed to the CLI - """ - logger.debug("Update OSD removal status") - for osd in self.to_remove_osds: - osd.pg_count = self.get_pg_count(str(osd.osd_id)) - logger.debug(f"OSD removal status: {self.to_remove_osds}") + # self.mgr.to_remove_osds could change while this is processing (osds get added from the CLI) + # The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and + # osds that were added while this method was executed' + self.mgr.to_remove_osds.intersection_update(new_queue) + self.save_to_store() - def drain_osd(self, osd_id: str) -> bool: + def cleanup(self): + # OSDs can always be cleaned up manually. This ensures that we run on existing OSDs + not_in_cluster_osds = self.mgr.to_remove_osds.not_in_cluster() + [self.mgr.to_remove_osds.remove(osd) for osd in not_in_cluster_osds] + + def get_osds_in_cluster(self) -> List[str]: + osd_map = self.mgr.get_osdmap() + return [str(x.get('osd')) for x in osd_map.dump().get('osds', [])] + + def osd_df(self) -> dict: + base_cmd = 'osd df' + ret, out, err = self.mgr.mon_command({ + 'prefix': base_cmd, + 'format': 'json' + }) + return json.loads(out) + + def get_pg_count(self, osd_id: int, osd_df: Optional[dict] = None) -> int: + if not osd_df: + osd_df = self.osd_df() + osd_nodes = osd_df.get('nodes', []) + for osd_node in osd_nodes: + if osd_node.get('id') == int(osd_id): + return osd_node.get('pgs', -1) + return -1 + + def find_osd_stop_threshold(self, osds: List["OSD"]) -> Optional[List["OSD"]]: """ - Uses `osd_support` module to schedule a drain operation of an OSD + Cut osd_id list in half until it's ok-to-stop + + :param osds: list of osd_ids + :return: list of ods_ids that can be stopped at once """ + if not osds: + return [] + while not self.ok_to_stop(osds): + if len(osds) <= 1: + # can't even stop one OSD, aborting + self.mgr.log.info("Can't even stop one OSD. Cluster is probably busy. Retrying later..") + return [] + + # This potentially prolongs the global wait time. + self.mgr.event.wait(1) + # splitting osd_ids in half until ok_to_stop yields success + # maybe popping ids off one by one is better here..depends on the cluster size I guess.. + # There's a lot of room for micro adjustments here + osds = osds[len(osds) // 2:] + return osds + + # todo start draining + # return all([osd.start_draining() for osd in osds]) + + def ok_to_stop(self, osds: List["OSD"]) -> bool: cmd_args = { - 'prefix': 'osd drain', - 'osd_ids': [int(osd_id)] + 'prefix': "osd ok-to-stop", + 'ids': [str(osd.osd_id) for osd in osds] } return self._run_mon_cmd(cmd_args) - def get_pg_count(self, osd_id: str) -> int: - """ Queries for PG count of an OSD """ - self.mgr.log.debug("Querying for drain status") + def set_osd_flag(self, osds: List["OSD"], flag: str) -> bool: + base_cmd = f"osd {flag}" + self.mgr.log.debug(f"running cmd: {base_cmd} on ids {osds}") ret, out, err = self.mgr.mon_command({ - 'prefix': 'osd drain status', + 'prefix': base_cmd, + 'ids': [str(osd.osd_id) for osd in osds] }) if ret != 0: - self.mgr.log.error(f"Calling osd drain status failed with {err}") - raise OrchestratorError("Could not query `osd drain status`") - out = json.loads(out) - for o in out: - if str(o.get('osd_id', '')) == str(osd_id): - return int(o.get('pgs', -1)) - return -1 + self.mgr.log.error(f"Could not set <{flag}> flag for osds: {osds}. <{err}>") + return False + self.mgr.log.info(f"OSDs <{osds}> are now <{flag}>") + return True - def is_empty(self, osd_id: str) -> bool: - """ Checks if an OSD is empty """ - return self.get_pg_count(osd_id) == 0 - - def ok_to_destroy(self, osd_ids: List[int]) -> bool: + def safe_to_destroy(self, osd_ids: List[int]) -> bool: """ Queries the safe-to-destroy flag for OSDs """ cmd_args = {'prefix': 'osd safe-to-destroy', - 'ids': osd_ids} + 'ids': [str(x) for x in osd_ids]} return self._run_mon_cmd(cmd_args) def destroy_osd(self, osd_id: int) -> bool: @@ -422,14 +432,6 @@ class RemoveUtil(object): 'yes_i_really_mean_it': True} return self._run_mon_cmd(cmd_args) - def down_osd(self, osd_ids: List[int]) -> bool: - """ Sets `out` flag to OSDs """ - cmd_args = { - 'prefix': 'osd down', - 'ids': osd_ids, - } - return self._run_mon_cmd(cmd_args) - def purge_osd(self, osd_id: int) -> bool: """ Purges an OSD from the cluster (forcefully) """ cmd_args = { @@ -439,14 +441,6 @@ class RemoveUtil(object): } return self._run_mon_cmd(cmd_args) - def out_osd(self, osd_ids: List[int]) -> bool: - """ Sets `down` flag to OSDs """ - cmd_args = { - 'prefix': 'osd out', - 'ids': osd_ids, - } - return self._run_mon_cmd(cmd_args) - def _run_mon_cmd(self, cmd_args: dict) -> bool: """ Generic command to run mon_command and evaluate/log the results @@ -458,3 +452,237 @@ class RemoveUtil(object): return False self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}") return True + + def save_to_store(self): + osd_queue = [osd.to_json() for osd in self.mgr.to_remove_osds.all_osds()] + logger.debug(f"Saving {osd_queue} to store") + self.mgr.set_store('osd_remove_queue', json.dumps(osd_queue)) + + def load_from_store(self): + for k, v in self.mgr.get_store_prefix('osd_remove_queue').items(): + for osd in json.loads(v): + logger.debug(f"Loading osd ->{osd} from store") + osd_obj = OSD.from_json(json.loads(osd), ctx=self) + self.mgr.to_remove_osds.add(osd_obj) + + +class NotFoundError(Exception): + pass + + +class OSD: + + def __init__(self, + osd_id: int, + remove_util: RemoveUtil, + drain_started_at: Optional[datetime] = None, + process_started_at: Optional[datetime] = None, + drain_stopped_at: Optional[datetime] = None, + drain_done_at: Optional[datetime] = None, + draining: bool = False, + started: bool = False, + stopped: bool = False, + replace: bool = False, + force: bool = False, + hostname: Optional[str] = None, + fullname: Optional[str] = None, + ): + # the ID of the OSD + self.osd_id = osd_id + + # when did process (not the actual draining) start + self.process_started_at = process_started_at + + # when did the drain start + self.drain_started_at = drain_started_at + + # when did the drain stop + self.drain_stopped_at = drain_stopped_at + + # when did the drain finish + self.drain_done_at = drain_done_at + + # did the draining start + self.draining = draining + + # was the operation started + self.started = started + + # was the operation stopped + self.stopped = stopped + + # If this is a replace or remove operation + self.replace = replace + # If we wait for the osd to be drained + self.force = force + # The name of the node + self.nodename = hostname + # The full name of the osd + self.fullname = fullname + + # mgr obj to make mgr/mon calls + self.rm_util = remove_util + + def start(self) -> None: + if self.started: + logger.debug(f"Already started draining {self}") + return None + self.started = True + self.stopped = False + + def start_draining(self) -> bool: + if self.stopped: + logger.debug(f"Won't start draining {self}. OSD draining is stopped.") + return False + self.rm_util.set_osd_flag([self], 'out') + self.drain_started_at = datetime.utcnow() + self.draining = True + logger.debug(f"Started draining {self}.") + return True + + def stop_draining(self) -> bool: + self.rm_util.set_osd_flag([self], 'in') + self.drain_stopped_at = datetime.utcnow() + self.draining = False + logger.debug(f"Stopped draining {self}.") + return True + + def stop(self) -> None: + if self.stopped: + logger.debug(f"Already stopped draining {self}") + return None + self.started = False + self.stopped = True + self.stop_draining() + + @property + def is_draining(self) -> bool: + """ + Consider an OSD draining when it is + actively draining but not yet empty + """ + return self.draining and not self.is_empty + + @property + def is_ok_to_stop(self) -> bool: + return self.rm_util.ok_to_stop([self]) + + @property + def is_empty(self) -> bool: + if self.get_pg_count() == 0: + if not self.drain_done_at: + self.drain_done_at = datetime.utcnow() + self.draining = False + return True + return False + + def safe_to_destroy(self) -> bool: + return self.rm_util.safe_to_destroy([self.osd_id]) + + def down(self) -> bool: + return self.rm_util.set_osd_flag([self], 'down') + + def destroy(self) -> bool: + return self.rm_util.destroy_osd(self.osd_id) + + def purge(self) -> bool: + return self.rm_util.purge_osd(self.osd_id) + + def get_pg_count(self) -> int: + return self.rm_util.get_pg_count(self.osd_id) + + @property + def exists(self) -> bool: + return str(self.osd_id) in self.rm_util.get_osds_in_cluster() + + def drain_status_human(self): + default_status = 'not started' + status = 'started' if self.started and not self.draining else default_status + status = 'draining' if self.draining else status + status = 'done, waiting for purge' if self.drain_done_at and not self.draining else status + return status + + def pg_count_str(self): + return 'n/a' if self.get_pg_count() < 0 else str(self.get_pg_count()) + + def to_json(self) -> str: + out = dict() + out['osd_id'] = self.osd_id + out['started'] = self.started + out['draining'] = self.draining + out['stopped'] = self.stopped + out['replace'] = self.replace + out['force'] = self.force + out['nodename'] = self.nodename # type: ignore + + for k in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']: + if getattr(self, k): + out[k] = getattr(self, k).strftime(DATEFMT) + else: + out[k] = getattr(self, k) + return json.dumps(out) + + @classmethod + def from_json(cls, inp: Optional[Dict[str, Any]], ctx: Optional[RemoveUtil] = None) -> Optional["OSD"]: + if not inp: + return None + for date_field in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']: + if inp.get(date_field): + inp.update({date_field: datetime.strptime(inp.get(date_field, ''), DATEFMT)}) + inp.update({'remove_util': ctx}) + return cls(**inp) + + def __hash__(self): + return hash(self.osd_id) + + def __eq__(self, other: object) -> bool: + if not isinstance(other, OSD): + return NotImplemented + return self.osd_id == other.osd_id + + def __repr__(self) -> str: + return f"(osd_id={self.osd_id}, is_draining={self.is_draining})" + + +class OSDQueue(Set): + + def __init__(self): + super().__init__() + + def as_osd_ids(self) -> List[int]: + return [osd.osd_id for osd in self] + + def queue_size(self) -> int: + return len(self) + + def draining_osds(self) -> List["OSD"]: + return [osd for osd in self if osd.is_draining] + + def idling_osds(self) -> List["OSD"]: + return [osd for osd in self if not osd.is_draining and not osd.is_empty] + + def empty_osds(self) -> List["OSD"]: + return [osd for osd in self if osd.is_empty] + + def all_osds(self) -> List["OSD"]: + return [osd for osd in self] + + def not_in_cluster(self) -> List["OSD"]: + return [osd for osd in self if not osd.exists] + + def enqueue(self, osd: "OSD") -> None: + if not osd.exists: + raise NotFoundError() + self.add(osd) + osd.start() + + def rm(self, osd: "OSD") -> None: + if not osd.exists: + raise NotFoundError() + osd.stop() + try: + logger.debug(f'Removing {osd} from the queue.') + self.remove(osd) + except KeyError: + logger.debug(f"Could not find {osd} in queue.") + raise KeyError diff --git a/src/pybind/mgr/cephadm/tests/fixtures.py b/src/pybind/mgr/cephadm/tests/fixtures.py index 2eac632a959..a14917595eb 100644 --- a/src/pybind/mgr/cephadm/tests/fixtures.py +++ b/src/pybind/mgr/cephadm/tests/fixtures.py @@ -12,11 +12,11 @@ except ImportError: import pytest from cephadm import CephadmOrchestrator +from cephadm.services.osd import RemoveUtil, OSD from orchestrator import raise_if_exception, Completion, HostSpec from tests import mock - def get_ceph_option(_, key): return __file__ @@ -43,9 +43,11 @@ def with_cephadm_module(module_options=None, store=None): :param store: Set the store before module.__init__ is called """ with mock.patch("cephadm.module.CephadmOrchestrator.get_ceph_option", get_ceph_option),\ - mock.patch("cephadm.module.CephadmOrchestrator.remote"),\ - mock.patch("cephadm.module.CephadmOrchestrator.send_command"), \ - mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command): + mock.patch("cephadm.module.CephadmOrchestrator.remote"), \ + mock.patch("cephadm.services.osd.RemoveUtil._run_mon_cmd"), \ + mock.patch("cephadm.module.CephadmOrchestrator.send_command"), \ + mock.patch("cephadm.module.CephadmOrchestrator.get_osdmap"), \ + mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command): m = CephadmOrchestrator.__new__ (CephadmOrchestrator) if module_options is not None: @@ -72,6 +74,21 @@ def cephadm_module(): yield m +@pytest.yield_fixture() +def rm_util(): + with with_cephadm_module({}) as m: + r = RemoveUtil.__new__(RemoveUtil) + r.__init__(m) + yield r + + +@pytest.yield_fixture() +def osd_obj(): + with mock.patch("cephadm.services.osd.RemoveUtil"): + o = OSD(0, mock.MagicMock()) + yield o + + def wait(m, c): # type: (CephadmOrchestrator, Completion) -> Any m.process([c]) diff --git a/src/pybind/mgr/cephadm/tests/test_cephadm.py b/src/pybind/mgr/cephadm/tests/test_cephadm.py index 77068d2a910..dc60f199656 100644 --- a/src/pybind/mgr/cephadm/tests/test_cephadm.py +++ b/src/pybind/mgr/cephadm/tests/test_cephadm.py @@ -6,7 +6,7 @@ from unittest.mock import ANY import pytest from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection -from cephadm.services.osd import OSDRemoval +from cephadm.services.osd import OSD, OSDQueue try: from typing import Any, List @@ -362,6 +362,7 @@ class TestCephadm(object): ) ]) )) + @mock.patch("cephadm.services.osd.OSD.exists", True) @mock.patch("cephadm.services.osd.RemoveUtil.get_pg_count", lambda _, __: 0) def test_remove_osds(self, cephadm_module): with with_host(cephadm_module, 'test'): @@ -372,14 +373,20 @@ class TestCephadm(object): out = wait(cephadm_module, c) assert out == ["Removed osd.0 from host 'test'"] - osd_removal_op = OSDRemoval(0, False, False, 'test', 'osd.0', datetime.datetime.utcnow(), -1) - cephadm_module.rm_util.queue_osds_for_removal({osd_removal_op}) - cephadm_module.rm_util._remove_osds_bg() - assert cephadm_module.rm_util.to_remove_osds == set() + cephadm_module.to_remove_osds.enqueue(OSD(osd_id=0, + replace=False, + force=False, + hostname='test', + fullname='osd.0', + process_started_at=datetime.datetime.utcnow(), + remove_util=cephadm_module.rm_util + )) + cephadm_module.rm_util.process_removal_queue() + assert cephadm_module.to_remove_osds == OSDQueue() c = cephadm_module.remove_osds_status() out = wait(cephadm_module, c) - assert out == set() + assert out == [] @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}')) @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _,__,___: None) diff --git a/src/pybind/mgr/cephadm/tests/test_osd_removal.py b/src/pybind/mgr/cephadm/tests/test_osd_removal.py new file mode 100644 index 00000000000..0b4b0cd506c --- /dev/null +++ b/src/pybind/mgr/cephadm/tests/test_osd_removal.py @@ -0,0 +1,219 @@ +from cephadm.services.osd import RemoveUtil, OSDQueue, OSD +import pytest +from .fixtures import rm_util, osd_obj +from tests import mock +from datetime import datetime + + +class MockOSD: + + def __init__(self, osd_id): + self.osd_id = osd_id + +class TestOSDRemoval: + + @pytest.mark.parametrize( + "osd_id, osd_df, expected", + [ + # missing 'nodes' key + (1, dict(nodes=[]), -1), + # missing 'pgs' key + (1, dict(nodes=[dict(id=1)]), -1), + # id != osd_id + (1, dict(nodes=[dict(id=999, pgs=1)]), -1), + # valid + (1, dict(nodes=[dict(id=1, pgs=1)]), 1), + ] + ) + def test_get_pg_count(self, rm_util, osd_id, osd_df, expected): + with mock.patch("cephadm.services.osd.RemoveUtil.osd_df", return_value=osd_df): + assert rm_util.get_pg_count(osd_id) == expected + + @pytest.mark.parametrize( + "osds, ok_to_stop, expected", + [ + # no osd_ids provided + ([], [False], []), + # all osds are ok_to_stop + ([1, 2], [True], [1, 2]), + # osds are ok_to_stop after the second iteration + ([1, 2], [False, True], [2]), + # osds are never ok_to_stop, (taking the sample size `(len(osd_ids))` into account), + # expected to get False + ([1, 2], [False, False], []), + ] + ) + def test_find_stop_threshold(self, rm_util, osds, ok_to_stop, expected): + with mock.patch("cephadm.services.osd.RemoveUtil.ok_to_stop", side_effect=ok_to_stop): + assert rm_util.find_osd_stop_threshold(osds) == expected + + def test_process_removal_queue(self, rm_util): + # TODO: ! + # rm_util.process_removal_queue() + pass + + def test_ok_to_stop(self, rm_util): + rm_util.ok_to_stop([MockOSD(1)]) + rm_util._run_mon_cmd.assert_called_with({'prefix': 'osd ok-to-stop', 'ids': ['1']}) + + def test_safe_to_destroy(self, rm_util): + rm_util.safe_to_destroy([1]) + rm_util._run_mon_cmd.assert_called_with({'prefix': 'osd safe-to-destroy', 'ids': ['1']}) + + def test_destroy_osd(self, rm_util): + rm_util.destroy_osd(1) + rm_util._run_mon_cmd.assert_called_with({'prefix': 'osd destroy-actual', 'id': 1, 'yes_i_really_mean_it': True}) + + def test_purge_osd(self, rm_util): + rm_util.purge_osd(1) + rm_util._run_mon_cmd.assert_called_with({'prefix': 'osd purge-actual', 'id': 1, 'yes_i_really_mean_it': True}) + + +class TestOSD: + + def test_start(self, osd_obj): + assert osd_obj.started is False + osd_obj.start() + assert osd_obj.started is True + assert osd_obj.stopped is False + + def test_start_draining(self, osd_obj): + assert osd_obj.draining is False + assert osd_obj.drain_started_at is None + ret = osd_obj.start_draining() + osd_obj.rm_util.set_osd_flag.assert_called_with([osd_obj], 'out') + assert isinstance(osd_obj.drain_started_at, datetime) + assert osd_obj.draining is True + assert ret is True + + def test_start_draining_stopped(self, osd_obj): + osd_obj.stopped = True + ret = osd_obj.start_draining() + assert osd_obj.drain_started_at is None + assert ret is False + assert osd_obj.draining is False + + def test_stop_draining(self, osd_obj): + ret = osd_obj.stop_draining() + osd_obj.rm_util.set_osd_flag.assert_called_with([osd_obj], 'in') + assert isinstance(osd_obj.drain_stopped_at, datetime) + assert osd_obj.draining is False + assert ret is True + + @mock.patch('cephadm.services.osd.OSD.stop_draining') + def test_stop(self, stop_draining_mock, osd_obj): + ret = osd_obj.stop() + assert osd_obj.started is False + assert osd_obj.stopped is True + stop_draining_mock.assert_called_once() + + @pytest.mark.parametrize( + "draining, empty, expected", + [ + # must be !draining! and !not empty! to yield True + (True, not True, True), + # not draining and not empty + (False, not True, False), + # not draining and empty + (False, True, False), + # draining and empty + (True, True, False), + ] + ) + def test_is_draining(self, osd_obj, draining, empty, expected): + with mock.patch("cephadm.services.osd.OSD.is_empty", new_callable=mock.PropertyMock(return_value=empty)): + osd_obj.draining = draining + assert osd_obj.is_draining is expected + + @mock.patch("cephadm.services.osd.RemoveUtil.ok_to_stop") + def test_is_ok_to_stop(self, _, osd_obj): + ret = osd_obj.is_ok_to_stop + osd_obj.rm_util.ok_to_stop.assert_called_once() + + @pytest.mark.parametrize( + "pg_count, expected", + [ + (0, True), + (1, False), + (9999, False), + (-1, False), + ] + ) + def test_is_empty(self, osd_obj, pg_count, expected): + with mock.patch("cephadm.services.osd.OSD.get_pg_count", return_value=pg_count): + assert osd_obj.is_empty is expected + + @mock.patch("cephadm.services.osd.RemoveUtil.safe_to_destroy") + def test_safe_to_destroy(self, _, osd_obj): + ret = osd_obj.safe_to_destroy() + osd_obj.rm_util.safe_to_destroy.assert_called_once() + + @mock.patch("cephadm.services.osd.RemoveUtil.set_osd_flag") + def test_down(self, _, osd_obj): + ret = osd_obj.down() + osd_obj.rm_util.set_osd_flag.assert_called_with([osd_obj], 'down') + + @mock.patch("cephadm.services.osd.RemoveUtil.destroy_osd") + def test_destroy_osd(self, _, osd_obj): + ret = osd_obj.destroy() + osd_obj.rm_util.destroy_osd.assert_called_once() + + @mock.patch("cephadm.services.osd.RemoveUtil.purge_osd") + def test_purge(self, _, osd_obj): + ret = osd_obj.purge() + osd_obj.rm_util.purge_osd.assert_called_once() + + @mock.patch("cephadm.services.osd.RemoveUtil.get_pg_count") + def test_pg_count(self, _, osd_obj): + ret = osd_obj.get_pg_count() + osd_obj.rm_util.get_pg_count.assert_called_once() + + def test_drain_status_human_not_started(self, osd_obj): + assert osd_obj.drain_status_human() == 'not started' + + def test_drain_status_human_started(self, osd_obj): + osd_obj.started = True + assert osd_obj.drain_status_human() == 'started' + + def test_drain_status_human_draining(self, osd_obj): + osd_obj.started = True + osd_obj.draining = True + assert osd_obj.drain_status_human() == 'draining' + + def test_drain_status_human_done(self, osd_obj): + osd_obj.started = True + osd_obj.draining = False + osd_obj.drain_done_at = datetime.utcnow() + assert osd_obj.drain_status_human() == 'done, waiting for purge' + + +class TestOSDQueue: + + def test_queue_size(self, osd_obj): + q = OSDQueue() + assert q.queue_size() == 0 + q.add(osd_obj) + assert q.queue_size() == 1 + + @mock.patch("cephadm.services.osd.OSD.start") + @mock.patch("cephadm.services.osd.OSD.exists") + def test_enqueue(self, exist, start, osd_obj): + q = OSDQueue() + q.enqueue(osd_obj) + osd_obj.start.assert_called_once() + + @mock.patch("cephadm.services.osd.OSD.stop") + @mock.patch("cephadm.services.osd.OSD.exists") + def test_rm_raise(self, exist, stop, osd_obj): + q = OSDQueue() + with pytest.raises(KeyError): + q.rm(osd_obj) + osd_obj.stop.assert_called_once() + + @mock.patch("cephadm.services.osd.OSD.stop") + @mock.patch("cephadm.services.osd.OSD.exists") + def test_rm(self, exist, stop, osd_obj): + q = OSDQueue() + q.add(osd_obj) + q.rm(osd_obj) + osd_obj.stop.assert_called_once() diff --git a/src/pybind/mgr/orchestrator/_interface.py b/src/pybind/mgr/orchestrator/_interface.py index 667cb7bc659..ef6325a30e8 100644 --- a/src/pybind/mgr/orchestrator/_interface.py +++ b/src/pybind/mgr/orchestrator/_interface.py @@ -1017,6 +1017,12 @@ class Orchestrator(object): """ raise NotImplementedError() + def stop_remove_osds(self, osd_ids: List[str]) -> Completion: + """ + TODO + """ + raise NotImplementedError() + def remove_osds_status(self): # type: () -> Completion """ diff --git a/src/pybind/mgr/orchestrator/module.py b/src/pybind/mgr/orchestrator/module.py index f1000fe18d1..024a3a8c533 100644 --- a/src/pybind/mgr/orchestrator/module.py +++ b/src/pybind/mgr/orchestrator/module.py @@ -856,35 +856,53 @@ Usage: "name=replace,type=CephBool,req=false " "name=force,type=CephBool,req=false", 'Remove OSD services') - def _osd_rm(self, svc_id: List[str], - replace: bool = False, - force: bool = False) -> HandleCommandResult: - completion = self.remove_osds(svc_id, replace, force) + def _osd_rm_start(self, + svc_id: List[str], + replace: bool = False, + force: bool = False) -> HandleCommandResult: + completion = self.remove_osds(svc_id, replace=replace, force=force) + self._orchestrator_wait([completion]) + raise_if_exception(completion) + return HandleCommandResult(stdout=completion.result_str()) + + @_cli_write_command( + 'orch osd rm stop', + "name=svc_id,type=CephString,n=N", + 'Remove OSD services') + def _osd_rm_stop(self, svc_id: List[str]) -> HandleCommandResult: + completion = self.stop_remove_osds(svc_id) self._orchestrator_wait([completion]) raise_if_exception(completion) return HandleCommandResult(stdout=completion.result_str()) @_cli_write_command( 'orch osd rm status', + "name=format,type=CephChoices,strings=plain|json|json-pretty|yaml,req=false", desc='status of OSD removal operation') - def _osd_rm_status(self) -> HandleCommandResult: + def _osd_rm_status(self, format='plain') -> HandleCommandResult: completion = self.remove_osds_status() self._orchestrator_wait([completion]) raise_if_exception(completion) report = completion.result + if not report: return HandleCommandResult(stdout="No OSD remove/replace operations reported") - table = PrettyTable( - ['NAME', 'HOST', 'PGS', 'STARTED_AT'], - border=False) - table.align = 'l' - table.left_padding_width = 0 - table.right_padding_width = 1 - # TODO: re-add sorted and sort by pg_count - for osd in report: - table.add_row((osd.fullname, osd.nodename, osd.pg_count_str, osd.started_at)) - return HandleCommandResult(stdout=table.get_string()) + if format != 'plain': + out = to_format(report, format, many=True, cls=None) + else: + table = PrettyTable( + ['OSD_ID', 'HOST', 'STATE', 'PG_COUNT', 'REPLACE', 'FORCE', 'DRAIN_STARTED_AT'], + border=False) + table.align = 'l' + table.left_padding_width = 0 + table.right_padding_width = 2 + for osd in sorted(report, key=lambda o: o.osd_id): + table.add_row([osd.osd_id, osd.nodename, osd.drain_status_human(), + osd.get_pg_count(), osd.replace, osd.replace, osd.drain_started_at]) + out = table.get_string() + + return HandleCommandResult(stdout=out) @_cli_write_command( 'orch daemon add', diff --git a/src/pybind/mgr/osd_support/__init__.py b/src/pybind/mgr/osd_support/__init__.py deleted file mode 100644 index f4f9e26e064..00000000000 --- a/src/pybind/mgr/osd_support/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -import os - -if 'UNITTEST' in os.environ: - import tests - tests.mock_ceph_modules() # type: ignore - -from .module import OSDSupport diff --git a/src/pybind/mgr/osd_support/module.py b/src/pybind/mgr/osd_support/module.py deleted file mode 100644 index a219ca868cc..00000000000 --- a/src/pybind/mgr/osd_support/module.py +++ /dev/null @@ -1,306 +0,0 @@ -from typing import List, Set, Optional -from mgr_module import MgrModule, HandleCommandResult -from threading import Event -import json -import errno - - -class OSDSupport(MgrModule): - # these are CLI commands we implement - COMMANDS = [ - { - "cmd": "osd drain name=osd_ids,type=CephInt,req=true,n=N", - "desc": "drain osd ids", - "perm": "r" - }, - { - "cmd": "osd drain status", - "desc": "show status", - "perm": "r" - }, - { - "cmd": "osd drain stop name=osd_ids,type=CephInt,req=false,n=N", - "desc": "show status for osds. Stopping all if osd_ids are omitted", - "perm": "r" - }, - ] - - MODULE_OPTIONS: List[dict] = [] - - # These are "native" Ceph options that this module cares about. - NATIVE_OPTIONS: List[str] = [] - - osd_ids: Set[int] = set() - emptying_osds: Set[int] = set() - check_osds: Set[int] = set() - empty: Set[int] = set() - - def __init__(self, *args, **kwargs): - super(OSDSupport, self).__init__(*args, **kwargs) - - # set up some members to enable the serve() method and shutdown() - self.run = True - self.event = Event() - - # ensure config options members are initialized; see config_notify() - self.config_notify() - - def config_notify(self): - """ - This method is called whenever one of our config options is changed. - """ - # This is some boilerplate that stores MODULE_OPTIONS in a class - # member, so that, for instance, the 'emphatic' option is always - # available as 'self.emphatic'. - for opt in self.MODULE_OPTIONS: - setattr(self, - opt['name'], - self.get_module_option(opt['name'])) - self.log.debug(' mgr option %s = %s', - opt['name'], getattr(self, opt['name'])) - # Do the same for the native options. - for _opt in self.NATIVE_OPTIONS: - setattr(self, - _opt, - self.get_ceph_option(_opt)) - self.log.debug('native option %s = %s', _opt, getattr(self, _opt)) - - def handle_command(self, inbuf, cmd): - ret = 0 - err = '' - _ = inbuf - cmd_prefix = cmd.get('prefix', '') - osd_ids: List[int] = cmd.get('osd_ids', list()) - not_found_osds: Set[int] = self.osds_not_in_cluster(osd_ids) - if cmd_prefix == 'osd drain': - if not_found_osds: - return -errno.EINVAL, '', f"OSDs <{not_found_osds}> not found in cluster" - # add osd_ids to set - for osd_id in osd_ids: - if osd_id not in self.emptying_osds: - self.osd_ids.add(osd_id) - self.log.info(f'Found OSD(s) <{self.osd_ids}> in the queue.') - out = 'Started draining OSDs. Query progress with ' - - elif cmd_prefix == 'osd drain status': - # re-initialize it with an empty set on invocation (long running processes) - self.check_osds = set() - # assemble a set of emptying osds and to_be_emptied osds - self.check_osds.update(self.emptying_osds) - self.check_osds.update(self.osd_ids) - self.check_osds.update(self.empty) - - report = list() - for osd_id in self.check_osds: - pgs = self.get_pg_count(osd_id) - report.append(dict(osd_id=osd_id, pgs=pgs)) - out = f"{json.dumps(report)}" - - elif cmd_prefix == 'osd drain stop': - if not osd_ids: - self.log.debug("No osd_ids provided, stop all pending drain operations)") - self.osd_ids = set() - self.emptying_osds = set() - - # this is just a poor-man's solution as it will not really stop draining - # the osds. It will just stop the queries and also prevent any scheduled OSDs - # from getting drained at a later point in time. - out = "Stopped all future draining operations (not resetting the weight for already reweighted OSDs)" - - else: - if not_found_osds: - return -errno.EINVAL, '', f"OSDs <{not_found_osds}> not found in cluster" - - self.osd_ids = self.osd_ids.difference(osd_ids) - self.emptying_osds = self.emptying_osds.difference(osd_ids) - out = f"Stopped draining operations for OSD(s): {osd_ids}" - - else: - return HandleCommandResult( - retval=-errno.EINVAL, - stdout='', - stderr=f"Command not found <{cmd.get('prefix', '')}>") - return HandleCommandResult( - retval=ret, # exit code - stdout=out, # stdout - stderr=err) - - def serve(self): - """ - This method is called by the mgr when the module starts and can be - used for any background activity. - """ - self.log.info("Starting mgr/osd_support") - while self.run: - - self.log.debug(f"Scheduled for draining: <{self.osd_ids}>") - self.log.debug(f"Currently being drained: <{self.emptying_osds}>") - # the state should be saved to the mon store in the actual call and - # then retrieved in serve() probably - - # 1) check if all provided osds can be stopped, if not, shrink list until ok-to-stop - for x in self.find_osd_stop_threshold(self.osd_ids): - self.emptying_osds.add(x) - - # remove the emptying osds from the osd_ids since they don't need to be checked again. - self.osd_ids = self.osd_ids.difference(self.emptying_osds) - - # 2) reweight the ok-to-stop osds, ONCE - self.reweight_osds(self.emptying_osds) - - # 3) check for osds to be empty - empty_osds = self.empty_osds(self.emptying_osds) - - # remove osds that are marked as empty - self.emptying_osds = self.emptying_osds.difference(empty_osds) - - # move empty osds in the done queue until they disappear from ceph's view - # other modules need to know when OSDs are empty - for osd in empty_osds: - self.log.debug(f"Adding {osd} to list of empty OSDs") - self.empty.add(osd) - - # remove from queue if no longer part of ceph cluster - self.cleanup() - - # fixed sleep interval of 10 seconds - sleep_interval = 10 - self.log.debug('Sleeping for %d seconds', sleep_interval) - self.event.wait(sleep_interval) - self.event.clear() - - def cleanup(self): - """ - Remove OSDs that are no longer in the ceph cluster from the - 'done' list. - :return: - """ - for osd in self.osds_not_in_cluster(list(self.empty)): - self.log.info(f"OSD: {osd} is not found in the cluster anymore. Removing") - self.empty.remove(osd) - - def shutdown(self): - """ - This method is called by the mgr when the module needs to shut - down (i.e., when the serve() function needs to exit). - """ - self.log.info('Stopping') - self.run = False - self.event.set() - - def get_osds_in_cluster(self) -> List[str]: - osd_map = self.get_osdmap() - return [x.get('osd') for x in osd_map.dump().get('osds', [])] - - def osds_not_in_cluster(self, osd_ids: List[int]) -> Set[int]: - self.log.info(f"Checking if provided osds <{osd_ids}> exist in the cluster") - cluster_osds = self.get_osds_in_cluster() - not_in_cluster = set() - for osd_id in osd_ids: - if int(osd_id) not in cluster_osds: - self.log.error(f"Could not find {osd_id} in cluster") - not_in_cluster.add(osd_id) - return not_in_cluster - - def empty_osds(self, osd_ids: Set[int]) -> List[int]: - if not osd_ids: - return list() - osd_df_data = self.osd_df() - empty_osds = list() - for osd_id in osd_ids: - if self.is_empty(osd_id, osd_df=osd_df_data): - empty_osds.append(osd_id) - return empty_osds - - def osd_df(self) -> dict: - # TODO: this should be cached I think - base_cmd = 'osd df' - ret, out, err = self.mon_command({ - 'prefix': base_cmd, - 'format': 'json' - }) - return json.loads(out) - - def is_empty(self, osd_id: int, osd_df: Optional[dict] = None) -> bool: - pgs = self.get_pg_count(osd_id, osd_df=osd_df) - if pgs != 0: - self.log.info(f"osd: {osd_id} still has {pgs} PGs.") - return False - self.log.info(f"osd: {osd_id} has no PGs anymore") - return True - - def reweight_osds(self, osd_ids: Set[int]) -> bool: - results = [(self.reweight_osd(osd_id)) for osd_id in osd_ids] - return all(results) - - def get_pg_count(self, osd_id: int, osd_df: Optional[dict] = None) -> int: - if not osd_df: - osd_df = self.osd_df() - osd_nodes = osd_df.get('nodes', []) - for osd_node in osd_nodes: - if osd_node.get('id') == int(osd_id): - return osd_node.get('pgs', -1) - return -1 - - def get_osd_weight(self, osd_id: int) -> float: - osd_df = self.osd_df() - osd_nodes = osd_df.get('nodes', []) - for osd_node in osd_nodes: - if osd_node.get('id') == int(osd_id): - if 'crush_weight' in osd_node: - return float(osd_node.get('crush_weight')) - return -1.0 - - def reweight_osd(self, osd_id: int, weight: float = 0.0) -> bool: - if self.get_osd_weight(osd_id) == weight: - self.log.debug(f"OSD <{osd_id}> is already weighted with: {weight}") - return True - base_cmd = 'osd crush reweight' - ret, out, err = self.mon_command({ - 'prefix': base_cmd, - 'name': f'osd.{osd_id}', - 'weight': weight - }) - cmd = f"{base_cmd} on osd.{osd_id} to weight: {weight}" - self.log.debug(f"running cmd: {cmd}") - if ret != 0: - self.log.error(f"command: {cmd} failed with: {err}") - return False - self.log.info(f"command: {cmd} succeeded with: {out}") - return True - - def find_osd_stop_threshold(self, osd_ids: Set[int]) -> Set[int]: - """ - Cut osd_id list in half until it's ok-to-stop - - :param osd_ids: list of osd_ids - :return: list of ods_ids that can be stopped at once - """ - if not osd_ids: - return set() - _osds: List[int] = list(osd_ids.copy()) - while not self.ok_to_stop(_osds): - if len(_osds) <= 1: - # can't even stop one OSD, aborting - self.log.info("Can't even stop one OSD. Cluster is probably busy. Retrying later..") - return set() - self.event.wait(1) - # splitting osd_ids in half until ok_to_stop yields success - # maybe popping ids off one by one is better here..depends on the cluster size I guess.. - # There's a lot of room for micro adjustments here - _osds = _osds[len(_osds)//2:] - return set(_osds) - - def ok_to_stop(self, osd_ids: List[int]) -> bool: - base_cmd = "osd ok-to-stop" - self.log.debug(f"running cmd: {base_cmd} on ids {osd_ids}") - ret, out, err = self.mon_command({ - 'prefix': base_cmd, - # apparently ok-to-stop allows strings only - 'ids': [str(x) for x in osd_ids] - }) - if ret != 0: - self.log.error(f"{osd_ids} are not ok-to-stop. {err}") - return False - self.log.info(f"OSDs <{osd_ids}> are ok-to-stop") - return True diff --git a/src/pybind/mgr/osd_support/tests/__init__.py b/src/pybind/mgr/osd_support/tests/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/src/pybind/mgr/osd_support/tests/fixtures.py b/src/pybind/mgr/osd_support/tests/fixtures.py deleted file mode 100644 index e948a3af14c..00000000000 --- a/src/pybind/mgr/osd_support/tests/fixtures.py +++ /dev/null @@ -1,14 +0,0 @@ -from osd_support import OSDSupport -import pytest - -from tests import mock - - -@pytest.yield_fixture() -def osd_support_module(): - with mock.patch("osd_support.module.OSDSupport.get_osdmap"), \ - mock.patch("osd_support.module.OSDSupport.osd_df"), \ - mock.patch("osd_support.module.OSDSupport.mon_command", return_value=(0, '', '')): - m = OSDSupport.__new__(OSDSupport) - m.__init__('osd_support', 0, 0) - yield m diff --git a/src/pybind/mgr/osd_support/tests/test_osd_support.py b/src/pybind/mgr/osd_support/tests/test_osd_support.py deleted file mode 100644 index 208a3e896d5..00000000000 --- a/src/pybind/mgr/osd_support/tests/test_osd_support.py +++ /dev/null @@ -1,144 +0,0 @@ -import pytest -from .fixtures import osd_support_module as osdsf -from tests import mock - - -class TestOSDSupport: - - def test_init(self, osdsf): - assert osdsf.osd_ids == set() - assert osdsf.emptying_osds == set() - assert osdsf.check_osds == set() - assert osdsf.empty == set() - - def test_osds_not_in_cluster(self, osdsf): - assert osdsf.osds_not_in_cluster([1, 2]) == {1, 2} - - @mock.patch("osd_support.module.OSDSupport.get_osds_in_cluster") - def test_osds_in_cluster(self, osds_in_cluster, osdsf): - osds_in_cluster.return_value = [1] - assert osdsf.osds_not_in_cluster([1, 2]) == {2} - - @pytest.mark.parametrize( - "is_empty, osd_ids, expected", - [ - (False, {1, 2}, []), - (True, {1, 2}, [1, 2]), - (None, {1, 2}, []), - ] - ) - def test_empty_osd(self, osdsf, is_empty, osd_ids, expected): - with mock.patch("osd_support.module.OSDSupport.is_empty", return_value=is_empty): - assert osdsf.empty_osds(osd_ids) == expected - - @pytest.mark.parametrize( - "pg_count, expected", - [ - (0, True), - (1, False), - (-1, False), - (9999999999999, False), - ] - ) - def test_is_emtpy(self, pg_count, expected, osdsf): - with mock.patch("osd_support.module.OSDSupport.get_pg_count", return_value=pg_count): - assert osdsf.is_empty(1) == expected - - @pytest.mark.parametrize( - "osd_ids, reweight_out, expected", - [ - ({1}, [False], False), - ({1}, [True], True), - ({1, 2}, [True, True], True), - ({1, 2}, [True, False], False), - ] - ) - def test_reweight_osds(self, osdsf, osd_ids, reweight_out, expected): - with mock.patch("osd_support.module.OSDSupport.reweight_osd", side_effect=reweight_out): - assert osdsf.reweight_osds(osd_ids) == expected - - @pytest.mark.parametrize( - "osd_id, osd_df, expected", - [ - # missing 'nodes' key - (1, dict(nodes=[]), -1), - # missing 'pgs' key - (1, dict(nodes=[dict(id=1)]), -1), - # id != osd_id - (1, dict(nodes=[dict(id=999, pgs=1)]), -1), - # valid - (1, dict(nodes=[dict(id=1, pgs=1)]), 1), - ] - ) - def test_get_pg_count(self, osdsf, osd_id, osd_df, expected): - with mock.patch("osd_support.module.OSDSupport.osd_df", return_value=osd_df): - assert osdsf.get_pg_count(osd_id) == expected - - @pytest.mark.parametrize( - "osd_id, osd_df, expected", - [ - # missing 'nodes' key - (1, dict(nodes=[]), -1.0), - # missing 'crush_weight' key - (1, dict(nodes=[dict(id=1)]), -1.0), - # id != osd_id - (1, dict(nodes=[dict(id=999, crush_weight=1)]), -1.0), - # valid - (1, dict(nodes=[dict(id=1, crush_weight=1)]), float(1)), - ] - ) - def test_get_osd_weight(self, osdsf, osd_id, osd_df, expected): - with mock.patch("osd_support.module.OSDSupport.osd_df", return_value=osd_df): - assert osdsf.get_osd_weight(osd_id) == expected - - @pytest.mark.parametrize( - "osd_id, initial_osd_weight, mon_cmd_return, weight, expected", - [ - # is already weighted correctly - (1, 1.0, (0, '', ''), 1.0, True), - # needs reweight, no errors in mon_cmd - (1, 2.0, (0, '', ''), 1.0, True), - # needs reweight, errors in mon_cmd - (1, 2.0, (1, '', ''), 1.0, False), - ] - ) - def test_reweight_osd(self, osdsf, osd_id, initial_osd_weight, mon_cmd_return, weight, expected): - with mock.patch("osd_support.module.OSDSupport.get_osd_weight", return_value=initial_osd_weight),\ - mock.patch("osd_support.module.OSDSupport.mon_command", return_value=mon_cmd_return): - assert osdsf.reweight_osd(osd_id, weight=weight) == expected - - @pytest.mark.parametrize( - "osd_ids, ok_to_stop, expected", - [ - # no osd_ids provided - ({}, [], set()), - # all osds are ok_to_stop - ({1, 2}, [True], {1, 2}), - # osds are ok_to_stop after the second iteration - ({1, 2}, [False, True], {2}), - # osds are never ok_to_stop, (taking the sample size `(len(osd_ids))` into account), - # expected to get a empty set() - ({1, 2}, [False, False], set()), - ] - ) - def test_find_stop_threshold(self, osdsf, osd_ids, ok_to_stop, expected): - with mock.patch("osd_support.module.OSDSupport.ok_to_stop", side_effect=ok_to_stop): - assert osdsf.find_osd_stop_threshold(osd_ids) == expected - - @pytest.mark.parametrize( - "osd_ids, mon_cmd_return, expected", - [ - # ret is 0 - ([1], (0, '', ''), True), - # no input yields True - ([], (0, '', ''), True), - # ret is != 0 - ([1], (-1, '', ''), False), - # no input, but ret != 0 - ([], (-1, '', ''), False), - ] - ) - def test_ok_to_stop(self, osdsf, osd_ids, mon_cmd_return, expected): - with mock.patch("osd_support.module.OSDSupport.mon_command", return_value=mon_cmd_return): - assert osdsf.ok_to_stop(osd_ids) == expected - diff --git a/src/pybind/mgr/tox.ini b/src/pybind/mgr/tox.ini index 6c2ca9b48a2..7ff959eefe4 100644 --- a/src/pybind/mgr/tox.ini +++ b/src/pybind/mgr/tox.ini @@ -36,6 +36,5 @@ commands = mypy --config-file=../../mypy.ini \ orchestrator/__init__.py \ progress/module.py \ rook/module.py \ - osd_support/module.py \ test_orchestrator/module.py \ volumes/__init__.py