Merge pull request #38815 from sebastian-philipp/cephadm-osd-queue-race

mgr/cephadm: lock multithreaded access to OSDRemovalQueue

Reviewed-by: Juan Miguel Olmo Martínez <jolmomar@redhat.com>
Reviewed-by: Michael Fritch <mfritch@suse.com>
This commit is contained in:
Sebastian Wagner 2021-01-15 12:26:33 +01:00 committed by GitHub
commit be473e78ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 156 additions and 128 deletions

View File

@ -48,7 +48,7 @@ from .services.container import CustomContainerService
from .services.iscsi import IscsiService
from .services.ha_rgw import HA_RGWService
from .services.nfs import NFSService
from .services.osd import RemoveUtil, OSDQueue, OSDService, OSD, NotFoundError
from .services.osd import RemoveUtil, OSDRemovalQueue, OSDService, OSD, NotFoundError
from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
NodeExporterService
from .schedule import HostAssignment
@ -390,9 +390,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
self.cache = HostCache(self)
self.cache.load()
self.rm_util = RemoveUtil(self)
self.to_remove_osds = OSDQueue()
self.rm_util.load_from_store()
self.to_remove_osds = OSDRemovalQueue(self)
self.to_remove_osds.load_from_store()
self.spec_store = SpecStore(self)
self.spec_store.load()
@ -527,7 +526,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
self.log.debug(f"Found empty osd. Starting removal process")
# if the osd that is now empty is also part of the removal queue
# start the process
self.rm_util.process_removal_queue()
self._kick_serve_loop()
def pause(self) -> None:
if not self.paused:
@ -1178,7 +1177,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
addr=spec.addr,
error_ok=True, no_fsid=True)
if code:
# err will contain stdout and stderr, so we filter on the message text to
# err will contain stdout and stderr, so we filter on the message text to
# only show the errors
errors = [_i.replace("ERROR: ", "") for _i in err if _i.startswith('ERROR')]
raise OrchestratorError('New host %s (%s) failed check(s): %s' % (
@ -2223,7 +2222,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
hostname=daemon.hostname,
fullname=daemon.name(),
process_started_at=datetime_now(),
remove_util=self.rm_util))
remove_util=self.to_remove_osds.rm_util))
except NotFoundError:
return f"Unable to find OSDs: {osd_ids}"
@ -2240,7 +2239,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
for osd_id in osd_ids:
try:
self.to_remove_osds.rm(OSD(osd_id=int(osd_id),
remove_util=self.rm_util))
remove_util=self.to_remove_osds.rm_util))
except (NotFoundError, KeyError):
return f'Unable to find OSD in the queue: {osd_id}'

View File

@ -75,7 +75,7 @@ class CephadmServe:
self._update_paused_health()
if not self.mgr.paused:
self.mgr.rm_util.process_removal_queue()
self.mgr.to_remove_osds.process_removal_queue()
self.mgr.migration.migrate()
if self.mgr.migration.is_migration_ongoing():

View File

