mirror of
https://github.com/ceph/ceph
synced 2025-01-04 02:02:36 +00:00
Merge pull request #34216 from jschmid1/preview_drivegroups
mgr/cephadm: preview osd creation for osdspecs Reviewed-by: Kiefer Chang <kiefer.chang@suse.com> Reviewed-by: Sebastian Wagner <sebastian.wagner@suse.com>
This commit is contained in:
commit
8f605c7c86
@ -30,7 +30,7 @@ import uuid
|
||||
|
||||
from ceph.deployment import inventory, translate
|
||||
from ceph.deployment.drive_group import DriveGroupSpec
|
||||
from ceph.deployment.drive_selection import selector
|
||||
from ceph.deployment.drive_selection.selector import DriveSelection
|
||||
from ceph.deployment.service_spec import \
|
||||
HostPlacementSpec, NFSServiceSpec, ServiceSpec, PlacementSpec, assert_valid_host
|
||||
|
||||
@ -144,8 +144,7 @@ class SpecStore():
|
||||
del self.spec_created[service_name]
|
||||
self.mgr.set_store(SPEC_STORE_PREFIX + service_name, None)
|
||||
|
||||
def find(self, service_name=None):
|
||||
# type: (Optional[str]) -> List[ServiceSpec]
|
||||
def find(self, service_name: Optional[str] = None) -> List[ServiceSpec]:
|
||||
specs = []
|
||||
for sn, spec in self.specs.items():
|
||||
if not service_name or \
|
||||
@ -2102,10 +2101,23 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
|
||||
|
||||
@trivial_completion
|
||||
def create_osds(self, drive_group: DriveGroupSpec):
|
||||
self.log.debug("Processing DriveGroup {}".format(drive_group))
|
||||
self.log.debug(f"Processing DriveGroup {drive_group}")
|
||||
ret = []
|
||||
for host, drive_selection in self.prepare_drivegroup(drive_group):
|
||||
self.log.info('Applying %s on host %s...' % (drive_group.service_id, host))
|
||||
cmd = self.driveselection_to_ceph_volume(drive_group, drive_selection)
|
||||
if not cmd:
|
||||
self.log.debug("No data_devices, skipping DriveGroup: {}".format(drive_group.service_id))
|
||||
continue
|
||||
ret_msg = self._create_osd(host, cmd)
|
||||
ret.append(ret_msg)
|
||||
return ", ".join(ret)
|
||||
|
||||
def prepare_drivegroup(self, drive_group: DriveGroupSpec) -> List[Tuple[str, DriveSelection]]:
|
||||
# 1) use fn_filter to determine matching_hosts
|
||||
matching_hosts = drive_group.placement.pattern_matches_hosts([x for x in self.cache.get_hosts()])
|
||||
# 2) Map the inventory to the InventoryHost object
|
||||
host_ds_map = []
|
||||
|
||||
def _find_inv_for_host(hostname: str, inventory_dict: dict):
|
||||
# This is stupid and needs to be loaded with the host
|
||||
@ -2114,27 +2126,49 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
|
||||
return _inventory
|
||||
raise OrchestratorError("No inventory found for host: {}".format(hostname))
|
||||
|
||||
ret = []
|
||||
# 3) iterate over matching_host and call DriveSelection and to_ceph_volume
|
||||
# 3) iterate over matching_host and call DriveSelection
|
||||
self.log.debug(f"Checking matching hosts -> {matching_hosts}")
|
||||
for host in matching_hosts:
|
||||
inventory_for_host = _find_inv_for_host(host, self.cache.devices)
|
||||
self.log.debug(f"Found inventory for host {inventory_for_host}")
|
||||
drive_selection = selector.DriveSelection(drive_group, inventory_for_host)
|
||||
drive_selection = DriveSelection(drive_group, inventory_for_host)
|
||||
self.log.debug(f"Found drive selection {drive_selection}")
|
||||
cmd = translate.to_ceph_volume(drive_group, drive_selection).run()
|
||||
self.log.debug(f"translated to cmd {cmd}")
|
||||
host_ds_map.append((host, drive_selection))
|
||||
return host_ds_map
|
||||
|
||||
def driveselection_to_ceph_volume(self, drive_group: DriveGroupSpec,
|
||||
drive_selection: DriveSelection,
|
||||
preview: bool = False) -> Optional[str]:
|
||||
self.log.debug(f"Translating DriveGroup <{drive_group}> to ceph-volume command")
|
||||
cmd: Optional[str] = translate.to_ceph_volume(drive_group, drive_selection, preview=preview).run()
|
||||
self.log.debug(f"Resulting ceph-volume cmd: {cmd}")
|
||||
return cmd
|
||||
|
||||
def preview_drivegroups(self, drive_group_name: Optional[str] = None,
|
||||
dg_specs: Optional[List[DriveGroupSpec]] = None) -> List[Dict[str, Dict[Any, Any]]]:
|
||||
# find drivegroups
|
||||
if drive_group_name:
|
||||
drive_groups = cast(List[DriveGroupSpec],
|
||||
self.spec_store.find(service_name=drive_group_name))
|
||||
elif dg_specs:
|
||||
drive_groups = dg_specs
|
||||
else:
|
||||
drive_groups = []
|
||||
ret_all = []
|
||||
for drive_group in drive_groups:
|
||||
# prepare driveselection
|
||||
for host, ds in self.prepare_drivegroup(drive_group):
|
||||
cmd = self.driveselection_to_ceph_volume(drive_group, ds, preview=True)
|
||||
if not cmd:
|
||||
self.log.debug("No data_devices, skipping DriveGroup: {}".format(drive_group.service_name()))
|
||||
continue
|
||||
self.log.info('Applying %s on host %s...' % (
|
||||
drive_group.service_name(), host))
|
||||
ret_msg = self._create_osd(host, cmd)
|
||||
ret.append(ret_msg)
|
||||
return ", ".join(ret)
|
||||
|
||||
def _create_osd(self, host, cmd):
|
||||
out, err, code = self._run_ceph_volume_command(host, cmd)
|
||||
if out:
|
||||
concat_out = json.loads(" ".join(out))
|
||||
ret_all.append({'data': concat_out, 'drivegroup': drive_group.service_id, 'host': host})
|
||||
return ret_all
|
||||
|
||||
def _run_ceph_volume_command(self, host: str, cmd: str) -> Tuple[List[str], List[str], int]:
|
||||
self._require_hosts(host)
|
||||
|
||||
# get bootstrap key
|
||||
@ -2153,8 +2187,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
|
||||
'keyring': keyring,
|
||||
})
|
||||
|
||||
before_osd_uuid_map = self.get_osd_uuid_map(only_up=True)
|
||||
|
||||
split_cmd = cmd.split(' ')
|
||||
_cmd = ['--config-json', '-', '--']
|
||||
_cmd.extend(split_cmd)
|
||||
@ -2163,6 +2195,11 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
|
||||
_cmd,
|
||||
stdin=j,
|
||||
error_ok=True)
|
||||
return out, err, code
|
||||
|
||||
def _create_osd(self, host, cmd):
|
||||
out, err, code = self._run_ceph_volume_command(host, cmd)
|
||||
|
||||
if code == 1 and ', it is already prepared' in '\n'.join(err):
|
||||
# HACK: when we create against an existing LV, ceph-volume
|
||||
# returns an error and the above message. To make this
|
||||
@ -2182,6 +2219,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
|
||||
'lvm', 'list',
|
||||
'--format', 'json',
|
||||
])
|
||||
before_osd_uuid_map = self.get_osd_uuid_map(only_up=True)
|
||||
osds_elems = json.loads('\n'.join(out))
|
||||
fsid = self._cluster_fsid
|
||||
osd_uuid_map = self.get_osd_uuid_map()
|
||||
|
@ -16,6 +16,8 @@ from execnet.gateway_bootstrap import HostNotFound
|
||||
|
||||
from ceph.deployment.service_spec import ServiceSpec, PlacementSpec, RGWSpec, \
|
||||
NFSServiceSpec, IscsiServiceSpec
|
||||
from ceph.deployment.drive_selection.selector import DriveSelection
|
||||
from ceph.deployment.inventory import Devices, Device
|
||||
from orchestrator import ServiceDescription, DaemonDescription, InventoryHost, \
|
||||
HostSpec, OrchestratorError
|
||||
from tests import mock
|
||||
@ -157,7 +159,6 @@ class TestCephadm(object):
|
||||
c = cephadm_module.daemon_action(what, 'rgw', 'myrgw.foobar')
|
||||
assert wait(cephadm_module, c) == [what + " rgw.myrgw.foobar from host 'test'"]
|
||||
|
||||
|
||||
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
|
||||
def test_mon_add(self, cephadm_module):
|
||||
with self._with_host(cephadm_module, 'test'):
|
||||
@ -207,6 +208,54 @@ class TestCephadm(object):
|
||||
out = wait(cephadm_module, c)
|
||||
assert out == "Created no osd(s) on host test; already created?"
|
||||
|
||||
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
|
||||
def test_prepare_drivegroup(self, cephadm_module):
|
||||
with self._with_host(cephadm_module, 'test'):
|
||||
dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'), data_devices=DeviceSelection(paths=['']))
|
||||
out = cephadm_module.prepare_drivegroup(dg)
|
||||
assert len(out) == 1
|
||||
f1 = out[0]
|
||||
assert f1[0] == 'test'
|
||||
assert isinstance(f1[1], DriveSelection)
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"devices, preview, exp_command",
|
||||
[
|
||||
# no preview and only one disk, prepare is used due the hack that is in place.
|
||||
(['/dev/sda'], False, "lvm prepare --bluestore --data /dev/sda --no-systemd"),
|
||||
# no preview and multiple disks, uses batch
|
||||
(['/dev/sda', '/dev/sdb'], False, "lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd"),
|
||||
# preview and only one disk needs to use batch again to generate the preview
|
||||
(['/dev/sda'], True, "lvm batch --no-auto /dev/sda --report --format json"),
|
||||
# preview and multiple disks work the same
|
||||
(['/dev/sda', '/dev/sdb'], True, "lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd --report --format json"),
|
||||
]
|
||||
)
|
||||
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
|
||||
def test_driveselection_to_ceph_volume(self, cephadm_module, devices, preview, exp_command):
|
||||
with self._with_host(cephadm_module, 'test'):
|
||||
dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'), data_devices=DeviceSelection(paths=devices))
|
||||
ds = DriveSelection(dg, Devices([Device(path) for path in devices]))
|
||||
preview = preview
|
||||
out = cephadm_module.driveselection_to_ceph_volume(dg, ds, preview)
|
||||
assert out in exp_command
|
||||
|
||||
@mock.patch("cephadm.module.SpecStore.find")
|
||||
@mock.patch("cephadm.module.CephadmOrchestrator.prepare_drivegroup")
|
||||
@mock.patch("cephadm.module.CephadmOrchestrator.driveselection_to_ceph_volume")
|
||||
@mock.patch("cephadm.module.CephadmOrchestrator._run_ceph_volume_command")
|
||||
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
|
||||
def test_preview_drivegroups_str(self, _run_c_v_command, _ds_to_cv, _prepare_dg, _find_store, cephadm_module):
|
||||
with self._with_host(cephadm_module, 'test'):
|
||||
dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'), data_devices=DeviceSelection(paths=['']))
|
||||
_find_store.return_value = [dg]
|
||||
_prepare_dg.return_value = [('host1', 'ds_dummy')]
|
||||
_run_c_v_command.return_value = ("{}", '', 0)
|
||||
cephadm_module.preview_drivegroups(drive_group_name='foo')
|
||||
_find_store.assert_called_once_with(service_name='foo')
|
||||
_prepare_dg.assert_called_once_with(dg)
|
||||
_run_c_v_command.assert_called_once()
|
||||
|
||||
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(
|
||||
json.dumps([
|
||||
dict(
|
||||
|
@ -941,6 +941,14 @@ class Orchestrator(object):
|
||||
""" Update OSD cluster """
|
||||
raise NotImplementedError()
|
||||
|
||||
def set_unmanaged_flag(self, service_name: str, unmanaged_flag: bool) -> HandleCommandResult:
|
||||
raise NotImplementedError()
|
||||
|
||||
def preview_drivegroups(self, drive_group_name: Optional[str] = 'osd',
|
||||
dg_specs: Optional[List[DriveGroupSpec]] = None) -> List[Dict[str, Dict[Any, Any]]]:
|
||||
""" Get a preview for OSD deployments """
|
||||
raise NotImplementedError()
|
||||
|
||||
def remove_osds(self, osd_ids: List[str],
|
||||
replace: bool = False,
|
||||
force: bool = False) -> Completion:
|
||||
|
@ -27,6 +27,7 @@ from ._interface import OrchestratorClientMixin, DeviceLightLoc, _cli_read_comma
|
||||
RGWSpec, InventoryFilter, InventoryHost, HostSpec, CLICommandMeta, \
|
||||
ServiceDescription, DaemonDescription, IscsiServiceSpec
|
||||
|
||||
|
||||
def nice_delta(now, t, suffix=''):
|
||||
if t:
|
||||
return to_pretty_timedelta(now - t) + suffix
|
||||
@ -457,18 +458,98 @@ class OrchestratorCli(OrchestratorClientMixin, MgrModule):
|
||||
|
||||
return HandleCommandResult(stdout=table.get_string())
|
||||
|
||||
def set_unmanaged_flag(self, service_name: str, unmanaged_flag: bool) -> HandleCommandResult:
|
||||
# setting unmanaged for $service_name
|
||||
completion = self.describe_service(service_name=service_name)
|
||||
self._orchestrator_wait([completion])
|
||||
raise_if_exception(completion)
|
||||
services: List[ServiceDescription] = completion.result
|
||||
specs = list()
|
||||
for service in services:
|
||||
spec = service.spec
|
||||
spec.unmanaged = unmanaged_flag
|
||||
specs.append(spec)
|
||||
completion = self.apply(specs)
|
||||
self._orchestrator_wait([completion])
|
||||
raise_if_exception(completion)
|
||||
if specs:
|
||||
return HandleCommandResult(stdout=f"Changed <unmanaged> flag to <{unmanaged_flag}> for "
|
||||
f"{[spec.service_name() for spec in specs]}")
|
||||
else:
|
||||
return HandleCommandResult(stdout=f"No specs found with the <service_name> -> {service_name}")
|
||||
|
||||
@_cli_write_command(
|
||||
'orch apply osd',
|
||||
'name=all_available_devices,type=CephBool,req=false',
|
||||
'name=all_available_devices,type=CephBool,req=false '
|
||||
'name=preview,type=CephBool,req=false '
|
||||
'name=service_name,type=CephString,req=false '
|
||||
'name=unmanaged,type=CephBool,req=false '
|
||||
"name=format,type=CephChoices,strings=plain|json|json-pretty|yaml,req=false",
|
||||
'Create OSD daemon(s) using a drive group spec')
|
||||
def _apply_osd(self, all_available_devices=False, inbuf=None):
|
||||
# type: (bool, Optional[str]) -> HandleCommandResult
|
||||
def _apply_osd(self,
|
||||
all_available_devices: bool = False,
|
||||
preview: bool = False,
|
||||
service_name: Optional[str] = None,
|
||||
unmanaged: Optional[bool] = None,
|
||||
format: Optional[str] = 'plain',
|
||||
inbuf: Optional[str] = None) -> HandleCommandResult:
|
||||
"""Apply DriveGroupSpecs to create OSDs"""
|
||||
usage = """
|
||||
Usage:
|
||||
ceph orch apply osd -i <json_file/yaml_file>
|
||||
ceph orch apply osd --use-all-devices
|
||||
ceph orch apply osd --service-name <service_name> --preview
|
||||
ceph orch apply osd --service-name <service_name> --unmanaged=True|False
|
||||
"""
|
||||
|
||||
def print_preview(prev, format):
|
||||
if format != 'plain':
|
||||
return to_format(prev, format)
|
||||
else:
|
||||
table = PrettyTable(
|
||||
['NAME', 'HOST', 'DATA', 'DB', 'WAL'],
|
||||
border=False)
|
||||
table.align = 'l'
|
||||
table.left_padding_width = 0
|
||||
table.right_padding_width = 1
|
||||
for data in prev:
|
||||
dg_name = data.get('drivegroup')
|
||||
hostname = data.get('host')
|
||||
for osd in data.get('data', {}).get('osds', []):
|
||||
db_path = '-'
|
||||
wal_path = '-'
|
||||
block_db = osd.get('block.db', {}).get('path')
|
||||
block_wal = osd.get('block.wal', {}).get('path')
|
||||
block_data = osd.get('data', {}).get('path', '')
|
||||
if not block_data:
|
||||
continue
|
||||
if block_db:
|
||||
db_path = data.get('data', {}).get('vg', {}).get('devices', [])
|
||||
if block_wal:
|
||||
wal_path = data.get('data', {}).get('wal_vg', {}).get('devices', [])
|
||||
table.add_row((dg_name, hostname, block_data, db_path, wal_path))
|
||||
out = table.get_string()
|
||||
if not out:
|
||||
out = "No pending deployments."
|
||||
return out
|
||||
|
||||
if (inbuf or all_available_devices) and service_name:
|
||||
# mutually exclusive
|
||||
return HandleCommandResult(-errno.EINVAL, stderr=usage)
|
||||
|
||||
if preview and not (service_name or all_available_devices or inbuf):
|
||||
# get all stored drivegroups and print
|
||||
prev = self.preview_drivegroups()
|
||||
return HandleCommandResult(stdout=print_preview(prev, format))
|
||||
|
||||
if service_name and preview:
|
||||
# get specified drivegroup and print
|
||||
prev = self.preview_drivegroups(service_name)
|
||||
return HandleCommandResult(stdout=print_preview(prev, format))
|
||||
|
||||
if service_name and unmanaged is not None:
|
||||
return self.set_unmanaged_flag(service_name, unmanaged)
|
||||
|
||||
if not inbuf and not all_available_devices:
|
||||
return HandleCommandResult(-errno.EINVAL, stderr=usage)
|
||||
if inbuf:
|
||||
@ -476,7 +557,7 @@ Usage:
|
||||
raise OrchestratorError('--all-available-devices cannot be combined with an osd spec')
|
||||
try:
|
||||
drivegroups = yaml.load_all(inbuf)
|
||||
dg_specs = [ServiceSpec.from_json(dg) for dg in drivegroups]
|
||||
dg_specs = [DriveGroupSpec.from_json(dg) for dg in drivegroups]
|
||||
except ValueError as e:
|
||||
msg = 'Failed to read JSON/YAML input: {}'.format(str(e)) + usage
|
||||
return HandleCommandResult(-errno.EINVAL, stderr=msg)
|
||||
@ -489,10 +570,12 @@ Usage:
|
||||
)
|
||||
]
|
||||
|
||||
if not preview:
|
||||
completion = self.apply_drivegroups(dg_specs)
|
||||
self._orchestrator_wait([completion])
|
||||
raise_if_exception(completion)
|
||||
return HandleCommandResult(stdout=completion.result_str())
|
||||
ret = self.preview_drivegroups(dg_specs=dg_specs)
|
||||
return HandleCommandResult(stdout=print_preview(ret, format))
|
||||
|
||||
@_cli_write_command(
|
||||
'orch daemon add osd',
|
||||
|
@ -247,6 +247,9 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
|
||||
|
||||
return list(filter(_filter_func, daemons))
|
||||
|
||||
def preview_drivegroups(self, drive_group_name=None, dg_specs=None):
|
||||
return [{}]
|
||||
|
||||
def create_osds(self, drive_group):
|
||||
# type: (DriveGroupSpec) -> TestCompletion
|
||||
""" Creates OSDs from a drive group specification.
|
||||
|
@ -149,10 +149,12 @@ class DriveGroupSpec(ServiceSpec):
|
||||
block_wal_size=None, # type: Optional[int]
|
||||
journal_size=None, # type: Optional[int]
|
||||
service_type=None, # type: Optional[str]
|
||||
unmanaged=None, # type: Optional[bool]
|
||||
unmanaged=False, # type: bool
|
||||
):
|
||||
assert service_type is None or service_type == 'osd'
|
||||
super(DriveGroupSpec, self).__init__('osd', service_id=service_id, placement=placement)
|
||||
super(DriveGroupSpec, self).__init__('osd', service_id=service_id,
|
||||
placement=placement,
|
||||
unmanaged=unmanaged)
|
||||
|
||||
#: A :class:`ceph.deployment.drive_group.DeviceSelection`
|
||||
self.data_devices = data_devices
|
||||
|
@ -15,11 +15,13 @@ class to_ceph_volume(object):
|
||||
|
||||
def __init__(self,
|
||||
spec, # type: DriveGroupSpec
|
||||
selection # type: DriveSelection
|
||||
selection, # type: DriveSelection
|
||||
preview=False
|
||||
):
|
||||
|
||||
self.spec = spec
|
||||
self.selection = selection
|
||||
self.preview = preview
|
||||
|
||||
def run(self):
|
||||
# type: () -> Optional[str]
|
||||
@ -54,6 +56,12 @@ class to_ceph_volume(object):
|
||||
not db_devices and \
|
||||
not wal_devices:
|
||||
cmd = "lvm prepare --bluestore --data %s --no-systemd" % (' '.join(data_devices))
|
||||
if self.preview:
|
||||
# Like every horrible hack, this has sideffects on other features.
|
||||
# In this case, 'lvm prepare' has neither a '--report' nor a '--format json' option
|
||||
# which essentially doesn't allow for a proper previews here.
|
||||
# Fall back to lvm batch in order to get a preview.
|
||||
return f"lvm batch --no-auto {' '.join(data_devices)} --report --format json"
|
||||
return cmd
|
||||
|
||||
if self.spec.objectstore == 'bluestore':
|
||||
@ -81,4 +89,8 @@ class to_ceph_volume(object):
|
||||
cmd += " --yes"
|
||||
cmd += " --no-systemd"
|
||||
|
||||
if self.preview:
|
||||
cmd += " --report"
|
||||
cmd += " --format json"
|
||||
|
||||
return cmd
|
||||
|
Loading…
Reference in New Issue
Block a user