Merge pull request #36151 from jschmid1/osd_support_reweight_impl

mgr/cephadm: rework osd removals/replacements

Reviewed-by: Sebastian Wagner <sebastian.wagner@suse.com>
This commit is contained in:
Sebastian Wagner 2020-07-31 13:28:18 +02:00 committed by GitHub
commit 5a15c730c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 706 additions and 693 deletions

View File

@ -1657,7 +1657,6 @@ fi
%{_datadir}/ceph/mgr/localpool
%{_datadir}/ceph/mgr/orchestrator
%{_datadir}/ceph/mgr/osd_perf_query
%{_datadir}/ceph/mgr/osd_support
%{_datadir}/ceph/mgr/pg_autoscaler
%{_datadir}/ceph/mgr/progress
%{_datadir}/ceph/mgr/prometheus

View File

@ -8,7 +8,6 @@ usr/share/ceph/mgr/iostat
usr/share/ceph/mgr/localpool
usr/share/ceph/mgr/orchestrator
usr/share/ceph/mgr/osd_perf_query
usr/share/ceph/mgr/osd_support
usr/share/ceph/mgr/pg_autoscaler
usr/share/ceph/mgr/progress
usr/share/ceph/mgr/prometheus

View File

@ -40,7 +40,6 @@ sensible.
Telegraf module <telegraf>
Telemetry module <telemetry>
Iostat module <iostat>
OSD Support module <osd_support>
Crash module <crash>
Insights module <insights>
Orchestrator module <orchestrator>

View File

@ -199,16 +199,16 @@ In the case that you have already created the OSD's using the ``all-available-de
ceph orch osd spec --service-name osd.all-available-devices --unmanaged
Remove an OSD
-------------------
-------------
::
ceph orch osd rm <svc_id>... [--replace] [--force]
ceph orch osd rm <svc_id(s)> [--replace] [--force]
Removes one or more OSDs from the cluster.
Evacuates PGs from an OSD and removes it from the cluster.
Example::
# ceph orch osd rm 4
# ceph orch osd rm 0
Scheduled OSD(s) for removal
@ -217,10 +217,10 @@ OSDs that are not safe-to-destroy will be rejected.
You can query the state of the operation with::
# ceph orch osd rm status
NAME HOST PGS STARTED_AT
osd.7 node1 55 2020-04-22 19:28:38.785761
osd.5 node3 3 2020-04-22 19:28:34.201685
osd.3 node2 0 2020-04-22 19:28:34.201695
OSD_ID HOST STATE PG_COUNT REPLACE FORCE STARTED_AT
2 cephadm-dev done, waiting for purge 0 True False 2020-07-17 13:01:43.147684
3 cephadm-dev draining 17 False True 2020-07-17 13:01:45.162158
4 cephadm-dev started 42 False True 2020-07-17 13:01:45.162158
When no PGs are left on the osd, it will be decommissioned and removed from the cluster.
@ -229,11 +229,28 @@ When no PGs are left on the osd, it will be decommissioned and removed from the
After removing an OSD, if you wipe the LVM physical volume in the device used by the removed OSD, a new OSD will be created.
Read information about the ``unmanaged`` parameter in :ref:`orchestrator-cli-create-osds`.
Stopping OSD Removal
--------------------
You can stop the operation with
::
ceph orch osd rm stop <svc_id(s)>
Example::
# ceph orch osd rm stop 4
Stopped OSD(s) removal
This will reset the initial state of the OSD and remove it from the queue.
Replace an OSD
-------------------
::
orch osd rm <svc_id>... --replace [--force]
orch osd rm <svc_id(s)> --replace [--force]
Example::

View File

@ -1,61 +0,0 @@
OSD Support Module
==================
The OSD Support module holds osd specific functionality that
is needed by different components like the orchestrators.
In current scope:
* osd draining
Enabling
--------
When an orchestrator is used this should be enabled as a dependency.
(*currently only valid for the cephadm orchestrator*)
The *osd_support* module is manually enabled with::
ceph mgr module enable osd_support
Commands
--------
Draining
########
This mode is for draining OSDs gracefully. `Draining` in this context means gracefully emptying out OSDs by setting their
weight to zero. An OSD is considered to be drained when no PGs are left.
::
ceph osd drain $osd_id
Takes a $osd_id and schedules it for draining. Since that process can take
quite some time, the operation will be executed in the background. To query the status
of the operation you can use:
::
ceph osd drain status
This gives you the status of all running operations in this format::
[{'osd_id': 0, 'pgs': 1234}, ..]
If you wish to stop an OSD from being drained::
ceph osd drain stop [$osd_id]
Stops all **scheduled** osd drain operations (not the operations that have been started already)
if no $osd_ids are given. If $osd_ids are present it only operates on them.
To stop and reset the weight of already started operations we need to save the initial weight
(see 'Ideas for improvement')
Ideas for improvement
----------------------
- add health checks set_health_checks
- use objects to represent OSDs
- allows timestamps, trending information etc
- save osd drain state (at least the osd_ids in the mon store)
- resume after a mgr crash
- save the initial weight of a osd i.e. (set to initial weight on abort)

View File

