ceph-volume batch: improve backwards compatibility

This restores legacy batch behavior and also adds some initial test and
adjusts existing tests to changes.

Signed-off-by: Jan Fajerski <jfajerski@suse.com>
This commit is contained in:
Jan Fajerski 2020-04-29 07:47:18 +02:00
parent d32e0e4320
commit a23a02df02
4 changed files with 184 additions and 210 deletions

View File

@ -4,7 +4,6 @@ import logging
import json
from textwrap import dedent
from ceph_volume import terminal, decorators
from ceph_volume.api.lvm import Volume
from ceph_volume.util import disk, prompt_bool, arg_validators, templates
from ceph_volume.util import prepare
from . import common
@ -30,6 +29,48 @@ def device_formatter(devices):
return ''.join(lines)
def ensure_disjoint_device_lists(data, db=[], wal=[], journal=[]):
# check that all device lists are disjoint with each other
if not(set(data).isdisjoint(set(db)) and
set(data).isdisjoint(set(wal)) and
set(data).isdisjoint(set(journal)) and
set(db).isdisjoint(set(wal))):
raise Exception('Device lists are not disjoint')
def get_physical_osds(devices, args):
'''
Goes through passed physical devices and assigns OSDs
'''
data_slots = args.osds_per_device
if args.data_slots:
data_slots = max(args.data_slots, args.osds_per_device)
rel_data_size = 100 / data_slots
mlogger.debug('relative data size: {}'.format(rel_data_size))
ret = []
for dev in devices:
if dev.available_lvm:
dev_size = dev.vg_size[0]
abs_size = disk.Size(b=int(dev_size * rel_data_size / 100))
free_size = dev.vg_free[0]
if abs_size < 419430400:
mlogger.error('Data LV on {} would be too small (<400M)'.format(dev.path))
continue
for _ in range(args.osds_per_device):
if abs_size > free_size:
break
free_size -= abs_size.b
osd_id = None
if args.osd_ids:
osd_id = args.osd_ids.pop()
ret.append(Batch.OSD(dev.path,
rel_data_size,
abs_size,
args.osds_per_device,
osd_id))
return ret
class Batch(object):
help = 'Automatically size devices for multi-OSD provisioning with minimal interaction'
@ -208,6 +249,9 @@ class Batch(object):
print(report)
def _check_slot_args(self):
'''
checking if -slots args are consistent with other arguments
'''
if self.args.data_slots and self.args.osds_per_device:
if self.args.data_slots < self.args.osds_per_device:
raise ValueError('data_slots is smaller then osds_per_device')
@ -219,13 +263,23 @@ class Batch(object):
# raise ValueError('{} is smaller then osds_per_device')
def _sort_rotational_disks(self):
'''
Helper for legacy auto behaviour.
Sorts drives into rotating and non-rotating, the latter being used for
db or journal.
'''
rotating = []
ssd = []
for d in self.args.devices:
if d.rotational:
self.args.devices.remove(d)
if self.args.filestore:
self.args.journal_devices.append(d)
else:
self.args.db_devices.append(d)
rotating.append(d)
else:
ssd.append(d)
self.args.devices = rotating
if self.args.filestore:
self.args.journal_devices = ssd
else:
self.args.db_devices = ssd
@decorators.needs_root
def main(self):
@ -237,15 +291,16 @@ class Batch(object):
if not self.args.bluestore and not self.args.filestore:
self.args.bluestore = True
if not self.args.no_auto:
if (not self.args.no_auto and not self.args.db_devices and not
self.args.wal_devices and not self.args.journal_devices):
self._sort_rotational_disks()
self._check_slot_args()
self._ensure_disjoint_device_lists(self.args.devices,
self.args.db_devices,
self.args.wal_devices,
self.args.journal_devices)
ensure_disjoint_device_lists(self.args.devices,
self.args.db_devices,
self.args.wal_devices,
self.args.journal_devices)
plan = self.get_plan(self.args)
@ -282,15 +337,6 @@ class Batch(object):
c.create(argparse.Namespace(**args))
def _ensure_disjoint_device_lists(self, data, db=[], wal=[], journal=[]):
# check that all device lists are disjoint with each other
if not(set(data).isdisjoint(set(db)) and
set(data).isdisjoint(set(wal)) and
set(data).isdisjoint(set(journal)) and
set(db).isdisjoint(set(wal))):
raise Exception('Device lists are not disjoint')
def get_plan(self, args):
if args.bluestore:
plan = self.get_deployment_layout(args, args.devices, args.db_devices,
@ -299,6 +345,25 @@ class Batch(object):
plan = self.get_deployment_layout(args, args.devices, args.journal_devices)
return plan
def get_lvm_osds(self, lvs, args):
'''
Goes through passed LVs and assigns planned osds
'''
ret = []
for lv in lvs:
if lv.used_by_ceph:
continue
osd_id = None
if args.osd_ids:
osd_id = args.osd_ids.pop()
osd = self.OSD("{}/{}".format(lv.vg_name, lv.lv_name),
100.0,
disk.Size(b=int(lv.lv_size)),
1,
osd_id)
ret.append(osd)
return ret
def get_deployment_layout(self, args, devices, fast_devices=[],
very_fast_devices=[]):
plan = []
@ -307,106 +372,64 @@ class Batch(object):
set(phys_devs))]
mlogger.debug(('passed data_devices: {} physical, {}'
' LVM').format(len(phys_devs), len(lvm_devs)))
data_slots = args.osds_per_device
if args.data_slots:
data_slots = max(args.data_slots, args.osds_per_device)
rel_data_size = 100 / data_slots
mlogger.debug('relative data size: {}'.format(rel_data_size))
for dev in phys_devs:
if dev.available_lvm:
dev_size = dev.vg_size[0]
abs_size = disk.Size(b=int(dev_size * rel_data_size / 100))
free_size = dev.vg_free[0]
if abs_size < 419430400:
mlogger.error('Data LV on {} would be too small (<400M)'.format(dev.path))
continue
for _ in range(args.osds_per_device):
if abs_size > free_size:
break
free_size -= abs_size.b
osd_id = None
if args.osd_ids:
osd_id = args.osd_ids.pop()
osd = self.OSD(dev.path,
rel_data_size,
abs_size,
args.osds_per_device,
osd_id)
plan.append(osd)
for dev in lvm_devs:
if dev.used_by_ceph:
continue
osd_id = None
if args.osd_ids:
osd_id = args.osd_ids.pop()
osd = self.OSD("{}/{}".format(dev.vg_name, dev.lv_name),
100.0,
disk.Size(b=int(dev.lv_size)),
1,
osd_id)
plan.append(osd)
plan.extend(get_physical_osds(phys_devs, args))
plan.extend(self.get_lvm_osds(lvm_devs, args))
num_osds = len(plan)
if num_osds == 0:
mlogger.info('All data devices are unavailable')
exit(0)
requested_osds = args.osds_per_device * len(phys_devs) + len(lvm_devs)
fast_type = 'block_db' if args.bluestore else 'journal'
fast_allocations = self.fast_allocations(fast_devices, requested_osds,
fast_type)
if fast_devices and not fast_allocations:
mlogger.info('{} fast devices were passed, but none are available'.format(len(fast_devices)))
exit(0)
if fast_devices and not len(fast_allocations) == num_osds:
mlogger.error('{} fast allocations != {} num_osds'.format(
len(fast_allocations), num_osds))
exit(1)
very_fast_allocations = self.fast_allocations(very_fast_devices,
requested_osds, 'block_wal')
if fast_devices:
if not fast_allocations:
mlogger.info('{} fast devices were passed, but none are available'.format(len(fast_devices)))
exit(0)
assert len(fast_allocations) == num_osds, '{} fast allocations != {} num_osds'.format(fast_allocations, num_osds)
if very_fast_devices:
if not very_fast_allocations:
mlogger.info('{} very fast devices were passed, but none are available'.format(len(very_fast_devices)))
exit(0)
assert len(very_fast_allocations) == num_osds, '{} fast allocations != {} num_osds'.format(very_fast_allocations, num_osds)
if very_fast_devices and not very_fast_allocations:
mlogger.info('{} very fast devices were passed, but none are available'.format(len(very_fast_devices)))
exit(0)
if very_fast_devices and not len(very_fast_allocations) == num_osds:
mlogger.error('{} very fast allocations != {} num_osds'.format(
len(very_fast_allocations), num_osds))
exit(1)
for osd in plan:
if fast_devices:
type_ = 'block.db'
if args.filestore:
type_ = 'journal'
osd.add_fast_device(*fast_allocations.pop(),
type_=type_)
if very_fast_devices:
assert args.bluestore and not args.filestore, 'filestore does not support wal devices'
type_=fast_type)
if very_fast_devices and args.bluestore:
osd.add_very_fast_device(*very_fast_allocations.pop(),
type_='block.wal')
return plan
def fast_allocations(self, devices, num_osds, type_):
ret = []
if not devices:
return ret
phys_devs = [d for d in devices if d.is_device]
lvm_devs = [d.lvs[0] for d in list(set(devices) -
set(phys_devs))]
ret.extend(
[("{}/{}".format(d.vg_name, d.lv_name), 100.0, disk.Size(b=int(d.lv_size)), 1) for d in
lvm_devs if not d.used_by_ceph])
if (num_osds - len(lvm_devs)) % len(phys_devs):
used_slots = int((num_osds - len(lvm_devs)) / len(phys_devs)) + 1
else:
used_slots = int((num_osds - len(lvm_devs)) / len(phys_devs))
requested_slots = getattr(self.args, '{}_slots'.format(type_))
def get_physical_fast_allocs(self, devices, type_, used_slots, args):
requested_slots = getattr(args, '{}_slots'.format(type_))
if not requested_slots or requested_slots < used_slots:
if requested_slots:
mlogger.info('{}_slots argument is to small, ignoring'.format(type_))
requested_slots = used_slots
requested_size = getattr(self.args, '{}_size'.format(type_), 0)
requested_size = getattr(args, '{}_size'.format(type_), 0)
if requested_size == 0:
# no size argument was specified, check ceph.conf
get_size_fct = getattr(prepare, 'get_{}_size'.format(type_))
requested_size = get_size_fct(lv_format=False)
available = [d for d in phys_devs if d.available_lvm]
for dev in available:
ret = []
for dev in devices:
if not dev.available_lvm:
continue
# TODO this only looks at the first vg on device
dev_size = dev.vg_size[0]
abs_size = disk.Size(b=int(dev_size / requested_slots))
@ -434,6 +457,33 @@ class Batch(object):
ret.append((dev.path, relative_size, abs_size, requested_slots))
return ret
def get_lvm_fast_allocs(self, lvs):
return [("{}/{}".format(d.vg_name, d.lv_name), 100.0,
disk.Size(b=int(d.lv_size)), 1) for d in lvs if not
d.used_by_ceph]
def fast_allocations(self, devices, num_osds, type_):
ret = []
if not devices:
return ret
phys_devs = [d for d in devices if d.is_device]
lvm_devs = [d.lvs[0] for d in list(set(devices) -
set(phys_devs))]
ret.extend(self.get_lvm_fast_allocs(lvm_devs))
if (num_osds - len(lvm_devs)) % len(phys_devs):
used_slots = int((num_osds - len(lvm_devs)) / len(phys_devs)) + 1
else:
used_slots = int((num_osds - len(lvm_devs)) / len(phys_devs))
ret.extend(self.get_physical_fast_allocs(phys_devs,
type_,
used_slots,
self.args))
return ret
class OSD(object):
def __init__(self, data_path, rel_size, abs_size, slots, id_):
self.id_ = id_

