mirror of
https://github.com/ceph/ceph
synced 2025-02-21 18:17:42 +00:00
mgr/cephadm: move process_removal_queue into OSDRemovalQueue
`process_removal_queue` belongs to OSDRemovalQueue instead of RemoveUtil Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
This commit is contained in:
parent
086afa9a69
commit
af52ba47a1
@ -391,8 +391,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
|
||||
self.cache.load()
|
||||
|
||||
self.rm_util = RemoveUtil(self)
|
||||
self.to_remove_osds = OSDRemovalQueue()
|
||||
self.rm_util.load_from_store()
|
||||
self.to_remove_osds = OSDRemovalQueue(self)
|
||||
self.to_remove_osds.load_from_store()
|
||||
|
||||
self.spec_store = SpecStore(self)
|
||||
self.spec_store.load()
|
||||
@ -527,7 +527,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
|
||||
self.log.debug(f"Found empty osd. Starting removal process")
|
||||
# if the osd that is now empty is also part of the removal queue
|
||||
# start the process
|
||||
self.rm_util.process_removal_queue()
|
||||
self._kick_serve_loop()
|
||||
|
||||
def pause(self) -> None:
|
||||
if not self.paused:
|
||||
@ -1178,7 +1178,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
|
||||
addr=spec.addr,
|
||||
error_ok=True, no_fsid=True)
|
||||
if code:
|
||||
# err will contain stdout and stderr, so we filter on the message text to
|
||||
# err will contain stdout and stderr, so we filter on the message text to
|
||||
# only show the errors
|
||||
errors = [_i.replace("ERROR: ", "") for _i in err if _i.startswith('ERROR')]
|
||||
raise OrchestratorError('New host %s (%s) failed check(s): %s' % (
|
||||
|
@ -75,7 +75,7 @@ class CephadmServe:
|
||||
self._update_paused_health()
|
||||
|
||||
if not self.mgr.paused:
|
||||
self.mgr.rm_util.process_removal_queue()
|
||||
self.mgr.to_remove_osds.process_removal_queue()
|
||||
|
||||
self.mgr.migration.migrate()
|
||||
if self.mgr.migration.is_migration_ongoing():
|
||||
|
@ -306,76 +306,6 @@ class RemoveUtil(object):
|
||||
def __init__(self, mgr: "CephadmOrchestrator") -> None:
|
||||
self.mgr: "CephadmOrchestrator" = mgr
|
||||
|
||||
def process_removal_queue(self) -> None:
|
||||
"""
|
||||
Performs actions in the _serve() loop to remove an OSD
|
||||
when criteria is met.
|
||||
"""
|
||||
|
||||
# make sure that we don't run on OSDs that are not in the cluster anymore.
|
||||
self.cleanup()
|
||||
|
||||
logger.debug(
|
||||
f"{self.mgr.to_remove_osds.queue_size()} OSDs are scheduled "
|
||||
f"for removal: {self.mgr.to_remove_osds.all_osds()}")
|
||||
|
||||
# find osds that are ok-to-stop and not yet draining
|
||||
ok_to_stop_osds = self.find_osd_stop_threshold(self.mgr.to_remove_osds.idling_osds())
|
||||
if ok_to_stop_osds:
|
||||
# start draining those
|
||||
_ = [osd.start_draining() for osd in ok_to_stop_osds]
|
||||
|
||||
# Check all osds for their state and take action (remove, purge etc)
|
||||
to_remove_osds = self.mgr.to_remove_osds.all_osds()
|
||||
new_queue: Set[OSD] = set()
|
||||
for osd in to_remove_osds: # type: OSD
|
||||
if not osd.force:
|
||||
# skip criteria
|
||||
if not osd.is_empty:
|
||||
logger.info(f"OSD <{osd.osd_id}> is not empty yet. Waiting a bit more")
|
||||
new_queue.add(osd)
|
||||
continue
|
||||
|
||||
if not osd.safe_to_destroy():
|
||||
logger.info(
|
||||
f"OSD <{osd.osd_id}> is not safe-to-destroy yet. Waiting a bit more")
|
||||
new_queue.add(osd)
|
||||
continue
|
||||
|
||||
# abort criteria
|
||||
if not osd.down():
|
||||
# also remove it from the remove_osd list and set a health_check warning?
|
||||
raise orchestrator.OrchestratorError(
|
||||
f"Could not set OSD <{osd.osd_id}> to 'down'")
|
||||
|
||||
if osd.replace:
|
||||
if not osd.destroy():
|
||||
raise orchestrator.OrchestratorError(
|
||||
f"Could not destroy OSD <{osd.osd_id}>")
|
||||
else:
|
||||
if not osd.purge():
|
||||
raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>")
|
||||
|
||||
if not osd.exists:
|
||||
continue
|
||||
assert osd.fullname is not None
|
||||
assert osd.hostname is not None
|
||||
CephadmServe(self.mgr)._remove_daemon(osd.fullname, osd.hostname)
|
||||
logger.info(f"Successfully removed OSD <{osd.osd_id}> on {osd.hostname}")
|
||||
logger.debug(f"Removing {osd.osd_id} from the queue.")
|
||||
|
||||
# self.mgr.to_remove_osds could change while this is processing (osds get added from the CLI)
|
||||
# The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and
|
||||
# osds that were added while this method was executed'
|
||||
self.mgr.to_remove_osds.intersection_update(new_queue)
|
||||
self.save_to_store()
|
||||
|
||||
def cleanup(self) -> None:
|
||||
# OSDs can always be cleaned up manually. This ensures that we run on existing OSDs
|
||||
not_in_cluster_osds = self.mgr.to_remove_osds.not_in_cluster()
|
||||
for osd in not_in_cluster_osds:
|
||||
self.mgr.to_remove_osds.remove(osd)
|
||||
|
||||
def get_osds_in_cluster(self) -> List[str]:
|
||||
osd_map = self.mgr.get_osdmap()
|
||||
return [str(x.get('osd')) for x in osd_map.dump().get('osds', [])]
|
||||
@ -483,19 +413,6 @@ class RemoveUtil(object):
|
||||
self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}")
|
||||
return True
|
||||
|
||||
def save_to_store(self) -> None:
|
||||
osd_queue = [osd.to_json() for osd in self.mgr.to_remove_osds.all_osds()]
|
||||
logger.debug(f"Saving {osd_queue} to store")
|
||||
self.mgr.set_store('osd_remove_queue', json.dumps(osd_queue))
|
||||
|
||||
def load_from_store(self) -> None:
|
||||
for k, v in self.mgr.get_store_prefix('osd_remove_queue').items():
|
||||
for osd in json.loads(v):
|
||||
logger.debug(f"Loading osd ->{osd} from store")
|
||||
osd_obj = OSD.from_json(osd, ctx=self)
|
||||
if osd_obj is not None:
|
||||
self.mgr.to_remove_osds.add(osd_obj)
|
||||
|
||||
|
||||
class NotFoundError(Exception):
|
||||
pass
|
||||
@ -654,13 +571,13 @@ class OSD:
|
||||
return out
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, inp: Optional[Dict[str, Any]], ctx: Optional[RemoveUtil] = None) -> Optional["OSD"]:
|
||||
def from_json(cls, inp: Optional[Dict[str, Any]], rm_util: RemoveUtil) -> Optional["OSD"]:
|
||||
if not inp:
|
||||
return None
|
||||
for date_field in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']:
|
||||
if inp.get(date_field):
|
||||
inp.update({date_field: str_to_datetime(inp.get(date_field, ''))})
|
||||
inp.update({'remove_util': ctx})
|
||||
inp.update({'remove_util': rm_util})
|
||||
if 'nodename' in inp:
|
||||
hostname = inp.pop('nodename')
|
||||
inp['hostname'] = hostname
|
||||
@ -680,9 +597,91 @@ class OSD:
|
||||
|
||||
class OSDRemovalQueue(object):
|
||||
|
||||
def __init__(self) -> None:
|
||||
def __init__(self, mgr: "CephadmOrchestrator") -> None:
|
||||
self.mgr: "CephadmOrchestrator" = mgr
|
||||
self.osds: Set[OSD] = set()
|
||||
|
||||
def process_removal_queue(self) -> None:
|
||||
"""
|
||||
Performs actions in the _serve() loop to remove an OSD
|
||||
when criteria is met.
|
||||
"""
|
||||
|
||||
# make sure that we don't run on OSDs that are not in the cluster anymore.
|
||||
self.cleanup()
|
||||
|
||||
logger.debug(
|
||||
f"{self.queue_size()} OSDs are scheduled "
|
||||
f"for removal: {self.all_osds()}")
|
||||
|
||||
# find osds that are ok-to-stop and not yet draining
|
||||
ok_to_stop_osds = self.mgr.rm_util.find_osd_stop_threshold(self.idling_osds())
|
||||
if ok_to_stop_osds:
|
||||
# start draining those
|
||||
_ = [osd.start_draining() for osd in ok_to_stop_osds]
|
||||
|
||||
# Check all osds for their state and take action (remove, purge etc)
|
||||
new_queue: Set[OSD] = set()
|
||||
for osd in self.all_osds(): # type: OSD
|
||||
if not osd.force:
|
||||
# skip criteria
|
||||
if not osd.is_empty:
|
||||
logger.info(f"OSD <{osd.osd_id}> is not empty yet. Waiting a bit more")
|
||||
new_queue.add(osd)
|
||||
continue
|
||||
|
||||
if not osd.safe_to_destroy():
|
||||
logger.info(
|
||||
f"OSD <{osd.osd_id}> is not safe-to-destroy yet. Waiting a bit more")
|
||||
new_queue.add(osd)
|
||||
continue
|
||||
|
||||
# abort criteria
|
||||
if not osd.down():
|
||||
# also remove it from the remove_osd list and set a health_check warning?
|
||||
raise orchestrator.OrchestratorError(
|
||||
f"Could not set OSD <{osd.osd_id}> to 'down'")
|
||||
|
||||
if osd.replace:
|
||||
if not osd.destroy():
|
||||
raise orchestrator.OrchestratorError(
|
||||
f"Could not destroy OSD <{osd.osd_id}>")
|
||||
else:
|
||||
if not osd.purge():
|
||||
raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>")
|
||||
|
||||
if not osd.exists:
|
||||
continue
|
||||
assert osd.fullname is not None
|
||||
assert osd.hostname is not None
|
||||
CephadmServe(self.mgr)._remove_daemon(osd.fullname, osd.hostname)
|
||||
logger.info(f"Successfully removed OSD <{osd.osd_id}> on {osd.hostname}")
|
||||
logger.debug(f"Removing {osd.osd_id} from the queue.")
|
||||
|
||||
# self could change while this is processing (osds get added from the CLI)
|
||||
# The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and
|
||||
# osds that were added while this method was executed'
|
||||
self.intersection_update(new_queue)
|
||||
self.save_to_store()
|
||||
|
||||
def cleanup(self) -> None:
|
||||
# OSDs can always be cleaned up manually. This ensures that we run on existing OSDs
|
||||
for osd in self.not_in_cluster():
|
||||
self.remove(osd)
|
||||
|
||||
def save_to_store(self) -> None:
|
||||
osd_queue = [osd.to_json() for osd in self.all_osds()]
|
||||
logger.debug(f"Saving {osd_queue} to store")
|
||||
self.mgr.set_store('osd_remove_queue', json.dumps(osd_queue))
|
||||
|
||||
def load_from_store(self) -> None:
|
||||
for k, v in self.mgr.get_store_prefix('osd_remove_queue').items():
|
||||
for osd in json.loads(v):
|
||||
logger.debug(f"Loading osd ->{osd} from store")
|
||||
osd_obj = OSD.from_json(osd, rm_util=self.mgr.rm_util)
|
||||
if osd_obj is not None:
|
||||
self.osds.add(osd_obj)
|
||||
|
||||
def as_osd_ids(self) -> List[int]:
|
||||
return [osd.osd_id for osd in self.osds]
|
||||
|
||||
|
@ -515,8 +515,8 @@ class TestCephadm(object):
|
||||
process_started_at=datetime_now(),
|
||||
remove_util=cephadm_module.rm_util
|
||||
))
|
||||
cephadm_module.rm_util.process_removal_queue()
|
||||
assert cephadm_module.to_remove_osds == OSDRemovalQueue()
|
||||
cephadm_module.to_remove_osds.process_removal_queue()
|
||||
assert cephadm_module.to_remove_osds == OSDRemovalQueue(cephadm_module)
|
||||
|
||||
c = cephadm_module.remove_osds_status()
|
||||
out = wait(cephadm_module, c)
|
||||
|
@ -90,9 +90,9 @@ class TestOSDRemoval:
|
||||
}
|
||||
])
|
||||
cephadm_module.set_store('osd_remove_queue', data)
|
||||
cephadm_module.rm_util.load_from_store()
|
||||
cephadm_module.to_remove_osds.load_from_store()
|
||||
|
||||
expected = OSDRemovalQueue()
|
||||
expected = OSDRemovalQueue(cephadm_module)
|
||||
expected.add(OSD(osd_id=35, remove_util=rm_util, draining=True))
|
||||
assert cephadm_module.to_remove_osds == expected
|
||||
|
||||
@ -218,7 +218,7 @@ class TestOSD:
|
||||
class TestOSDRemovalQueue:
|
||||
|
||||
def test_queue_size(self, osd_obj):
|
||||
q = OSDRemovalQueue()
|
||||
q = OSDRemovalQueue(mock.Mock())
|
||||
assert q.queue_size() == 0
|
||||
q.add(osd_obj)
|
||||
assert q.queue_size() == 1
|
||||
@ -226,14 +226,14 @@ class TestOSDRemovalQueue:
|
||||
@mock.patch("cephadm.services.osd.OSD.start")
|
||||
@mock.patch("cephadm.services.osd.OSD.exists")
|
||||
def test_enqueue(self, exist, start, osd_obj):
|
||||
q = OSDRemovalQueue()
|
||||
q = OSDRemovalQueue(mock.Mock())
|
||||
q.enqueue(osd_obj)
|
||||
osd_obj.start.assert_called_once()
|
||||
|
||||
@mock.patch("cephadm.services.osd.OSD.stop")
|
||||
@mock.patch("cephadm.services.osd.OSD.exists")
|
||||
def test_rm_raise(self, exist, stop, osd_obj):
|
||||
q = OSDRemovalQueue()
|
||||
q = OSDRemovalQueue(mock.Mock())
|
||||
with pytest.raises(KeyError):
|
||||
q.rm(osd_obj)
|
||||
osd_obj.stop.assert_called_once()
|
||||
@ -241,7 +241,7 @@ class TestOSDRemovalQueue:
|
||||
@mock.patch("cephadm.services.osd.OSD.stop")
|
||||
@mock.patch("cephadm.services.osd.OSD.exists")
|
||||
def test_rm(self, exist, stop, osd_obj):
|
||||
q = OSDRemovalQueue()
|
||||
q = OSDRemovalQueue(mock.Mock())
|
||||
q.add(osd_obj)
|
||||
q.rm(osd_obj)
|
||||
osd_obj.stop.assert_called_once()
|
||||
|
Loading…
Reference in New Issue
Block a user