@ -89,7 +89,6 @@ const static std::map<uint32_t, std::set<std::string>> always_on_modules = {
"devicehealth",
"orchestrator",
"rbd_support",
"osd_support",
"volumes",
"pg_autoscaler",
"telemetry",
@ -104,7 +103,6 @@ const static std::map<uint32_t, std::set<std::string>> always_on_modules = {
"devicehealth",
"orchestrator",
"rbd_support",
"osd_support",
"volumes",
"pg_autoscaler",
"telemetry",

View File

@ -38,7 +38,7 @@ from .services.cephadmservice import MonService, MgrService, MdsService, RgwServ
RbdMirrorService, CrashService, CephadmService
from .services.iscsi import IscsiService
from .services.nfs import NFSService
from .services.osd import RemoveUtil, OSDRemoval, OSDService
from .services.osd import RemoveUtil, OSDQueue, OSDService, OSD, NotFoundError
from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
NodeExporterService
from .schedule import HostAssignment, HostPlacementSpec
@ -318,7 +318,10 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
self.cache = HostCache(self)
self.cache.load()
self.rm_util = RemoveUtil(self)
self.to_remove_osds = OSDQueue()
self.rm_util.load_from_store()
self.spec_store = SpecStore(self)
self.spec_store.load()
@ -495,7 +498,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
self._update_paused_health()
if not self.paused:
self.rm_util._remove_osds_bg()
self.rm_util.process_removal_queue()
self.migration.migrate()
if self.migration.is_migration_ongoing():
@ -564,6 +567,20 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
monmap['modified'], CEPH_DATEFMT)
if self.last_monmap and self.last_monmap > datetime.datetime.utcnow():
self.last_monmap = None # just in case clocks are skewed
if notify_type == "pg_summary":
self._trigger_osd_removal()
def _trigger_osd_removal(self):
data = self.get("osd_stats")
for osd in data.get('osd_stats', []):
if osd.get('num_pgs') == 0:
# if _ANY_ osd that is currently in the queue appears to be empty,
# start the removal process
if int(osd.get('osd')) in self.to_remove_osds.as_osd_ids():
self.log.debug(f"Found empty osd. Starting removal process")
# if the osd that is now empty is also part of the removal queue
# start the process
self.rm_util.process_removal_queue()
def pause(self):
if not self.paused:
@ -2461,31 +2478,53 @@ you may want to run:
"""
Takes a list of OSDs and schedules them for removal.
The function that takes care of the actual removal is
_remove_osds_bg().
process_removal_queue().
"""
daemons = self.cache.get_daemons_by_service('osd')
found: Set[OSDRemoval] = set()
daemons: List[orchestrator.DaemonDescription] = self.cache.get_daemons_by_service('osd')
to_remove_daemons = list()
for daemon in daemons:
if daemon.daemon_id not in osd_ids:
continue
found.add(OSDRemoval(daemon.daemon_id, replace, force,
daemon.hostname, daemon.name(),
datetime.datetime.utcnow(), -1))
if daemon.daemon_id in osd_ids:
to_remove_daemons.append(daemon)
if not to_remove_daemons:
return f"Unable to find OSDs: {osd_ids}"
not_found = {osd_id for osd_id in osd_ids if osd_id not in [x.osd_id for x in found]}
if not_found:
raise OrchestratorError('Unable to find OSD: %s' % not_found)
self.rm_util.queue_osds_for_removal(found)
for daemon in to_remove_daemons:
try:
self.to_remove_osds.enqueue(OSD(osd_id=int(daemon.daemon_id),
replace=replace,
force=force,
hostname=daemon.hostname,
fullname=daemon.name(),
process_started_at=datetime.datetime.utcnow(),
remove_util=self.rm_util))
except NotFoundError:
return f"Unable to find OSDs: {osd_ids}"
# trigger the serve loop to initiate the removal
self._kick_serve_loop()
return "Scheduled OSD(s) for removal"
@trivial_completion
def remove_osds_status(self) -> Set[OSDRemoval]:
def stop_remove_osds(self, osd_ids: List[str]):
"""
Stops a `removal` process for a List of OSDs.
This will revert their weight and remove it from the osds_to_remove queue
"""
for osd_id in osd_ids:
try:
self.to_remove_osds.rm(OSD(osd_id=int(osd_id),
remove_util=self.rm_util))
except (NotFoundError, KeyError):
return f'Unable to find OSD in the queue: {osd_id}'
# trigger the serve loop to halt the removal
self._kick_serve_loop()
return "Stopped OSD(s) removal"
@trivial_completion
def remove_osds_status(self):
"""
The CLI call to retrieve an osd removal report
"""
return self.rm_util.report
return self.to_remove_osds.all_osds()

View File

@ -1,4 +1,3 @@
import datetime
import json
import logging
from typing import List, Dict, Any, Set, Union, Tuple, cast, Optional
@ -7,6 +6,7 @@ from ceph.deployment import translate
from ceph.deployment.drive_group import DriveGroupSpec
from ceph.deployment.drive_selection import DriveSelection
from datetime import datetime
import orchestrator
from cephadm.utils import forall_hosts
from orchestrator import OrchestratorError
@ -15,6 +15,7 @@ from mgr_module import MonCommandFailed
from cephadm.services.cephadmservice import CephadmService, CephadmDaemonSpec
logger = logging.getLogger(__name__)
DATEFMT = '%Y-%m-%dT%H:%M:%S.%f'
class OSDService(CephadmService):
@ -280,139 +281,148 @@ class OSDService(CephadmService):
return osd_host_map
class OSDRemoval(object):
def __init__(self,
osd_id: str,
replace: bool,
force: bool,
nodename: str,
fullname: str,
start_at: datetime.datetime,
pg_count: int):
self.osd_id = osd_id
self.replace = replace
self.force = force
self.nodename = nodename
self.fullname = fullname
self.started_at = start_at
self.pg_count = pg_count
# needed due to changing 'started_at' attr
def __eq__(self, other):
return self.osd_id == other.osd_id
def __hash__(self):
return hash(self.osd_id)
def __repr__(self):
return ('<OSDRemoval>(osd_id={}, replace={}, force={}, nodename={}'
', fullname={}, started_at={}, pg_count={})').format(
self.osd_id, self.replace, self.force, self.nodename,
self.fullname, self.started_at, self.pg_count)
@property
def pg_count_str(self) -> str:
return 'n/a' if self.pg_count < 0 else str(self.pg_count)
class RemoveUtil(object):
def __init__(self, mgr):
self.mgr = mgr
self.to_remove_osds: Set[OSDRemoval] = set()
self.osd_removal_report: Dict[OSDRemoval, Union[int,str]] = dict()
@property
def report(self) -> Set[OSDRemoval]:
return self.to_remove_osds.copy()
def queue_osds_for_removal(self, osds: Set[OSDRemoval]):
self.to_remove_osds.update(osds)
def _remove_osds_bg(self) -> None:
def process_removal_queue(self) -> None:
"""
Performs actions in the _serve() loop to remove an OSD
when criteria is met.
"""
# make sure that we don't run on OSDs that are not in the cluster anymore.
self.cleanup()
logger.debug(
f"{len(self.to_remove_osds)} OSDs are scheduled for removal: {list(self.to_remove_osds)}")
self._update_osd_removal_status()
remove_osds: set = self.to_remove_osds.copy()
for osd in remove_osds:
f"{self.mgr.to_remove_osds.queue_size()} OSDs are scheduled "
f"for removal: {self.mgr.to_remove_osds.all_osds()}")
# find osds that are ok-to-stop and not yet draining
ok_to_stop_osds = self.find_osd_stop_threshold(self.mgr.to_remove_osds.idling_osds())
if ok_to_stop_osds:
# start draining those
_ = [osd.start_draining() for osd in ok_to_stop_osds]
# Check all osds for their state and take action (remove, purge etc)
to_remove_osds = self.mgr.to_remove_osds.all_osds()
new_queue = set()
for osd in to_remove_osds:
if not osd.force:
self.drain_osd(osd.osd_id)
# skip criteria
if not self.is_empty(osd.osd_id):
if not osd.is_empty:
logger.info(f"OSD <{osd.osd_id}> is not empty yet. Waiting a bit more")
new_queue.add(osd)
continue
if not self.ok_to_destroy([osd.osd_id]):
if not osd.safe_to_destroy():
logger.info(
f"OSD <{osd.osd_id}> is not safe-to-destroy yet. Waiting a bit more")
new_queue.add(osd)
continue
# abort criteria
if not self.down_osd([osd.osd_id]):
if not osd.down():
# also remove it from the remove_osd list and set a health_check warning?
raise orchestrator.OrchestratorError(
f"Could not set OSD <{osd.osd_id}> to 'down'")
if osd.replace:
if not self.destroy_osd(osd.osd_id):
# also remove it from the remove_osd list and set a health_check warning?
if not osd.destroy():
raise orchestrator.OrchestratorError(
f"Could not destroy OSD <{osd.osd_id}>")
else:
if not self.purge_osd(osd.osd_id):
# also remove it from the remove_osd list and set a health_check warning?
if not osd.purge():
raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>")
if not osd.exists:
continue
self.mgr._remove_daemon(osd.fullname, osd.nodename)
logger.info(f"Successfully removed OSD <{osd.osd_id}> on {osd.nodename}")
logger.debug(f"Removing {osd.osd_id} from the queue.")
self.to_remove_osds.remove(osd)
def _update_osd_removal_status(self):
"""
Generate a OSD report that can be printed to the CLI
"""
logger.debug("Update OSD removal status")
for osd in self.to_remove_osds:
osd.pg_count = self.get_pg_count(str(osd.osd_id))
logger.debug(f"OSD removal status: {self.to_remove_osds}")
# self.mgr.to_remove_osds could change while this is processing (osds get added from the CLI)
# The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and
# osds that were added while this method was executed'
self.mgr.to_remove_osds.intersection_update(new_queue)
self.save_to_store()
def drain_osd(self, osd_id: str) -> bool:
def cleanup(self):
# OSDs can always be cleaned up manually. This ensures that we run on existing OSDs
not_in_cluster_osds = self.mgr.to_remove_osds.not_in_cluster()
[self.mgr.to_remove_osds.remove(osd) for osd in not_in_cluster_osds]
def get_osds_in_cluster(self) -> List[str]:
osd_map = self.mgr.get_osdmap()
return [str(x.get('osd')) for x in osd_map.dump().get('osds', [])]
def osd_df(self) -> dict:
base_cmd = 'osd df'
ret, out, err = self.mgr.mon_command({
'prefix': base_cmd,
'format': 'json'
})
return json.loads(out)
def get_pg_count(self, osd_id: int, osd_df: Optional[dict] = None) -> int:
if not osd_df:
osd_df = self.osd_df()
osd_nodes = osd_df.get('nodes', [])
for osd_node in osd_nodes:
if osd_node.get('id') == int(osd_id):
return osd_node.get('pgs', -1)
return -1
def find_osd_stop_threshold(self, osds: List["OSD"]) -> Optional[List["OSD"]]:
"""
Uses `osd_support` module to schedule a drain operation of an OSD
Cut osd_id list in half until it's ok-to-stop
:param osds: list of osd_ids
:return: list of ods_ids that can be stopped at once
"""
if not osds:
return []
while not self.ok_to_stop(osds):
if len(osds) <= 1:
# can't even stop one OSD, aborting
self.mgr.log.info("Can't even stop one OSD. Cluster is probably busy. Retrying later..")
return []
# This potentially prolongs the global wait time.
self.mgr.event.wait(1)
# splitting osd_ids in half until ok_to_stop yields success
# maybe popping ids off one by one is better here..depends on the cluster size I guess..
# There's a lot of room for micro adjustments here
osds = osds[len(osds) // 2:]
return osds
# todo start draining
# return all([osd.start_draining() for osd in osds])
def ok_to_stop(self, osds: List["OSD"]) -> bool:
cmd_args = {
'prefix': 'osd drain',
'osd_ids': [int(osd_id)]
'prefix': "osd ok-to-stop",
'ids': [str(osd.osd_id) for osd in osds]
}
return self._run_mon_cmd(cmd_args)
def get_pg_count(self, osd_id: str) -> int:
""" Queries for PG count of an OSD """
self.mgr.log.debug("Querying for drain status")
def set_osd_flag(self, osds: List["OSD"], flag: str) -> bool:
base_cmd = f"osd {flag}"
self.mgr.log.debug(f"running cmd: {base_cmd} on ids {osds}")
ret, out, err = self.mgr.mon_command({
'prefix': 'osd drain status',
'prefix': base_cmd,
'ids': [str(osd.osd_id) for osd in osds]
})
if ret != 0:
self.mgr.log.error(f"Calling osd drain status failed with {err}")
raise OrchestratorError("Could not query `osd drain status`")
out = json.loads(out)
for o in out:
if str(o.get('osd_id', '')) == str(osd_id):
return int(o.get('pgs', -1))
return -1
self.mgr.log.error(f"Could not set <{flag}> flag for osds: {osds}. <{err}>")
return False
self.mgr.log.info(f"OSDs <{osds}> are now <{flag}>")
return True
def is_empty(self, osd_id: str) -> bool:
""" Checks if an OSD is empty """
return self.get_pg_count(osd_id) == 0
def ok_to_destroy(self, osd_ids: List[int]) -> bool:
def safe_to_destroy(self, osd_ids: List[int]) -> bool:
""" Queries the safe-to-destroy flag for OSDs """
cmd_args = {'prefix': 'osd safe-to-destroy',
'ids': osd_ids}
'ids': [str(x) for x in osd_ids]}
return self._run_mon_cmd(cmd_args)
def destroy_osd(self, osd_id: int) -> bool:
@ -422,14 +432,6 @@ class RemoveUtil(object):
'yes_i_really_mean_it': True}
return self._run_mon_cmd(cmd_args)
def down_osd(self, osd_ids: List[int]) -> bool:
""" Sets `out` flag to OSDs """
cmd_args = {
'prefix': 'osd down',
'ids': osd_ids,
}
return self._run_mon_cmd(cmd_args)
def purge_osd(self, osd_id: int) -> bool:
""" Purges an OSD from the cluster (forcefully) """
cmd_args = {
@ -439,14 +441,6 @@ class RemoveUtil(object):
}
return self._run_mon_cmd(cmd_args)
def out_osd(self, osd_ids: List[int]) -> bool:
""" Sets `down` flag to OSDs """
cmd_args = {
'prefix': 'osd out',
'ids': osd_ids,
}
return self._run_mon_cmd(cmd_args)
def _run_mon_cmd(self, cmd_args: dict) -> bool:
"""
Generic command to run mon_command and evaluate/log the results
@ -458,3 +452,237 @@ class RemoveUtil(object):
return False
self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}")
return True
def save_to_store(self):
osd_queue = [osd.to_json() for osd in self.mgr.to_remove_osds.all_osds()]
logger.debug(f"Saving {osd_queue} to store")
self.mgr.set_store('osd_remove_queue', json.dumps(osd_queue))
def load_from_store(self):
for k, v in self.mgr.get_store_prefix('osd_remove_queue').items():
for osd in json.loads(v):
logger.debug(f"Loading osd ->{osd} from store")
osd_obj = OSD.from_json(json.loads(osd), ctx=self)
self.mgr.to_remove_osds.add(osd_obj)
class NotFoundError(Exception):
pass
class OSD:
def __init__(self,
osd_id: int,
remove_util: RemoveUtil,
drain_started_at: Optional[datetime] = None,
process_started_at: Optional[datetime] = None,
drain_stopped_at: Optional[datetime] = None,
drain_done_at: Optional[datetime] = None,
draining: bool = False,
started: bool = False,
stopped: bool = False,
replace: bool = False,
force: bool = False,
hostname: Optional[str] = None,
fullname: Optional[str] = None,
):
# the ID of the OSD
self.osd_id = osd_id
# when did process (not the actual draining) start
self.process_started_at = process_started_at
# when did the drain start
self.drain_started_at = drain_started_at
# when did the drain stop
self.drain_stopped_at = drain_stopped_at
# when did the drain finish
self.drain_done_at = drain_done_at
# did the draining start
self.draining = draining
# was the operation started
self.started = started
# was the operation stopped
self.stopped = stopped
# If this is a replace or remove operation
self.replace = replace
# If we wait for the osd to be drained
self.force = force
# The name of the node
self.nodename = hostname
# The full name of the osd
self.fullname = fullname
# mgr obj to make mgr/mon calls
self.rm_util = remove_util
def start(self) -> None:
if self.started:
logger.debug(f"Already started draining {self}")
return None
self.started = True
self.stopped = False
def start_draining(self) -> bool:
if self.stopped:
logger.debug(f"Won't start draining {self}. OSD draining is stopped.")
return False
self.rm_util.set_osd_flag([self], 'out')
self.drain_started_at = datetime.utcnow()
self.draining = True
logger.debug(f"Started draining {self}.")
return True
def stop_draining(self) -> bool:
self.rm_util.set_osd_flag([self], 'in')
self.drain_stopped_at = datetime.utcnow()
self.draining = False
logger.debug(f"Stopped draining {self}.")
return True
def stop(self) -> None:
if self.stopped:
logger.debug(f"Already stopped draining {self}")
return None
self.started = False
self.stopped = True
self.stop_draining()
@property
def is_draining(self) -> bool:
"""
Consider an OSD draining when it is
actively draining but not yet empty
"""
return self.draining and not self.is_empty
@property
def is_ok_to_stop(self) -> bool:
return self.rm_util.ok_to_stop([self])
@property
def is_empty(self) -> bool:
if self.get_pg_count() == 0:
if not self.drain_done_at:
self.drain_done_at = datetime.utcnow()
self.draining = False
return True
return False
def safe_to_destroy(self) -> bool:
return self.rm_util.safe_to_destroy([self.osd_id])
def down(self) -> bool:
return self.rm_util.set_osd_flag([self], 'down')
def destroy(self) -> bool:
return self.rm_util.destroy_osd(self.osd_id)
def purge(self) -> bool:
return self.rm_util.purge_osd(self.osd_id)
def get_pg_count(self) -> int:
return self.rm_util.get_pg_count(self.osd_id)
@property
def exists(self) -> bool:
return str(self.osd_id) in self.rm_util.get_osds_in_cluster()
def drain_status_human(self):
default_status = 'not started'
status = 'started' if self.started and not self.draining else default_status
status = 'draining' if self.draining else status
status = 'done, waiting for purge' if self.drain_done_at and not self.draining else status
return status
def pg_count_str(self):
return 'n/a' if self.get_pg_count() < 0 else str(self.get_pg_count())
def to_json(self) -> str:
out = dict()
out['osd_id'] = self.osd_id
out['started'] = self.started
out['draining'] = self.draining
out['stopped'] = self.stopped
out['replace'] = self.replace
out['force'] = self.force
out['nodename'] = self.nodename # type: ignore
for k in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']:
if getattr(self, k):
out[k] = getattr(self, k).strftime(DATEFMT)
else:
out[k] = getattr(self, k)
return json.dumps(out)
@classmethod
def from_json(cls, inp: Optional[Dict[str, Any]], ctx: Optional[RemoveUtil] = None) -> Optional["OSD"]:
if not inp:
return None
for date_field in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']:
if inp.get(date_field):
inp.update({date_field: datetime.strptime(inp.get(date_field, ''), DATEFMT)})
inp.update({'remove_util': ctx})
return cls(**inp)
def __hash__(self):
return hash(self.osd_id)
def __eq__(self, other: object) -> bool:
if not isinstance(other, OSD):
return NotImplemented
return self.osd_id == other.osd_id
def __repr__(self) -> str:
return f"<OSD>(osd_id={self.osd_id}, is_draining={self.is_draining})"
class OSDQueue(Set):
def __init__(self):
super().__init__()
def as_osd_ids(self) -> List[int]:
return [osd.osd_id for osd in self]
def queue_size(self) -> int:
return len(self)
def draining_osds(self) -> List["OSD"]:
return [osd for osd in self if osd.is_draining]
def idling_osds(self) -> List["OSD"]:
return [osd for osd in self if not osd.is_draining and not osd.is_empty]
def empty_osds(self) -> List["OSD"]:
return [osd for osd in self if osd.is_empty]
def all_osds(self) -> List["OSD"]:
return [osd for osd in self]
def not_in_cluster(self) -> List["OSD"]:
return [osd for osd in self if not osd.exists]
def enqueue(self, osd: "OSD") -> None:
if not osd.exists:
raise NotFoundError()
self.add(osd)
osd.start()
def rm(self, osd: "OSD") -> None:
if not osd.exists:
raise NotFoundError()
osd.stop()
try:
logger.debug(f'Removing {osd} from the queue.')
self.remove(osd)
except KeyError:
logger.debug(f"Could not find {osd} in queue.")
raise KeyError

View File

@ -12,11 +12,11 @@ except ImportError:
import pytest
from cephadm import CephadmOrchestrator
from cephadm.services.osd import RemoveUtil, OSD
from orchestrator import raise_if_exception, Completion, HostSpec
from tests import mock
def get_ceph_option(_, key):
return __file__
@ -43,9 +43,11 @@ def with_cephadm_module(module_options=None, store=None):
:param store: Set the store before module.__init__ is called
"""
with mock.patch("cephadm.module.CephadmOrchestrator.get_ceph_option", get_ceph_option),\
mock.patch("cephadm.module.CephadmOrchestrator.remote"),\
mock.patch("cephadm.module.CephadmOrchestrator.send_command"), \
mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command):
mock.patch("cephadm.module.CephadmOrchestrator.remote"), \
mock.patch("cephadm.services.osd.RemoveUtil._run_mon_cmd"), \
mock.patch("cephadm.module.CephadmOrchestrator.send_command"), \
mock.patch("cephadm.module.CephadmOrchestrator.get_osdmap"), \
mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command):
m = CephadmOrchestrator.__new__ (CephadmOrchestrator)
if module_options is not None:
@ -72,6 +74,21 @@ def cephadm_module():
yield m
@pytest.yield_fixture()
def rm_util():
with with_cephadm_module({}) as m:
r = RemoveUtil.__new__(RemoveUtil)
r.__init__(m)
yield r
@pytest.yield_fixture()
def osd_obj():
with mock.patch("cephadm.services.osd.RemoveUtil"):
o = OSD(0, mock.MagicMock())
yield o
def wait(m, c):
# type: (CephadmOrchestrator, Completion) -> Any
m.process([c])

