Merge pull request #34346 from jschmid1/osd_id_claims

mgr/cephadm: add support for osd_id_claims

Reviewed-by: Jan Fajerski <jfajerski@suse.com>
Reviewed-by: Sebastian Wagner <sebastian.wagner@suse.com>
This commit is contained in:
Sebastian Wagner 2020-04-16 16:37:34 +02:00 committed by GitHub
commit c5f0a2e718
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 134 additions and 29 deletions

View File

@ -2084,32 +2084,62 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
return blink(locs)
def get_osd_uuid_map(self, only_up=False):
# type: (bool) -> Dict[str,str]
# type: (bool) -> Dict[str, str]
osd_map = self.get('osd_map')
r = {}
for o in osd_map['osds']:
# only include OSDs that have ever started in this map. this way
# an interrupted osd create can be repeated and succeed the second
# time around.
if not only_up or o['up_from'] > 0:
r[str(o['osd'])] = o['uuid']
osd_id = o.get('osd')
if osd_id is None:
raise OrchestratorError("Could not retrieve osd_id from osd_map")
if not only_up or (o['up_from'] > 0):
r[str(osd_id)] = o.get('uuid', '')
return r
@trivial_completion
def apply_drivegroups(self, specs: List[DriveGroupSpec]):
return [self._apply(spec) for spec in specs]
def find_destroyed_osds(self) -> Dict[str, List[str]]:
osd_host_map: Dict[str, List[str]] = dict()
ret, out, err = self.mon_command({
'prefix': 'osd tree',
'states': ['destroyed'],
'format': 'json'
})
if ret != 0:
raise OrchestratorError(f"Caught error on calling 'osd tree destroyed' -> {err}")
try:
tree = json.loads(out)
except json.decoder.JSONDecodeError:
self.log.error(f"Could not decode json -> {out}")
return osd_host_map
nodes = tree.get('nodes', {})
for node in nodes:
if node.get('type') == 'host':
osd_host_map.update(
{node.get('name'): [str(_id) for _id in node.get('children', list())]}
)
return osd_host_map
@trivial_completion
def create_osds(self, drive_group: DriveGroupSpec):
self.log.debug(f"Processing DriveGroup {drive_group}")
ret = []
drive_group.osd_id_claims = self.find_destroyed_osds()
self.log.info(f"Found osd claims for drivegroup {drive_group.service_id} -> {drive_group.osd_id_claims}")
for host, drive_selection in self.prepare_drivegroup(drive_group):
self.log.info('Applying %s on host %s...' % (drive_group.service_id, host))
cmd = self.driveselection_to_ceph_volume(drive_group, drive_selection)
cmd = self.driveselection_to_ceph_volume(drive_group, drive_selection,
drive_group.osd_id_claims.get(host, []))
if not cmd:
self.log.debug("No data_devices, skipping DriveGroup: {}".format(drive_group.service_id))
continue
ret_msg = self._create_osd(host, cmd)
ret_msg = self._create_osd(host, cmd,
replace_osd_ids=drive_group.osd_id_claims.get(host, []))
ret.append(ret_msg)
return ", ".join(ret)
@ -2119,6 +2149,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
# 2) Map the inventory to the InventoryHost object
host_ds_map = []
# set osd_id_claims
def _find_inv_for_host(hostname: str, inventory_dict: dict):
# This is stupid and needs to be loaded with the host
for _host, _inventory in inventory_dict.items():
@ -2138,9 +2170,10 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
def driveselection_to_ceph_volume(self, drive_group: DriveGroupSpec,
drive_selection: DriveSelection,
osd_id_claims: Optional[List[str]] = None,
preview: bool = False) -> Optional[str]:
self.log.debug(f"Translating DriveGroup <{drive_group}> to ceph-volume command")
cmd: Optional[str] = translate.to_ceph_volume(drive_group, drive_selection, preview=preview).run()
cmd: Optional[str] = translate.to_ceph_volume(drive_group, drive_selection, osd_id_claims, preview=preview).run()
self.log.debug(f"Resulting ceph-volume cmd: {cmd}")
return cmd
@ -2156,9 +2189,12 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
drive_groups = []
ret_all = []
for drive_group in drive_groups:
drive_group.osd_id_claims = self.find_destroyed_osds()
self.log.info(f"Found osd claims for drivegroup {drive_group.service_id} -> {drive_group.osd_id_claims}")
# prepare driveselection
for host, ds in self.prepare_drivegroup(drive_group):
cmd = self.driveselection_to_ceph_volume(drive_group, ds, preview=True)
cmd = self.driveselection_to_ceph_volume(drive_group, ds,
drive_group.osd_id_claims.get(host, []), preview=True)
if not cmd:
self.log.debug("No data_devices, skipping DriveGroup: {}".format(drive_group.service_name()))
continue
@ -2197,7 +2233,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
error_ok=True)
return out, err, code
def _create_osd(self, host, cmd):
def _create_osd(self, host, cmd, replace_osd_ids=None):
out, err, code = self._run_ceph_volume_command(host, cmd)
if code == 1 and ', it is already prepared' in '\n'.join(err):
@ -2229,16 +2265,16 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
if osd['tags']['ceph.cluster_fsid'] != fsid:
self.log.debug('mismatched fsid, skipping %s' % osd)
continue
if osd_id in before_osd_uuid_map:
# this osd existed before we ran prepare
if osd_id in before_osd_uuid_map and osd_id not in replace_osd_ids:
# if it exists but is part of the replacement operation, don't skip
continue
if osd_id not in osd_uuid_map:
self.log.debug('osd id %d does not exist in cluster' % osd_id)
self.log.debug('osd id {} does not exist in cluster'.format(osd_id))
continue
if osd_uuid_map[osd_id] != osd['tags']['ceph.osd_fsid']:
if osd_uuid_map.get(osd_id) != osd['tags']['ceph.osd_fsid']:
self.log.debug('mismatched osd uuid (cluster has %s, osd '
'has %s)' % (
osd_uuid_map[osd_id],
osd_uuid_map.get(osd_id),
osd['tags']['ceph.osd_fsid']))
continue
@ -2333,7 +2369,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
if daemon_type == 'osd':
if not osd_uuid_map:
osd_uuid_map = self.get_osd_uuid_map()
osd_uuid = osd_uuid_map.get(daemon_id, None)
osd_uuid = osd_uuid_map.get(daemon_id)
if not osd_uuid:
raise OrchestratorError('osd.%d not in osdmap' % daemon_id)
extra_args.extend(['--osd-fsid', osd_uuid])

