Merge pull request #58140 from guits/cv-tpm2-support

ceph-volume: add TPM2 token enrollment support for encrypted OSDs
This commit is contained in:
Guillaume Abrioux 2024-08-13 21:27:42 +02:00 committed by GitHub
commit 2b06a578e5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
29 changed files with 1119 additions and 277 deletions

View File

@ -61,6 +61,12 @@ For enabling :ref:`encryption <ceph-volume-lvm-encryption>`, the ``--dmcrypt`` f
ceph-volume lvm prepare --bluestore --dmcrypt --data vg/lv
Starting with Ceph Squid, you can opt for TPM2 token enrollment for the created LUKS2 devices with the ``--with-tpm`` flag:
.. prompt:: bash #
ceph-volume lvm prepare --bluestore --dmcrypt --with-tpm --data vg/lv
If a ``block.db`` device or a ``block.wal`` device is needed, it can be
specified with ``--block.db`` or ``--block.wal``. These can be physical
devices, partitions, or logical volumes. ``block.db`` and ``block.wal`` are

View File

@ -666,6 +666,21 @@ This example would deploy all OSDs with encryption enabled.
all: true
encrypted: true
Ceph Squid onwards support tpm2 token enrollment to LUKS2 devices.
You can add the `tpm2` to your OSD spec:
.. code-block:: yaml
service_type: osd
service_id: example_osd_spec_with_tpm2
placement:
host_pattern: '*'
spec:
data_devices:
all: true
encrypted: true
tpm2: true
See a full list in the DriveGroupSpecs
.. py:currentmodule:: ceph.deployment.drive_group

View File

@ -1,8 +1,33 @@
import os
import logging
from collections import namedtuple
sys_info = namedtuple('sys_info', ['devices'])
sys_info.devices = dict()
logger = logging.getLogger(__name__)
class AllowLoopDevices:
allow = False
warned = False
@classmethod
def __call__(cls) -> bool:
val = os.environ.get("CEPH_VOLUME_ALLOW_LOOP_DEVICES", "false").lower()
if val not in ("false", 'no', '0'):
cls.allow = True
if not cls.warned:
logger.warning(
"CEPH_VOLUME_ALLOW_LOOP_DEVICES is set in your "
"environment, so we will allow the use of unattached loop"
" devices as disks. This feature is intended for "
"development purposes only and will never be supported in"
" production. Issues filed based on this behavior will "
"likely be ignored."
)
cls.warned = True
return cls.allow
class UnloadedConfig(object):
@ -14,6 +39,8 @@ class UnloadedConfig(object):
def __getattr__(self, *a):
raise RuntimeError("No valid ceph configuration file was loaded.")
allow_loop_devices = AllowLoopDevices()
conf = namedtuple('config', ['ceph', 'cluster', 'verbosity', 'path', 'log_path', 'dmcrypt_no_workqueue'])
conf.ceph = UnloadedConfig()
conf.dmcrypt_no_workqueue = None

View File