View File

@ -6,7 +6,7 @@ from unittest.mock import ANY
import pytest
from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection
from cephadm.services.osd import OSDRemoval
from cephadm.services.osd import OSD, OSDQueue
try:
from typing import Any, List
@ -362,6 +362,7 @@ class TestCephadm(object):
)
])
))
@mock.patch("cephadm.services.osd.OSD.exists", True)
@mock.patch("cephadm.services.osd.RemoveUtil.get_pg_count", lambda _, __: 0)
def test_remove_osds(self, cephadm_module):
with with_host(cephadm_module, 'test'):
@ -372,14 +373,20 @@ class TestCephadm(object):
out = wait(cephadm_module, c)
assert out == ["Removed osd.0 from host 'test'"]
osd_removal_op = OSDRemoval(0, False, False, 'test', 'osd.0', datetime.datetime.utcnow(), -1)
cephadm_module.rm_util.queue_osds_for_removal({osd_removal_op})
cephadm_module.rm_util._remove_osds_bg()
assert cephadm_module.rm_util.to_remove_osds == set()
cephadm_module.to_remove_osds.enqueue(OSD(osd_id=0,
replace=False,
force=False,
hostname='test',
fullname='osd.0',
process_started_at=datetime.datetime.utcnow(),
remove_util=cephadm_module.rm_util
))
cephadm_module.rm_util.process_removal_queue()
assert cephadm_module.to_remove_osds == OSDQueue()
c = cephadm_module.remove_osds_status()
out = wait(cephadm_module, c)
assert out == set()
assert out == []
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _,__,___: None)

