mgr/cephadm: generate one c-v raw prepare cmd per data device in raw mode

Fixes: https://tracker.ceph.com/issues/54522

Signed-off-by: Adam King <adking@redhat.com>
This commit is contained in:
Adam King 2022-03-10 12:43:28 -05:00
parent 56f20feb62
commit b6556e5dbd
4 changed files with 193 additions and 100 deletions

View File

@ -47,9 +47,9 @@ class OSDService(CephService):
osd_id_claims_for_host = osd_id_claims.filtered_by_host(host)
cmd = self.driveselection_to_ceph_volume(drive_selection,
osd_id_claims_for_host)
if not cmd:
cmds: List[str] = self.driveselection_to_ceph_volume(drive_selection,
osd_id_claims_for_host)
if not cmds:
logger.debug("No data_devices, skipping DriveGroup: {}".format(
drive_group.service_id))
return None
@ -60,7 +60,7 @@ class OSDService(CephService):
start_ts = datetime_now()
env_vars: List[str] = [f"CEPH_VOLUME_OSDSPEC_AFFINITY={drive_group.service_id}"]
ret_msg = await self.create_single_host(
drive_group, host, cmd,
drive_group, host, cmds,
replace_osd_ids=osd_id_claims_for_host, env_vars=env_vars
)
self.mgr.cache.update_osdspec_last_applied(
@ -79,20 +79,20 @@ class OSDService(CephService):
async def create_single_host(self,
drive_group: DriveGroupSpec,
host: str, cmd: str, replace_osd_ids: List[str],
host: str, cmds: List[str], replace_osd_ids: List[str],
env_vars: Optional[List[str]] = None) -> str:
out, err, code = await self._run_ceph_volume_command(host, cmd, env_vars=env_vars)
if code == 1 and ', it is already prepared' in '\n'.join(err):
# HACK: when we create against an existing LV, ceph-volume
# returns an error and the above message. To make this
# command idempotent, tolerate this "error" and continue.
logger.debug('the device was already prepared; continuing')
code = 0
if code:
raise RuntimeError(
'cephadm exited with an error code: %d, stderr:%s' % (
code, '\n'.join(err)))
for cmd in cmds:
out, err, code = await self._run_ceph_volume_command(host, cmd, env_vars=env_vars)
if code == 1 and ', it is already prepared' in '\n'.join(err):
# HACK: when we create against an existing LV, ceph-volume
# returns an error and the above message. To make this
# command idempotent, tolerate this "error" and continue.
logger.debug('the device was already prepared; continuing')
code = 0
if code:
raise RuntimeError(
'cephadm exited with an error code: %d, stderr:%s' % (
code, '\n'.join(err)))
return await self.deploy_osd_daemons_for_existing_osds(host, drive_group.service_name(),
replace_osd_ids)
@ -228,18 +228,33 @@ class OSDService(CephService):
drive_selection = DriveSelection(drive_group, inventory_for_host,
existing_daemons=len(dd_for_spec_and_host))
logger.debug(f"Found drive selection {drive_selection}")
if drive_group.method and drive_group.method == 'raw':
# ceph-volume can currently only handle a 1:1 mapping
# of data/db/wal devices for raw mode osds. If db/wal devices
# are defined and the number does not match the number of data
# devices, we need to bail out
if drive_selection.data_devices() and drive_selection.db_devices():
if len(drive_selection.data_devices()) != len(drive_selection.db_devices()):
raise OrchestratorError('Raw mode only supports a 1:1 ratio of data to db devices. Found '
f'{len(drive_selection.data_devices())} potential data device(s) and '
f'{len(drive_selection.db_devices())} potential db device(s) on host {host}')
if drive_selection.data_devices() and drive_selection.wal_devices():
if len(drive_selection.data_devices()) != len(drive_selection.wal_devices()):
raise OrchestratorError('Raw mode only supports a 1:1 ratio of data to wal devices. Found '
f'{len(drive_selection.data_devices())} potential data device(s) and '
f'{len(drive_selection.wal_devices())} potential wal device(s) on host {host}')
host_ds_map.append((host, drive_selection))
return host_ds_map
@staticmethod
def driveselection_to_ceph_volume(drive_selection: DriveSelection,
osd_id_claims: Optional[List[str]] = None,
preview: bool = False) -> Optional[str]:
preview: bool = False) -> List[str]:
logger.debug(f"Translating DriveGroup <{drive_selection.spec}> to ceph-volume command")
cmd: Optional[str] = translate.to_ceph_volume(drive_selection,
osd_id_claims, preview=preview).run()
logger.debug(f"Resulting ceph-volume cmd: {cmd}")
return cmd
cmds: List[str] = translate.to_ceph_volume(drive_selection,
osd_id_claims, preview=preview).run()
logger.debug(f"Resulting ceph-volume cmds: {cmds}")
return cmds
def get_previews(self, host: str) -> List[Dict[str, Any]]:
# Find OSDSpecs that match host.
@ -282,32 +297,33 @@ class OSDService(CephService):
continue
# driveselection for host
cmd = self.driveselection_to_ceph_volume(ds,
osd_id_claims.filtered_by_host(host),
preview=True)
if not cmd:
cmds: List[str] = self.driveselection_to_ceph_volume(ds,
osd_id_claims.filtered_by_host(host),
preview=True)
if not cmds:
logger.debug("No data_devices, skipping DriveGroup: {}".format(
osdspec.service_name()))
continue
# get preview data from ceph-volume
out, err, code = self.mgr.wait_async(self._run_ceph_volume_command(host, cmd))
if out:
try:
concat_out: Dict[str, Any] = json.loads(' '.join(out))
except ValueError:
logger.exception('Cannot decode JSON: \'%s\'' % ' '.join(out))
concat_out = {}
notes = []
if osdspec.data_devices is not None and osdspec.data_devices.limit and len(concat_out) < osdspec.data_devices.limit:
found = len(concat_out)
limit = osdspec.data_devices.limit
notes.append(
f'NOTE: Did not find enough disks matching filter on host {host} to reach data device limit (Found: {found} | Limit: {limit})')
ret_all.append({'data': concat_out,
'osdspec': osdspec.service_id,
'host': host,
'notes': notes})
for cmd in cmds:
out, err, code = self.mgr.wait_async(self._run_ceph_volume_command(host, cmd))
if out:
try:
concat_out: Dict[str, Any] = json.loads(' '.join(out))
except ValueError:
logger.exception('Cannot decode JSON: \'%s\'' % ' '.join(out))
concat_out = {}
notes = []
if osdspec.data_devices is not None and osdspec.data_devices.limit and len(concat_out) < osdspec.data_devices.limit:
found = len(concat_out)
limit = osdspec.data_devices.limit
notes.append(
f'NOTE: Did not find enough disks matching filter on host {host} to reach data device limit (Found: {found} | Limit: {limit})')
ret_all.append({'data': concat_out,
'osdspec': osdspec.service_id,
'host': host,
'notes': notes})
return ret_all
def resolve_hosts_for_osdspecs(self,

View File

@ -855,29 +855,54 @@ class TestCephadm(object):
assert isinstance(f1[1], DriveSelection)
@pytest.mark.parametrize(
"devices, preview, exp_command",
"devices, preview, exp_commands",
[
# no preview and only one disk, prepare is used due the hack that is in place.
(['/dev/sda'], False, "lvm batch --no-auto /dev/sda --yes --no-systemd"),
(['/dev/sda'], False, ["lvm batch --no-auto /dev/sda --yes --no-systemd"]),
# no preview and multiple disks, uses batch
(['/dev/sda', '/dev/sdb'], False,
"CEPH_VOLUME_OSDSPEC_AFFINITY=test.spec lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd"),
["CEPH_VOLUME_OSDSPEC_AFFINITY=test.spec lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd"]),
# preview and only one disk needs to use batch again to generate the preview
(['/dev/sda'], True, "lvm batch --no-auto /dev/sda --yes --no-systemd --report --format json"),
(['/dev/sda'], True, ["lvm batch --no-auto /dev/sda --yes --no-systemd --report --format json"]),
# preview and multiple disks work the same
(['/dev/sda', '/dev/sdb'], True,
"CEPH_VOLUME_OSDSPEC_AFFINITY=test.spec lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd --report --format json"),
["CEPH_VOLUME_OSDSPEC_AFFINITY=test.spec lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd --report --format json"]),
]
)
@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
def test_driveselection_to_ceph_volume(self, cephadm_module, devices, preview, exp_command):
def test_driveselection_to_ceph_volume(self, cephadm_module, devices, preview, exp_commands):
with with_host(cephadm_module, 'test'):
dg = DriveGroupSpec(service_id='test.spec', placement=PlacementSpec(
host_pattern='test'), data_devices=DeviceSelection(paths=devices))
ds = DriveSelection(dg, Devices([Device(path) for path in devices]))
preview = preview
out = cephadm_module.osd_service.driveselection_to_ceph_volume(ds, [], preview)
assert out in exp_command
assert all(any(cmd in exp_cmd for exp_cmd in exp_commands) for cmd in out), f'Expected cmds from f{out} in {exp_commands}'
@pytest.mark.parametrize(
"devices, preview, exp_commands",
[
# one data device, no preview
(['/dev/sda'], False, ["raw prepare --bluestore --data /dev/sda"]),
# multiple data devices, no preview
(['/dev/sda', '/dev/sdb'], False,
["raw prepare --bluestore --data /dev/sda", "raw prepare --bluestore --data /dev/sdb"]),
# one data device, preview
(['/dev/sda'], True, ["raw prepare --bluestore --data /dev/sda --report --format json"]),
# multiple data devices, preview
(['/dev/sda', '/dev/sdb'], True,
["raw prepare --bluestore --data /dev/sda --report --format json", "raw prepare --bluestore --data /dev/sdb --report --format json"]),
]
)
@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
def test_raw_driveselection_to_ceph_volume(self, cephadm_module, devices, preview, exp_commands):
with with_host(cephadm_module, 'test'):
dg = DriveGroupSpec(service_id='test.spec', method='raw', placement=PlacementSpec(
host_pattern='test'), data_devices=DeviceSelection(paths=devices))
ds = DriveSelection(dg, Devices([Device(path) for path in devices]))
preview = preview
out = cephadm_module.osd_service.driveselection_to_ceph_volume(ds, [], preview)
assert all(any(cmd in exp_cmd for exp_cmd in exp_commands) for cmd in out), f'Expected cmds from f{out} in {exp_commands}'
@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(
json.dumps([

View File

@ -25,7 +25,7 @@ class to_ceph_volume(object):
self.osd_id_claims = osd_id_claims
def run(self):
# type: () -> Optional[str]
# type: () -> List[str]
""" Generate ceph-volume commands based on the DriveGroup filters """
data_devices = [x.path for x in self.selection.data_devices()]
db_devices = [x.path for x in self.selection.db_devices()]
@ -33,19 +33,34 @@ class to_ceph_volume(object):
journal_devices = [x.path for x in self.selection.journal_devices()]
if not data_devices:
return None
return []
cmd = ""
cmds: List[str] = []
if self.spec.method == 'raw':
assert self.spec.objectstore == 'bluestore'
cmd = "raw prepare --bluestore"
cmd += " --data {}".format(" ".join(data_devices))
if db_devices:
cmd += " --block.db {}".format(" ".join(db_devices))
if wal_devices:
cmd += " --block.wal {}".format(" ".join(wal_devices))
# ceph-volume raw prepare only support 1:1 ratio of data to db/wal devices
if data_devices and db_devices:
if len(data_devices) != len(db_devices):
raise ValueError('Number of data devices must match number of '
'db devices for raw mode osds')
if data_devices and wal_devices:
if len(data_devices) != len(wal_devices):
raise ValueError('Number of data devices must match number of '
'wal devices for raw mode osds')
# for raw prepare each data device needs its own prepare command
dev_counter = 0
while dev_counter < len(data_devices):
cmd = "raw prepare --bluestore"
cmd += " --data {}".format(data_devices[dev_counter])
if db_devices:
cmd += " --block.db {}".format(db_devices[dev_counter])
if wal_devices:
cmd += " --block.wal {}".format(wal_devices[dev_counter])
cmds.append(cmd)
dev_counter += 1
elif self.spec.objectstore == 'filestore':
# for lvm batch we can just do all devices in one command
cmd = "lvm batch --no-auto"
cmd += " {}".format(" ".join(data_devices))
@ -58,9 +73,10 @@ class to_ceph_volume(object):
' '.join(journal_devices))
cmd += " --filestore"
cmds.append(cmd)
elif self.spec.objectstore == 'bluestore':
# for lvm batch we can just do all devices in one command
cmd = "lvm batch --no-auto {}".format(" ".join(data_devices))
if db_devices:
@ -74,25 +90,27 @@ class to_ceph_volume(object):
if self.spec.block_db_size:
cmd += " --block-db-size {}".format(self.spec.block_db_size)
cmds.append(cmd)
if self.spec.encrypted:
cmd += " --dmcrypt"
for i in range(len(cmds)):
if self.spec.encrypted:
cmds[i] += " --dmcrypt"
if self.spec.osds_per_device:
cmd += " --osds-per-device {}".format(self.spec.osds_per_device)
if self.spec.osds_per_device:
cmds[i] += " --osds-per-device {}".format(self.spec.osds_per_device)
if self.spec.data_allocate_fraction:
cmd += " --data-allocate-fraction {}".format(self.spec.data_allocate_fraction)
if self.spec.data_allocate_fraction:
cmds[i] += " --data-allocate-fraction {}".format(self.spec.data_allocate_fraction)
if self.osd_id_claims:
cmd += " --osd-ids {}".format(" ".join(self.osd_id_claims))
if self.osd_id_claims:
cmds[i] += " --osd-ids {}".format(" ".join(self.osd_id_claims))
if self.spec.method != 'raw':
cmd += " --yes"
cmd += " --no-systemd"
if self.spec.method != 'raw':
cmds[i] += " --yes"
cmds[i] += " --no-systemd"
if self.preview:
cmd += " --report"
cmd += " --format json"
if self.preview:
cmds[i] += " --report"
cmds[i] += " --format json"
return cmd
return cmds

View File

@ -144,8 +144,8 @@ def test_ceph_volume_command_0():
spec.validate()
inventory = _mk_inventory(_mk_device()*2)
sel = drive_selection.DriveSelection(spec, inventory)
cmd = translate.to_ceph_volume(sel, []).run()
assert cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd'
cmds = translate.to_ceph_volume(sel, []).run()
assert all(cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd' for cmd in cmds), f'Expected {cmd} in {cmds}'
def test_ceph_volume_command_1():
@ -157,9 +157,9 @@ def test_ceph_volume_command_1():
spec.validate()
inventory = _mk_inventory(_mk_device(rotational=True)*2 + _mk_device(rotational=False)*2)
sel = drive_selection.DriveSelection(spec, inventory)
cmd = translate.to_ceph_volume(sel, []).run()
assert cmd == ('lvm batch --no-auto /dev/sda /dev/sdb '
'--db-devices /dev/sdc /dev/sdd --yes --no-systemd')
cmds = translate.to_ceph_volume(sel, []).run()
assert all(cmd == ('lvm batch --no-auto /dev/sda /dev/sdb '
'--db-devices /dev/sdc /dev/sdd --yes --no-systemd') for cmd in cmds), f'Expected {cmd} in {cmds}'
def test_ceph_volume_command_2():
@ -175,10 +175,10 @@ def test_ceph_volume_command_2():
_mk_device(size="10.0 GB", rotational=False)*2
)
sel = drive_selection.DriveSelection(spec, inventory)
cmd = translate.to_ceph_volume(sel, []).run()
assert cmd == ('lvm batch --no-auto /dev/sda /dev/sdb '
cmds = translate.to_ceph_volume(sel, []).run()
assert all(cmd == ('lvm batch --no-auto /dev/sda /dev/sdb '
'--db-devices /dev/sdc /dev/sdd --wal-devices /dev/sde /dev/sdf '
'--yes --no-systemd')
'--yes --no-systemd') for cmd in cmds), f'Expected {cmd} in {cmds}'
def test_ceph_volume_command_3():
@ -195,11 +195,11 @@ def test_ceph_volume_command_3():
_mk_device(size="10.0 GB", rotational=False)*2
)
sel = drive_selection.DriveSelection(spec, inventory)
cmd = translate.to_ceph_volume(sel, []).run()
assert cmd == ('lvm batch --no-auto /dev/sda /dev/sdb '
cmds = translate.to_ceph_volume(sel, []).run()
assert all(cmd == ('lvm batch --no-auto /dev/sda /dev/sdb '
'--db-devices /dev/sdc /dev/sdd '
'--wal-devices /dev/sde /dev/sdf --dmcrypt '
'--yes --no-systemd')
'--yes --no-systemd') for cmd in cmds), f'Expected {cmd} in {cmds}'
def test_ceph_volume_command_4():
@ -219,11 +219,11 @@ def test_ceph_volume_command_4():
_mk_device(size="10.0 GB", rotational=False)*2
)
sel = drive_selection.DriveSelection(spec, inventory)
cmd = translate.to_ceph_volume(sel, []).run()
assert cmd == ('lvm batch --no-auto /dev/sda /dev/sdb '
cmds = translate.to_ceph_volume(sel, []).run()
assert all(cmd == ('lvm batch --no-auto /dev/sda /dev/sdb '
'--db-devices /dev/sdc /dev/sdd --wal-devices /dev/sde /dev/sdf '
'--block-wal-size 500M --block-db-size 500M --dmcrypt '
'--osds-per-device 3 --yes --no-systemd')
'--osds-per-device 3 --yes --no-systemd') for cmd in cmds), f'Expected {cmd} in {cmds}'
def test_ceph_volume_command_5():
@ -236,8 +236,8 @@ def test_ceph_volume_command_5():
spec.validate()
inventory = _mk_inventory(_mk_device(rotational=True)*2)
sel = drive_selection.DriveSelection(spec, inventory)
cmd = translate.to_ceph_volume(sel, []).run()
assert cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --filestore --yes --no-systemd'
cmds = translate.to_ceph_volume(sel, []).run()
assert all(cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --filestore --yes --no-systemd' for cmd in cmds), f'Expected {cmd} in {cmds}'
def test_ceph_volume_command_6():
@ -252,10 +252,10 @@ def test_ceph_volume_command_6():
spec.validate()
inventory = _mk_inventory(_mk_device(rotational=True)*2 + _mk_device(rotational=False)*2)
sel = drive_selection.DriveSelection(spec, inventory)
cmd = translate.to_ceph_volume(sel, []).run()
assert cmd == ('lvm batch --no-auto /dev/sdc /dev/sdd '
cmds = translate.to_ceph_volume(sel, []).run()
assert all(cmd == ('lvm batch --no-auto /dev/sdc /dev/sdd '
'--journal-size 500M --journal-devices /dev/sda /dev/sdb '
'--filestore --yes --no-systemd')
'--filestore --yes --no-systemd') for cmd in cmds), f'Expected {cmd} in {cmds}'
def test_ceph_volume_command_7():
@ -267,8 +267,8 @@ def test_ceph_volume_command_7():
spec.validate()
inventory = _mk_inventory(_mk_device(rotational=True)*2)
sel = drive_selection.DriveSelection(spec, inventory)
cmd = translate.to_ceph_volume(sel, ['0', '1']).run()
assert cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --osd-ids 0 1 --yes --no-systemd'
cmds = translate.to_ceph_volume(sel, ['0', '1']).run()
assert all(cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --osd-ids 0 1 --yes --no-systemd' for cmd in cmds), f'Expected {cmd} in {cmds}'
def test_ceph_volume_command_8():
@ -285,8 +285,8 @@ def test_ceph_volume_command_8():
_mk_device(rotational=False, size="349.0 GB", model='INTEL SSDPED1K375GA') # wal/db
)
sel = drive_selection.DriveSelection(spec, inventory)
cmd = translate.to_ceph_volume(sel, []).run()
assert cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --db-devices /dev/sdc --yes --no-systemd'
cmds = translate.to_ceph_volume(sel, []).run()
assert all(cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --db-devices /dev/sdc --yes --no-systemd' for cmd in cmds), f'Expected {cmd} in {cmds}'
def test_ceph_volume_command_9():
@ -298,5 +298,39 @@ def test_ceph_volume_command_9():
spec.validate()
inventory = _mk_inventory(_mk_device()*2)
sel = drive_selection.DriveSelection(spec, inventory)
cmd = translate.to_ceph_volume(sel, []).run()
assert cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --data-allocate-fraction 0.8 --yes --no-systemd'
cmds = translate.to_ceph_volume(sel, []).run()
assert all(cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --data-allocate-fraction 0.8 --yes --no-systemd' for cmd in cmds), f'Expected {cmd} in {cmds}'
def test_raw_ceph_volume_command_0():
spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'),
service_id='foobar',
data_devices=DeviceSelection(rotational=True),
db_devices=DeviceSelection(rotational=False),
method='raw',
)
spec.validate()
inventory = _mk_inventory(_mk_device(rotational=True) + # data
_mk_device(rotational=True) + # data
_mk_device(rotational=False) + # db
_mk_device(rotational=False) # db
)
exp_cmds = ['raw prepare --bluestore --data /dev/sda --block.db /dev/sdc', 'raw prepare --bluestore --data /dev/sdb --block.db /dev/sdd']
sel = drive_selection.DriveSelection(spec, inventory)
cmds = translate.to_ceph_volume(sel, []).run()
assert all(cmd in exp_cmds for cmd in cmds), f'Expected {exp_cmds} to match {cmds}'
def test_raw_ceph_volume_command_1():
spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'),
service_id='foobar',
data_devices=DeviceSelection(rotational=True),
db_devices=DeviceSelection(rotational=False),
method='raw',
)
spec.validate()
inventory = _mk_inventory(_mk_device(rotational=True) + # data
_mk_device(rotational=True) + # data
_mk_device(rotational=False) # db
)
sel = drive_selection.DriveSelection(spec, inventory)
with pytest.raises(ValueError):
cmds = translate.to_ceph_volume(sel, []).run()