View File

@ -178,6 +178,57 @@ class TestCephadm(object):
r = cephadm_module._apply_service(ServiceSpec('mgr', placement=ps))
assert r
@mock.patch("cephadm.module.CephadmOrchestrator.mon_command")
def test_find_destroyed_osds(self, _mon_cmd, cephadm_module):
dict_out = {
"nodes": [
{
"id": -1,
"name": "default",
"type": "root",
"type_id": 11,
"children": [
-3
]
},
{
"id": -3,
"name": "host1",
"type": "host",
"type_id": 1,
"pool_weights": {},
"children": [
0
]
},
{
"id": 0,
"device_class": "hdd",
"name": "osd.0",
"type": "osd",
"type_id": 0,
"crush_weight": 0.0243988037109375,
"depth": 2,
"pool_weights": {},
"exists": 1,
"status": "destroyed",
"reweight": 1,
"primary_affinity": 1
}
],
"stray": []
}
json_out = json.dumps(dict_out)
_mon_cmd.return_value = (0, json_out, '')
out = cephadm_module.find_destroyed_osds()
assert out == {'host1': ['0']}
@mock.patch("cephadm.module.CephadmOrchestrator.mon_command")
def test_find_destroyed_osds_cmd_failure(self, _mon_cmd, cephadm_module):
_mon_cmd.return_value = (1, "", "fail_msg")
with pytest.raises(OrchestratorError):
out = cephadm_module.find_destroyed_osds()
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.module.SpecStore.save")
def test_apply_osd_save(self, _save_spec, cephadm_module):
@ -237,7 +288,7 @@ class TestCephadm(object):
dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'), data_devices=DeviceSelection(paths=devices))
ds = DriveSelection(dg, Devices([Device(path) for path in devices]))
preview = preview
out = cephadm_module.driveselection_to_ceph_volume(dg, ds, preview)
out = cephadm_module.driveselection_to_ceph_volume(dg, ds, [], preview)
assert out in exp_command
@mock.patch("cephadm.module.SpecStore.find")

View File

@ -144,7 +144,7 @@ class DriveGroupSpec(ServiceSpec):
encrypted=False, # type: bool
db_slots=None, # type: Optional[int]
wal_slots=None, # type: Optional[int]
osd_id_claims=None, # type: Optional[Dict[str, DeviceSelection]]
osd_id_claims=None, # type: Optional[Dict[str, List[str]]]
block_db_size=None, # type: Optional[int]
block_wal_size=None, # type: Optional[int]
journal_size=None, # type: Optional[int]
@ -196,10 +196,9 @@ class DriveGroupSpec(ServiceSpec):
#: How many OSDs per WAL device
self.wal_slots = wal_slots
#: Optional: mapping of OSD id to DeviceSelection, used when the
#: created OSDs are meant to replace previous OSDs on
#: the same node. See :ref:`orchestrator-osd-replace`
self.osd_id_claims = osd_id_claims
#: Optional: mapping of host -> List of osd_ids that should be replaced
#: See :ref:`orchestrator-osd-replace`
self.osd_id_claims = osd_id_claims or dict()
@classmethod
def _from_json_impl(cls, json_drive_group):

View File