View File

@ -0,0 +1,219 @@
from cephadm.services.osd import RemoveUtil, OSDQueue, OSD
import pytest
from .fixtures import rm_util, osd_obj
from tests import mock
from datetime import datetime
class MockOSD:
def __init__(self, osd_id):
self.osd_id = osd_id
class TestOSDRemoval:
@pytest.mark.parametrize(
"osd_id, osd_df, expected",
[
# missing 'nodes' key
(1, dict(nodes=[]), -1),
# missing 'pgs' key
(1, dict(nodes=[dict(id=1)]), -1),
# id != osd_id
(1, dict(nodes=[dict(id=999, pgs=1)]), -1),
# valid
(1, dict(nodes=[dict(id=1, pgs=1)]), 1),
]
)
def test_get_pg_count(self, rm_util, osd_id, osd_df, expected):
with mock.patch("cephadm.services.osd.RemoveUtil.osd_df", return_value=osd_df):
assert rm_util.get_pg_count(osd_id) == expected
@pytest.mark.parametrize(
"osds, ok_to_stop, expected",
[
# no osd_ids provided
([], [False], []),
# all osds are ok_to_stop
([1, 2], [True], [1, 2]),
# osds are ok_to_stop after the second iteration
([1, 2], [False, True], [2]),
# osds are never ok_to_stop, (taking the sample size `(len(osd_ids))` into account),
# expected to get False
([1, 2], [False, False], []),
]
)
def test_find_stop_threshold(self, rm_util, osds, ok_to_stop, expected):
with mock.patch("cephadm.services.osd.RemoveUtil.ok_to_stop", side_effect=ok_to_stop):
assert rm_util.find_osd_stop_threshold(osds) == expected
def test_process_removal_queue(self, rm_util):
# TODO: !
# rm_util.process_removal_queue()
pass
def test_ok_to_stop(self, rm_util):
rm_util.ok_to_stop([MockOSD(1)])
rm_util._run_mon_cmd.assert_called_with({'prefix': 'osd ok-to-stop', 'ids': ['1']})
def test_safe_to_destroy(self, rm_util):
rm_util.safe_to_destroy([1])
rm_util._run_mon_cmd.assert_called_with({'prefix': 'osd safe-to-destroy', 'ids': ['1']})
def test_destroy_osd(self, rm_util):
rm_util.destroy_osd(1)
rm_util._run_mon_cmd.assert_called_with({'prefix': 'osd destroy-actual', 'id': 1, 'yes_i_really_mean_it': True})
def test_purge_osd(self, rm_util):
rm_util.purge_osd(1)
rm_util._run_mon_cmd.assert_called_with({'prefix': 'osd purge-actual', 'id': 1, 'yes_i_really_mean_it': True})
class TestOSD:
def test_start(self, osd_obj):
assert osd_obj.started is False
osd_obj.start()
assert osd_obj.started is True
assert osd_obj.stopped is False
def test_start_draining(self, osd_obj):
assert osd_obj.draining is False
assert osd_obj.drain_started_at is None
ret = osd_obj.start_draining()
osd_obj.rm_util.set_osd_flag.assert_called_with([osd_obj], 'out')
assert isinstance(osd_obj.drain_started_at, datetime)
assert osd_obj.draining is True
assert ret is True
def test_start_draining_stopped(self, osd_obj):
osd_obj.stopped = True
ret = osd_obj.start_draining()
assert osd_obj.drain_started_at is None
assert ret is False
assert osd_obj.draining is False
def test_stop_draining(self, osd_obj):
ret = osd_obj.stop_draining()
osd_obj.rm_util.set_osd_flag.assert_called_with([osd_obj], 'in')
assert isinstance(osd_obj.drain_stopped_at, datetime)
assert osd_obj.draining is False
assert ret is True
@mock.patch('cephadm.services.osd.OSD.stop_draining')
def test_stop(self, stop_draining_mock, osd_obj):
ret = osd_obj.stop()
assert osd_obj.started is False
assert osd_obj.stopped is True
stop_draining_mock.assert_called_once()
@pytest.mark.parametrize(
"draining, empty, expected",
[
# must be !draining! and !not empty! to yield True
(True, not True, True),
# not draining and not empty
(False, not True, False),
# not draining and empty
(False, True, False),
# draining and empty
(True, True, False),
]
)
def test_is_draining(self, osd_obj, draining, empty, expected):
with mock.patch("cephadm.services.osd.OSD.is_empty", new_callable=mock.PropertyMock(return_value=empty)):
osd_obj.draining = draining
assert osd_obj.is_draining is expected
@mock.patch("cephadm.services.osd.RemoveUtil.ok_to_stop")
def test_is_ok_to_stop(self, _, osd_obj):
ret = osd_obj.is_ok_to_stop
osd_obj.rm_util.ok_to_stop.assert_called_once()
@pytest.mark.parametrize(
"pg_count, expected",
[
(0, True),
(1, False),
(9999, False),
(-1, False),
]
)
def test_is_empty(self, osd_obj, pg_count, expected):
with mock.patch("cephadm.services.osd.OSD.get_pg_count", return_value=pg_count):
assert osd_obj.is_empty is expected
@mock.patch("cephadm.services.osd.RemoveUtil.safe_to_destroy")
def test_safe_to_destroy(self, _, osd_obj):
ret = osd_obj.safe_to_destroy()
osd_obj.rm_util.safe_to_destroy.assert_called_once()
@mock.patch("cephadm.services.osd.RemoveUtil.set_osd_flag")
def test_down(self, _, osd_obj):
ret = osd_obj.down()
osd_obj.rm_util.set_osd_flag.assert_called_with([osd_obj], 'down')
@mock.patch("cephadm.services.osd.RemoveUtil.destroy_osd")
def test_destroy_osd(self, _, osd_obj):
ret = osd_obj.destroy()
osd_obj.rm_util.destroy_osd.assert_called_once()
@mock.patch("cephadm.services.osd.RemoveUtil.purge_osd")
def test_purge(self, _, osd_obj):
ret = osd_obj.purge()
osd_obj.rm_util.purge_osd.assert_called_once()
@mock.patch("cephadm.services.osd.RemoveUtil.get_pg_count")
def test_pg_count(self, _, osd_obj):
ret = osd_obj.get_pg_count()
osd_obj.rm_util.get_pg_count.assert_called_once()
def test_drain_status_human_not_started(self, osd_obj):
assert osd_obj.drain_status_human() == 'not started'
def test_drain_status_human_started(self, osd_obj):
osd_obj.started = True
assert osd_obj.drain_status_human() == 'started'
def test_drain_status_human_draining(self, osd_obj):
osd_obj.started = True
osd_obj.draining = True
assert osd_obj.drain_status_human() == 'draining'
def test_drain_status_human_done(self, osd_obj):
osd_obj.started = True
osd_obj.draining = False
osd_obj.drain_done_at = datetime.utcnow()
assert osd_obj.drain_status_human() == 'done, waiting for purge'
class TestOSDQueue:
def test_queue_size(self, osd_obj):
q = OSDQueue()
assert q.queue_size() == 0
q.add(osd_obj)
assert q.queue_size() == 1
@mock.patch("cephadm.services.osd.OSD.start")
@mock.patch("cephadm.services.osd.OSD.exists")
def test_enqueue(self, exist, start, osd_obj):
q = OSDQueue()
q.enqueue(osd_obj)
osd_obj.start.assert_called_once()
@mock.patch("cephadm.services.osd.OSD.stop")
@mock.patch("cephadm.services.osd.OSD.exists")
def test_rm_raise(self, exist, stop, osd_obj):
q = OSDQueue()
with pytest.raises(KeyError):
q.rm(osd_obj)
osd_obj.stop.assert_called_once()
@mock.patch("cephadm.services.osd.OSD.stop")
@mock.patch("cephadm.services.osd.OSD.exists")
def test_rm(self, exist, stop, osd_obj):
q = OSDQueue()
q.add(osd_obj)
q.rm(osd_obj)
osd_obj.stop.assert_called_once()