@ -267,6 +267,12 @@ class Batch(object):
action=arg_validators.DmcryptAction,
help='Enable device encryption via dm-crypt',
)
parser.add_argument(
'--with-tpm',
dest='with_tpm',
help='Whether encrypted OSDs should be enrolled with TPM.',
action='store_true'
)
parser.add_argument(
'--crush-device-class',
dest='crush_device_class',
@ -423,6 +429,7 @@ class Batch(object):
global_args = [
'bluestore',
'dmcrypt',
'with_tpm',
'crush_device_class',
'no_systemd',
]

View File

@ -83,6 +83,11 @@ common_args = {
'action': arg_validators.DmcryptAction,
'help': 'Enable device encryption via dm-crypt',
},
'--with-tpm': {
'dest': 'with_tpm',
'help': 'Whether encrypted OSDs should be enrolled with TPM.',
'action': 'store_true'
},
'--no-systemd': {
'dest': 'no_systemd',
'action': 'store_true',

View File

@ -39,12 +39,19 @@ class Activate(object):
'--device',
help='The device for the OSD to start'
)
parser.add_argument(
'--devices',
help='The device for the OSD to start',
nargs='*',
default=[]
)
parser.add_argument(
'--osd-id',
help='OSD ID to activate'
)
parser.add_argument(
'--osd-uuid',
dest='osd_fsid',
help='OSD UUID to active'
)
parser.add_argument(
@ -82,15 +89,11 @@ class Activate(object):
return
self.args = parser.parse_args(self.argv)
devs = []
if self.args.device:
devs = [self.args.device]
if self.args.block_wal:
devs.append(self.args.block_wal)
if self.args.block_db:
devs.append(self.args.block_db)
if self.args.devices is None:
self.args.devices = [self.args.device]
else:
self.args.devices.append(self.args.device)
self.objectstore = objectstore.mapping['RAW'][self.args.objectstore](args=self.args)
self.objectstore.activate(devs=devs,
start_osd_id=self.args.osd_id,
start_osd_uuid=self.args.osd_uuid,
tmpfs=not self.args.no_tmpfs)
self.objectstore.activate()

View File

@ -1,7 +1,7 @@
import argparse
from ceph_volume.util import arg_validators
def create_parser(prog, description):
def create_parser(prog: str, description: str) -> argparse.ArgumentParser:
"""
Both prepare and create share the same parser, those are defined here to
avoid duplication
@ -58,6 +58,12 @@ def create_parser(prog, description):
action=arg_validators.DmcryptAction,
help='Enable device encryption via dm-crypt',
)
parser.add_argument(
'--with-tpm',
dest='with_tpm',
help='Whether encrypted OSDs should be enrolled with TPM.',
action='store_true'
),
parser.add_argument(
'--osd-id',
help='Reuse an existing OSD id',

View File

@ -5,7 +5,7 @@ import logging
from textwrap import dedent
from ceph_volume import decorators, process
from ceph_volume.util import disk
from typing import Any, Dict, List
from typing import Any, Dict, List as _List
logger = logging.getLogger(__name__)
@ -20,7 +20,8 @@ def direct_report(devices):
_list = List([])
return _list.generate(devices)
def _get_bluestore_info(dev):
def _get_bluestore_info(dev: str) -> Dict[str, Any]:
result: Dict[str, Any] = {}
out, err, rc = process.call([
'ceph-bluestore-tool', 'show-label',
'--dev', dev], verbose_on_failure=False)
@ -28,42 +29,26 @@ def _get_bluestore_info(dev):
# ceph-bluestore-tool returns an error (below) if device is not bluestore OSD
# > unable to read label for <device>: (2) No such file or directory
# but it's possible the error could be for a different reason (like if the disk fails)
logger.debug('assuming device {} is not BlueStore; ceph-bluestore-tool failed to get info from device: {}\n{}'.format(dev, out, err))
return None
oj = json.loads(''.join(out))
if dev not in oj:
# should be impossible, so warn
logger.warning('skipping device {} because it is not reported in ceph-bluestore-tool output: {}'.format(dev, out))
return None
try:
r = {
'osd_uuid': oj[dev]['osd_uuid'],
}
if oj[dev]['description'] == 'main':
whoami = oj[dev]['whoami']
r.update({
'type': 'bluestore',
'osd_id': int(whoami),
'ceph_fsid': oj[dev]['ceph_fsid'],
'device': dev,
})
elif oj[dev]['description'] == 'bluefs db':
r['device_db'] = dev
elif oj[dev]['description'] == 'bluefs wal':
r['device_wal'] = dev
return r
except KeyError as e:
# this will appear for devices that have a bluestore header but aren't valid OSDs
# for example, due to incomplete rollback of OSDs: https://tracker.ceph.com/issues/51869
logger.error('device {} does not have all BlueStore data needed to be a valid OSD: {}\n{}'.format(dev, out, e))
return None
logger.debug(f'assuming device {dev} is not BlueStore; ceph-bluestore-tool failed to get info from device: {out}\n{err}')
else:
oj = json.loads(''.join(out))
if dev not in oj:
# should be impossible, so warn
logger.warning(f'skipping device {dev} because it is not reported in ceph-bluestore-tool output: {out}')
try:
result = disk.bluestore_info(dev, oj)
except KeyError as e:
# this will appear for devices that have a bluestore header but aren't valid OSDs
# for example, due to incomplete rollback of OSDs: https://tracker.ceph.com/issues/51869
logger.error(f'device {dev} does not have all BlueStore data needed to be a valid OSD: {out}\n{e}')
return result
class List(object):
help = 'list BlueStore OSDs on raw devices'
def __init__(self, argv):
def __init__(self, argv: _List[str]) -> None:
self.argv = argv
def is_atari_partitions(self, _lsblk: Dict[str, Any]) -> bool:
@ -81,7 +66,7 @@ class List(object):
return True
return False
def exclude_atari_partitions(self, _lsblk_all: Dict[str, Any]) -> List[Dict[str, Any]]:
def exclude_atari_partitions(self, _lsblk_all: Dict[str, Any]) -> _List[Dict[str, Any]]:
return [_lsblk for _lsblk in _lsblk_all if not self.is_atari_partitions(_lsblk)]
def generate(self, devs=None):
@ -113,7 +98,7 @@ class List(object):
logger.debug('inspecting devices: {}'.format(devs))
for info_device in info_devices:
bs_info = _get_bluestore_info(info_device['NAME'])
if bs_info is None:
if not bs_info:
# None is also returned in the rare event that there is an issue reading info from
# a BlueStore disk, so be sure to log our assumption that it isn't bluestore
logger.info('device {} does not have BlueStore information'.format(info_device['NAME']))

View File

@ -42,11 +42,13 @@ class Prepare(object):
self.args = parser.parse_args(self.argv)
if self.args.bluestore:
self.args.objectstore = 'bluestore'
if self.args.dmcrypt and not os.getenv('CEPH_VOLUME_DMCRYPT_SECRET'):
terminal.error('encryption was requested (--dmcrypt) but environment variable ' \
'CEPH_VOLUME_DMCRYPT_SECRET is not set, you must set ' \
'this variable to provide a dmcrypt secret.')
raise SystemExit(1)
if self.args.dmcrypt:
if not self.args.with_tpm and not os.getenv('CEPH_VOLUME_DMCRYPT_SECRET'):
terminal.error('encryption was requested (--dmcrypt) but environment variable ' \
'CEPH_VOLUME_DMCRYPT_SECRET is not set, you must set ' \
'this variable to provide a dmcrypt secret or use --with-tpm ' \
'in order to enroll a tpm2 token.')
raise SystemExit(1)
self.objectstore = objectstore.mapping['RAW'][self.args.objectstore](args=self.args)
self.objectstore.safe_prepare(self.args)

View File

@ -1,7 +1,9 @@
from . import lvmbluestore
from . import rawbluestore
from typing import Any, Dict
mapping = {
mapping: Dict[str, Any] = {
'LVM': {
'bluestore': lvmbluestore.LvmBlueStore
},

View File

@ -2,9 +2,11 @@ import logging
import os
import errno
import time
import tempfile
from ceph_volume import conf, terminal, process
from ceph_volume.util import prepare as prepare_utils
from ceph_volume.util import system, disk
from ceph_volume.util import encryption as encryption_utils
from typing import Dict, Any, List, Optional, TYPE_CHECKING
if TYPE_CHECKING:
@ -22,21 +24,23 @@ class BaseObjectStore:
# for the OSD, this needs to be fixed. This could either be a file (!)
# or a string (!!) or some flags that we would need to compound
# into a dict so that we can convert to JSON (!!!)
self.secrets = {'cephx_secret': prepare_utils.create_key()}
self.cephx_secret = self.secrets.get('cephx_secret',
prepare_utils.create_key())
self.encrypted = 0
self.secrets: Dict[str, str] = {'cephx_secret': prepare_utils.create_key()}
self.cephx_secret: str = self.secrets.get('cephx_secret',
prepare_utils.create_key())
self.encrypted: int = 0
self.tags: Dict[str, Any] = {}
self.osd_id: str = ''
self.osd_fsid = ''
self.block_lv: Optional["Volume"] = None
self.cephx_lockbox_secret = ''
self.osd_fsid: str = ''
self.cephx_lockbox_secret: str = ''
self.objectstore: str = ''
self.osd_mkfs_cmd: List[str] = []
self.block_device_path = ''
if hasattr(self.args, 'dmcrypt'):
if self.args.dmcrypt:
self.encrypted = 1
self.block_device_path: str = ''
self.dmcrypt_key: str = encryption_utils.create_dmcrypt_key()
self.with_tpm: int = int(getattr(self.args, 'with_tpm', False))
self.method: str = ''
if getattr(self.args, 'dmcrypt', False):
self.encrypted = 1
if not self.with_tpm:
self.cephx_lockbox_secret = prepare_utils.create_key()
self.secrets['cephx_lockbox_secret'] = \
self.cephx_lockbox_secret
@ -152,3 +156,23 @@ class BaseObjectStore:
def activate(self) -> None:
raise NotImplementedError()
def enroll_tpm2(self, device: str) -> None:
"""
Enrolls a device with TPM2 (Trusted Platform Module 2.0) using systemd-cryptenroll.
This method creates a temporary file to store the dmcrypt key and uses it to enroll the device.
Args:
device (str): The device path to be enrolled with TPM2.
"""
if self.with_tpm:
tmp_dir: str = '/rootfs/tmp' if os.environ.get('I_AM_IN_A_CONTAINER', False) else '/tmp'
with tempfile.NamedTemporaryFile(mode='w', delete=True, dir=tmp_dir) as temp_file:
temp_file.write(self.dmcrypt_key)
temp_file.flush()
temp_file_name: str = temp_file.name.replace('/rootfs', '', 1)
cmd: List[str] = ['systemd-cryptenroll', '--tpm2-device=auto',
device, '--unlock-key-file', temp_file_name,
'--tpm2-pcrs', '9+12', '--wipe-slot', 'tpm2']
process.call(cmd, run_on_host=True, show_command=True)

View File

@ -2,10 +2,13 @@ import logging
import os
from .baseobjectstore import BaseObjectStore
from ceph_volume.util import system
from typing import Optional, TYPE_CHECKING
from ceph_volume.util.encryption import CephLuks2
from ceph_volume import process
from typing import Any, Dict, List, Optional, TYPE_CHECKING
if TYPE_CHECKING:
import argparse
from ceph_volume.api.lvm import Volume
logger = logging.getLogger(__name__)
@ -22,6 +25,7 @@ class BlueStore(BaseObjectStore):
self.block_device_path: str = ''
self.wal_device_path: str = ''
self.db_device_path: str = ''
self.block_lv: Volume
def add_objectstore_opts(self) -> None:
"""
@ -59,3 +63,44 @@ class BlueStore(BaseObjectStore):
link_path = os.path.join(self.osd_path, link_name)
if os.path.exists(link_path):
os.unlink(os.path.join(self.osd_path, link_name))
def add_label(self, key: str,
value: str,
device: str) -> None:
"""Add a label to a BlueStore device.
Args:
key (str): The name of the label being added.
value (str): Value of the label being added.
device (str): The path of the BlueStore device.
Raises:
RuntimeError: If `ceph-bluestore-tool` command doesn't success.
"""
command: List[str] = ['ceph-bluestore-tool',
'set-label-key',
'-k',
key,
'-v',
value,
'--dev',
device]
_, err, rc = process.call(command,
terminal_verbose=True,
show_command=True)
if rc:
raise RuntimeError(f"Can't add BlueStore label '{key}' to device {device}: {err}")
def osd_mkfs(self) -> None:
super().osd_mkfs()
mapping: Dict[str, Any] = {'raw': ['data', 'block_db', 'block_wal'],
'lvm': ['ceph.block_device', 'ceph.db_device', 'ceph.wal_device']}
if self.args.dmcrypt:
for dev_type in mapping[self.method]:
if self.method == 'raw':
path = self.args.__dict__.get(dev_type, None)
else:
path = self.block_lv.tags.get(dev_type, None)
if path is not None:
CephLuks2(path).config_luks2({'subsystem': f'ceph_fsid={self.osd_fsid}'})

View File

@ -22,12 +22,12 @@ logger = logging.getLogger(__name__)
class LvmBlueStore(BlueStore):
def __init__(self, args: "argparse.Namespace") -> None:
super().__init__(args)
self.method = 'lvm'
self.tags: Dict[str, Any] = {}
self.block_lv: Optional["Volume"] = None
def pre_prepare(self) -> None:
if self.encrypted:
self.secrets['dmcrypt_key'] = encryption_utils.create_dmcrypt_key()
if self.encrypted and not self.with_tpm:
self.secrets['dmcrypt_key'] = self.dmcrypt_key
cluster_fsid = self.get_cluster_fsid()
@ -63,6 +63,7 @@ class LvmBlueStore(BlueStore):
self.tags['ceph.block_uuid'] = self.block_lv.__dict__['lv_uuid']
self.tags['ceph.cephx_lockbox_secret'] = self.cephx_lockbox_secret
self.tags['ceph.encrypted'] = self.encrypted
self.tags['ceph.with_tpm'] = 1 if self.with_tpm else 0
self.tags['ceph.vdo'] = api.is_vdo(self.block_lv.__dict__['lv_path'])
def prepare_data_device(self,
@ -158,7 +159,7 @@ class LvmBlueStore(BlueStore):
self.block_lv.set_tags(self.tags) # type: ignore
# 3/ encryption-only operations
if self.secrets.get('dmcrypt_key'):
if self.encrypted:
self.prepare_dmcrypt()
# 4/ osd_prepare req
@ -175,24 +176,18 @@ class LvmBlueStore(BlueStore):
# done on activation. Format and open ('decrypt' devices) and
# re-assign the device and journal variables so that the rest of the
# process can use the mapper paths
key = self.secrets['dmcrypt_key']
self.block_device_path = \
self.luks_format_and_open(key,
self.block_device_path,
'block',
self.tags)
self.wal_device_path = self.luks_format_and_open(key,
self.wal_device_path,
'wal',
self.tags)
self.db_device_path = self.luks_format_and_open(key,
self.db_device_path,
'db',
self.tags)
device_types = ('block', 'db', 'wal')
for device_type in device_types:
attr_name: str = f'{device_type}_device_path'
path: str = self.__dict__[attr_name]
if path:
self.__dict__[attr_name] = self.luks_format_and_open(path,
device_type,
self.tags)
def luks_format_and_open(self,
key: Optional[str],
device: str,
device_type: str,
tags: Dict[str, Any]) -> str:
@ -206,14 +201,18 @@ class LvmBlueStore(BlueStore):
uuid = tags[tag_name]
# format data device
encryption_utils.luks_format(
key,
self.dmcrypt_key,
device
)
if self.with_tpm:
self.enroll_tpm2(device)
encryption_utils.luks_open(
key,
self.dmcrypt_key,
device,
uuid
)
uuid,
self.with_tpm)
return '/dev/mapper/%s' % uuid
@ -346,7 +345,7 @@ class LvmBlueStore(BlueStore):
raise RuntimeError('could not find a bluestore OSD to activate')
is_encrypted = osd_block_lv.tags.get('ceph.encrypted', '0') == '1'
dmcrypt_secret = None
dmcrypt_secret = ''
osd_id = osd_block_lv.tags['ceph.osd_id']
conf.cluster = osd_block_lv.tags['ceph.cluster_name']
osd_fsid = osd_block_lv.tags['ceph.osd_fsid']
@ -368,13 +367,16 @@ class LvmBlueStore(BlueStore):
if is_encrypted:
osd_lv_path = '/dev/mapper/%s' % osd_block_lv.__dict__['lv_uuid']
lockbox_secret = osd_block_lv.tags['ceph.cephx_lockbox_secret']
encryption_utils.write_lockbox_keyring(osd_id,
osd_fsid,
lockbox_secret)
dmcrypt_secret = encryption_utils.get_dmcrypt_key(osd_id, osd_fsid)
self.with_tpm = bool(osd_block_lv.tags.get('ceph.with_tpm', 0))
if not self.with_tpm:
encryption_utils.write_lockbox_keyring(osd_id,
osd_fsid,
lockbox_secret)
dmcrypt_secret = encryption_utils.get_dmcrypt_key(osd_id, osd_fsid)
encryption_utils.luks_open(dmcrypt_secret,
osd_block_lv.__dict__['lv_path'],
osd_block_lv.__dict__['lv_uuid'])
osd_block_lv.__dict__['lv_uuid'],
with_tpm=self.with_tpm)
else:
osd_lv_path = osd_block_lv.__dict__['lv_path']

View File

@ -19,19 +19,19 @@ logger = logging.getLogger(__name__)
class RawBlueStore(BlueStore):
def __init__(self, args: "argparse.Namespace") -> None:
super().__init__(args)
if hasattr(self.args, 'data'):
self.block_device_path = self.args.data
if hasattr(self.args, 'block_db'):
self.db_device_path = self.args.block_db
if hasattr(self.args, 'block_wal'):
self.wal_device_path = self.args.block_wal
self.method = 'raw'
self.devices: List[str] = getattr(args, 'devices', [])
self.osd_id = getattr(self.args, 'osd_id', '')
self.osd_fsid = getattr(self.args, 'osd_fsid', '')
self.block_device_path = getattr(self.args, 'data', '')
self.db_device_path = getattr(self.args, 'block_db', '')
self.wal_device_path = getattr(self.args, 'block_wal', '')
def prepare_dmcrypt(self) -> None:
"""
Helper for devices that are encrypted. The operations needed for
block, db, wal, devices are all the same
"""
key = self.secrets['dmcrypt_key']
for device, device_type in [(self.block_device_path, 'block'),
(self.db_device_path, 'db'),
@ -44,16 +44,19 @@ class RawBlueStore(BlueStore):
device_type)
# format data device
encryption_utils.luks_format(
key,
self.dmcrypt_key,
device
)
if self.with_tpm:
self.enroll_tpm2(device)
encryption_utils.luks_open(
key,
self.dmcrypt_key,
device,
mapping
mapping,
self.with_tpm
)
self.__dict__[f'{device_type}_device_path'] = \
'/dev/mapper/{}'.format(mapping)
'/dev/mapper/{}'.format(mapping) # TODO(guits): need to preserve path or find a way to get the parent device from the mapper ?
def safe_prepare(self,
args: Optional["argparse.Namespace"] = None) -> None:
@ -80,25 +83,21 @@ class RawBlueStore(BlueStore):
@decorators.needs_root
def prepare(self) -> None:
if self.encrypted:
self.secrets['dmcrypt_key'] = \
os.getenv('CEPH_VOLUME_DMCRYPT_SECRET')
self.osd_fsid = system.generate_uuid()
crush_device_class = self.args.crush_device_class
if self.encrypted and not self.with_tpm:
self.dmcrypt_key = os.getenv('CEPH_VOLUME_DMCRYPT_SECRET', '')
self.secrets['dmcrypt_key'] = self.dmcrypt_key
if crush_device_class:
self.secrets['crush_device_class'] = crush_device_class
tmpfs = not self.args.no_tmpfs
if self.args.block_wal:
self.wal = self.args.block_wal
if self.args.block_db:
self.db = self.args.block_db
# reuse a given ID if it exists, otherwise create a new ID
self.osd_id = prepare_utils.create_id(
self.osd_fsid, json.dumps(self.secrets))
if self.secrets.get('dmcrypt_key'):
if self.encrypted:
self.prepare_dmcrypt()
self.prepare_osd_req(tmpfs=tmpfs)
@ -106,18 +105,12 @@ class RawBlueStore(BlueStore):
# prepare the osd filesystem
self.osd_mkfs()
def _activate(self,
meta: Dict[str, Any],
tmpfs: bool) -> None:
# find the osd
osd_id = meta['osd_id']
osd_uuid = meta['osd_uuid']
def _activate(self, osd_id: str, osd_fsid: str) -> None:
# mount on tmpfs the osd directory
self.osd_path = '/var/lib/ceph/osd/%s-%s' % (conf.cluster, osd_id)
if not system.path_is_mounted(self.osd_path):
# mkdir -p and mount as tmpfs
prepare_utils.create_osd_path(osd_id, tmpfs=tmpfs)
prepare_utils.create_osd_path(osd_id, tmpfs=not self.args.no_tmpfs)
# XXX This needs to be removed once ceph-bluestore-tool can deal with
# symlinks that exist in the osd dir
@ -133,49 +126,98 @@ class RawBlueStore(BlueStore):
'prime-osd-dir',
'--path', self.osd_path,
'--no-mon-config',
'--dev', meta['device'],
'--dev', self.block_device_path,
]
process.run(prime_command)
# always re-do the symlink regardless if it exists, so that the block,
# block.wal, and block.db devices that may have changed can be mapped
# correctly every time
prepare_utils.link_block(meta['device'], osd_id)
prepare_utils.link_block(self.block_device_path, osd_id)
if 'device_db' in meta:
prepare_utils.link_db(meta['device_db'], osd_id, osd_uuid)
if self.db_device_path:
prepare_utils.link_db(self.db_device_path, osd_id, osd_fsid)
if 'device_wal' in meta:
prepare_utils.link_wal(meta['device_wal'], osd_id, osd_uuid)
if self.wal_device_path:
prepare_utils.link_wal(self.wal_device_path, osd_id, osd_fsid)
system.chown(self.osd_path)
terminal.success("ceph-volume raw activate "
"successful for osd ID: %s" % osd_id)
@decorators.needs_root
def activate(self,
devs: List[str],
start_osd_id: str,
start_osd_uuid: str,
tmpfs: bool) -> None:
"""
:param args: The parsed arguments coming from the CLI
"""
assert devs or start_osd_id or start_osd_uuid
found = direct_report(devs)
def activate(self) -> None:
"""Activate Ceph OSDs on the system.
This function activates Ceph Object Storage Daemons (OSDs) on the system.
It iterates over all block devices, checking if they have a LUKS2 signature and
are encrypted for Ceph. If a device's OSD fsid matches and it is enrolled with TPM2,
the function pre-activates it. After collecting the relevant devices, it attempts to
activate any OSDs found.
Raises:
RuntimeError: If no matching OSDs are found to activate.
"""
assert self.devices or self.osd_id or self.osd_fsid
activated_any: bool = False
for d in disk.lsblk_all(abspath=True):
device: str = d.get('NAME')
luks2 = encryption_utils.CephLuks2(device)
if luks2.is_ceph_encrypted:
if luks2.is_tpm2_enrolled and self.osd_fsid == luks2.osd_fsid:
self.pre_activate_tpm2(device)
found = direct_report(self.devices)
activated_any = False
for osd_uuid, meta in found.items():
osd_id = meta['osd_id']
if start_osd_id is not None and str(osd_id) != str(start_osd_id):
if self.osd_id is not None and str(osd_id) != str(self.osd_id):
continue
if start_osd_uuid is not None and osd_uuid != start_osd_uuid:
if self.osd_fsid is not None and osd_uuid != self.osd_fsid:
continue
logger.info('Activating osd.%s uuid %s cluster %s' % (
osd_id, osd_uuid, meta['ceph_fsid']))
self._activate(meta,
tmpfs=tmpfs)
self.block_device_path = meta.get('device')
self.db_device_path = meta.get('device_db', '')
self.wal_device_path = meta.get('device_wal', '')
logger.info(f'Activating osd.{osd_id} uuid {osd_uuid} cluster {meta["ceph_fsid"]}')
self._activate(osd_id, osd_uuid)
activated_any = True
if not activated_any:
raise RuntimeError('did not find any matching OSD to activate')
def pre_activate_tpm2(self, device: str) -> None:
"""Pre-activate a TPM2-encrypted device for Ceph.
This function pre-activates a TPM2-encrypted device for Ceph by opening the
LUKS encryption, checking the BlueStore header, and renaming the device
mapper according to the BlueStore mapping type.
Args:
device (str): The path to the device to be pre-activated.
Raises:
RuntimeError: If the device does not have a BlueStore signature.
"""
bs_mapping_type: Dict[str, str] = {'bluefs db': 'db',
'bluefs wal': 'wal',
'main': 'block'}
self.with_tpm = 1
self.temp_mapper: str = f'activating-{os.path.basename(device)}'
self.temp_mapper_path: str = f'/dev/mapper/{self.temp_mapper}'
encryption_utils.luks_open(
'',
device,
self.temp_mapper,
self.with_tpm
)
bluestore_header: Dict[str, Any] = disk.get_bluestore_header(self.temp_mapper_path)
if not bluestore_header:
raise RuntimeError(f"{device} doesn't have BlueStore signature.")
kname: str = disk.get_parent_device_from_mapper(self.temp_mapper_path, abspath=False)
device_type = bs_mapping_type[bluestore_header[self.temp_mapper_path]['description']]
new_mapper: str = f'ceph-{self.osd_fsid}-{kname}-{device_type}-dmcrypt'
self.block_device_path = f'/dev/mapper/{new_mapper}'
self.devices.append(self.block_device_path)
encryption_utils.rename_mapper(self.temp_mapper, new_mapper)