@ -1,7 +1,7 @@
import logging
try:
from typing import Optional
from typing import Optional, List
except ImportError:
pass
@ -16,12 +16,14 @@ class to_ceph_volume(object):
def __init__(self,
spec, # type: DriveGroupSpec
selection, # type: DriveSelection
preview=False
osd_id_claims=None, # type: Optional[List[str]]
preview=False # type: bool
):
self.spec = spec
self.selection = selection
self.preview = preview
self.osd_id_claims = osd_id_claims
def run(self):
# type: () -> Optional[str]
@ -34,6 +36,7 @@ class to_ceph_volume(object):
if not data_devices:
return None
cmd = ""
if self.spec.objectstore == 'filestore':
cmd = "lvm batch --no-auto"
@ -56,6 +59,8 @@ class to_ceph_volume(object):
not db_devices and \
not wal_devices:
cmd = "lvm prepare --bluestore --data %s --no-systemd" % (' '.join(data_devices))
if self.osd_id_claims:
cmd += " --osd-id {}".format(str(self.osd_id_claims[0]))
if self.preview:
# Like every horrible hack, this has sideffects on other features.
# In this case, 'lvm prepare' has neither a '--report' nor a '--format json' option
@ -86,6 +91,9 @@ class to_ceph_volume(object):
if self.spec.osds_per_device:
cmd += " --osds-per-device {}".format(self.spec.osds_per_device)
if self.osd_id_claims:
cmd += " --osd-ids {}".format(" ".join(self.osd_id_claims))
cmd += " --yes"
cmd += " --no-systemd"

View File

@ -72,7 +72,7 @@ def test_ceph_volume_command_0():
)
inventory = _mk_inventory(_mk_device()*2)
sel = drive_selection.DriveSelection(spec, inventory)
cmd = translate.to_ceph_volume(spec, sel).run()
cmd = translate.to_ceph_volume(spec, sel, []).run()
assert cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd'
@ -83,7 +83,7 @@ def test_ceph_volume_command_1():
)
inventory = _mk_inventory(_mk_device(rotational=True)*2 + _mk_device(rotational=False)*2)
sel = drive_selection.DriveSelection(spec, inventory)
cmd = translate.to_ceph_volume(spec, sel).run()
cmd = translate.to_ceph_volume(spec, sel, []).run()
assert cmd == ('lvm batch --no-auto /dev/sda /dev/sdb '
'--db-devices /dev/sdc /dev/sdd --yes --no-systemd')
@ -99,7 +99,7 @@ def test_ceph_volume_command_2():
_mk_device(size="10.0 GB", rotational=False)*2
)
sel = drive_selection.DriveSelection(spec, inventory)
cmd = translate.to_ceph_volume(spec, sel).run()
cmd = translate.to_ceph_volume(spec, sel, []).run()
assert cmd == ('lvm batch --no-auto /dev/sda /dev/sdb '
'--db-devices /dev/sdc /dev/sdd --wal-devices /dev/sde /dev/sdf '
'--yes --no-systemd')
@ -117,7 +117,7 @@ def test_ceph_volume_command_3():
_mk_device(size="10.0 GB", rotational=False)*2
)
sel = drive_selection.DriveSelection(spec, inventory)
cmd = translate.to_ceph_volume(spec, sel).run()
cmd = translate.to_ceph_volume(spec, sel, []).run()
assert cmd == ('lvm batch --no-auto /dev/sda /dev/sdb '
'--db-devices /dev/sdc /dev/sdd '
'--wal-devices /dev/sde /dev/sdf --dmcrypt '
@ -139,7 +139,7 @@ def test_ceph_volume_command_4():
_mk_device(size="10.0 GB", rotational=False)*2
)
sel = drive_selection.DriveSelection(spec, inventory)
cmd = translate.to_ceph_volume(spec, sel).run()
cmd = translate.to_ceph_volume(spec, sel, []).run()
assert cmd == ('lvm batch --no-auto /dev/sda /dev/sdb '
'--db-devices /dev/sdc /dev/sdd --wal-devices /dev/sde /dev/sdf '
'--block-wal-size 500M --block-db-size 500M --dmcrypt '
@ -153,7 +153,7 @@ def test_ceph_volume_command_5():
)
inventory = _mk_inventory(_mk_device(rotational=True)*2)
sel = drive_selection.DriveSelection(spec, inventory)
cmd = translate.to_ceph_volume(spec, sel).run()
cmd = translate.to_ceph_volume(spec, sel, []).run()
assert cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --filestore --yes --no-systemd'
@ -166,7 +166,18 @@ def test_ceph_volume_command_6():
)
inventory = _mk_inventory(_mk_device(rotational=True)*2 + _mk_device(rotational=False)*2)
sel = drive_selection.DriveSelection(spec, inventory)
cmd = translate.to_ceph_volume(spec, sel).run()
cmd = translate.to_ceph_volume(spec, sel, []).run()
assert cmd == ('lvm batch --no-auto /dev/sdc /dev/sdd '
'--journal-size 500M --journal-devices /dev/sda /dev/sdb '
'--filestore --yes --no-systemd')
def test_ceph_volume_command_7():
spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'),
data_devices=DeviceSelection(all=True),
osd_id_claims={'host1': ['0', '1']}
)
inventory = _mk_inventory(_mk_device(rotational=True)*2)
sel = drive_selection.DriveSelection(spec, inventory)
cmd = translate.to_ceph_volume(spec, sel, ['0', '1']).run()
assert cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --osd-ids 0 1 --yes --no-systemd'