View File

@ -1,7 +1,8 @@
import os
import pytest
from mock.mock import patch, PropertyMock
from mock.mock import patch, PropertyMock, create_autospec
from ceph_volume.util import disk
from ceph_volume.util import device
from ceph_volume.util.constants import ceph_disk_guids
from ceph_volume import conf, configuration
@ -40,6 +41,16 @@ def capture():
return Capture()
@pytest.fixture
def mock_devices_available():
dev = create_autospec(device.Device)
dev.path = '/dev/foo'
dev.available_lvm = True
dev.vg_size = [21474836480]
dev.vg_free = dev.vg_size
return [dev]
@pytest.fixture
def fake_run(monkeypatch):
fake_run = Capture()

View File

@ -12,118 +12,31 @@ class TestBatch(object):
def test_disjoint_device_lists(self, factory):
device1 = factory(used_by_ceph=False, available=True, abspath="/dev/sda")
device2 = factory(used_by_ceph=False, available=True, abspath="/dev/sdb")
b = batch.Batch([])
b.args.devices = [device1, device2]
b.args.db_devices = [device2]
b._filter_devices()
devices = [device1, device2]
db_devices = [device2]
with pytest.raises(Exception) as disjoint_ex:
b._ensure_disjoint_device_lists()
batch.ensure_disjoint_device_lists(devices, db_devices)
assert 'Device lists are not disjoint' in str(disjoint_ex.value)
def test_get_physical_osds_return_len(self, factory,
mock_devices_available):
osds_per_device = 1
args = factory(data_slots=1, osds_per_device=osds_per_device, osd_ids=[])
osds = batch.get_physical_osds(mock_devices_available, args)
assert len(osds) == len(mock_devices_available) * osds_per_device
class TestFilterDevices(object):
def test_get_physical_osds_rel_size(self, factory,
mock_devices_available):
osds_per_device = 1
args = factory(data_slots=1, osds_per_device=osds_per_device, osd_ids=[])
osds = batch.get_physical_osds(mock_devices_available, args)
for osd in osds:
assert osd.data[1] == 100 / osds_per_device
def test_filter_used_device(self, factory):
device1 = factory(used_by_ceph=True, abspath="/dev/sda")
args = factory(devices=[device1], filtered_devices={})
result, filtered_devices = batch.filter_devices(args)
assert not result
assert device1.abspath in filtered_devices
def test_has_unused_devices(self, factory):
device1 = factory(
used_by_ceph=False,
abspath="/dev/sda",
rotational=False,
is_lvm_member=False
)
args = factory(devices=[device1], filtered_devices={})
result, filtered_devices = batch.filter_devices(args)
assert device1 in result
assert not filtered_devices
def test_filter_device_used_as_a_journal(self, factory):
hdd1 = factory(
used_by_ceph=True,
abspath="/dev/sda",
rotational=True,
is_lvm_member=True,
)
lv = factory(tags={"ceph.type": "journal"})
ssd1 = factory(
used_by_ceph=False,
abspath="/dev/nvme0n1",
rotational=False,
is_lvm_member=True,
lvs=[lv],
)
args = factory(devices=[hdd1, ssd1], filtered_devices={})
result, filtered_devices = batch.filter_devices(args)
assert not result
assert ssd1.abspath in filtered_devices
def test_last_device_is_not_filtered(self, factory):
hdd1 = factory(
used_by_ceph=True,
abspath="/dev/sda",
rotational=True,
is_lvm_member=True,
)
ssd1 = factory(
used_by_ceph=False,
abspath="/dev/nvme0n1",
rotational=False,
is_lvm_member=False,
)
args = factory(devices=[hdd1, ssd1], filtered_devices={})
result, filtered_devices = batch.filter_devices(args)
assert result
assert len(filtered_devices) == 1
def test_no_auto_fails_on_unavailable_device(self, factory):
hdd1 = factory(
used_by_ceph=False,
abspath="/dev/sda",
rotational=True,
is_lvm_member=False,
available=True,
)
ssd1 = factory(
used_by_ceph=True,
abspath="/dev/nvme0n1",
rotational=False,
is_lvm_member=True,
available=False
)
args = factory(devices=[hdd1], db_devices=[ssd1], filtered_devices={},
yes=True, format="", report=False)
b = batch.Batch([])
b.args = args
with pytest.raises(RuntimeError) as ex:
b._filter_devices()
assert '1 devices were filtered in non-interactive mode, bailing out' in str(ex.value)
def test_no_auto_prints_json_on_unavailable_device_and_report(self, factory, capsys):
hdd1 = factory(
used_by_ceph=False,
abspath="/dev/sda",
rotational=True,
is_lvm_member=False,
available=True,
)
ssd1 = factory(
used_by_ceph=True,
abspath="/dev/nvme0n1",
rotational=False,
is_lvm_member=True,
available=False
)
captured = capsys.readouterr()
args = factory(devices=[hdd1], db_devices=[ssd1], filtered_devices={},
yes=True, format="json", report=True)
b = batch.Batch([])
b.args = args
with pytest.raises(SystemExit):
b._filter_devices()
result = json.loads(captured.out)
assert not result["changed"]
def test_get_physical_osds_abs_size(self, factory,
mock_devices_available):
osds_per_device = 1
args = factory(data_slots=1, osds_per_device=osds_per_device, osd_ids=[])
osds = batch.get_physical_osds(mock_devices_available, args)
for osd, dev in zip(osds, mock_devices_available):
assert osd.data[2] == dev.vg_size[0] / osds_per_device