View File

@ -1,11 +1,13 @@
import os
import pytest
from mock.mock import patch, PropertyMock, create_autospec
from mock.mock import patch, PropertyMock, create_autospec, Mock
from ceph_volume.api import lvm
from ceph_volume.util import disk
from ceph_volume.util import device
from ceph_volume.util.constants import ceph_disk_guids
from ceph_volume import conf, configuration, objectstore
from ceph_volume.objectstore.rawbluestore import RawBlueStore
from typing import Any, Dict, List, Optional, Callable
class Capture(object):
@ -494,6 +496,14 @@ raw_direct_report_data = {
"osd_id": 9,
"osd_uuid": "a0e07c5b-bee1-4ea2-ae07-cb89deda9b27",
"type": "bluestore"
},
"db32a338-b640-4cbc-af17-f63808b1c36e": {
"ceph_fsid": "c301d0aa-288d-11ef-b535-c84bd6975560",
"device": "/dev/mapper/ceph-db32a338-b640-4cbc-af17-f63808b1c36e-sdb-block-dmcrypt",
"device_db": "/dev/mapper/ceph-db32a338-b640-4cbc-af17-f63808b1c36e-sdc-db-dmcrypt",
"osd_id": 0,
"osd_uuid": "db32a338-b640-4cbc-af17-f63808b1c36e",
"type": "bluestore"
}
}
@ -503,4 +513,21 @@ def mock_lvm_direct_report(monkeypatch):
@pytest.fixture
def mock_raw_direct_report(monkeypatch):
monkeypatch.setattr('ceph_volume.objectstore.rawbluestore.direct_report', lambda x: raw_direct_report_data)
monkeypatch.setattr('ceph_volume.objectstore.rawbluestore.direct_report', lambda x: raw_direct_report_data)
@pytest.fixture
def fake_lsblk_all(monkeypatch: Any) -> Callable:
def apply(data: Optional[List[Dict[str, Any]]] = None) -> None:
if data is None:
devices = []
else:
devices = data
monkeypatch.setattr("ceph_volume.util.device.disk.lsblk_all", lambda *a, **kw: devices)
return apply
@pytest.fixture
def rawbluestore(factory: type[Factory]) -> RawBlueStore:
args = factory(devices=['/dev/foo'])
with patch('ceph_volume.objectstore.rawbluestore.prepare_utils.create_key', Mock(return_value=['AQCee6ZkzhOrJRAAZWSvNC3KdXOpC2w8ly4AZQ=='])):
r = RawBlueStore(args) # type: ignore
return r