@ -1,5 +1,6 @@
import json
import logging
from threading import Lock
from typing import List, Dict, Any, Set, Union, Tuple, cast, Optional, TYPE_CHECKING
from ceph.deployment import translate
@ -306,76 +307,6 @@ class RemoveUtil(object):
def __init__(self, mgr: "CephadmOrchestrator") -> None:
self.mgr: "CephadmOrchestrator" = mgr
def process_removal_queue(self) -> None:
"""
Performs actions in the _serve() loop to remove an OSD
when criteria is met.
"""
# make sure that we don't run on OSDs that are not in the cluster anymore.
self.cleanup()
logger.debug(
f"{self.mgr.to_remove_osds.queue_size()} OSDs are scheduled "
f"for removal: {self.mgr.to_remove_osds.all_osds()}")
# find osds that are ok-to-stop and not yet draining
ok_to_stop_osds = self.find_osd_stop_threshold(self.mgr.to_remove_osds.idling_osds())
if ok_to_stop_osds:
# start draining those
_ = [osd.start_draining() for osd in ok_to_stop_osds]
# Check all osds for their state and take action (remove, purge etc)
to_remove_osds = self.mgr.to_remove_osds.all_osds()
new_queue = set()
for osd in to_remove_osds:
if not osd.force:
# skip criteria
if not osd.is_empty:
logger.info(f"OSD <{osd.osd_id}> is not empty yet. Waiting a bit more")
new_queue.add(osd)
continue
if not osd.safe_to_destroy():
logger.info(
f"OSD <{osd.osd_id}> is not safe-to-destroy yet. Waiting a bit more")
new_queue.add(osd)
continue
# abort criteria
if not osd.down():
# also remove it from the remove_osd list and set a health_check warning?
raise orchestrator.OrchestratorError(
f"Could not set OSD <{osd.osd_id}> to 'down'")
if osd.replace:
if not osd.destroy():
raise orchestrator.OrchestratorError(
f"Could not destroy OSD <{osd.osd_id}>")
else:
if not osd.purge():
raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>")
if not osd.exists:
continue
assert osd.fullname is not None
assert osd.hostname is not None
CephadmServe(self.mgr)._remove_daemon(osd.fullname, osd.hostname)
logger.info(f"Successfully removed OSD <{osd.osd_id}> on {osd.hostname}")
logger.debug(f"Removing {osd.osd_id} from the queue.")
# self.mgr.to_remove_osds could change while this is processing (osds get added from the CLI)
# The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and
# osds that were added while this method was executed'
self.mgr.to_remove_osds.intersection_update(new_queue)
self.save_to_store()
def cleanup(self) -> None:
# OSDs can always be cleaned up manually. This ensures that we run on existing OSDs
not_in_cluster_osds = self.mgr.to_remove_osds.not_in_cluster()
for osd in not_in_cluster_osds:
self.mgr.to_remove_osds.remove(osd)
def get_osds_in_cluster(self) -> List[str]:
osd_map = self.mgr.get_osdmap()
return [str(x.get('osd')) for x in osd_map.dump().get('osds', [])]
@ -483,18 +414,6 @@ class RemoveUtil(object):
self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}")
return True
def save_to_store(self) -> None:
osd_queue = [osd.to_json() for osd in self.mgr.to_remove_osds.all_osds()]
logger.debug(f"Saving {osd_queue} to store")
self.mgr.set_store('osd_remove_queue', json.dumps(osd_queue))
def load_from_store(self) -> None:
for k, v in self.mgr.get_store_prefix('osd_remove_queue').items():
for osd in json.loads(v):
logger.debug(f"Loading osd ->{osd} from store")
osd_obj = OSD.from_json(osd, ctx=self)
self.mgr.to_remove_osds.add(osd_obj)
class NotFoundError(Exception):
pass
@ -551,7 +470,7 @@ class OSD:
self.fullname = fullname
# mgr obj to make mgr/mon calls
self.rm_util = remove_util
self.rm_util: RemoveUtil = remove_util
def start(self) -> None:
if self.started:
@ -653,13 +572,13 @@ class OSD:
return out
@classmethod
def from_json(cls, inp: Optional[Dict[str, Any]], ctx: Optional[RemoveUtil] = None) -> Optional["OSD"]:
def from_json(cls, inp: Optional[Dict[str, Any]], rm_util: RemoveUtil) -> Optional["OSD"]:
if not inp:
return None
for date_field in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']:
if inp.get(date_field):
inp.update({date_field: str_to_datetime(inp.get(date_field, ''))})
inp.update({'remove_util': ctx})
inp.update({'remove_util': rm_util})
if 'nodename' in inp:
hostname = inp.pop('nodename')
inp['hostname'] = hostname
@ -677,45 +596,153 @@ class OSD:
return f"<OSD>(osd_id={self.osd_id}, draining={self.draining})"
class OSDQueue(Set):
class OSDRemovalQueue(object):
def __init__(self) -> None:
super().__init__()
def __init__(self, mgr: "CephadmOrchestrator") -> None:
self.mgr: "CephadmOrchestrator" = mgr
self.osds: Set[OSD] = set()
self.rm_util = RemoveUtil(mgr)
# locks multithreaded access to self.osds. Please avoid locking
# network calls, like mon commands.
self.lock = Lock()
def process_removal_queue(self) -> None:
"""
Performs actions in the _serve() loop to remove an OSD
when criteria is met.
we can't hold self.lock, as we're calling _remove_daemon in the loop
"""
# make sure that we don't run on OSDs that are not in the cluster anymore.
self.cleanup()
# find osds that are ok-to-stop and not yet draining
ok_to_stop_osds = self.rm_util.find_osd_stop_threshold(self.idling_osds())
if ok_to_stop_osds:
# start draining those
_ = [osd.start_draining() for osd in ok_to_stop_osds]
all_osds = self.all_osds()
logger.debug(
f"{self.queue_size()} OSDs are scheduled "
f"for removal: {all_osds}")
# Check all osds for their state and take action (remove, purge etc)
new_queue: Set[OSD] = set()
for osd in all_osds: # type: OSD
if not osd.force:
# skip criteria
if not osd.is_empty:
logger.info(f"OSD <{osd.osd_id}> is not empty yet. Waiting a bit more")
new_queue.add(osd)
continue
if not osd.safe_to_destroy():
logger.info(
f"OSD <{osd.osd_id}> is not safe-to-destroy yet. Waiting a bit more")
new_queue.add(osd)
continue
# abort criteria
if not osd.down():
# also remove it from the remove_osd list and set a health_check warning?
raise orchestrator.OrchestratorError(
f"Could not set OSD <{osd.osd_id}> to 'down'")
if osd.replace:
if not osd.destroy():
raise orchestrator.OrchestratorError(
f"Could not destroy OSD <{osd.osd_id}>")
else:
if not osd.purge():
raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>")
if not osd.exists:
continue
assert osd.fullname is not None
assert osd.hostname is not None
CephadmServe(self.mgr)._remove_daemon(osd.fullname, osd.hostname)
logger.info(f"Successfully removed OSD <{osd.osd_id}> on {osd.hostname}")
logger.debug(f"Removing {osd.osd_id} from the queue.")
# self could change while this is processing (osds get added from the CLI)
# The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and
# osds that were added while this method was executed'
with self.lock:
self.osds.intersection_update(new_queue)
self._save_to_store()
def cleanup(self) -> None:
# OSDs can always be cleaned up manually. This ensures that we run on existing OSDs
with self.lock:
for osd in self._not_in_cluster():
self.osds.remove(osd)
def _save_to_store(self) -> None:
osd_queue = [osd.to_json() for osd in self.osds]
logger.debug(f"Saving {osd_queue} to store")
self.mgr.set_store('osd_remove_queue', json.dumps(osd_queue))
def load_from_store(self) -> None:
with self.lock:
for k, v in self.mgr.get_store_prefix('osd_remove_queue').items():
for osd in json.loads(v):
logger.debug(f"Loading osd ->{osd} from store")
osd_obj = OSD.from_json(osd, rm_util=self.rm_util)
if osd_obj is not None:
self.osds.add(osd_obj)
def as_osd_ids(self) -> List[int]:
return [osd.osd_id for osd in self]
with self.lock:
return [osd.osd_id for osd in self.osds]
def queue_size(self) -> int:
return len(self)
with self.lock:
return len(self.osds)
def draining_osds(self) -> List["OSD"]:
return [osd for osd in self if osd.is_draining]
with self.lock:
return [osd for osd in self.osds if osd.is_draining]
def idling_osds(self) -> List["OSD"]:
return [osd for osd in self if not osd.is_draining and not osd.is_empty]
with self.lock:
return [osd for osd in self.osds if not osd.is_draining and not osd.is_empty]
def empty_osds(self) -> List["OSD"]:
return [osd for osd in self if osd.is_empty]
with self.lock:
return [osd for osd in self.osds if osd.is_empty]
def all_osds(self) -> List["OSD"]:
return [osd for osd in self]
with self.lock:
return [osd for osd in self.osds]
def not_in_cluster(self) -> List["OSD"]:
return [osd for osd in self if not osd.exists]
def _not_in_cluster(self) -> List["OSD"]:
return [osd for osd in self.osds if not osd.exists]
def enqueue(self, osd: "OSD") -> None:
if not osd.exists:
raise NotFoundError()
self.add(osd)
with self.lock:
self.osds.add(osd)
osd.start()
def rm(self, osd: "OSD") -> None:
if not osd.exists:
raise NotFoundError()
osd.stop()
try:
logger.debug(f'Removing {osd} from the queue.')
self.remove(osd)
except KeyError:
logger.debug(f"Could not find {osd} in queue.")
raise KeyError
with self.lock:
try:
logger.debug(f'Removing {osd} from the queue.')
self.osds.remove(osd)
except KeyError:
logger.debug(f"Could not find {osd} in queue.")
raise KeyError
def __eq__(self, other: Any) -> bool:
if not isinstance(other, OSDRemovalQueue):
return False
with self.lock:
return self.osds == other.osds

