mgr/osd_support: new module for osd utility calls

it lets you start/stop and monitor osd draining processes

Signed-off-by: Joshua Schmid <jschmid@suse.de>
This commit is contained in:
Joshua Schmid 2020-01-14 18:54:10 +01:00
parent ae5d2c6ab2
commit b5c5ef185d
3 changed files with 328 additions and 0 deletions

View File

@ -0,0 +1 @@
from .module import OSDSupport

View File

@ -0,0 +1,326 @@
from typing import List, Set, Optional
from mgr_module import MgrModule, HandleCommandResult
from threading import Event
import json
import errno
"""
A module that holds a variety of useful utility to deal with OSDs
Draining OSDs:
This module can be used to drain osds from the commandline:
$ ceph osd drain $osd_ids ([$osd_ids])
You can monitor the progress with
$ ceph osd drain status
Gives you the status of scheduled and running osd drain operations
[{'osd_id': 0, 'pgs': 1234}, ..]
$ ceph osd drain stop ([$osd_ids])
Stops all !SCHEDULED! osd drain operations (not the operations that have been started already)
if no $osd_ids are give. If $osd_ids are present it only operates on them.
To stop and reset the weight of already started operations we need to save the initial weight
(see 'Ideas for improvement')
Ideas for improvement:
- add health checks set_health_checks
- use objects to represent OSDs
- allows timestamps, trending information etc
- save osd drain state (at least the osd_ids in the mon store)
- resume after a mgr crash
- save the initial weight of a osd i.e. (set to initial weight on abort)
"""
class OSDSupport(MgrModule):
# these are CLI commands we implement
COMMANDS = [
{
"cmd": "osd drain name=osd_ids,type=CephInt,req=true,n=N",
"desc": "drain osd ids",
"perm": "r"
},
{
"cmd": "osd drain status",
"desc": "show status",
"perm": "r"
},
{
"cmd": "osd drain stop name=osd_ids,type=CephInt,req=false,n=N",
"desc": "show status for osds. Stopping all if osd_ids are omitted",
"perm": "r"
},
]
# These are module options we understand. These can be set with
#
# ceph config set global mgr/hello/<name> <value>
#
# e.g.,
#
# ceph config set global mgr/hello/place Earth
#
MODULE_OPTIONS: List[dict] = []
# These are "native" Ceph options that this module cares about.
NATIVE_OPTIONS: List[str] = []
osd_ids: Set[int] = set()
emptying_osds: Set[int] = set()
check_osds: Set[int] = set()
def __init__(self, *args, **kwargs):
super(OSDSupport, self).__init__(*args, **kwargs)
# set up some members to enable the serve() method and shutdown()
self.run = True
self.event = Event()
# ensure config options members are initialized; see config_notify()
self.config_notify()
def config_notify(self):
"""
This method is called whenever one of our config options is changed.
"""
# This is some boilerplate that stores MODULE_OPTIONS in a class
# member, so that, for instance, the 'emphatic' option is always
# available as 'self.emphatic'.
for opt in self.MODULE_OPTIONS:
setattr(self,
opt['name'],
self.get_module_option(opt['name']) or opt['default'])
self.log.debug(' mgr option %s = %s',
opt['name'], getattr(self, opt['name']))
# Do the same for the native options.
for _opt in self.NATIVE_OPTIONS:
setattr(self,
_opt,
self.get_ceph_option(_opt))
self.log.debug('native option %s = %s', _opt, getattr(self, _opt))
def handle_command(self, inbuf, cmd):
ret = 0
err = ''
_ = inbuf
cmd_prefix = cmd.get('prefix', '')
osd_ids: List[int] = cmd.get('osd_ids', list())
not_found_osds: Set[int] = self.osds_not_in_cluster(osd_ids)
if cmd_prefix == 'osd drain':
if not_found_osds:
return -errno.EINVAL, '', f"OSDs <{not_found_osds}> not found in cluster"
# add osd_ids to set
for osd_id in osd_ids:
if osd_id not in self.emptying_osds:
self.osd_ids.add(osd_id)
self.log.info(f'Found OSD(s) <{self.osd_ids}> in the queue.')
out = 'Started draining OSDs. Query progress with <ceph drain status>'
elif cmd_prefix == 'osd drain status':
# re-initialize it with an empty set on invocation (long running processes)
self.check_osds = set()
# assemble a set of emptying osds and to_be_emptied osds
self.check_osds.update(self.emptying_osds)
self.check_osds.update(self.osd_ids)
report = list()
for osd_id in self.check_osds:
pgs = self.get_pg_count(osd_id)
report.append(dict(osd_id=osd_id, pgs=pgs))
out = f"{report}"
elif cmd_prefix == 'osd drain stop':
if not osd_ids:
self.log.debug("No osd_ids provided, stop all pending drain operations)")
self.osd_ids = set()
self.emptying_osds = set()
# this is just a poor-man's solution as it will not really stop draining
# the osds. It will just stop the queries and also prevent any scheduled OSDs
# from getting drained at a later point in time.
out = "Stopped all future draining operations (not resetting the weight for already reweighted OSDs)"
else:
if not_found_osds:
return -errno.EINVAL, '', f"OSDs <{not_found_osds}> not found in cluster"
self.osd_ids = self.osd_ids.difference(osd_ids)
self.emptying_osds = self.emptying_osds.difference(osd_ids)
out = f"Stopped draining operations for OSD(s): {osd_ids}"
else:
return HandleCommandResult(
retval=-errno.EINVAL,
stdout='',
stderr=f"Command not found <{cmd.get('prefix', '')}>")
return HandleCommandResult(
retval=ret, # exit code
stdout=out, # stdout
stderr=err)
def serve(self):
"""
This method is called by the mgr when the module starts and can be
used for any background activity.
"""
self.log.info("Starting mgr/osd_support")
while self.run:
# Do some useful background work here.
self.log.debug(f"Scheduled for draining: <{self.osd_ids}>")
self.log.debug(f"Currently being drained: <{self.emptying_osds}>")
# the state should be saved to the mon store in the actual call and
# then retrieved in serve() probably
# 1) check if all provided osds can be stopped, if not, shrink list until ok-to-stop
for x in self.find_osd_stop_threshold(self.osd_ids):
self.emptying_osds.add(x)
# remove the emptying osds from the osd_ids since they don't need to be checked again.
self.osd_ids = self.osd_ids.difference(self.emptying_osds)
# 2) reweight the ok-to-stop osds, ONCE
self.reweight_osds(self.emptying_osds)
# 3) check for osds to be empty
empty_osds = self.empty_osds(self.emptying_osds)
# remove osds that are marked as empty
self.emptying_osds = self.emptying_osds.difference(empty_osds)
# fixed sleep interval of 10 seconds
sleep_interval = 10
self.log.debug('Sleeping for %d seconds', sleep_interval)
self.event.wait(sleep_interval)
self.event.clear()
def shutdown(self):
"""
This method is called by the mgr when the module needs to shut
down (i.e., when the serve() function needs to exit).
"""
self.log.info('Stopping')
self.run = False
self.event.set()
def osds_not_in_cluster(self, osd_ids: List[int]) -> Set[int]:
self.log.info(f"Checking if provided osds <{osd_ids}> exist in the cluster")
osd_map = self.get_osdmap()
cluster_osds = [x.get('osd') for x in osd_map.dump().get('osds', [])]
not_in_cluster = set()
for osd_id in osd_ids:
if int(osd_id) not in cluster_osds:
self.log.error(f"Could not find {osd_id} in cluster")
not_in_cluster.add(osd_id)
return not_in_cluster
def empty_osds(self, osd_ids: Set[int]) -> List[int]:
if not osd_ids:
return list()
osd_df_data = self.osd_df()
empty_osds = list()
for osd_id in osd_ids:
if self.is_empty(osd_id, osd_df=osd_df_data):
empty_osds.append(osd_id)
return empty_osds
def osd_df(self) -> dict:
# TODO: this should be cached I think
base_cmd = 'osd df'
ret, out, err = self.mon_command({
'prefix': base_cmd,
'format': 'json'
})
return json.loads(out)
def is_empty(self, osd_id: int, osd_df: Optional[dict] = None) -> bool:
pgs = self.get_pg_count(osd_id, osd_df=osd_df)
if pgs != 0:
self.log.info(f"osd: {osd_id} still has {pgs} PGs.")
return False
self.log.info(f"osd: {osd_id} has no PGs anymore")
return True
def reweight_osds(self, osd_ids: Set[int]) -> bool:
results = [(self.reweight_osd(osd_id)) for osd_id in osd_ids]
return all(results)
def get_pg_count(self, osd_id: int, osd_df: Optional[dict] = None) -> int:
if not osd_df:
osd_df = self.osd_df()
osd_nodes = osd_df.get('nodes', [])
for osd_node in osd_nodes:
if osd_node.get('id', None) == int(osd_id):
return osd_node.get('pgs')
errmsg = f"Could not find <pgs> field for osd_id: {osd_id} in osd_df data"
self.log.error(errmsg)
raise RuntimeError(errmsg)
def get_osd_weight(self, osd_id: int) -> float:
osd_df = self.osd_df()
osd_nodes = osd_df.get('nodes', [])
for osd_node in osd_nodes:
if osd_node.get('id', None) == int(osd_id):
return float(osd_node.get('crush_weight'))
return -1.0
def reweight_osd(self, osd_id: int, weight: float = 0.0) -> bool:
if self.get_osd_weight(osd_id) == weight:
self.log.debug(f"OSD <{osd_id}> is already weighted with: {weight}")
return True
base_cmd = 'osd crush reweight'
ret, out, err = self.mon_command({
'prefix': base_cmd,
'name': f'osd.{osd_id}',
'weight': weight
})
cmd = f"{base_cmd} on osd.{osd_id} to weight: {weight}"
self.log.debug(f"running cmd: {cmd}")
if ret != 0:
self.log.error(f"command: {cmd} failed with: {err}")
return False
self.log.info(f"command: {cmd} succeeded with: {out}")
return True
def find_osd_stop_threshold(self, osd_ids: Set[int]) -> Set[int]:
"""
Cut osd_id list in half until it's ok-to-stop
:param osd_ids: list of osd_ids
:return: list of ods_ids that can be stopped at once
"""
if not osd_ids:
return set()
_osds: List[int] = list(osd_ids.copy())
while not self.ok_to_stop(_osds):
if len(_osds) <= 1:
# can't even stop one OSD, aborting
self.log.info("Can't even stop one OSD. Cluster is probably busy. Retrying later..")
return set()
self.event.wait(1)
# splitting osd_ids in half until ok_to_stop yields success
# maybe popping ids off one by one is better here..depends on the cluster size I guess..
# There's a lot of room for micro adjustments here
_osds = _osds[len(_osds)//2:]
return set(_osds)
def ok_to_stop(self, osd_ids: List[int]) -> bool:
base_cmd = "osd ok-to-stop"
self.log.debug(f"running cmd: {base_cmd} on ids {osd_ids}")
ret, out, err = self.mon_command({
'prefix': base_cmd,
# apparently ok-to-stop allows strings only
'ids': [str(x) for x in osd_ids]
})
if ret != 0:
self.log.error(f"{osd_ids} are not ok-to-stop. {err}")
return False
self.log.info(f"OSDs <{osd_ids}> are ok-to-stop")
return True

View File

@ -19,4 +19,5 @@ commands = mypy --config-file=../../mypy.ini \
orchestrator/__init__.py \
progress/module.py \
rook/module.py \
osd_support/module.py \
test_orchestrator/module.py