View File

@ -78,6 +78,7 @@ class TestPrepare(object):
with pytest.raises(RuntimeError) as error:
self.p.args = Mock()
self.p.args.data = '/dev/sdfoo'
self.p.args.with_tpm = '0'
self.p.get_lv = Mock()
self.p.objectstore = objectstore.lvmbluestore.LvmBlueStore(args=self.p.args)
self.p.objectstore.safe_prepare()

View File

@ -71,8 +71,8 @@ class TestPrepare(object):
osd_fsid='123',
secrets=dict(dmcrypt_key='foo'))
self.p.objectstore.prepare_dmcrypt()
m_luks_open.assert_called_with('foo', '/dev/foo', 'ceph-123-foo-block-dmcrypt')
m_luks_format.assert_called_with('foo', '/dev/foo')
m_luks_open.assert_called_with(self.p.objectstore.dmcrypt_key, '/dev/foo', 'ceph-123-foo-block-dmcrypt', 0)
m_luks_format.assert_called_with(self.p.objectstore.dmcrypt_key, '/dev/foo')
assert self.p.objectstore.__dict__['block_device_path'] == '/dev/mapper/ceph-123-foo-block-dmcrypt'
@patch('ceph_volume.util.encryption.luks_open')
@ -86,8 +86,8 @@ class TestPrepare(object):
osd_fsid='456',
secrets=dict(dmcrypt_key='foo'))
self.p.objectstore.prepare_dmcrypt()
m_luks_open.assert_called_with('foo', '/dev/db-foo', 'ceph-456-foo-db-dmcrypt')
m_luks_format.assert_called_with('foo', '/dev/db-foo')
m_luks_open.assert_called_with(self.p.objectstore.dmcrypt_key, '/dev/db-foo', 'ceph-456-foo-db-dmcrypt', 0)
m_luks_format.assert_called_with(self.p.objectstore.dmcrypt_key, '/dev/db-foo')
assert self.p.objectstore.__dict__['db_device_path'] == '/dev/mapper/ceph-456-foo-db-dmcrypt'
@patch('ceph_volume.util.encryption.luks_open')
@ -101,8 +101,8 @@ class TestPrepare(object):
osd_fsid='789',
secrets=dict(dmcrypt_key='foo'))
self.p.objectstore.prepare_dmcrypt()
m_luks_open.assert_called_with('foo', '/dev/wal-foo', 'ceph-789-foo-wal-dmcrypt')
m_luks_format.assert_called_with('foo', '/dev/wal-foo')
m_luks_open.assert_called_with(self.p.objectstore.dmcrypt_key, '/dev/wal-foo', 'ceph-789-foo-wal-dmcrypt', 0)
m_luks_format.assert_called_with(self.p.objectstore.dmcrypt_key, '/dev/wal-foo')
assert self.p.objectstore.__dict__['wal_device_path'] == '/dev/mapper/ceph-789-foo-wal-dmcrypt'
@patch('ceph_volume.objectstore.rawbluestore.rollback_osd')

View File