View File

@ -1017,6 +1017,12 @@ class Orchestrator(object):
"""
raise NotImplementedError()
def stop_remove_osds(self, osd_ids: List[str]) -> Completion:
"""
TODO
"""
raise NotImplementedError()
def remove_osds_status(self):
# type: () -> Completion
"""

View File

@ -856,35 +856,53 @@ Usage:
"name=replace,type=CephBool,req=false "
"name=force,type=CephBool,req=false",
'Remove OSD services')
def _osd_rm(self, svc_id: List[str],
replace: bool = False,
force: bool = False) -> HandleCommandResult:
completion = self.remove_osds(svc_id, replace, force)
def _osd_rm_start(self,
svc_id: List[str],
replace: bool = False,
force: bool = False) -> HandleCommandResult:
completion = self.remove_osds(svc_id, replace=replace, force=force)
self._orchestrator_wait([completion])
raise_if_exception(completion)
return HandleCommandResult(stdout=completion.result_str())
@_cli_write_command(
'orch osd rm stop',
"name=svc_id,type=CephString,n=N",
'Remove OSD services')
def _osd_rm_stop(self, svc_id: List[str]) -> HandleCommandResult:
completion = self.stop_remove_osds(svc_id)
self._orchestrator_wait([completion])
raise_if_exception(completion)
return HandleCommandResult(stdout=completion.result_str())
@_cli_write_command(
'orch osd rm status',
"name=format,type=CephChoices,strings=plain|json|json-pretty|yaml,req=false",
desc='status of OSD removal operation')
def _osd_rm_status(self) -> HandleCommandResult:
def _osd_rm_status(self, format='plain') -> HandleCommandResult:
completion = self.remove_osds_status()
self._orchestrator_wait([completion])
raise_if_exception(completion)
report = completion.result
if not report:
return HandleCommandResult(stdout="No OSD remove/replace operations reported")
table = PrettyTable(
['NAME', 'HOST', 'PGS', 'STARTED_AT'],
border=False)
table.align = 'l'
table.left_padding_width = 0
table.right_padding_width = 1
# TODO: re-add sorted and sort by pg_count
for osd in report:
table.add_row((osd.fullname, osd.nodename, osd.pg_count_str, osd.started_at))
return HandleCommandResult(stdout=table.get_string())
if format != 'plain':
out = to_format(report, format, many=True, cls=None)
else:
table = PrettyTable(
['OSD_ID', 'HOST', 'STATE', 'PG_COUNT', 'REPLACE', 'FORCE', 'DRAIN_STARTED_AT'],
border=False)
table.align = 'l'
table.left_padding_width = 0
table.right_padding_width = 2
for osd in sorted(report, key=lambda o: o.osd_id):
table.add_row([osd.osd_id, osd.nodename, osd.drain_status_human(),
osd.get_pg_count(), osd.replace, osd.replace, osd.drain_started_at])
out = table.get_string()
return HandleCommandResult(stdout=out)
@_cli_write_command(
'orch daemon add',

View File

@ -1,7 +0,0 @@
import os
if 'UNITTEST' in os.environ:
import tests
tests.mock_ceph_modules() # type: ignore
from .module import OSDSupport

View File

@ -1,306 +0,0 @@
from typing import List, Set, Optional
from mgr_module import MgrModule, HandleCommandResult
from threading import Event
import json
import errno
class OSDSupport(MgrModule):
# these are CLI commands we implement
COMMANDS = [
{
"cmd": "osd drain name=osd_ids,type=CephInt,req=true,n=N",
"desc": "drain osd ids",
"perm": "r"
},
{
"cmd": "osd drain status",
"desc": "show status",
"perm": "r"
},
{
"cmd": "osd drain stop name=osd_ids,type=CephInt,req=false,n=N",
"desc": "show status for osds. Stopping all if osd_ids are omitted",
"perm": "r"
},
]
MODULE_OPTIONS: List[dict] = []
# These are "native" Ceph options that this module cares about.
NATIVE_OPTIONS: List[str] = []
osd_ids: Set[int] = set()
emptying_osds: Set[int] = set()
check_osds: Set[int] = set()
empty: Set[int] = set()
def __init__(self, *args, **kwargs):
super(OSDSupport, self).__init__(*args, **kwargs)
# set up some members to enable the serve() method and shutdown()
self.run = True
self.event = Event()
# ensure config options members are initialized; see config_notify()
self.config_notify()
def config_notify(self):
"""
This method is called whenever one of our config options is changed.
"""
# This is some boilerplate that stores MODULE_OPTIONS in a class
# member, so that, for instance, the 'emphatic' option is always
# available as 'self.emphatic'.
for opt in self.MODULE_OPTIONS:
setattr(self,
opt['name'],
self.get_module_option(opt['name']))
self.log.debug(' mgr option %s = %s',
opt['name'], getattr(self, opt['name']))
# Do the same for the native options.
for _opt in self.NATIVE_OPTIONS:
setattr(self,
_opt,
self.get_ceph_option(_opt))
self.log.debug('native option %s = %s', _opt, getattr(self, _opt))
def handle_command(self, inbuf, cmd):
ret = 0
err = ''
_ = inbuf
cmd_prefix = cmd.get('prefix', '')
osd_ids: List[int] = cmd.get('osd_ids', list())
not_found_osds: Set[int] = self.osds_not_in_cluster(osd_ids)
if cmd_prefix == 'osd drain':
if not_found_osds:
return -errno.EINVAL, '', f"OSDs <{not_found_osds}> not found in cluster"
# add osd_ids to set
for osd_id in osd_ids:
if osd_id not in self.emptying_osds:
self.osd_ids.add(osd_id)
self.log.info(f'Found OSD(s) <{self.osd_ids}> in the queue.')
out = 'Started draining OSDs. Query progress with <ceph osd drain status>'
elif cmd_prefix == 'osd drain status':
# re-initialize it with an empty set on invocation (long running processes)
self.check_osds = set()
# assemble a set of emptying osds and to_be_emptied osds
self.check_osds.update(self.emptying_osds)
self.check_osds.update(self.osd_ids)
self.check_osds.update(self.empty)
report = list()
for osd_id in self.check_osds:
pgs = self.get_pg_count(osd_id)
report.append(dict(osd_id=osd_id, pgs=pgs))
out = f"{json.dumps(report)}"
elif cmd_prefix == 'osd drain stop':
if not osd_ids:
self.log.debug("No osd_ids provided, stop all pending drain operations)")
self.osd_ids = set()
self.emptying_osds = set()
# this is just a poor-man's solution as it will not really stop draining
# the osds. It will just stop the queries and also prevent any scheduled OSDs
# from getting drained at a later point in time.
out = "Stopped all future draining operations (not resetting the weight for already reweighted OSDs)"
else:
if not_found_osds:
return -errno.EINVAL, '', f"OSDs <{not_found_osds}> not found in cluster"
self.osd_ids = self.osd_ids.difference(osd_ids)
self.emptying_osds = self.emptying_osds.difference(osd_ids)
out = f"Stopped draining operations for OSD(s): {osd_ids}"
else:
return HandleCommandResult(
retval=-errno.EINVAL,
stdout='',
stderr=f"Command not found <{cmd.get('prefix', '')}>")
return HandleCommandResult(
retval=ret, # exit code
stdout=out, # stdout
stderr=err)
def serve(self):
"""
This method is called by the mgr when the module starts and can be
used for any background activity.
"""
self.log.info("Starting mgr/osd_support")
while self.run:
self.log.debug(f"Scheduled for draining: <{self.osd_ids}>")
self.log.debug(f"Currently being drained: <{self.emptying_osds}>")
# the state should be saved to the mon store in the actual call and
# then retrieved in serve() probably
# 1) check if all provided osds can be stopped, if not, shrink list until ok-to-stop
for x in self.find_osd_stop_threshold(self.osd_ids):
self.emptying_osds.add(x)
# remove the emptying osds from the osd_ids since they don't need to be checked again.
self.osd_ids = self.osd_ids.difference(self.emptying_osds)
# 2) reweight the ok-to-stop osds, ONCE
self.reweight_osds(self.emptying_osds)
# 3) check for osds to be empty
empty_osds = self.empty_osds(self.emptying_osds)
# remove osds that are marked as empty
self.emptying_osds = self.emptying_osds.difference(empty_osds)
# move empty osds in the done queue until they disappear from ceph's view
# other modules need to know when OSDs are empty
for osd in empty_osds:
self.log.debug(f"Adding {osd} to list of empty OSDs")
self.empty.add(osd)
# remove from queue if no longer part of ceph cluster
self.cleanup()
# fixed sleep interval of 10 seconds
sleep_interval = 10
self.log.debug('Sleeping for %d seconds', sleep_interval)
self.event.wait(sleep_interval)
self.event.clear()
def cleanup(self):
"""
Remove OSDs that are no longer in the ceph cluster from the
'done' list.
:return:
"""
for osd in self.osds_not_in_cluster(list(self.empty)):
self.log.info(f"OSD: {osd} is not found in the cluster anymore. Removing")
self.empty.remove(osd)
def shutdown(self):
"""
This method is called by the mgr when the module needs to shut
down (i.e., when the serve() function needs to exit).
"""
self.log.info('Stopping')
self.run = False
self.event.set()
def get_osds_in_cluster(self) -> List[str]:
osd_map = self.get_osdmap()
return [x.get('osd') for x in osd_map.dump().get('osds', [])]
def osds_not_in_cluster(self, osd_ids: List[int]) -> Set[int]:
self.log.info(f"Checking if provided osds <{osd_ids}> exist in the cluster")
cluster_osds = self.get_osds_in_cluster()
not_in_cluster = set()
for osd_id in osd_ids:
if int(osd_id) not in cluster_osds:
self.log.error(f"Could not find {osd_id} in cluster")
not_in_cluster.add(osd_id)
return not_in_cluster
def empty_osds(self, osd_ids: Set[int]) -> List[int]:
if not osd_ids:
return list()
osd_df_data = self.osd_df()
empty_osds = list()
for osd_id in osd_ids:
if self.is_empty(osd_id, osd_df=osd_df_data):
empty_osds.append(osd_id)
return empty_osds
def osd_df(self) -> dict:
# TODO: this should be cached I think
base_cmd = 'osd df'
ret, out, err = self.mon_command({
'prefix': base_cmd,
'format': 'json'
})
return json.loads(out)
def is_empty(self, osd_id: int, osd_df: Optional[dict] = None) -> bool:
pgs = self.get_pg_count(osd_id, osd_df=osd_df)
if pgs != 0:
self.log.info(f"osd: {osd_id} still has {pgs} PGs.")
return False
self.log.info(f"osd: {osd_id} has no PGs anymore")
return True
def reweight_osds(self, osd_ids: Set[int]) -> bool:
results = [(self.reweight_osd(osd_id)) for osd_id in osd_ids]
return all(results)
def get_pg_count(self, osd_id: int, osd_df: Optional[dict] = None) -> int:
if not osd_df:
osd_df = self.osd_df()
osd_nodes = osd_df.get('nodes', [])
for osd_node in osd_nodes:
if osd_node.get('id') == int(osd_id):
return osd_node.get('pgs', -1)
return -1
def get_osd_weight(self, osd_id: int) -> float:
osd_df = self.osd_df()
osd_nodes = osd_df.get('nodes', [])
for osd_node in osd_nodes:
if osd_node.get('id') == int(osd_id):
if 'crush_weight' in osd_node:
return float(osd_node.get('crush_weight'))
return -1.0
def reweight_osd(self, osd_id: int, weight: float = 0.0) -> bool:
if self.get_osd_weight(osd_id) == weight:
self.log.debug(f"OSD <{osd_id}> is already weighted with: {weight}")
return True
base_cmd = 'osd crush reweight'
ret, out, err = self.mon_command({
'prefix': base_cmd,
'name': f'osd.{osd_id}',
'weight': weight
})
cmd = f"{base_cmd} on osd.{osd_id} to weight: {weight}"
self.log.debug(f"running cmd: {cmd}")
if ret != 0:
self.log.error(f"command: {cmd} failed with: {err}")
return False
self.log.info(f"command: {cmd} succeeded with: {out}")
return True
def find_osd_stop_threshold(self, osd_ids: Set[int]) -> Set[int]:
"""
Cut osd_id list in half until it's ok-to-stop
:param osd_ids: list of osd_ids
:return: list of ods_ids that can be stopped at once
"""
if not osd_ids:
return set()
_osds: List[int] = list(osd_ids.copy())
while not self.ok_to_stop(_osds):
if len(_osds) <= 1:
# can't even stop one OSD, aborting
self.log.info("Can't even stop one OSD. Cluster is probably busy. Retrying later..")
return set()
self.event.wait(1)
# splitting osd_ids in half until ok_to_stop yields success
# maybe popping ids off one by one is better here..depends on the cluster size I guess..
# There's a lot of room for micro adjustments here
_osds = _osds[len(_osds)//2:]
return set(_osds)
def ok_to_stop(self, osd_ids: List[int]) -> bool:
base_cmd = "osd ok-to-stop"
self.log.debug(f"running cmd: {base_cmd} on ids {osd_ids}")
ret, out, err = self.mon_command({
'prefix': base_cmd,
# apparently ok-to-stop allows strings only
'ids': [str(x) for x in osd_ids]
})
if ret != 0:
self.log.error(f"{osd_ids} are not ok-to-stop. {err}")
return False
self.log.info(f"OSDs <{osd_ids}> are ok-to-stop")
return True

View File

@ -1,14 +0,0 @@
from osd_support import OSDSupport
import pytest
from tests import mock
@pytest.yield_fixture()
def osd_support_module():
with mock.patch("osd_support.module.OSDSupport.get_osdmap"), \
mock.patch("osd_support.module.OSDSupport.osd_df"), \
mock.patch("osd_support.module.OSDSupport.mon_command", return_value=(0, '', '')):
m = OSDSupport.__new__(OSDSupport)
m.__init__('osd_support', 0, 0)
yield m

View File

@ -1,144 +0,0 @@
import pytest
from .fixtures import osd_support_module as osdsf
from tests import mock
class TestOSDSupport:
def test_init(self, osdsf):
assert osdsf.osd_ids == set()
assert osdsf.emptying_osds == set()
assert osdsf.check_osds == set()
assert osdsf.empty == set()
def test_osds_not_in_cluster(self, osdsf):
assert osdsf.osds_not_in_cluster([1, 2]) == {1, 2}
@mock.patch("osd_support.module.OSDSupport.get_osds_in_cluster")
def test_osds_in_cluster(self, osds_in_cluster, osdsf):
osds_in_cluster.return_value = [1]
assert osdsf.osds_not_in_cluster([1, 2]) == {2}
@pytest.mark.parametrize(
"is_empty, osd_ids, expected",
[
(False, {1, 2}, []),
(True, {1, 2}, [1, 2]),
(None, {1, 2}, []),
]
)
def test_empty_osd(self, osdsf, is_empty, osd_ids, expected):
with mock.patch("osd_support.module.OSDSupport.is_empty", return_value=is_empty):
assert osdsf.empty_osds(osd_ids) == expected
@pytest.mark.parametrize(
"pg_count, expected",
[
(0, True),
(1, False),
(-1, False),
(9999999999999, False),
]
)
def test_is_emtpy(self, pg_count, expected, osdsf):
with mock.patch("osd_support.module.OSDSupport.get_pg_count", return_value=pg_count):
assert osdsf.is_empty(1) == expected
@pytest.mark.parametrize(
"osd_ids, reweight_out, expected",
[
({1}, [False], False),
({1}, [True], True),
({1, 2}, [True, True], True),
({1, 2}, [True, False], False),
]
)
def test_reweight_osds(self, osdsf, osd_ids, reweight_out, expected):
with mock.patch("osd_support.module.OSDSupport.reweight_osd", side_effect=reweight_out):
assert osdsf.reweight_osds(osd_ids) == expected
@pytest.mark.parametrize(
"osd_id, osd_df, expected",
[
# missing 'nodes' key
(1, dict(nodes=[]), -1),
# missing 'pgs' key
(1, dict(nodes=[dict(id=1)]), -1),
# id != osd_id
(1, dict(nodes=[dict(id=999, pgs=1)]), -1),
# valid
(1, dict(nodes=[dict(id=1, pgs=1)]), 1),
]
)
def test_get_pg_count(self, osdsf, osd_id, osd_df, expected):
with mock.patch("osd_support.module.OSDSupport.osd_df", return_value=osd_df):
assert osdsf.get_pg_count(osd_id) == expected
@pytest.mark.parametrize(
"osd_id, osd_df, expected",
[
# missing 'nodes' key
(1, dict(nodes=[]), -1.0),
# missing 'crush_weight' key
(1, dict(nodes=[dict(id=1)]), -1.0),
# id != osd_id
(1, dict(nodes=[dict(id=999, crush_weight=1)]), -1.0),
# valid
(1, dict(nodes=[dict(id=1, crush_weight=1)]), float(1)),
]
)
def test_get_osd_weight(self, osdsf, osd_id, osd_df, expected):
with mock.patch("osd_support.module.OSDSupport.osd_df", return_value=osd_df):
assert osdsf.get_osd_weight(osd_id) == expected
@pytest.mark.parametrize(
"osd_id, initial_osd_weight, mon_cmd_return, weight, expected",
[
# is already weighted correctly
(1, 1.0, (0, '', ''), 1.0, True),
# needs reweight, no errors in mon_cmd
(1, 2.0, (0, '', ''), 1.0, True),
# needs reweight, errors in mon_cmd
(1, 2.0, (1, '', ''), 1.0, False),
]
)
def test_reweight_osd(self, osdsf, osd_id, initial_osd_weight, mon_cmd_return, weight, expected):
with mock.patch("osd_support.module.OSDSupport.get_osd_weight", return_value=initial_osd_weight),\
mock.patch("osd_support.module.OSDSupport.mon_command", return_value=mon_cmd_return):
assert osdsf.reweight_osd(osd_id, weight=weight) == expected
@pytest.mark.parametrize(
"osd_ids, ok_to_stop, expected",
[
# no osd_ids provided
({}, [], set()),
# all osds are ok_to_stop
({1, 2}, [True], {1, 2}),
# osds are ok_to_stop after the second iteration
({1, 2}, [False, True], {2}),
# osds are never ok_to_stop, (taking the sample size `(len(osd_ids))` into account),
# expected to get a empty set()
({1, 2}, [False, False], set()),
]
)
def test_find_stop_threshold(self, osdsf, osd_ids, ok_to_stop, expected):
with mock.patch("osd_support.module.OSDSupport.ok_to_stop", side_effect=ok_to_stop):
assert osdsf.find_osd_stop_threshold(osd_ids) == expected
@pytest.mark.parametrize(
"osd_ids, mon_cmd_return, expected",
[
# ret is 0
([1], (0, '', ''), True),
# no input yields True
([], (0, '', ''), True),
# ret is != 0
([1], (-1, '', ''), False),
# no input, but ret != 0
([], (-1, '', ''), False),
]
)
def test_ok_to_stop(self, osdsf, osd_ids, mon_cmd_return, expected):
with mock.patch("osd_support.module.OSDSupport.mon_command", return_value=mon_cmd_return):
assert osdsf.ok_to_stop(osd_ids) == expected

View File

@ -36,6 +36,5 @@ commands = mypy --config-file=../../mypy.ini \
orchestrator/__init__.py \
progress/module.py \
rook/module.py \
osd_support/module.py \
test_orchestrator/module.py \
volumes/__init__.py