View File

@ -6,7 +6,7 @@ import pytest
from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection
from cephadm.serve import CephadmServe
from cephadm.services.osd import OSD, OSDQueue
from cephadm.services.osd import OSD, OSDRemovalQueue
try:
from typing import Any, List
@ -513,10 +513,10 @@ class TestCephadm(object):
hostname='test',
fullname='osd.0',
process_started_at=datetime_now(),
remove_util=cephadm_module.rm_util
remove_util=cephadm_module.to_remove_osds.rm_util
))
cephadm_module.rm_util.process_removal_queue()
assert cephadm_module.to_remove_osds == OSDQueue()
cephadm_module.to_remove_osds.process_removal_queue()
assert cephadm_module.to_remove_osds == OSDRemovalQueue(cephadm_module)
c = cephadm_module.remove_osds_status()
out = wait(cephadm_module, c)

View File

@ -1,6 +1,6 @@
import json
from cephadm.services.osd import RemoveUtil, OSDQueue, OSD
from cephadm.services.osd import RemoveUtil, OSDRemovalQueue, OSD
import pytest
from .fixtures import rm_util, osd_obj, cephadm_module
from tests import mock
@ -73,7 +73,7 @@ class TestOSDRemoval:
rm_util._run_mon_cmd.assert_called_with(
{'prefix': 'osd purge-actual', 'id': 1, 'yes_i_really_mean_it': True})
def test_load(self, cephadm_module):
def test_load(self, cephadm_module, rm_util):
data = json.dumps([
{
"osd_id": 35,
@ -90,9 +90,11 @@ class TestOSDRemoval:
}
])
cephadm_module.set_store('osd_remove_queue', data)
cephadm_module.rm_util.load_from_store()
cephadm_module.to_remove_osds.load_from_store()
assert repr(cephadm_module.to_remove_osds) == 'OSDQueue({<OSD>(osd_id=35, draining=True)})'
expected = OSDRemovalQueue(cephadm_module)
expected.osds.add(OSD(osd_id=35, remove_util=rm_util, draining=True))
assert cephadm_module.to_remove_osds == expected
class TestOSD:
@ -213,25 +215,25 @@ class TestOSD:
assert osd_obj.drain_status_human() == 'done, waiting for purge'
class TestOSDQueue:
class TestOSDRemovalQueue:
def test_queue_size(self, osd_obj):
q = OSDQueue()
q = OSDRemovalQueue(mock.Mock())
assert q.queue_size() == 0
q.add(osd_obj)
q.osds.add(osd_obj)
assert q.queue_size() == 1
@mock.patch("cephadm.services.osd.OSD.start")
@mock.patch("cephadm.services.osd.OSD.exists")
def test_enqueue(self, exist, start, osd_obj):
q = OSDQueue()
q = OSDRemovalQueue(mock.Mock())
q.enqueue(osd_obj)
osd_obj.start.assert_called_once()
@mock.patch("cephadm.services.osd.OSD.stop")
@mock.patch("cephadm.services.osd.OSD.exists")
def test_rm_raise(self, exist, stop, osd_obj):
q = OSDQueue()
q = OSDRemovalQueue(mock.Mock())
with pytest.raises(KeyError):
q.rm(osd_obj)
osd_obj.stop.assert_called_once()
@ -239,7 +241,7 @@ class TestOSDQueue:
@mock.patch("cephadm.services.osd.OSD.stop")
@mock.patch("cephadm.services.osd.OSD.exists")
def test_rm(self, exist, stop, osd_obj):
q = OSDQueue()
q.add(osd_obj)
q = OSDRemovalQueue(mock.Mock())
q.osds.add(osd_obj)
q.rm(osd_obj)
osd_obj.stop.assert_called_once()

View File

@ -6,7 +6,7 @@ from cephadm.services.cephadmservice import MonService, MgrService, MdsService,
RbdMirrorService, CrashService, CephadmService, AuthEntity, CephadmExporter
from cephadm.services.iscsi import IscsiService
from cephadm.services.nfs import NFSService
from cephadm.services.osd import RemoveUtil, OSDQueue, OSDService, OSD, NotFoundError
from cephadm.services.osd import RemoveUtil, OSDRemovalQueue, OSDService, OSD, NotFoundError
from cephadm.services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
NodeExporterService