mgr/osd_support: remove module and all traces

Signed-off-by: Joshua Schmid <jschmid@suse.de>
This commit is contained in:
Joshua Schmid 2020-07-24 15:23:52 +02:00
parent 1bdf288e81
commit a55c1dd0fa
10 changed files with 0 additions and 537 deletions

View File

@ -1657,7 +1657,6 @@ fi
%{_datadir}/ceph/mgr/localpool
%{_datadir}/ceph/mgr/orchestrator
%{_datadir}/ceph/mgr/osd_perf_query
%{_datadir}/ceph/mgr/osd_support
%{_datadir}/ceph/mgr/pg_autoscaler
%{_datadir}/ceph/mgr/progress
%{_datadir}/ceph/mgr/prometheus

View File

@ -40,7 +40,6 @@ sensible.
Telegraf module <telegraf>
Telemetry module <telemetry>
Iostat module <iostat>
OSD Support module <osd_support>
Crash module <crash>
Insights module <insights>
Orchestrator module <orchestrator>

View File

@ -1,61 +0,0 @@
OSD Support Module
==================
The OSD Support module holds osd specific functionality that
is needed by different components like the orchestrators.
In current scope:
* osd draining
Enabling
--------
When an orchestrator is used this should be enabled as a dependency.
(*currently only valid for the cephadm orchestrator*)
The *osd_support* module is manually enabled with::
ceph mgr module enable osd_support
Commands
--------
Draining
########
This mode is for draining OSDs gracefully. `Draining` in this context means gracefully emptying out OSDs by setting their
weight to zero. An OSD is considered to be drained when no PGs are left.
::
ceph osd drain $osd_id
Takes a $osd_id and schedules it for draining. Since that process can take
quite some time, the operation will be executed in the background. To query the status
of the operation you can use:
::
ceph osd drain status
This gives you the status of all running operations in this format::
[{'osd_id': 0, 'pgs': 1234}, ..]
If you wish to stop an OSD from being drained::
ceph osd drain stop [$osd_id]
Stops all **scheduled** osd drain operations (not the operations that have been started already)
if no $osd_ids are given. If $osd_ids are present it only operates on them.
To stop and reset the weight of already started operations we need to save the initial weight
(see 'Ideas for improvement')
Ideas for improvement
----------------------
- add health checks set_health_checks
- use objects to represent OSDs
- allows timestamps, trending information etc
- save osd drain state (at least the osd_ids in the mon store)
- resume after a mgr crash
- save the initial weight of a osd i.e. (set to initial weight on abort)

View File