View File

@ -253,8 +253,8 @@ class TestDevice(object):
assert disk.is_ceph_disk_member is False
def test_existing_vg_available(self, monkeypatch, device_info):
vg = api.VolumeGroup(vg_name='foo/bar', vg_free_count=6,
vg_extent_size=1073741824)
vg = api.VolumeGroup(vg_name='foo/bar', vg_free_count=1536,
vg_extent_size=4194304)
monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg])
lsblk = {"TYPE": "disk"}
data = {"/dev/nvme0n1": {"size": "6442450944"}}
@ -277,10 +277,10 @@ class TestDevice(object):
assert not disk.available_raw
def test_multiple_existing_vgs(self, monkeypatch, device_info):
vg1 = api.VolumeGroup(vg_name='foo/bar', vg_free_count=4,
vg_extent_size=1073741824)
vg2 = api.VolumeGroup(vg_name='foo/bar', vg_free_count=6,
vg_extent_size=1073741824)
vg1 = api.VolumeGroup(vg_name='foo/bar', vg_free_count=1000,
vg_extent_size=4194304)
vg2 = api.VolumeGroup(vg_name='foo/bar', vg_free_count=536,
vg_extent_size=4194304)
monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg1, vg2])
lsblk = {"TYPE": "disk"}
data = {"/dev/nvme0n1": {"size": "6442450944"}}