@ -13,7 +13,6 @@ class TestLvmBlueStore:
@patch('ceph_volume.conf.cluster', 'ceph')
@patch('ceph_volume.api.lvm.get_single_lv')
@patch('ceph_volume.objectstore.lvmbluestore.prepare_utils.create_id', Mock(return_value='111'))
@patch('ceph_volume.objectstore.lvmbluestore.encryption_utils.create_dmcrypt_key', Mock(return_value='fake-dmcrypt-key'))
def test_pre_prepare_lv(self, m_get_single_lv, factory):
args = factory(cluster_fsid='abcd',
osd_fsid='abc123',
@ -26,6 +25,7 @@ class TestLvmBlueStore:
lv_tags='',
lv_uuid='fake-uuid')
self.lvm_bs.encrypted = True
self.lvm_bs.dmcrypt_key = 'fake-dmcrypt-key'
self.lvm_bs.args = args
self.lvm_bs.pre_prepare()
assert self.lvm_bs.secrets['dmcrypt_key'] == 'fake-dmcrypt-key'
@ -42,22 +42,62 @@ class TestLvmBlueStore:
'ceph.block_uuid': 'fake-uuid',
'ceph.cephx_lockbox_secret': '',
'ceph.encrypted': True,
'ceph.vdo': '0'}
'ceph.vdo': '0',
'ceph.with_tpm': 0}
@patch('ceph_volume.conf.cluster', 'ceph')
@patch('ceph_volume.api.lvm.get_single_lv')
@patch('ceph_volume.objectstore.lvmbluestore.prepare_utils.create_id', Mock(return_value='111'))
def test_pre_prepare_lv_with_dmcrypt_and_tpm(self, m_get_single_lv, factory):
args = factory(cluster_fsid='abcd',
osd_fsid='abc123',
crush_device_class='ssd',
osd_id='111',
data='vg_foo/lv_foo',
dmcrypt=True,
with_tpm=True)
m_get_single_lv.return_value = Volume(lv_name='lv_foo',
lv_path='/fake-path',
vg_name='vg_foo',
lv_tags='',
lv_uuid='fake-uuid')
self.lvm_bs.encrypted = True
self.lvm_bs.with_tpm = True
self.lvm_bs.dmcrypt_key = 'fake-dmcrypt-key-tpm2'
self.lvm_bs.args = args
self.lvm_bs.pre_prepare()
assert 'dmcrypt_key' not in self.lvm_bs.secrets.keys()
assert self.lvm_bs.secrets['crush_device_class'] == 'ssd'
assert self.lvm_bs.osd_id == '111'
assert self.lvm_bs.block_device_path == '/fake-path'
assert self.lvm_bs.tags == {'ceph.osd_fsid': 'abc123',
'ceph.osd_id': '111',
'ceph.cluster_fsid': 'abcd',
'ceph.cluster_name': 'ceph',
'ceph.crush_device_class': 'ssd',
'ceph.osdspec_affinity': '',
'ceph.block_device': '/fake-path',
'ceph.block_uuid': 'fake-uuid',
'ceph.cephx_lockbox_secret': '',
'ceph.encrypted': True,
'ceph.vdo': '0',
'ceph.with_tpm': 1}
@patch('ceph_volume.objectstore.lvmbluestore.prepare_utils.create_id', Mock(return_value='111'))
@patch('ceph_volume.objectstore.lvmbluestore.encryption_utils.create_dmcrypt_key', Mock(return_value='fake-dmcrypt-key'))
def test_pre_prepare_no_lv(self, factory):
args = factory(cluster_fsid='abcd',
osd_fsid='abc123',
crush_device_class='ssd',
osd_id='111',
data='/dev/foo')
data='/dev/foo',
dmcrypt_key='fake-dmcrypt-key')
self.lvm_bs.prepare_data_device = lambda x, y: Volume(lv_name='lv_foo',
lv_path='/fake-path',
vg_name='vg_foo',
lv_tags='',
lv_uuid='fake-uuid')
self.lvm_bs.encrypted = True
self.lvm_bs.dmcrypt_key = 'fake-dmcrypt-key'
self.lvm_bs.args = args
self.lvm_bs.pre_prepare()
assert self.lvm_bs.secrets['dmcrypt_key'] == 'fake-dmcrypt-key'
@ -74,7 +114,8 @@ class TestLvmBlueStore:
'ceph.block_uuid': 'fake-uuid',
'ceph.cephx_lockbox_secret': '',
'ceph.encrypted': True,
'ceph.vdo': '0'}
'ceph.vdo': '0',
'ceph.with_tpm': 0}
@patch('ceph_volume.util.disk.is_partition', Mock(return_value=True))
@patch('ceph_volume.api.lvm.create_lv')
@ -165,6 +206,7 @@ class TestLvmBlueStore:
block_db_size=123,
block_wal_slots=1,
block_db_slots=1,
with_tpm=False
)
self.lvm_bs.args = args
self.lvm_bs.pre_prepare = lambda: None
@ -177,7 +219,7 @@ class TestLvmBlueStore:
assert self.lvm_bs.wal_device_path == '/dev/foo1'
assert self.lvm_bs.db_device_path == '/dev/foo2'
assert self.lvm_bs.block_lv.set_tags.mock_calls == [call({'ceph.type': 'block', 'ceph.vdo': '0', 'ceph.wal_uuid': 'c6798f59-01', 'ceph.wal_device': '/dev/foo1', 'ceph.db_uuid': 'c6798f59-01', 'ceph.db_device': '/dev/foo2'})]
assert self.lvm_bs.prepare_dmcrypt.called
assert not self.lvm_bs.prepare_dmcrypt.called
assert self.lvm_bs.osd_mkfs.called
assert self.lvm_bs.prepare_osd_req.called
@ -185,8 +227,12 @@ class TestLvmBlueStore:
self.lvm_bs.secrets = {'dmcrypt_key': 'fake-secret'}
self.lvm_bs.tags = {'ceph.block_uuid': 'block-uuid1',
'ceph.db_uuid': 'db-uuid2',
'ceph.wal_uuid': 'wal-uuid3'}
self.lvm_bs.luks_format_and_open = lambda *a: f'/dev/mapper/{a[3]["ceph."+a[2]+"_uuid"]}'
'ceph.wal_uuid': 'wal-uuid3',
'ceph.with_tpm': 0}
self.lvm_bs.block_device_path = '/dev/sdb'
self.lvm_bs.db_device_path = '/dev/sdc'
self.lvm_bs.wal_device_path = '/dev/sdb'
self.lvm_bs.luks_format_and_open = lambda *a: f'/dev/mapper/{a[2]["ceph."+a[1]+"_uuid"]}'
self.lvm_bs.prepare_dmcrypt()
assert self.lvm_bs.block_device_path == '/dev/mapper/block-uuid1'
assert self.lvm_bs.db_device_path == '/dev/mapper/db-uuid2'
@ -195,15 +241,24 @@ class TestLvmBlueStore:
@patch('ceph_volume.objectstore.lvmbluestore.encryption_utils.luks_open')
@patch('ceph_volume.objectstore.lvmbluestore.encryption_utils.luks_format')
def test_luks_format_and_open(self, m_luks_format, m_luks_open):
result = self.lvm_bs.luks_format_and_open('key',
'/dev/foo',
result = self.lvm_bs.luks_format_and_open('/dev/foo',
'block',
{'ceph.block_uuid': 'block-uuid1'})
assert result == '/dev/mapper/block-uuid1'
@patch('ceph_volume.objectstore.lvmbluestore.LvmBlueStore.enroll_tpm2', Mock(return_value=MagicMock()))
@patch('ceph_volume.objectstore.lvmbluestore.encryption_utils.luks_open')
@patch('ceph_volume.objectstore.lvmbluestore.encryption_utils.luks_format')
def test_luks_format_and_open_with_tpm(self, m_luks_format, m_luks_open):
self.lvm_bs.with_tpm = True
result = self.lvm_bs.luks_format_and_open('/dev/foo',
'block',
{'ceph.block_uuid': 'block-uuid1'})
assert result == '/dev/mapper/block-uuid1'
self.lvm_bs.enroll_tpm2.assert_called_once()
def test_luks_format_and_open_not_device(self):
result = self.lvm_bs.luks_format_and_open('key',
'',
result = self.lvm_bs.luks_format_and_open('',
'block',
{})
assert result == ''

View File

@ -26,6 +26,26 @@ class TestRawBlueStore:
assert self.raw_bs.db_device_path == "/dev/mapper/ceph--foo0-db-dmcrypt"
assert self.raw_bs.wal_device_path == "/dev/mapper/ceph--foo0-wal-dmcrypt"
@patch('ceph_volume.objectstore.rawbluestore.RawBlueStore.enroll_tpm2', Mock(return_value=MagicMock()))
def test_prepare_dmcrypt_with_tpm(self,
device_info,
fake_call,
key_size):
self.raw_bs.block_device_path = '/dev/foo0'
self.raw_bs.db_device_path = '/dev/foo1'
self.raw_bs.wal_device_path = '/dev/foo2'
self.raw_bs.with_tpm = 1
lsblk = {"TYPE": "disk",
"NAME": "foo0",
'KNAME': 'foo0'}
device_info(lsblk=lsblk)
self.raw_bs.prepare_dmcrypt()
assert 'dmcrypt_key' not in self.raw_bs.secrets.keys()
assert self.raw_bs.block_device_path == "/dev/mapper/ceph--foo0-block-dmcrypt"
assert self.raw_bs.db_device_path == "/dev/mapper/ceph--foo0-db-dmcrypt"
assert self.raw_bs.wal_device_path == "/dev/mapper/ceph--foo0-wal-dmcrypt"
assert self.raw_bs.enroll_tpm2.mock_calls == [call('/dev/foo0'), call('/dev/foo1'), call('/dev/foo2')]
@patch('ceph_volume.objectstore.rawbluestore.rollback_osd')
@patch('ceph_volume.objectstore.rawbluestore.RawBlueStore.prepare')
def test_safe_prepare_raises_exception(self,
@ -48,13 +68,10 @@ class TestRawBlueStore:
capsys):
args = factory(dmcrypt=True,
data='/dev/foo')
# self.raw_bs.args = args
self.raw_bs.safe_prepare(args)
stdout, stderr = capsys.readouterr()
_, stderr = capsys.readouterr()
assert "prepare successful for: /dev/foo" in stderr
# @patch('ceph_volume.objectstore.rawbluestore.prepare_utils.create_id')
# @patch('ceph_volume.objectstore.rawbluestore.system.generate_uuid', return_value='fake-uuid')
@patch.dict('os.environ', {'CEPH_VOLUME_DMCRYPT_SECRET': 'dmcrypt-key'})
@patch('ceph_volume.objectstore.rawbluestore.prepare_utils.create_id')
@patch('ceph_volume.objectstore.rawbluestore.system.generate_uuid')
@ -92,26 +109,27 @@ class TestRawBlueStore:
m_link_block,
m_link_db,
m_link_wal,
monkeypatch):
meta = dict(osd_id='1',
osd_uuid='fake-uuid',
device='/dev/foo',
device_db='/dev/foo1',
device_wal='/dev/foo2')
monkeypatch,
factory):
args = factory(no_tmpfs=False)
self.raw_bs.args = args
self.raw_bs.block_device_path = '/dev/sda'
self.raw_bs.db_device_path = '/dev/sdb'
self.raw_bs.wal_device_path = '/dev/sdc'
m_run.return_value = MagicMock()
m_exists.side_effect = lambda path: True
m_create_osd_path.return_value = MagicMock()
m_unlink.return_value = MagicMock()
monkeypatch.setattr(system, 'chown', lambda path: 0)
monkeypatch.setattr(system, 'path_is_mounted', lambda path: 0)
self.raw_bs._activate(meta, True)
self.raw_bs._activate('1', True)
calls = [call('/var/lib/ceph/osd/ceph-1/block'),
call('/var/lib/ceph/osd/ceph-1/block.db'),
call('/var/lib/ceph/osd/ceph-1/block.wal')]
assert m_run.mock_calls == [call(['ceph-bluestore-tool',
'prime-osd-dir',
'--path', '/var/lib/ceph/osd/ceph-1',
'--no-mon-config', '--dev', '/dev/foo'])]
'--no-mon-config', '--dev', '/dev/sda'])]
assert m_unlink.mock_calls == calls
assert m_exists.mock_calls == calls
assert m_create_osd_path.mock_calls == [call('1', tmpfs=True)]
@ -120,40 +138,81 @@ class TestRawBlueStore:
is_root,
mock_raw_direct_report):
with pytest.raises(RuntimeError) as error:
self.raw_bs.activate([],
'123',
'fake-uuid',
True)
self.raw_bs.osd_id = '1'
self.raw_bs.activate()
assert str(error.value) == 'did not find any matching OSD to activate'
def test_activate_osd_id(self,
is_root,
mock_raw_direct_report):
def test_activate_osd_id_and_fsid(self,
is_root,
mock_raw_direct_report):
self.raw_bs._activate = MagicMock()
self.raw_bs.activate([],
'8',
'824f7edf-371f-4b75-9231-4ab62a32d5c0',
True)
self.raw_bs.osd_id = '8'
self.raw_bs.osd_fsid = '824f7edf-371f-4b75-9231-4ab62a32d5c0'
self.raw_bs.activate()
self.raw_bs._activate.mock_calls == [call({'ceph_fsid': '7dccab18-14cf-11ee-837b-5254008f8ca5',
'device': '/dev/mapper/ceph--40bc7bd7--4aee--483e--ba95--89a64bc8a4fd-osd--block--824f7edf--371f--4b75--9231--4ab62a32d5c0',
'device_db': '/dev/mapper/ceph--73d6d4db--6528--48f2--a4e2--1c82bc87a9ac-osd--db--b82d920d--be3c--4e4d--ba64--18f7e8445892',
'osd_id': 8,
'osd_uuid': '824f7edf-371f-4b75-9231-4ab62a32d5c0',
'type': 'bluestore'},
tmpfs=True)]
tmpfs=True)]
def test_activate_osd_fsid(self,
is_root,
mock_raw_direct_report):
self.raw_bs._activate = MagicMock()
with pytest.raises(RuntimeError):
self.raw_bs.activate([],
'8',
'a0e07c5b-bee1-4ea2-ae07-cb89deda9b27',
True)
self.raw_bs._activate.mock_calls == [call({'ceph_fsid': '7dccab18-14cf-11ee-837b-5254008f8ca5',
'device': '/dev/mapper/ceph--e34cc3f5--a70d--49df--82b3--46bcbd63d4b0-osd--block--a0e07c5b--bee1--4ea2--ae07--cb89deda9b27',
'osd_id': 9,
'osd_uuid': 'a0e07c5b-bee1-4ea2-ae07-cb89deda9b27',
'type': 'bluestore'},
tmpfs=True)]
@patch('ceph_volume.objectstore.rawbluestore.encryption_utils.rename_mapper', Mock(return_value=MagicMock()))
@patch('ceph_volume.util.disk.get_bluestore_header')
@patch('ceph_volume.objectstore.rawbluestore.encryption_utils.luks_open', Mock(return_value=MagicMock()))
def test_activate_dmcrypt_tpm(self, m_bs_header, rawbluestore, fake_lsblk_all, mock_raw_direct_report, is_root) -> None:
m_bs_header.return_value = {
"/dev/mapper/activating-sdb": {
"osd_uuid": "db32a338-b640-4cbc-af17-f63808b1c36e",
"size": 20000572178432,
"btime": "2024-06-13T12:16:57.607442+0000",
"description": "main",
"bfm_blocks": "4882952192",
"bfm_blocks_per_key": "128",
"bfm_bytes_per_block": "4096",
"bfm_size": "20000572178432",
"bluefs": "1",
"ceph_fsid": "c301d0aa-288d-11ef-b535-c84bd6975560",
"ceph_version_when_created": "ceph version 19.0.0-4242-gf2f7cc60 (f2f7cc609cdbae767486cf2fe6872a4789adffb2) squid (dev)",
"created_at": "2024-06-13T12:17:20.122565Z",
"elastic_shared_blobs": "1",
"kv_backend": "rocksdb",
"magic": "ceph osd volume v026",
"mkfs_done": "yes",
"osd_key": "AQAk42pmt7tqFxAAHlaETFm33yFtEuoQAh/cpQ==",
"ready": "ready",
"whoami": "0"}
}
mock_luks2_1 = Mock()
mock_luks2_1.is_ceph_encrypted = True
mock_luks2_1.is_tpm2_enrolled = True
mock_luks2_1.osd_fsid = 'db32a338-b640-4cbc-af17-f63808b1c36e'
mock_luks2_2 = Mock()
mock_luks2_2.is_ceph_encrypted = True
mock_luks2_2.is_tpm2_enrolled = False
mock_luks2_2.osd_fsid = 'db32a338-b640-4cbc-af17-f63808b1c36e'
mock_luks2_3 = Mock()
mock_luks2_3.is_ceph_encrypted = False
mock_luks2_3.is_tpm2_enrolled = False
mock_luks2_3.osd_fsid = ''
mock_luks2_4 = Mock()
mock_luks2_4.is_ceph_encrypted = True
mock_luks2_4.is_tpm2_enrolled = True
mock_luks2_4.osd_fsid = 'abcd'
with patch('ceph_volume.objectstore.rawbluestore.encryption_utils.CephLuks2', side_effect=[mock_luks2_1,
mock_luks2_2,
mock_luks2_3,
mock_luks2_4]):
fake_lsblk_all([{'NAME': '/dev/sdb', 'FSTYPE': 'crypto_LUKS'},
{'NAME': '/dev/sdc', 'FSTYPE': 'crypto_LUKS'},
{'NAME': '/dev/sdd', 'FSTYPE': ''}])
rawbluestore.osd_fsid = 'db32a338-b640-4cbc-af17-f63808b1c36e'
rawbluestore.osd_id = '0'
rawbluestore._activate = MagicMock()
rawbluestore.activate()
assert rawbluestore._activate.mock_calls == [call(0, 'db32a338-b640-4cbc-af17-f63808b1c36e')]
assert rawbluestore.block_device_path == '/dev/mapper/ceph-db32a338-b640-4cbc-af17-f63808b1c36e-sdb-block-dmcrypt'
assert rawbluestore.db_device_path == '/dev/mapper/ceph-db32a338-b640-4cbc-af17-f63808b1c36e-sdc-db-dmcrypt'