@ -89,7 +89,6 @@ const static std::map<uint32_t, std::set<std::string>> always_on_modules = {
"devicehealth",
"orchestrator",
"rbd_support",
"osd_support",
"volumes",
"pg_autoscaler",
"telemetry",
@ -104,7 +103,6 @@ const static std::map<uint32_t, std::set<std::string>> always_on_modules = {
"devicehealth",
"orchestrator",
"rbd_support",
"osd_support",
"volumes",
"pg_autoscaler",
"telemetry",

View File

@ -1,7 +0,0 @@
import os
if 'UNITTEST' in os.environ:
import tests
tests.mock_ceph_modules() # type: ignore
from .module import OSDSupport

View File

@ -1,306 +0,0 @@
from typing import List, Set, Optional
from mgr_module import MgrModule, HandleCommandResult
from threading import Event
import json
import errno
class OSDSupport(MgrModule):
# these are CLI commands we implement
COMMANDS = [
{
"cmd": "osd drain name=osd_ids,type=CephInt,req=true,n=N",
"desc": "drain osd ids",
"perm": "r"
},
{
"cmd": "osd drain status",
"desc": "show status",
"perm": "r"
},
{
"cmd": "osd drain stop name=osd_ids,type=CephInt,req=false,n=N",
"desc": "show status for osds. Stopping all if osd_ids are omitted",
"perm": "r"
},
]
MODULE_OPTIONS: List[dict] = []
# These are "native" Ceph options that this module cares about.
NATIVE_OPTIONS: List[str] = []
osd_ids: Set[int] = set()
emptying_osds: Set[int] = set()
check_osds: Set[int] = set()
empty: Set[int] = set()
def __init__(self, *args, **kwargs):
super(OSDSupport, self).__init__(*args, **kwargs)
# set up some members to enable the serve() method and shutdown()
self.run = True
self.event = Event()
# ensure config options members are initialized; see config_notify()
self.config_notify()
def config_notify(self):
"""
This method is called whenever one of our config options is changed.
"""
# This is some boilerplate that stores MODULE_OPTIONS in a class
# member, so that, for instance, the 'emphatic' option is always
# available as 'self.emphatic'.
for opt in self.MODULE_OPTIONS:
setattr(self,
opt['name'],
self.get_module_option(opt['name']))
self.log.debug(' mgr option %s = %s',
opt['name'], getattr(self, opt['name']))
# Do the same for the native options.
for _opt in self.NATIVE_OPTIONS:
setattr(self,
_opt,
self.get_ceph_option(_opt))
self.log.debug('native option %s = %s', _opt, getattr(self, _opt))
def handle_command(self, inbuf, cmd):
ret = 0
err = ''
_ = inbuf
cmd_prefix = cmd.get('prefix', '')
osd_ids: List[int] = cmd.get('osd_ids', list())
not_found_osds: Set[int] = self.osds_not_in_cluster(osd_ids)
if cmd_prefix == 'osd drain':
if not_found_osds:
return -errno.EINVAL, '', f"OSDs <{not_found_osds}> not found in cluster"
# add osd_ids to set
for osd_id in osd_ids:
if osd_id not in self.emptying_osds:
self.osd_ids.add(osd_id)
self.log.info(f'Found OSD(s) <{self.osd_ids}> in the queue.')
out = 'Started draining OSDs. Query progress with <ceph osd drain status>'
elif cmd_prefix == 'osd drain status':
# re-initialize it with an empty set on invocation (long running processes)
self.check_osds = set()
# assemble a set of emptying osds and to_be_emptied osds
self.check_osds.update(self.emptying_osds)
self.check_osds.update(self.osd_ids)
self.check_osds.update(self.empty)
report = list()
for osd_id in self.check_osds:
pgs = self.get_pg_count(osd_id)
report.append(dict(osd_id=osd_id, pgs=pgs))
out = f"{json.dumps(report)}"
elif cmd_prefix == 'osd drain stop':
if not osd_ids:
self.log.debug("No osd_ids provided, stop all pending drain operations)")
self.osd_ids = set()
self.emptying_osds = set()
# this is just a poor-man's solution as it will not really stop draining
# the osds. It will just stop the queries and also prevent any scheduled OSDs
# from getting drained at a later point in time.
out = "Stopped all future draining operations (not resetting the weight for already reweighted OSDs)"
else:
if not_found_osds:
return -errno.EINVAL, '', f"OSDs <{not_found_osds}> not found in cluster"
self.osd_ids = self.osd_ids.difference(osd_ids)
self.emptying_osds = self.emptying_osds.difference(osd_ids)
out = f"Stopped draining operations for OSD(s): {osd_ids}"
else:
return HandleCommandResult(
retval=-errno.EINVAL,
stdout='',
stderr=f"Command not found <{cmd.get('prefix', '')}>")
return HandleCommandResult(
retval=ret, # exit code
stdout=out, # stdout
stderr=err)
def serve(self):
"""
This method is called by the mgr when the module starts and can be
used for any background activity.
"""
self.log.info("Starting mgr/osd_support")
while self.run:
self.log.debug(f"Scheduled for draining: <{self.osd_ids}>")
self.log.debug(f"Currently being drained: <{self.emptying_osds}>")
# the state should be saved to the mon store in the actual call and
# then retrieved in serve() probably
# 1) check if all provided osds can be stopped, if not, shrink list until ok-to-stop
for x in self.find_osd_stop_threshold(self.osd_ids):
self.emptying_osds.add(x)
# remove the emptying osds from the osd_ids since they don't need to be checked again.
self.osd_ids = self.osd_ids.difference(self.emptying_osds)
# 2) reweight the ok-to-stop osds, ONCE
self.reweight_osds(self.emptying_osds)
# 3) check for osds to be empty
empty_osds = self.empty_osds(self.emptying_osds)
# remove osds that are marked as empty
self.emptying_osds = self.emptying_osds.difference(empty_osds)
# move empty osds in the done queue until they disappear from ceph's view
# other modules need to know when OSDs are empty
for osd in empty_osds:
self.log.debug(f"Adding {osd} to list of empty OSDs")
self.empty.add(osd)
# remove from queue if no longer part of ceph cluster
self.cleanup()
# fixed sleep interval of 10 seconds
sleep_interval = 10
self.log.debug('Sleeping for %d seconds', sleep_interval)
self.event.wait(sleep_interval)
self.event.clear()
def cleanup(self):
"""
Remove OSDs that are no longer in the ceph cluster from the
'done' list.
:return:
"""
for osd in self.osds_not_in_cluster(list(self.empty)):
self.log.info(f"OSD: {osd} is not found in the cluster anymore. Removing")
self.empty.remove(osd)
def shutdown(self):
"""
This method is called by the mgr when the module needs to shut
down (i.e., when the serve() function needs to exit).
"""
self.log.info('Stopping')
self.run = False
self.event.set()
def get_osds_in_cluster(self) -> List[str]:
osd_map = self.get_osdmap()
return [x.get('osd') for x in osd_map.dump().get('osds', [])]
def osds_not_in_cluster(self, osd_ids: List[int]) -> Set[int]:
self.log.info(f"Checking if provided osds <{osd_ids}> exist in the cluster")
cluster_osds = self.get_osds_in_cluster()
not_in_cluster = set()
for osd_id in osd_ids:
if int(osd_id) not in cluster_osds:
self.log.error(f"Could not find {osd_id} in cluster")
not_in_cluster.add(osd_id)
return not_in_cluster
def empty_osds(self, osd_ids: Set[int]) -> List[int]:
if not osd_ids:
return list()
osd_df_data = self.osd_df()
empty_osds = list()
for osd_id in osd_ids:
if self.is_empty(osd_id, osd_df=osd_df_data):
empty_osds.append(osd_id)
return empty_osds
def osd_df(self) -> dict:
# TODO: this should be cached I think
base_cmd = 'osd df'
ret, out, err = self.mon_command({
'prefix': base_cmd,
'format': 'json'
})
return json.loads(out)
def is_empty(self, osd_id: int, osd_df: Optional[dict] = None) -> bool:
pgs = self.get_pg_count(osd_id, osd_df=osd_df)
if pgs != 0:
self.log.info(f"osd: {osd_id} still has {pgs} PGs.")
return False
self.log.info(f"osd: {osd_id} has no PGs anymore")
return True
def reweight_osds(self, osd_ids: Set[int]) -> bool:
results = [(self.reweight_osd(osd_id)) for osd_id in osd_ids]
return all(results)
def get_pg_count(self, osd_id: int, osd_df: Optional[dict] = None) -> int:
if not osd_df:
osd_df = self.osd_df()
osd_nodes = osd_df.get('nodes', [])
for osd_node in osd_nodes:
if osd_node.get('id') == int(osd_id):
return osd_node.get('pgs', -1)
return -1
def get_osd_weight(self, osd_id: int) -> float:
osd_df = self.osd_df()
osd_nodes = osd_df.get('nodes', [])
for osd_node in osd_nodes:
if osd_node.get('id') == int(osd_id):
if 'crush_weight' in osd_node:
return float(osd_node.get('crush_weight'))
return -1.0
def reweight_osd(self, osd_id: int, weight: float = 0.0) -> bool:
if self.get_osd_weight(osd_id) == weight:
self.log.debug(f"OSD <{osd_id}> is already weighted with: {weight}")
return True
base_cmd = 'osd crush reweight'
ret, out, err = self.mon_command({
'prefix': base_cmd,
'name': f'osd.{osd_id}',
'weight': weight
})
cmd = f"{base_cmd} on osd.{osd_id} to weight: {weight}"
self.log.debug(f"running cmd: {cmd}")
if ret != 0:
self.log.error(f"command: {cmd} failed with: {err}")
return False
self.log.info(f"command: {cmd} succeeded with: {out}")
return True
def find_osd_stop_threshold(self, osd_ids: Set[int]) -> Set[int]:
"""
Cut osd_id list in half until it's ok-to-stop
:param osd_ids: list of osd_ids
:return: list of ods_ids that can be stopped at once
"""
if not osd_ids:
return set()
_osds: List[int] = list(osd_ids.copy())
while not self.ok_to_stop(_osds):
if len(_osds) <= 1:
# can't even stop one OSD, aborting
self.log.info("Can't even stop one OSD. Cluster is probably busy. Retrying later..")
return set()
self.event.wait(1)
# splitting osd_ids in half until ok_to_stop yields success
# maybe popping ids off one by one is better here..depends on the cluster size I guess..
# There's a lot of room for micro adjustments here
_osds = _osds[len(_osds)//2:]
return set(_osds)
def ok_to_stop(self, osd_ids: List[int]) -> bool:
base_cmd = "osd ok-to-stop"
self.log.debug(f"running cmd: {base_cmd} on ids {osd_ids}")
ret, out, err = self.mon_command({
'prefix': base_cmd,
# apparently ok-to-stop allows strings only
'ids': [str(x) for x in osd_ids]
})
if ret != 0:
self.log.error(f"{osd_ids} are not ok-to-stop. {err}")
return False
self.log.info(f"OSDs <{osd_ids}> are ok-to-stop")
return True

View File

@ -1,14 +0,0 @@
from osd_support import OSDSupport
import pytest
from tests import mock
@pytest.yield_fixture()
def osd_support_module():
with mock.patch("osd_support.module.OSDSupport.get_osdmap"), \
mock.patch("osd_support.module.OSDSupport.osd_df"), \
mock.patch("osd_support.module.OSDSupport.mon_command", return_value=(0, '', '')):
m = OSDSupport.__new__(OSDSupport)
m.__init__('osd_support', 0, 0)
yield m

View File

@ -1,144 +0,0 @@
import pytest
from .fixtures import osd_support_module as osdsf
from tests import mock
class TestOSDSupport:
def test_init(self, osdsf):
assert osdsf.osd_ids == set()
assert osdsf.emptying_osds == set()
assert osdsf.check_osds == set()
assert osdsf.empty == set()
def test_osds_not_in_cluster(self, osdsf):
assert osdsf.osds_not_in_cluster([1, 2]) == {1, 2}
@mock.patch("osd_support.module.OSDSupport.get_osds_in_cluster")
def test_osds_in_cluster(self, osds_in_cluster, osdsf):
osds_in_cluster.return_value = [1]
assert osdsf.osds_not_in_cluster([1, 2]) == {2}
@pytest.mark.parametrize(
"is_empty, osd_ids, expected",
[
(False, {1, 2}, []),
(True, {1, 2}, [1, 2]),
(None, {1, 2}, []),
]
)
def test_empty_osd(self, osdsf, is_empty, osd_ids, expected):
with mock.patch("osd_support.module.OSDSupport.is_empty", return_value=is_empty):
assert osdsf.empty_osds(osd_ids) == expected
@pytest.mark.parametrize(
"pg_count, expected",
[
(0, True),
(1, False),
(-1, False),
(9999999999999, False),
]
)
def test_is_emtpy(self, pg_count, expected, osdsf):
with mock.patch("osd_support.module.OSDSupport.get_pg_count", return_value=pg_count):
assert osdsf.is_empty(1) == expected
@pytest.mark.parametrize(
"osd_ids, reweight_out, expected",
[
({1}, [False], False),
({1}, [True], True),
({1, 2}, [True, True], True),
({1, 2}, [True, False], False),
]
)
def test_reweight_osds(self, osdsf, osd_ids, reweight_out, expected):
with mock.patch("osd_support.module.OSDSupport.reweight_osd", side_effect=reweight_out):
assert osdsf.reweight_osds(osd_ids) == expected
@pytest.mark.parametrize(
"osd_id, osd_df, expected",
[
# missing 'nodes' key
(1, dict(nodes=[]), -1),
# missing 'pgs' key
(1, dict(nodes=[dict(id=1)]), -1),
# id != osd_id
(1, dict(nodes=[dict(id=999, pgs=1)]), -1),
# valid
(1, dict(nodes=[dict(id=1, pgs=1)]), 1),
]
)
def test_get_pg_count(self, osdsf, osd_id, osd_df, expected):
with mock.patch("osd_support.module.OSDSupport.osd_df", return_value=osd_df):
assert osdsf.get_pg_count(osd_id) == expected
@pytest.mark.parametrize(
"osd_id, osd_df, expected",
[
# missing 'nodes' key
(1, dict(nodes=[]), -1.0),
# missing 'crush_weight' key
(1, dict(nodes=[dict(id=1)]), -1.0),
# id != osd_id
(1, dict(nodes=[dict(id=999, crush_weight=1)]), -1.0),
# valid
(1, dict(nodes=[dict(id=1, crush_weight=1)]), float(1)),
]
)
def test_get_osd_weight(self, osdsf, osd_id, osd_df, expected):
with mock.patch("osd_support.module.OSDSupport.osd_df", return_value=osd_df):
assert osdsf.get_osd_weight(osd_id) == expected
@pytest.mark.parametrize(
"osd_id, initial_osd_weight, mon_cmd_return, weight, expected",
[
# is already weighted correctly
(1, 1.0, (0, '', ''), 1.0, True),
# needs reweight, no errors in mon_cmd
(1, 2.0, (0, '', ''), 1.0, True),
# needs reweight, errors in mon_cmd
(1, 2.0, (1, '', ''), 1.0, False),
]
)
def test_reweight_osd(self, osdsf, osd_id, initial_osd_weight, mon_cmd_return, weight, expected):
with mock.patch("osd_support.module.OSDSupport.get_osd_weight", return_value=initial_osd_weight),\
mock.patch("osd_support.module.OSDSupport.mon_command", return_value=mon_cmd_return):
assert osdsf.reweight_osd(osd_id, weight=weight) == expected
@pytest.mark.parametrize(
"osd_ids, ok_to_stop, expected",
[
# no osd_ids provided
({}, [], set()),
# all osds are ok_to_stop
({1, 2}, [True], {1, 2}),
# osds are ok_to_stop after the second iteration
({1, 2}, [False, True], {2}),
# osds are never ok_to_stop, (taking the sample size `(len(osd_ids))` into account),
# expected to get a empty set()
({1, 2}, [False, False], set()),
]
)
def test_find_stop_threshold(self, osdsf, osd_ids, ok_to_stop, expected):
with mock.patch("osd_support.module.OSDSupport.ok_to_stop", side_effect=ok_to_stop):
assert osdsf.find_osd_stop_threshold(osd_ids) == expected
@pytest.mark.parametrize(
"osd_ids, mon_cmd_return, expected",
[
# ret is 0
([1], (0, '', ''), True),
# no input yields True
([], (0, '', ''), True),
# ret is != 0
([1], (-1, '', ''), False),
# no input, but ret != 0
([], (-1, '', ''), False),
]
)
def test_ok_to_stop(self, osdsf, osd_ids, mon_cmd_return, expected):
with mock.patch("osd_support.module.OSDSupport.mon_command", return_value=mon_cmd_return):
assert osdsf.ok_to_stop(osd_ids) == expected

View File

@ -36,6 +36,5 @@ commands = mypy --config-file=../../mypy.ini \
orchestrator/__init__.py \
progress/module.py \
rook/module.py \
osd_support/module.py \
test_orchestrator/module.py \
volumes/__init__.py