View File

@ -0,0 +1,26 @@
import os
from ceph_volume import AllowLoopDevices, allow_loop_devices
from typing import Any
class TestAllowLoopDevsWarning:
def setup_method(self) -> None:
AllowLoopDevices.allow = False
AllowLoopDevices.warned = False
self.teardown_method()
def teardown_method(self) -> None:
AllowLoopDevices.allow = False
AllowLoopDevices.warned = False
if os.environ.get('CEPH_VOLUME_ALLOW_LOOP_DEVICES'):
os.environ.pop('CEPH_VOLUME_ALLOW_LOOP_DEVICES')
def test_loop_dev_warning(self, fake_call: Any, caplog: Any) -> None:
AllowLoopDevices.warned = False
assert allow_loop_devices() is False
assert not caplog.records
os.environ['CEPH_VOLUME_ALLOW_LOOP_DEVICES'] = "y"
assert allow_loop_devices() is True
log = caplog.records[0]
assert log.levelname == "WARNING"
assert "will never be supported in production" in log.message

View File

@ -1,7 +1,6 @@
import os
import pytest
from ceph_volume.util import disk
from mock.mock import patch, MagicMock
from mock.mock import patch, Mock, MagicMock, mock_open
class TestFunctions:
@ -39,6 +38,11 @@ class TestFunctions:
assert disk.is_partition('sda1')
@patch('os.path.exists', Mock(return_value=True))
def test_get_lvm_mapper_path_from_dm(self):
with patch('builtins.open', mock_open(read_data='test--foo--vg-test--foo--lv')):
assert disk.get_lvm_mapper_path_from_dm('/dev/dm-123') == '/dev/mapper/test--foo--vg-test--foo--lv'
class TestLsblkParser(object):
def test_parses_whitespace_values(self):
@ -551,24 +555,6 @@ class TestSizeSpecificFormatting(object):
assert result == "1027.00 TB"
class TestAllowLoopDevsWarning(object):
def setup_method(self):
disk.AllowLoopDevices.allow = False
disk.AllowLoopDevices.warned = False
if os.environ.get('CEPH_VOLUME_ALLOW_LOOP_DEVICES'):
os.environ.pop('CEPH_VOLUME_ALLOW_LOOP_DEVICES')
def test_loop_dev_warning(self, fake_call, caplog):
disk.AllowLoopDevices.warned = False
assert disk.allow_loop_devices() is False
assert not caplog.records
os.environ['CEPH_VOLUME_ALLOW_LOOP_DEVICES'] = "y"
assert disk.allow_loop_devices() is True
log = caplog.records[0]
assert log.levelname == "WARNING"
assert "will never be supported in production" in log.message
class TestHasBlueStoreLabel(object):
def test_device_path_is_a_path(self, fake_filesystem):
device_path = '/var/lib/ceph/osd/ceph-0'

View File

@ -1,7 +1,9 @@
from ceph_volume.util import encryption
from mock.mock import patch, Mock
from mock.mock import call, patch, Mock, MagicMock
from typing import Any
import base64
import pytest
import json
class TestNoWorkqueue:
@ -176,3 +178,113 @@ class TestLuksOpen(object):
]
encryption.luks_open('abcd', '/dev/foo', '/dev/bar')
assert m_call.call_args[0][0] == expected
class TestCephLuks2:
@patch.object(encryption.CephLuks2, 'get_osd_fsid', Mock(return_value='abcd-1234'))
@patch.object(encryption.CephLuks2, 'is_ceph_encrypted', Mock(return_value=True))
def test_init_ceph_encrypted(self) -> None:
assert encryption.CephLuks2('/dev/foo').osd_fsid == 'abcd-1234'
@patch.object(encryption.CephLuks2, 'get_osd_fsid', Mock(return_value=''))
@patch.object(encryption.CephLuks2, 'is_ceph_encrypted', Mock(return_value=False))
def test_init_not_ceph_encrypted(self) -> None:
assert encryption.CephLuks2('/dev/foo').osd_fsid == ''
def test_has_luks2_signature(self) -> None:
with patch('ceph_volume.util.encryption._dd_read', return_value='LUKS'):
assert encryption.CephLuks2('/dev/foo').has_luks2_signature
@patch('ceph_volume.util.encryption._dd_read', side_effect=Exception('foo'))
def test_has_luks2_signature_raises_exception(self, m_dd_read: Any) -> None:
with pytest.raises(RuntimeError):
encryption.CephLuks2('/dev/foo').has_luks2_signature
@patch.object(encryption.CephLuks2, 'get_subsystem', Mock(return_value='ceph_fsid=abcd'))
@patch.object(encryption.CephLuks2, 'has_luks2_signature', Mock(return_value=True))
def test_is_ceph_encrypted(self) -> None:
assert encryption.CephLuks2('/dev/foo').is_ceph_encrypted
@patch.object(encryption.CephLuks2, 'get_label', Mock(return_value=''))
@patch.object(encryption.CephLuks2, 'has_luks2_signature', Mock(return_value=True))
def test_is_not_ceph_encrypted(self) -> None:
assert not encryption.CephLuks2('/dev/foo').is_ceph_encrypted
@patch('ceph_volume.util.encryption.process.call', Mock(return_value=MagicMock()))
def test_config_luks2_invalid_config(self) -> None:
with pytest.raises(RuntimeError):
encryption.CephLuks2('/dev/foo').config_luks2({'subsystem': 'ceph_fsid=1234-abcd', 'label': 'foo', 'foo': 'bar'})
@patch('ceph_volume.util.encryption.process.call', Mock(return_value=MagicMock()))
def test_config_luks2_invalid_config_keys(self) -> None:
with pytest.raises(RuntimeError):
encryption.CephLuks2('/dev/foo').config_luks2({'fake': 'fake-value', 'subsystem': 'ceph_fsid=1234-abcd'})
@patch('ceph_volume.util.encryption.process.call')
def test_config_luks2_ok(self, m_call: Any) -> None:
m_call.return_value = ('', '', 0)
encryption.CephLuks2('/dev/foo').config_luks2({'label': 'foo', 'subsystem': 'ceph_fsid=1234-abcd'})
assert m_call.mock_calls == [call(['cryptsetup', 'config', '/dev/foo', '--label', 'foo', '--subsystem', 'ceph_fsid=1234-abcd'], verbose_on_failure=False)]
@patch('ceph_volume.util.encryption.process.call')
def test_config_luks2_raises_exception(self, m_call: Any) -> None:
m_call.return_value = ('', '', 1)
with pytest.raises(RuntimeError):
encryption.CephLuks2('/dev/foo').config_luks2({'label': 'foo', 'subsystem': 'ceph_fsid=1234-abcd'})
def test_get_label(self) -> None:
with patch('ceph_volume.util.encryption._dd_read', return_value='fake-luks2-label'):
label: str = encryption.CephLuks2('/dev/foo').get_label()
assert label == 'fake-luks2-label'
def test_get_label_raises_exception(self) -> None:
with patch('ceph_volume.util.encryption._dd_read', side_effect=Exception('fake-error')):
with pytest.raises(RuntimeError):
encryption.CephLuks2('/dev/foo').get_label()
@patch.object(encryption.CephLuks2, 'get_subsystem', Mock(return_value='ceph_fsid=abcd'))
def test_get_osd_fsid(self) -> None:
assert encryption.CephLuks2('/dev/foo').get_osd_fsid() == 'abcd'
@patch.object(encryption.CephLuks2, 'get_label', Mock(return_value='ceph'))
def test_get_osd_fsid_error(self) -> None:
result: str = encryption.CephLuks2('/dev/foo').get_osd_fsid()
assert result == ''
def test_get_subsystem(self) -> None:
with patch('ceph_volume.util.encryption._dd_read', return_value='fake-luks2-subsystem'):
assert encryption.CephLuks2('/dev/foo').get_subsystem() == 'fake-luks2-subsystem'
def test_get_subsystem_raises_exception(self) -> None:
with patch('ceph_volume.util.encryption._dd_read', side_effect=Exception('fake-error')):
with pytest.raises(RuntimeError):
encryption.CephLuks2('/dev/foo').get_subsystem()
def test_get_json_area(self) -> None:
mock_json_data = '{"tokens": {"1": {"type": "systemd-tpm2"}}}'
with patch('ceph_volume.util.encryption._dd_read', return_value=mock_json_data):
assert encryption.CephLuks2('/dev/foo').get_json_area() == json.loads(mock_json_data)
def test_get_json_area_invalid(self) -> None:
with patch('ceph_volume.util.encryption._dd_read', return_value='invalid-json-data'):
with pytest.raises(RuntimeError):
encryption.CephLuks2('/dev/foo').get_json_area()
def test_get_json_area_exception_caught(self) -> None:
with patch('ceph_volume.util.encryption._dd_read', side_effect=OSError):
with pytest.raises(OSError):
encryption.CephLuks2('/dev/foo').get_json_area()
@patch('ceph_volume.util.encryption.lsblk', Mock(return_value={'FSTYPE': 'crypto_LUKS'}))
@patch.object(encryption.CephLuks2, 'get_json_area', Mock(return_value={"tokens": {"1": {"type": "systemd-tpm2"}}}))
def test_is_tpm2_enrolled_true(self) -> None:
assert encryption.CephLuks2('/dev/foo').is_tpm2_enrolled
@patch('ceph_volume.util.encryption.lsblk', Mock(return_value={'FSTYPE': 'whatever'}))
def test_is_tpm2_enrolled_false_not_a_luks_device(self) -> None:
assert not encryption.CephLuks2('/dev/foo').is_tpm2_enrolled
@patch('ceph_volume.util.encryption.lsblk', Mock(return_value={'FSTYPE': 'crypto_LUKS'}))
@patch.object(encryption.CephLuks2, 'get_json_area', Mock(return_value={"whatever": "fake-value"}))
def test_is_tpm2_enrolled_false_not_enrolled_with_tpm2(self) -> None:
assert not encryption.CephLuks2('/dev/foo').is_tpm2_enrolled

View File

@ -130,28 +130,35 @@ class TestOsdMkfsBluestore(object):
o.osd_mkfs()
assert '--keyfile' in fake_call.calls[2]['args'][0]
def test_keyring_is_not_added(self, fake_call, monkeypatch):
def test_keyring_is_not_added(self, fake_call, monkeypatch, factory):
args = factory(dmcrypt=False)
monkeypatch.setattr(system, 'chown', lambda path: True)
o = objectstore.bluestore.BlueStore([])
o.args = args
o.osd_id = '1'
o.osd_fsid = 'asdf'
o.osd_mkfs()
assert '--keyfile' not in fake_call.calls[0]['args'][0]
def test_wal_is_added(self, fake_call, monkeypatch, objectstore_bluestore):
def test_wal_is_added(self, fake_call, monkeypatch, objectstore_bluestore, factory):
args = factory(dmcrypt=False)
monkeypatch.setattr(system, 'chown', lambda path: True)
bs = objectstore_bluestore(objecstore='bluestore',
osd_id='1',
osd_fid='asdf',
wal_device_path='/dev/smm1',
cephx_secret='foo',)
cephx_secret='foo',
dmcrypt=False)
bs.args = args
bs.osd_mkfs()
assert '--bluestore-block-wal-path' in fake_call.calls[2]['args'][0]
assert '/dev/smm1' in fake_call.calls[2]['args'][0]
def test_db_is_added(self, fake_call, monkeypatch):
def test_db_is_added(self, fake_call, monkeypatch, factory):
args = factory(dmcrypt=False)
monkeypatch.setattr(system, 'chown', lambda path: True)
bs = objectstore.bluestore.BlueStore([])
bs.args = args
bs.db_device_path = '/dev/smm2'
bs.osd_mkfs()
assert '--bluestore-block-db-path' in fake_call.calls[2]['args'][0]

View File

@ -92,6 +92,9 @@ class ValidRawDevice(ValidDevice):
super().get_device(dev_path)
return self._format_device(self._is_valid_device())
def _format_device(self, device: Device) -> str:
return device.path
def _is_valid_device(self, raise_sys_exit=True):
out, err, rc = process.call([
'ceph-bluestore-tool', 'show-label',

View File

@ -3,12 +3,11 @@
import logging
import os
from functools import total_ordering
from ceph_volume import sys_info
from ceph_volume import sys_info, allow_loop_devices
from ceph_volume.api import lvm
from ceph_volume.util import disk, system
from ceph_volume.util.lsmdisk import LSMDisk
from ceph_volume.util.constants import ceph_disk_guids
from ceph_volume.util.disk import allow_loop_devices
logger = logging.getLogger(__name__)
@ -212,12 +211,21 @@ class Device(object):
lv = _lv
break
else:
filters = {}
if self.path[0] == '/':
lv = lvm.get_single_lv(filters={'lv_path': self.path})
lv_mapper_path: str = self.path
field: str = 'lv_path'
if self.path.startswith('/dev/mapper') or self.path.startswith('/dev/dm-'):
path = os.path.realpath(self.path) if self.path.startswith('/dev/mapper') else self.path
lv_mapper_path = disk.get_lvm_mapper_path_from_dm(path)
field = 'lv_dm_path'
filters = {field: lv_mapper_path}
else:
vgname, lvname = self.path.split('/')
lv = lvm.get_single_lv(filters={'lv_name': lvname,
'vg_name': vgname})
filters = {'lv_name': lvname, 'vg_name': vgname}
lv = lvm.get_single_lv(filters=filters)
if lv:
self.lv_api = lv

View File

@ -3,7 +3,8 @@ import os
import re
import stat
import time
from ceph_volume import process
import json
from ceph_volume import process, allow_loop_devices
from ceph_volume.api import lvm
from ceph_volume.util.system import get_file_contents
from typing import Dict, List, Any
@ -727,31 +728,6 @@ def is_mapper_device(device_name):
return device_name.startswith(('/dev/mapper', '/dev/dm-'))
class AllowLoopDevices(object):
allow = False
warned = False
@classmethod
def __call__(cls):
val = os.environ.get("CEPH_VOLUME_ALLOW_LOOP_DEVICES", "false").lower()
if val not in ("false", 'no', '0'):
cls.allow = True
if not cls.warned:
logger.warning(
"CEPH_VOLUME_ALLOW_LOOP_DEVICES is set in your "
"environment, so we will allow the use of unattached loop"
" devices as disks. This feature is intended for "
"development purposes only and will never be supported in"
" production. Issues filed based on this behavior will "
"likely be ignored."
)
cls.warned = True
return cls.allow
allow_loop_devices = AllowLoopDevices()
def get_block_devs_sysfs(_sys_block_path: str = '/sys/block', _sys_dev_block_path: str = '/sys/dev/block', device: str = '') -> List[List[str]]:
def holder_inner_loop() -> bool:
for holder in holders:
@ -963,3 +939,193 @@ def get_lvm_mappers(sys_block_path: str = '/sys/block') -> List[str]:
result.append(f'/dev/mapper/{name.strip()}')
result.append(f'/dev/{device}')
return result
def _dd_read(device: str, count: int, skip: int = 0) -> str:
"""Read bytes from a device
Args:
device (str): The device to read bytes from.
count (int): The number of bytes to read.
skip (int, optional): The number of bytes to skip at the beginning. Defaults to 0.
Returns:
str: A string containing the read bytes.
"""
result: str = ''
try:
with open(device, 'rb') as b:
b.seek(skip)
data: bytes = b.read(count)
result = data.decode('utf-8').replace('\x00', '')
except OSError:
logger.warning(f"Can't read from {device}")
pass
except UnicodeDecodeError:
pass
except Exception as e:
logger.error(f"An error occurred while reading from {device}: {e}")
raise
return result
def _dd_write(device: str, data: str, skip: int = 0) -> None:
"""Write bytes to a device
Args:
device (str): The device to write bytes to.
data (str): The data to write to the device.
skip (int, optional): The number of bytes to skip at the beginning. Defaults to 0.
Raises:
OSError: If there is an error opening or writing to the device.
Exception: If any other error occurs during the write operation.
"""
try:
with open(device, 'r+b') as b:
b.seek(skip)
b.write(data.encode('utf-8'))
except OSError:
logger.warning(f"Can't write to {device}")
raise
except Exception as e:
logger.error(f"An error occurred while writing to {device}: {e}")
raise
def get_bluestore_header(device: str) -> Dict[str, Any]:
"""Retrieve BlueStore header information from a given device.
This function retrieves BlueStore header information from the specified 'device'.
It first checks if the device exists. If the device does not exist, a RuntimeError
is raised. Then, it calls the 'ceph-bluestore-tool' command to show the label
information of the device. If the command execution is successful, it parses the
JSON output containing the BlueStore header information and returns it as a dictionary.
Args:
device (str): The path to the device.
Returns:
Dict[str, Any]: A dictionary containing BlueStore header information.
"""
data: Dict[str, Any] = {}
if os.path.exists(device):
out, err, rc = process.call([
'ceph-bluestore-tool', 'show-label',
'--dev', device], verbose_on_failure=False)
if rc:
logger.debug(f'device {device} is not BlueStore; ceph-bluestore-tool failed to get info from device: {out}\n{err}')
else:
data = json.loads(''.join(out))
else:
logger.warning(f'device {device} not found.')
return data
def bluestore_info(device: str, bluestore_labels: Dict[str, Any]) -> Dict[str, Any]:
"""Build a dict representation of a BlueStore header
Args:
device (str): The path of the BlueStore device.
bluestore_labels (Dict[str, Any]): Plain text output from `ceph-bluestore-tool show-label`
Returns:
Dict[str, Any]: Generated dict representation of the BlueStore header
"""
result: Dict[str, Any] = {}
result['osd_uuid'] = bluestore_labels[device]['osd_uuid']
if bluestore_labels[device]['description'] == 'main':
whoami = bluestore_labels[device]['whoami']
result.update({
'type': bluestore_labels[device].get('type', 'bluestore'),
'osd_id': int(whoami),
'ceph_fsid': bluestore_labels[device]['ceph_fsid'],
'device': device,
})
if bluestore_labels[device].get('db_device_uuid', ''):
result['db_device_uuid'] = bluestore_labels[device].get('db_device_uuid')
if bluestore_labels[device].get('wal_device_uuid', ''):
result['wal_device_uuid'] = bluestore_labels[device].get('wal_device_uuid')
elif bluestore_labels[device]['description'] == 'bluefs db':
result['device_db'] = device
elif bluestore_labels[device]['description'] == 'bluefs wal':
result['device_wal'] = device
return result
def get_block_device_holders(sys_block: str = '/sys/block') -> Dict[str, Any]:
"""Get a dictionary of device mappers with their corresponding parent devices.
This function retrieves information about device mappers and their parent devices
from the '/sys/block' directory. It iterates through each directory within 'sys_block',
and for each directory, it checks if a 'holders' directory exists. If so, it lists
the contents of the 'holders' directory and constructs a dictionary where the keys
are the device mappers and the values are their corresponding parent devices.
Args:
sys_block (str, optional): The path to the '/sys/block' directory. Defaults to '/sys/block'.
Returns:
Dict[str, Any]: A dictionary where keys are device mappers (e.g., '/dev/mapper/...') and
values are their corresponding parent devices (e.g., '/dev/sdX').
"""
result: Dict[str, Any] = {}
for b in os.listdir(sys_block):
path: str = os.path.join(sys_block, b, 'holders')
if os.path.exists(path):
for h in os.listdir(path):
result[f'/dev/{h}'] = f'/dev/{b}'
return result
def get_parent_device_from_mapper(mapper: str, abspath: bool = True) -> str:
"""Get the parent device corresponding to a given device mapper.
This function retrieves the parent device corresponding to a given device mapper
from the dictionary returned by the 'get_block_device_holders' function. It first
checks if the specified 'mapper' exists. If it does, it resolves the real path of
the mapper using 'os.path.realpath'. Then, it attempts to retrieve the parent device
from the dictionary. If the mapper is not found in the dictionary, an empty string
is returned.
Args:
mapper (str): The path to the device mapper.
abspath (bool, optional): If True (default), returns the absolute path of the parent device.
If False, returns only the basename of the parent device.
Returns:
str: The parent device corresponding to the given device mapper, or an empty string
if the mapper is not found in the dictionary of device mappers.
"""
result: str = ''
if os.path.exists(mapper):
_mapper: str = os.path.realpath(mapper)
try:
result = get_block_device_holders()[_mapper]
if not abspath:
result = os.path.basename(result)
except KeyError:
pass
return result
def get_lvm_mapper_path_from_dm(path: str, sys_block: str = '/sys/block') -> str:
"""_summary_
Retrieve the logical volume path for a given device.
This function takes the path of a device and returns the corresponding
logical volume path by reading the 'dm/name' file within the sysfs
directory.
Args:
path (str): The device path for which to retrieve the logical volume path.
sys_block (str, optional): The base sysfs block directory. Defaults to '/sys/block'.
Returns:
str: The device mapper path in the form of '/dev/dm-X'.
"""
result: str = ''
dev: str = os.path.basename(path)
sys_block_path: str = os.path.join(sys_block, dev, 'dm/name')
if os.path.exists(sys_block_path):
with open(sys_block_path, 'r') as f:
content: str = f.read()
result = f'/dev/mapper/{content}'
return result

View File

@ -2,12 +2,14 @@ import base64
import os
import logging
import re
import json
from ceph_volume import process, conf, terminal
from ceph_volume.util import constants, system
from ceph_volume.util.device import Device
from .prepare import write_keyring
from .disk import lsblk, device_family, get_part_entry_type
from .disk import lsblk, device_family, get_part_entry_type, _dd_read
from packaging import version
from typing import Any, Dict, List
logger = logging.getLogger(__name__)
mlogger = terminal.MultiLogger(__name__)
@ -82,7 +84,7 @@ def get_key_size_from_conf():
return key_size
def create_dmcrypt_key():
def create_dmcrypt_key() -> str:
"""
Create the secret dm-crypt key (KEK) used to encrypt/decrypt the Volume Key.
"""
@ -91,7 +93,7 @@ def create_dmcrypt_key():
return key
def luks_format(key, device):
def luks_format(key: str, device: str) -> None:
"""
Decrypt (open) an encrypted device, previously prepared with cryptsetup
@ -140,7 +142,47 @@ def plain_open(key, device, mapping):
process.call(command, stdin=key, terminal_verbose=True, show_command=True)
def luks_open(key, device, mapping):
def luks_close(mapping: str) -> None:
"""Close a LUKS2 mapper device.
Args:
mapping (str): the name of the mapper to be closed.
"""
command: List[str] = ['cryptsetup',
'luksClose',
mapping]
process.call(command,
terminal_verbose=True,
show_command=True)
def rename_mapper(current: str, new: str) -> None:
"""Rename a mapper
Args:
old (str): current name
new (str): new name
"""
command: List[str] = [
'dmsetup',
'rename',
current,
new
]
_, err, rc = process.call(command,
terminal_verbose=True,
show_command=True)
if rc:
raise RuntimeError(f"Can't rename mapper '{current}' to '{new}': {err}")
def luks_open(key: str,
device: str,
mapping: str,
with_tpm: int = 0) -> None:
"""
Decrypt (open) an encrypted device, previously prepared with cryptsetup
@ -150,23 +192,38 @@ def luks_open(key, device, mapping):
:param device: absolute path to device
:param mapping: mapping name used to correlate device. Usually a UUID
"""
command = [
'cryptsetup',
'--key-size',
get_key_size_from_conf(),
'--key-file',
'-',
'--allow-discards', # allow discards (aka TRIM) requests for device
'luksOpen',
device,
mapping,
]
command: List[str] = []
if with_tpm:
command = ['/usr/lib/systemd/systemd-cryptsetup',
'attach',
mapping,
device,
'-',
'tpm2-device=auto,discard']
if bypass_workqueue(device):
command[-1] += ',no-read-workqueue,no-write-workqueue'
else:
command = [
'cryptsetup',
'--key-size',
get_key_size_from_conf(),
'--key-file',
'-',
'--allow-discards', # allow discards (aka TRIM) requests for device
'luksOpen',
device,
mapping,
]
if bypass_workqueue(device):
command.extend(['--perf-no_read_workqueue',
'--perf-no_write_workqueue'])
if bypass_workqueue(device):
command.extend(['--perf-no_read_workqueue',
'--perf-no_write_workqueue'])
process.call(command, stdin=key, terminal_verbose=True, show_command=True)
process.call(command,
run_on_host=with_tpm,
stdin=key,
terminal_verbose=True,
show_command=True)
def dmcrypt_close(mapping, skip_path_check=False):
@ -355,3 +412,160 @@ def prepare_dmcrypt(key, device, mapping):
mapping
)
return '/dev/mapper/%s' % mapping
class CephLuks2:
def __init__(self, device: str) -> None:
self.device: str = device
self.osd_fsid: str = ''
if self.is_ceph_encrypted:
self.osd_fsid = self.get_osd_fsid()
@property
def has_luks2_signature(self) -> bool:
try:
return _dd_read(self.device, 4) == 'LUKS'
except Exception as e:
raise RuntimeError(e)
@property
def is_ceph_encrypted(self) -> bool:
"""Check whether a device is used for a Ceph encrypted OSD
Args:
device (str): The path of the device being checked.
Returns:
bool: `True` if the device is used by an encrypted Ceph OSD, else `False`.
"""
result: bool = False
try:
result = self.has_luks2_signature and 'ceph_fsid=' in self.get_subsystem()
except RuntimeError:
pass
return result
def config_luks2(self, config: Dict[str, str]) -> None:
"""Set the subsystem of a LUKS2 device
Args:
config (str): The config to apply to the LUKS2 device.
Raises:
RuntimeError: If it can't set LUKS2 configuration.
"""
if not (0 < len(config) <= 2):
raise RuntimeError(f'Invalid config for LUKS2 device {self.device}')
valid_keys = ['label', 'subsystem']
if not all(key in valid_keys for key in config.keys()):
raise RuntimeError(f'LUKS2 config for device {self.device} can only be "label" and/or "subsystem".')
command: List[str] = ['cryptsetup', 'config',
self.device]
for k, v in config.items():
command.extend([f'--{k}', v])
_, err, rc = process.call(command, verbose_on_failure=False)
if rc:
raise RuntimeError(f"Can't set luks2 config to {self.device}:\n{err}")
def get_label(self) -> str:
"""Get the label of a LUKS2 device
Args:
device (str): The device to get the LUKS label from.
Returns:
str: The LUKS2 label of the device.
"""
result: str = ''
try:
result = _dd_read(self.device, 48, 24)
except Exception:
raise RuntimeError(f"Can't get luks2 label from {self.device}")
return result
def get_osd_fsid(self) -> str:
"""Get the osd fsid.
Returns:
str: The OSD fsid
"""
result: str = ''
try:
subsystem = self.get_subsystem()
result = subsystem.split('=')[1]
except IndexError:
logger.debug(f"LUKS2 device {self.device} doesn't have ceph osd fsid detail. Please check LUKS2 label for this device.")
return result
def get_subsystem(self) -> str:
"""Get the subsystem of a LUKS2 device
Args:
device (str): The device to get the LUKS subsystem from.
Returns:
str: The LUKS2 subsystem of the device.
"""
result: str = ''
try:
result = _dd_read(self.device, 48, 208)
except Exception as e:
raise RuntimeError(f"Can't get luks2 label from {self.device}:\n{e}")
return result
def get_json_area(self) -> Dict[str, Any]:
"""Retrieve the LUKS2 JSON configuration area from a given device.
This function reads the LUKS2 JSON configuration area from the specified 'device'.
It first checks if the device contains a LUKS2 signature. If not, an empty dictionary
is returned. If a LUKS2 signature is found, it reads the JSON configuration area
starting from byte offset 4096 (4 KB) and extracts the configuration data.
Args:
device (str): The path to the device.
Raises:
RuntimeError: If the LUKS2 JSON area on the device is invalid or cannot be decoded.
Returns:
Dict[str, Any]: A dictionary containing the extracted LUKS2 JSON configuration data.
"""
result: Dict[str, Any] = {}
try:
data: str = _dd_read(self.device, 12288, 4096)
result = json.loads(data)
except json.JSONDecodeError:
msg: str = f"LUKS2 json area for device {self.device} seems invalid."
raise RuntimeError(msg)
except Exception:
raise
return result
@property
def is_tpm2_enrolled(self) -> bool:
"""Check if a given device is enrolled with TPM2.
This function checks if the specified 'device' is enrolled with TPM2.
It first determines if the device is a LUKS encrypted volume by checking
its filesystem type using lsblk. If the filesystem type is 'crypto_LUKS',
it extracts the LUKS2 JSON configuration area from the device using the
'get_luks2_json_area' function. If the JSON area contains a 'systemd-tpm2'
token, it indicates that the device is enrolled with TPM2.
Args:
device (str): The path to the device.
Returns:
bool: True if the device is enrolled with TPM2, False otherwise.
"""
if lsblk(self.device).get('FSTYPE', '') == 'crypto_LUKS':
json_area: Dict[str, Any] = self.get_json_area()
if 'tokens' in json_area.keys():
for token in json_area['tokens'].keys():
if json_area['tokens'][token].get('type', '') == 'systemd-tpm2':
return True
return False

View File

@ -166,7 +166,7 @@ class DriveGroupSpec(ServiceSpec):
"""
_supported_features = [
"encrypted", "block_wal_size", "osds_per_device",
"encrypted", "tpm2", "block_wal_size", "osds_per_device",
"db_slots", "wal_slots", "block_db_size", "placement", "service_id", "service_type",
"data_devices", "db_devices", "wal_devices", "journal_devices",
"data_directories", "osds_per_device", "objectstore", "osd_id_claims",
@ -185,6 +185,7 @@ class DriveGroupSpec(ServiceSpec):
osds_per_device=None, # type: Optional[int]
objectstore='bluestore', # type: str
encrypted=False, # type: bool
tpm2=False, # type: bool
db_slots=None, # type: Optional[int]
wal_slots=None, # type: Optional[int]
osd_id_claims=None, # type: Optional[Dict[str, List[str]]]
@ -248,6 +249,9 @@ class DriveGroupSpec(ServiceSpec):
#: ``true`` or ``false``
self.encrypted = encrypted
#: ``true`` or ``false``
self.tpm2 = tpm2
#: How many OSDs per DB device
self.db_slots = db_slots

View File

@ -132,6 +132,9 @@ class to_ceph_volume(object):
if self.spec.encrypted:
cmds[i] += " --dmcrypt"
if self.spec.tpm2:
cmds[i] += " --with-tpm"
if self.spec.osds_per_device:
cmds[i] += " --osds-per-device {}".format(self.spec.osds_per_device)