ceph/src/ceph-disk
Sage Weil 2ea8fac441 ceph-disk: use /sys/block to determine partition device names
Not all devices are basename + number; some have intervening character(s),
like /dev/cciss/c0d1p2.

Signed-off-by: Sage Weil <sage@inktank.com>
2013-07-16 15:51:44 -07:00

2299 lines
64 KiB
Python
Executable File

#!/usr/bin/python
import argparse
import errno
import fcntl
import logging
import os
import os.path
import platform
import re
import subprocess
import stat
import sys
import tempfile
import uuid
"""
Prepare:
- create GPT partition
- mark the partition with the ceph type uuid
- create a file system
- mark the fs as ready for ceph consumption
- entire data disk is used (one big partition)
- a new partition is added to the journal disk (so it can be easily shared)
- triggered by administrator or ceph-deploy, e.g. 'ceph-disk <data disk> [journal disk]
Activate:
- mount the volume in a temp loation
- allocate an osd id (if needed)
- remount in the correct location /var/lib/ceph/osd/$cluster-$id
- start ceph-osd
- triggered by udev when it sees the OSD gpt partition type
- triggered by admin 'ceph-disk activate <path>'
- triggered on ceph service startup with 'ceph-disk activate-all'
We rely on /dev/disk/by-partuuid to find partitions by their UUID;
this is what the journal symlink inside the osd data volume normally
points to.
activate-all relies on /dev/disk/by-parttype-uuid/$typeuuid.$uuid to
find all partitions. We install special udev rules to create these
links.
udev triggers 'ceph-disk activate <dev>' or 'ceph-disk
activate-journal <dev>' based on the partition type.
On old distros (e.g., RHEL6), the blkid installed does not recognized
GPT partition metadata and the /dev/disk/by-partuuid etc. links aren't
present. We have a horrible hack in the form of ceph-disk-udev that
parses gparted output to create the symlinks above and triggers the
'ceph-disk activate' etc commands that udev normally would do if it
knew the GPT partition type.
"""
CEPH_OSD_ONDISK_MAGIC = 'ceph osd volume v026'
JOURNAL_UUID = '45b0969e-9b03-4f30-b4c6-b4b80ceff106'
DMCRYPT_JOURNAL_UUID = '45b0969e-9b03-4f30-b4c6-5ec00ceff106'
OSD_UUID = '4fbd7e29-9d25-41b8-afd0-062c0ceff05d'
DMCRYPT_OSD_UUID = '4fbd7e29-9d25-41b8-afd0-5ec00ceff05d'
TOBE_UUID = '89c57f98-2fe5-4dc0-89c1-f3ad0ceff2be'
DMCRYPT_TOBE_UUID = '89c57f98-2fe5-4dc0-89c1-5ec00ceff2be'
DEFAULT_FS_TYPE = 'xfs'
MOUNT_OPTIONS = dict(
btrfs='noatime,user_subvol_rm_allowed',
# user_xattr is default ever since linux 2.6.39 / 3.0, but we'll
# delay a moment before removing it fully because we did have some
# issues with ext4 before the xatts-in-leveldb work, and it seemed
# that user_xattr helped
ext4='noatime,user_xattr',
xfs='noatime',
)
MKFS_ARGS = dict(
btrfs=[
'-m', 'single',
'-l', '32768',
'-n', '32768',
],
xfs=[
# xfs insists on not overwriting previous fs; even if we wipe
# partition table, we often recreate it exactly the same way,
# so we'll see ghosts of filesystems past
'-f',
'-i', 'size=2048',
],
)
INIT_SYSTEMS = [
'upstart',
'sysvinit',
'systemd',
'auto',
]
# Nuke the TERM variable to avoid confusing any subprocesses we call.
# For example, libreadline will print weird control sequences for some
# TERM values.
if 'TERM' in os.environ:
del os.environ['TERM']
LOG_NAME = __name__
if LOG_NAME == '__main__':
LOG_NAME = os.path.basename(sys.argv[0])
LOG = logging.getLogger(LOG_NAME)
###### lock ########
class filelock(object):
def __init__(self, fn):
self.fn = fn
self.fd = None
def acquire(self):
assert not self.fd
self.fd = file(self.fn, 'w')
fcntl.lockf(self.fd, fcntl.LOCK_EX)
def release(self):
assert self.fd
fcntl.lockf(self.fd, fcntl.LOCK_UN)
self.fd = None
prepare_lock = filelock('/var/lib/ceph/tmp/ceph-disk.prepare.lock')
activate_lock = filelock('/var/lib/ceph/tmp/ceph-disk.activate.lock')
###### exceptions ########
class Error(Exception):
"""
Error
"""
def __str__(self):
doc = self.__doc__.strip()
return ': '.join([doc] + [str(a) for a in self.args])
class MountError(Error):
"""
Mounting filesystem failed
"""
class UnmountError(Error):
"""
Unmounting filesystem failed
"""
class BadMagicError(Error):
"""
Does not look like a Ceph OSD, or incompatible version
"""
class TruncatedLineError(Error):
"""
Line is truncated
"""
class TooManyLinesError(Error):
"""
Too many lines
"""
class FilesystemTypeError(Error):
"""
Cannot discover filesystem type
"""
####### utils
def maybe_mkdir(*a, **kw):
"""
Creates a new directory if it doesn't exist, removes
existing symlink before creating the directory.
"""
# remove any symlink, if it is there..
if os.path.exists(*a) and stat.S_ISLNK(os.lstat(*a).st_mode):
LOG.debug('Removing old symlink at %s', *a)
os.unlink(*a)
try:
os.mkdir(*a, **kw)
except OSError, e:
if e.errno == errno.EEXIST:
pass
else:
raise
# a device "name" is something like
# sdb
# cciss!c0d1
def get_dev_name(path):
"""
get device name from path. e.g., /dev/sda -> sdas, /dev/cciss/c0d1 -> cciss!c0d1
"""
assert path.startswith('/dev/')
base = path[5:]
return base.replace('/', '!')
# a device "path" is something like
# /dev/sdb
# /dev/cciss/c0d1
def get_dev_path(name):
"""
get a path (/dev/...) from a name (cciss!c0d1)
"""
return '/dev/' + name.replace('!', '/')
def get_dev_relpath(name):
"""
get a relative path to /dev from a name (cciss!c0d1)
"""
return name.replace('!', '/')
def get_partition_dev(dev, pnum):
"""
get the device name for a partition
assume that partitions are named like the base dev, with a number, and optionally
some intervening characters (like 'p'). e.g.,
sda 1 -> sda1
cciss/c0d1 1 -> cciss!c0d1p1
"""
name = get_dev_name(os.path.realpath(dev))
partname = None
for f in os.listdir(os.path.join('/sys/block', name)):
if f.startswith(name) and f.endswith(str(pnum)):
# we want the shortest name that starts with the base name and ends with the partition number
if not partname or len(f) < len(partname):
partname = f
if partname:
return get_dev_path(partname)
else:
raise Error('partition %d for %s does not appear to exist' % (pnum, dev))
def list_all_partitions():
"""
Return a list of devices and partitions
"""
dev_part_list = {}
for name in os.listdir('/sys/block'):
if not os.path.exists(os.path.join('/sys/block', name, 'device')):
continue
dev_part_list[name] = list_partitions(name)
return dev_part_list
def list_partitions(basename):
"""
Return a list of partitions on the given device name
"""
partitions = []
for name in os.listdir(os.path.join('/sys/block', basename)):
if name.startswith(basename):
partitions.append(name)
return partitions
def is_partition(dev):
"""
Check whether a given device path is a partition or a full disk.
"""
dev = os.path.realpath(dev)
if not stat.S_ISBLK(os.lstat(dev).st_mode):
raise Error('not a block device', dev)
name = get_dev_name(dev)
if os.path.exists(os.path.join('/sys/block', name)):
return False
# make sure it is a partition of something else
for basename in os.listdir('/sys/block'):
if os.path.exists(os.path.join('/sys/block', basename, name)):
return True
raise Error('not a disk or partition', dev)
def is_mounted(dev):
"""
Check if the given device is mounted.
"""
dev = os.path.realpath(dev)
with file('/proc/mounts', 'rb') as proc_mounts:
for line in proc_mounts:
fields = line.split()
if len(fields) < 3:
continue
mounts_dev = fields[0]
path = fields[1]
if mounts_dev.startswith('/') and os.path.exists(mounts_dev):
mounts_dev = os.path.realpath(mounts_dev)
if mounts_dev == dev:
return path
return None
def is_held(dev):
"""
Check if a device is held by another device (e.g., a dm-crypt mapping)
"""
assert os.path.exists(dev)
dev = os.path.realpath(dev)
base = get_dev_name(dev)
# full disk?
directory = '/sys/block/{base}/holders'.format(base=base)
if os.path.exists(directory):
return os.listdir(directory)
# partition?
part = base
while len(base):
directory = '/sys/block/{base}/{part}/holders'.format(part=part, base=base)
if os.path.exists(directory):
return os.listdir(directory)
base = base[:-1]
return []
def verify_not_in_use(dev):
"""
Verify if a given device (path) is in use (e.g. mounted or
in use by device-mapper).
:raises: Error if device is in use.
"""
assert os.path.exists(dev)
if is_partition(dev):
if is_mounted(dev):
raise Error('Device is mounted', dev)
holders = is_held(dev)
if holders:
raise Error('Device is in use by a device-mapper mapping (dm-crypt?)' % dev, ','.join(holders))
else:
basename = get_dev_name(os.path.realpath(dev))
for partname in list_partitions(basename):
partition = get_dev_path(partname)
if is_mounted(partition):
raise Error('Device is mounted', partition)
holders = is_held(partition)
if holders:
raise Error('Device %s is in use by a device-mapper mapping (dm-crypt?)' % partition, ','.join(holders))
def must_be_one_line(line):
"""
Checks if given line is really one single line.
:raises: TruncatedLineError or TooManyLinesError
:return: Content of the line, or None if line isn't valid.
"""
if line[-1:] != '\n':
raise TruncatedLineError(line)
line = line[:-1]
if '\n' in line:
raise TooManyLinesError(line)
return line
def read_one_line(parent, name):
"""
Read a file whose sole contents are a single line.
Strips the newline.
:return: Contents of the line, or None if file did not exist.
"""
path = os.path.join(parent, name)
try:
line = file(path, 'rb').read()
except IOError as e:
if e.errno == errno.ENOENT:
return None
else:
raise
try:
line = must_be_one_line(line)
except (TruncatedLineError, TooManyLinesError) as e:
raise Error('File is corrupt: {path}: {msg}'.format(
path=path,
msg=e,
))
return line
def write_one_line(parent, name, text):
"""
Write a file whose sole contents are a single line.
Adds a newline.
"""
path = os.path.join(parent, name)
tmp = '{path}.{pid}.tmp'.format(path=path, pid=os.getpid())
with file(tmp, 'wb') as tmp_file:
tmp_file.write(text + '\n')
os.fsync(tmp_file.fileno())
os.rename(tmp, path)
def check_osd_magic(path):
"""
Check that this path has the Ceph OSD magic.
:raises: BadMagicError if this does not look like a Ceph OSD data
dir.
"""
magic = read_one_line(path, 'magic')
if magic is None:
# probably not mkfs'ed yet
raise BadMagicError(path)
if magic != CEPH_OSD_ONDISK_MAGIC:
raise BadMagicError(path)
def check_osd_id(osd_id):
"""
Ensures osd id is numeric.
"""
if not re.match(r'^[0-9]+$', osd_id):
raise Error('osd id is not numeric')
def allocate_osd_id(
cluster,
fsid,
keyring,
):
"""
Accocates an OSD id on the given cluster.
:raises: Error if the call to allocate the OSD id fails.
:return: The allocated OSD id.
"""
LOG.debug('Allocating OSD id...')
try:
osd_id = _check_output(
args=[
'/usr/bin/ceph',
'--cluster', cluster,
'--name', 'client.bootstrap-osd',
'--keyring', keyring,
'osd', 'create', '--concise',
fsid,
],
)
except subprocess.CalledProcessError as e:
raise Error('ceph osd create failed', e)
osd_id = must_be_one_line(osd_id)
check_osd_id(osd_id)
return osd_id
def get_osd_id(path):
"""
Gets the OSD id of the OSD at the given path.
"""
osd_id = read_one_line(path, 'whoami')
if osd_id is not None:
check_osd_id(osd_id)
return osd_id
def _check_output(*args, **kwargs):
process = subprocess.Popen(
stdout=subprocess.PIPE,
*args, **kwargs)
out, _ = process.communicate()
ret = process.wait()
if ret:
cmd = kwargs.get("args")
if cmd is None:
cmd = args[0]
#raise subprocess.CalledProcessError(ret, cmd, output=out)
error = subprocess.CalledProcessError(ret, cmd)
error.output = out
raise error
return out
def get_conf(cluster, variable):
"""
Get the value of the given configuration variable from the
cluster.
:raises: Error if call to ceph-conf fails.
:return: The variable value or None.
"""
try:
process = subprocess.Popen(
args=[
'/usr/bin/ceph-conf',
'--cluster={cluster}'.format(
cluster=cluster,
),
'--name=osd.',
'--lookup',
variable,
],
stdout=subprocess.PIPE,
close_fds=True,
)
except OSError as e:
raise Error('error executing ceph-conf', e)
(out, _err) = process.communicate()
ret = process.wait()
if ret == 1:
# config entry not found
return None
elif ret != 0:
raise Error('getting variable from configuration failed')
value = str(out).split('\n', 1)[0]
# don't differentiate between "var=" and no var set
if not value:
return None
return value
def get_conf_with_default(cluster, variable):
"""
Get a config value that is known to the C++ code.
This will fail if called on variables that are not defined in
common config options.
"""
try:
out = _check_output(
args=[
'ceph-osd',
'--cluster={cluster}'.format(
cluster=cluster,
),
'--show-config-value={variable}'.format(
variable=variable,
),
],
close_fds=True,
)
except subprocess.CalledProcessError as e:
raise Error(
'getting variable from configuration failed',
e,
)
value = str(out).split('\n', 1)[0]
return value
def get_fsid(cluster):
"""
Get the fsid of the cluster.
:return: The fsid or raises Error.
"""
fsid = get_conf(cluster=cluster, variable='fsid')
if fsid is None:
raise Error('getting cluster uuid from configuration failed')
return fsid
def get_or_create_dmcrypt_key(
_uuid,
key_dir,
):
"""
Get path to dmcrypt key or create a new key file.
:return: Path to the dmcrypt key file.
"""
path = os.path.join(key_dir, _uuid)
# already have it?
if os.path.exists(path):
return path
# make a new key
try:
if not os.path.exists(key_dir):
os.makedirs(key_dir)
with file('/dev/urandom', 'rb') as i:
key = i.read(256)
with file(path, 'wb') as key_file:
key_file.write(key)
return path
except:
raise Error('unable to read or create dm-crypt key', path)
def dmcrypt_map(
rawdev,
keypath,
_uuid,
):
"""
Maps a device to a dmcrypt device.
:return: Path to the dmcrypt device.
"""
dev = '/dev/mapper/'+ _uuid
args = [
'cryptsetup',
'--key-file',
keypath,
'--key-size', '256',
'create',
_uuid,
rawdev,
]
try:
subprocess.check_call(args)
return dev
except subprocess.CalledProcessError as e:
raise Error('unable to map device', rawdev, e)
def dmcrypt_unmap(
_uuid
):
"""
Removes the dmcrypt device with the given UUID.
"""
args = [
'cryptsetup',
'remove',
_uuid
]
try:
subprocess.check_call(args)
except subprocess.CalledProcessError as e:
raise Error('unable to unmap device', _uuid, e)
def mount(
dev,
fstype,
options,
):
"""
Mounts a device with given filessystem type and
mount options to a tempfile path under /var/lib/ceph/tmp.
"""
# pick best-of-breed mount options based on fs type
if options is None:
options = MOUNT_OPTIONS.get(fstype, '')
# mount
path = tempfile.mkdtemp(
prefix='mnt.',
dir='/var/lib/ceph/tmp',
)
try:
LOG.debug('Mounting %s on %s with options %s', dev, path, options)
subprocess.check_call(
args=[
'mount',
'-o', options,
'--',
dev,
path,
],
)
except subprocess.CalledProcessError as e:
try:
os.rmdir(path)
except (OSError, IOError):
pass
raise MountError(e)
return path
def unmount(
path,
):
"""
Unmount and removes the given mount point.
"""
try:
LOG.debug('Unmounting %s', path)
subprocess.check_call(
args=[
'/bin/umount',
'--',
path,
],
)
except subprocess.CalledProcessError as e:
raise UnmountError(e)
os.rmdir(path)
###########################################
def get_free_partition_index(dev):
"""
Get the next free partition index on a given device.
:return: Index number (> 1 if there is already a partition on the device)
or 1 if there is no partition table.
"""
try:
lines = _check_output(
args=[
'parted',
'--machine',
'--',
dev,
'print',
],
)
except subprocess.CalledProcessError as e:
print 'cannot read partition index; assume it isn\'t present\n (Error: %s)' % e
return 1
if not lines:
raise Error('parted failed to output anything')
lines = str(lines).splitlines(True)
# work around buggy libreadline(?) library in rhel/centos.
idiot_prefix = '\x1b\x5b\x3f\x31\x30\x33\x34\x68';
if lines[0].startswith(idiot_prefix):
lines[0] = lines[0][8:]
if lines[0] not in ['CHS;\n', 'CYL;\n', 'BYT;\n']:
raise Error('weird parted units', lines[0])
del lines[0]
if not lines[0].startswith('/dev/'):
raise Error('weird parted disk entry', lines[0])
del lines[0]
seen = set()
for line in lines:
idx, _ = line.split(':', 1)
idx = int(idx)
seen.add(idx)
num = 1
while num in seen:
num += 1
return num
def zap(dev):
"""
Destroy the partition table and content of a given disk.
"""
try:
LOG.debug('Zapping partition table on %s', dev)
# try to wipe out any GPT partition table backups. sgdisk
# isn't too thorough.
lba_size = 4096
size = 33 * lba_size
with file(dev, 'wb') as dev_file:
dev_file.seek(-size, os.SEEK_END)
dev_file.write(size*'\0')
subprocess.check_call(
args=[
'sgdisk',
'--zap-all',
'--clear',
'--mbrtogpt',
'--',
dev,
],
)
except subprocess.CalledProcessError as e:
raise Error(e)
def prepare_journal_dev(
data,
journal,
journal_size,
journal_uuid,
journal_dm_keypath,
):
if is_partition(journal):
LOG.debug('Journal %s is a partition', journal)
LOG.warning('OSD will not be hot-swappable if journal is not the same device as the osd data')
return (journal, None, None)
ptype = JOURNAL_UUID
if journal_dm_keypath:
ptype = DMCRYPT_JOURNAL_UUID
# it is a whole disk. create a partition!
num = None
if journal == data:
# we're sharing the disk between osd data and journal;
# make journal be partition number 2, so it's pretty
num = 2
journal_part = '{num}:0:{size}M'.format(
num=num,
size=journal_size,
)
else:
# sgdisk has no way for me to say "whatever is the next
# free index number" when setting type guids etc, so we
# need to awkwardly look up the next free number, and then
# fix that in the call -- and hope nobody races with us;
# then again nothing guards the partition table from races
# anyway
num = get_free_partition_index(dev=journal)
journal_part = '{num}:0:+{size}M'.format(
num=num,
size=journal_size,
)
LOG.warning('OSD will not be hot-swappable if journal is not the same device as the osd data')
try:
LOG.debug('Creating journal partition num %d size %d on %s', num, journal_size, journal)
subprocess.check_call(
args=[
'sgdisk',
'--new={part}'.format(part=journal_part),
'--change-name={num}:ceph journal'.format(num=num),
'--partition-guid={num}:{journal_uuid}'.format(
num=num,
journal_uuid=journal_uuid,
),
'--typecode={num}:{uuid}'.format(
num=num,
uuid=ptype,
),
'--',
journal,
],
)
subprocess.call(
args=[
# wait for udev event queue to clear
'udevadm',
'settle',
],
)
journal_symlink = '/dev/disk/by-partuuid/{journal_uuid}'.format(
journal_uuid=journal_uuid,
)
journal_dmcrypt = None
if journal_dm_keypath:
journal_dmcrypt = journal_symlink
journal_symlink = '/dev/mapper/{uuid}'.format(uuid=journal_uuid)
LOG.debug('Journal is GPT partition %s', journal_symlink)
return (journal_symlink, journal_dmcrypt, journal_uuid)
except subprocess.CalledProcessError as e:
raise Error(e)
def prepare_journal_file(
journal,
journal_size):
if not os.path.exists(journal):
LOG.debug('Creating journal file %s with size %dM', journal, journal_size)
with file(journal, 'wb') as journal_file:
journal_file.truncate(journal_size * 1048576)
# FIXME: should we resize an existing journal file?
LOG.debug('Journal is file %s', journal)
LOG.warning('OSD will not be hot-swappable if journal is not the same device as the osd data')
return (journal, None, None)
def prepare_journal(
data,
journal,
journal_size,
journal_uuid,
force_file,
force_dev,
journal_dm_keypath,
):
if journal is None:
if force_dev:
raise Error('Journal is unspecified; not a block device')
return (None, None, None)
if not os.path.exists(journal):
if force_dev:
raise Error('Journal does not exist; not a block device', journal)
return prepare_journal_file(journal, journal_size)
jmode = os.stat(journal).st_mode
if stat.S_ISREG(jmode):
if force_dev:
raise Error('Journal is not a block device', journal)
return prepare_journal_file(journal, journal_size)
if stat.S_ISBLK(jmode):
if force_file:
raise Error('Journal is not a regular file', journal)
return prepare_journal_dev(data, journal, journal_size, journal_uuid, journal_dm_keypath)
raise Error('Journal %s is neither a block device nor regular file', journal)
def adjust_symlink(target, path):
create = True
if os.path.lexists(path):
try:
mode = os.lstat(path).st_mode
if stat.S_ISREG(mode):
LOG.debug('Removing old file %s', path)
os.unlink(path)
elif stat.S_ISLNK(mode):
old = os.readlink(path)
if old != target:
LOG.debug('Removing old symlink %s -> %s', path, old)
os.unlink(path)
else:
create = False
except:
raise Error('unable to remove (or adjust) old file (symlink)', path)
if create:
LOG.debug('Creating symlink %s -> %s', path, target)
try:
os.symlink(target, path)
except:
raise Error('unable to create symlink %s -> %s' % (path, target))
def prepare_dir(
path,
journal,
cluster_uuid,
osd_uuid,
journal_uuid,
journal_dmcrypt = None,
):
LOG.debug('Preparing osd data dir %s', path)
if osd_uuid is None:
osd_uuid = str(uuid.uuid4())
if journal is not None:
# we're using an external journal; point to it here
adjust_symlink(journal, os.path.join(path, 'journal'))
if journal_dmcrypt is not None:
adjust_symlink(journal_dmcrypt, os.path.join(path, 'journal_dmcrypt'))
else:
try:
os.unlink(os.path.join(path, 'journal_dmcrypt'))
except OSError:
pass
write_one_line(path, 'ceph_fsid', cluster_uuid)
write_one_line(path, 'fsid', osd_uuid)
write_one_line(path, 'magic', CEPH_OSD_ONDISK_MAGIC)
if journal_uuid is not None:
# i.e., journal is a tagged partition
write_one_line(path, 'journal_uuid', journal_uuid)
def prepare_dev(
data,
journal,
fstype,
mkfs_args,
mount_options,
cluster_uuid,
osd_uuid,
journal_uuid,
journal_dmcrypt,
osd_dm_keypath,
):
"""
Prepare a data/journal combination to be used for an OSD.
The ``magic`` file is written last, so it's presence is a reliable
indicator of the whole sequence having completed.
WARNING: This will unconditionally overwrite anything given to
it.
"""
ptype_tobe = TOBE_UUID
ptype_osd = OSD_UUID
if osd_dm_keypath:
ptype_tobe = DMCRYPT_TOBE_UUID
ptype_osd = DMCRYPT_OSD_UUID
rawdev = None
if is_partition(data):
LOG.debug('OSD data device %s is a partition', data)
rawdev = data
else:
LOG.debug('Creating osd partition on %s', data)
try:
subprocess.check_call(
args=[
'sgdisk',
'--largest-new=1',
'--change-name=1:ceph data',
'--partition-guid=1:{osd_uuid}'.format(
osd_uuid=osd_uuid,
),
'--typecode=1:%s' % ptype_tobe,
'--',
data,
],
)
subprocess.call(
args=[
# wait for udev event queue to clear
'udevadm',
'settle',
],
)
except subprocess.CalledProcessError as e:
raise Error(e)
rawdev = get_partition_dev(data, 1)
dev = None
if osd_dm_keypath:
dev = dmcrypt_map(rawdev, osd_dm_keypath, osd_uuid)
else:
dev = rawdev
try:
args = [
'mkfs',
'-t',
fstype,
]
if mkfs_args is not None:
args.extend(mkfs_args.split())
if fstype == 'xfs':
args.extend(['-f']) # always force
else:
args.extend(MKFS_ARGS.get(fstype, []))
args.extend([
'--',
dev,
])
try:
LOG.debug('Creating %s fs on %s', fstype, dev)
subprocess.check_call(args=args)
except subprocess.CalledProcessError as e:
raise Error(e)
#remove whitespaces from mount_options
if mount_options is not None:
mount_options = "".join(mount_options.split())
path = mount(dev=dev, fstype=fstype, options=mount_options)
try:
prepare_dir(
path=path,
journal=journal,
cluster_uuid=cluster_uuid,
osd_uuid=osd_uuid,
journal_uuid=journal_uuid,
journal_dmcrypt=journal_dmcrypt,
)
finally:
unmount(path)
finally:
if rawdev != dev:
dmcrypt_unmap(osd_uuid)
if not is_partition(data):
try:
subprocess.check_call(
args=[
'sgdisk',
'--typecode=1:%s' % ptype_osd,
'--',
data,
],
)
except subprocess.CalledProcessError as e:
raise Error(e)
def main_prepare(args):
journal_dm_keypath = None
osd_dm_keypath = None
try:
prepare_lock.acquire()
if not os.path.exists(args.data):
raise Error('data path does not exist', args.data)
# in use?
dmode = os.stat(args.data).st_mode
if stat.S_ISBLK(dmode):
verify_not_in_use(args.data)
if args.journal and os.path.exists(args.journal):
jmode = os.stat(args.journal).st_mode
if stat.S_ISBLK(jmode):
verify_not_in_use(args.journal)
if args.zap_disk is not None:
if stat.S_ISBLK(dmode) and not is_partition(args.data):
zap(args.data)
else:
raise Error('not full block device; cannot zap', args.data)
if args.cluster_uuid is None:
args.cluster_uuid = get_fsid(cluster=args.cluster)
if args.cluster_uuid is None:
raise Error(
'must have fsid in config or pass --cluster-uuid=',
)
if args.fs_type is None:
args.fs_type = get_conf(
cluster=args.cluster,
variable='osd_mkfs_type',
)
if args.fs_type is None:
args.fs_type = get_conf(
cluster=args.cluster,
variable='osd_fs_type',
)
if args.fs_type is None:
args.fs_type = DEFAULT_FS_TYPE
mkfs_args = get_conf(
cluster=args.cluster,
variable='osd_mkfs_options_{fstype}'.format(
fstype=args.fs_type,
),
)
if mkfs_args is None:
mkfs_args = get_conf(
cluster=args.cluster,
variable='osd_fs_mkfs_options_{fstype}'.format(
fstype=args.fs_type,
),
)
mount_options = get_conf(
cluster=args.cluster,
variable='osd_mount_options_{fstype}'.format(
fstype=args.fs_type,
),
)
if mount_options is None:
mount_options = get_conf(
cluster=args.cluster,
variable='osd_fs_mount_options_{fstype}'.format(
fstype=args.fs_type,
),
)
journal_size = get_conf_with_default(
cluster=args.cluster,
variable='osd_journal_size',
)
journal_size = int(journal_size)
# colocate journal with data?
if stat.S_ISBLK(dmode) and not is_partition(args.data) and args.journal is None and args.journal_file is None:
LOG.info('Will colocate journal with data on %s', args.data)
args.journal = args.data
if args.journal_uuid is None:
args.journal_uuid = str(uuid.uuid4())
if args.osd_uuid is None:
args.osd_uuid = str(uuid.uuid4())
# dm-crypt keys?
if args.dmcrypt:
journal_dm_keypath = get_or_create_dmcrypt_key(args.journal_uuid, args.dmcrypt_key_dir)
osd_dm_keypath = get_or_create_dmcrypt_key(args.osd_uuid, args.dmcrypt_key_dir)
# prepare journal
(journal_symlink, journal_dmcrypt, journal_uuid) = prepare_journal(
data=args.data,
journal=args.journal,
journal_size=journal_size,
journal_uuid=args.journal_uuid,
force_file=args.journal_file,
force_dev=args.journal_dev,
journal_dm_keypath=journal_dm_keypath,
)
# prepare data
if stat.S_ISDIR(dmode):
if args.data_dev:
raise Error('data path is not a block device', args.data)
prepare_dir(
path=args.data,
journal=journal_symlink,
cluster_uuid=args.cluster_uuid,
osd_uuid=args.osd_uuid,
journal_uuid=journal_uuid,
journal_dmcrypt=journal_dmcrypt,
)
elif stat.S_ISBLK(dmode):
if args.data_dir:
raise Error('data path is not a directory', args.data)
prepare_dev(
data=args.data,
journal=journal_symlink,
fstype=args.fs_type,
mkfs_args=mkfs_args,
mount_options=mount_options,
cluster_uuid=args.cluster_uuid,
osd_uuid=args.osd_uuid,
journal_uuid=journal_uuid,
journal_dmcrypt=journal_dmcrypt,
osd_dm_keypath=osd_dm_keypath,
)
else:
raise Error('not a dir or block device', args.data)
prepare_lock.release()
if stat.S_ISBLK(dmode):
# try to make sure the kernel refreshes the table. note
# that if this gets ebusy, we are probably racing with
# udev because it already updated it.. ignore failure here.
LOG.debug('Calling partprobe on prepared device %s', args.data)
subprocess.call(
args=[
'partprobe',
args.data,
],
)
except Error as e:
if journal_dm_keypath:
os.unlink(journal_dm_keypath)
if osd_dm_keypath:
os.unlink(osd_dm_keypath)
prepare_lock.release()
raise e
###########################
def mkfs(
path,
cluster,
osd_id,
fsid,
keyring,
):
monmap = os.path.join(path, 'activate.monmap')
subprocess.check_call(
args=[
'/usr/bin/ceph',
'--cluster', cluster,
'--name', 'client.bootstrap-osd',
'--keyring', keyring,
'mon', 'getmap', '-o', monmap,
],
)
subprocess.check_call(
args=[
'/usr/bin/ceph-osd',
'--cluster', cluster,
'--mkfs',
'--mkkey',
'-i', osd_id,
'--monmap', monmap,
'--osd-data', path,
'--osd-journal', os.path.join(path, 'journal'),
'--osd-uuid', fsid,
'--keyring', os.path.join(path, 'keyring'),
],
)
# TODO ceph-osd --mkfs removes the monmap file?
# os.unlink(monmap)
def auth_key(
path,
cluster,
osd_id,
keyring,
):
subprocess.check_call(
args=[
'/usr/bin/ceph',
'--cluster', cluster,
'--name', 'client.bootstrap-osd',
'--keyring', keyring,
'auth', 'add', 'osd.{osd_id}'.format(osd_id=osd_id),
'-i', os.path.join(path, 'keyring'),
'osd', 'allow *',
'mon', 'allow rwx',
],
)
def move_mount(
dev,
path,
cluster,
osd_id,
):
LOG.debug('Moving mount to final location...')
parent = '/var/lib/ceph/osd'
osd_data = os.path.join(
parent,
'{cluster}-{osd_id}'.format(cluster=cluster, osd_id=osd_id),
)
maybe_mkdir(osd_data)
# we really want to mount --move, but that is not supported when
# the parent mount is shared, as it is by default on RH, Fedora,
# and probably others. Also, --bind doesn't properly manipulate
# /etc/mtab, which *still* isn't a symlink to /proc/mounts despite
# this being 2013. Instead, mount the original device at the final
# location.
subprocess.check_call(
args=[
'/bin/mount',
'--',
dev,
osd_data,
],
)
subprocess.check_call(
args=[
'/bin/umount',
'-l', # lazy, in case someone else is peeking at the
# wrong moment
'--',
path,
],
)
def start_daemon(
cluster,
osd_id,
):
LOG.debug('Starting %s osd.%s...', cluster, osd_id)
path = '/var/lib/ceph/osd/{cluster}-{osd_id}'.format(
cluster=cluster, osd_id=osd_id)
# upstart?
try:
if os.path.exists(os.path.join(path,'upstart')):
subprocess.check_call(
args=[
'/sbin/initctl',
# use emit, not start, because start would fail if the
# instance was already running
'emit',
# since the daemon starting doesn't guarantee much about
# the service being operational anyway, don't bother
# waiting for it
'--no-wait',
'--',
'ceph-osd',
'cluster={cluster}'.format(cluster=cluster),
'id={osd_id}'.format(osd_id=osd_id),
],
)
elif os.path.exists(os.path.join(path, 'sysvinit')):
if os.path.exists('/usr/sbin/service'):
svc = '/usr/sbin/service'
else:
svc = '/sbin/service'
subprocess.check_call(
args=[
svc,
'ceph',
'start',
'osd.{osd_id}'.format(osd_id=osd_id),
],
)
else:
raise Error('{cluster} osd.{osd_id} is not tagged with an init system'.format(
cluster=cluster,
osd_id=osd_id,
))
except subprocess.CalledProcessError as e:
raise Error('ceph osd start failed', e)
def detect_fstype(
dev,
):
fstype = _check_output(
args=[
'/sbin/blkid',
# we don't want stale cached results
'-p',
'-s', 'TYPE',
'-o' 'value',
'--',
dev,
],
)
fstype = must_be_one_line(fstype)
return fstype
def mount_activate(
dev,
activate_key_template,
init,
):
try:
fstype = detect_fstype(dev=dev)
except (subprocess.CalledProcessError,
TruncatedLineError,
TooManyLinesError) as e:
raise FilesystemTypeError(
'device {dev}'.format(dev=dev),
e,
)
# TODO always using mount options from cluster=ceph for
# now; see http://tracker.newdream.net/issues/3253
mount_options = get_conf(
cluster='ceph',
variable='osd_mount_options_{fstype}'.format(
fstype=fstype,
),
)
if mount_options is None:
mount_options = get_conf(
cluster='ceph',
variable='osd_fs_mount_options_{fstype}'.format(
fstype=fstype,
),
)
#remove whitespaces from mount_options
if mount_options is not None:
mount_options = "".join(mount_options.split())
path = mount(dev=dev, fstype=fstype, options=mount_options)
osd_id = None
cluster = None
try:
(osd_id, cluster) = activate(path, activate_key_template, init)
# check if the disk is already active, or if something else is already
# mounted there
active = False
other = False
src_dev = os.stat(path).st_dev
try:
dst_dev = os.stat('/var/lib/ceph/osd/{cluster}-{osd_id}'.format(
cluster=cluster,
osd_id=osd_id)).st_dev
if src_dev == dst_dev:
active = True
else:
parent_dev = os.stat('/var/lib/ceph/osd').st_dev
if dst_dev != parent_dev:
other = True
elif os.listdir('/var/lib/ceph/osd/{cluster}-{osd_id}'.format(
cluster=cluster,
osd_id=osd_id,
)):
other = True
except OSError:
pass
if active:
LOG.info('%s osd.%s already mounted in position; unmounting ours.' % (cluster, osd_id))
unmount(path)
elif other:
raise Error('another %s osd.%s already mounted in position (old/different cluster instance?); unmounting ours.' % (cluster, osd_id))
else:
move_mount(
dev=dev,
path=path,
cluster=cluster,
osd_id=osd_id,
)
return (cluster, osd_id)
except:
LOG.error('Failed to activate')
unmount(path)
raise
finally:
# remove our temp dir
if os.path.exists(path):
os.rmdir(path)
def activate_dir(
path,
activate_key_template,
init,
):
if not os.path.exists(path):
raise Error(
'directory %s does not exist' % path
)
(osd_id, cluster) = activate(path, activate_key_template, init)
canonical = '/var/lib/ceph/osd/{cluster}-{osd_id}'.format(
cluster=cluster,
osd_id=osd_id)
if path != canonical:
# symlink it from the proper location
create = True
if os.path.lexists(canonical):
old = os.readlink(canonical)
if old != path:
LOG.debug('Removing old symlink %s -> %s', canonical, old)
try:
os.unlink(canonical)
except:
raise Error('unable to remove old symlink %s', canonical)
else:
create = False
if create:
LOG.debug('Creating symlink %s -> %s', canonical, path)
try:
os.symlink(path, canonical)
except:
raise Error('unable to create symlink %s -> %s', canonical, path)
return (cluster, osd_id)
def find_cluster_by_uuid(_uuid):
"""
Find a cluster name by searching /etc/ceph/*.conf for a conf file
with the right uuid.
"""
no_fsid = []
if not os.path.exists('/etc/ceph'):
return None
for conf_file in os.listdir('/etc/ceph'):
if not conf_file.endswith('.conf'):
continue
cluster = conf_file[:-5]
fsid = get_conf(cluster, 'fsid')
if fsid is None:
no_fsid.append(cluster)
elif fsid == _uuid:
return cluster
# be tolerant of /etc/ceph/ceph.conf without an fsid defined.
if len(no_fsid) == 1 and no_fsid[0] == 'ceph':
LOG.warning('No fsid defined in /etc/ceph/ceph.conf; using anyway')
return 'ceph'
return None
def activate(
path,
activate_key_template,
init,
):
try:
check_osd_magic(path)
ceph_fsid = read_one_line(path, 'ceph_fsid')
if ceph_fsid is None:
raise Error('No cluster uuid assigned.')
LOG.debug('Cluster uuid is %s', ceph_fsid)
cluster = find_cluster_by_uuid(ceph_fsid)
if cluster is None:
raise Error('No cluster conf found in /etc/ceph with fsid %s' % ceph_fsid)
LOG.debug('Cluster name is %s', cluster)
fsid = read_one_line(path, 'fsid')
if fsid is None:
raise Error('No OSD uuid assigned.')
LOG.debug('OSD uuid is %s', fsid)
keyring = activate_key_template.format(cluster=cluster)
osd_id = get_osd_id(path)
if osd_id is None:
osd_id = allocate_osd_id(
cluster=cluster,
fsid=fsid,
keyring=keyring,
)
write_one_line(path, 'whoami', osd_id)
LOG.debug('OSD id is %s', osd_id)
if not os.path.exists(os.path.join(path, 'ready')):
LOG.debug('Initializing OSD...')
# re-running mkfs is safe, so just run until it completes
mkfs(
path=path,
cluster=cluster,
osd_id=osd_id,
fsid=fsid,
keyring=keyring,
)
if init is not None:
if init == 'auto':
conf_val = get_conf(
cluster=cluster,
variable='init'
)
if conf_val is not None:
init = conf_val
else:
(distro, release, codename) = platform.dist()
if distro == 'Ubuntu':
init = 'upstart'
else:
init = 'sysvinit'
LOG.debug('Marking with init system %s', init)
with file(os.path.join(path, init), 'w'):
pass
# remove markers for others, just in case.
for other in INIT_SYSTEMS:
if other != init:
try:
os.unlink(os.path.join(path, other))
except OSError:
pass
if not os.path.exists(os.path.join(path, 'active')):
LOG.debug('Authorizing OSD key...')
auth_key(
path=path,
cluster=cluster,
osd_id=osd_id,
keyring=keyring,
)
write_one_line(path, 'active', 'ok')
LOG.debug('%s osd.%s data dir is ready at %s', cluster, osd_id, path)
return (osd_id, cluster)
except:
raise
def main_activate(args):
cluster = None
osd_id = None
if not os.path.exists(args.path):
raise Error('%s does not exist', args.path)
if is_suppressed(args.path):
LOG.info('suppressed activate request on %s', args.path)
return
activate_lock.acquire()
try:
mode = os.stat(args.path).st_mode
if stat.S_ISBLK(mode):
(cluster, osd_id) = mount_activate(
dev=args.path,
activate_key_template=args.activate_key_template,
init=args.mark_init,
)
elif stat.S_ISDIR(mode):
(cluster, osd_id) = activate_dir(
path=args.path,
activate_key_template=args.activate_key_template,
init=args.mark_init,
)
else:
raise Error('%s is not a directory or block device', args.path)
start_daemon(
cluster=cluster,
osd_id=osd_id,
)
activate_lock.release()
except:
activate_lock.release()
###########################
def get_journal_osd_uuid(path):
if not os.path.exists(path):
raise Error('%s does not exist', path)
mode = os.stat(path).st_mode
if not stat.S_ISBLK(mode):
raise Error('%s is not a block device', path)
try:
out = _check_output(
args=[
'ceph-osd',
'-i', '0', # this is ignored
'--get-journal-uuid',
'--osd-journal',
path,
],
close_fds=True,
)
except subprocess.CalledProcessError as e:
raise Error(
'failed to get osd uuid/fsid from journal',
e,
)
value = str(out).split('\n', 1)[0]
LOG.debug('Journal %s has OSD UUID %s', path, value)
return value
def main_activate_journal(args):
if not os.path.exists(args.dev):
raise Error('%s does not exist', args.dev)
cluster = None
osd_id = None
osd_uuid = None
activate_lock.acquire()
try:
osd_uuid = get_journal_osd_uuid(args.dev)
path = os.path.join('/dev/disk/by-partuuid/', osd_uuid.lower())
(cluster, osd_id) = mount_activate(
dev=path,
activate_key_template=args.activate_key_template,
init=args.mark_init,
)
start_daemon(
cluster=cluster,
osd_id=osd_id,
)
activate_lock.release()
except:
activate_lock.release()
raise
###########################
def main_activate_all(args):
dir = '/dev/disk/by-parttypeuuid'
LOG.debug('Scanning %s', dir)
if not os.path.exists(dir):
return
err = False
for name in os.listdir(dir):
if name.find('.') < 0:
continue
(tag, uuid) = name.split('.')
if tag == OSD_UUID:
path = os.path.join(dir, name)
LOG.info('Activating %s', path)
activate_lock.acquire()
try:
(cluster, osd_id) = mount_activate(
dev=path,
activate_key_template=args.activate_key_template,
init=args.mark_init,
)
start_daemon(
cluster=cluster,
osd_id=osd_id,
)
except Exception as e:
print >> sys.stderr, '{prog}: {msg}'.format(
prog=args.prog,
msg=e,
)
err = True
finally:
activate_lock.release()
if err:
raise Error('One or more partitions failed to activate')
###########################
def is_swap(dev):
dev = os.path.realpath(dev)
with file('/proc/swaps', 'rb') as proc_swaps:
for line in proc_swaps.readlines()[1:]:
fields = line.split()
if len(fields) < 3:
continue
swaps_dev = fields[0]
if swaps_dev.startswith('/') and os.path.exists(swaps_dev):
swaps_dev = os.path.realpath(swaps_dev)
if swaps_dev == dev:
return True
return False
def get_oneliner(base, name):
path = os.path.join(base, name)
if os.path.isfile(path):
with open(path, 'r') as _file:
return _file.readline().rstrip()
return None
def get_dev_fs(dev):
fscheck = subprocess.Popen(
[
'blkid',
'-s',
'TYPE',
dev
],
stdout = subprocess.PIPE,
stderr=subprocess.PIPE).stdout.read()
if 'TYPE' in fscheck:
fstype = fscheck.split()[1].split('"')[1]
return fstype
else:
return None
def get_partition_type(part):
(base, partnum) = re.match('(\D+)(\d+)', part).group(1, 2)
sgdisk = subprocess.Popen(
[
'sgdisk',
'-p',
base,
],
stdout = subprocess.PIPE,
stderr = subprocess.PIPE).stdout.read()
for line in sgdisk.splitlines():
m = re.search('\s+(\d+)\s+\d+\s+\d+\s+\S+ \S+B\s+\S+\s+(.*)', line)
if m is not None:
num = m.group(1)
if num != partnum:
continue
return m.group(2)
return None
def get_partition_uuid(dev):
(base, partnum) = re.match('(\D+)(\d+)', dev).group(1, 2)
out = subprocess.Popen(
[ 'sgdisk', '-i', partnum, base ],
stdout = subprocess.PIPE,
stderr = subprocess.PIPE).stdout.read()
for line in out.splitlines():
m = re.match('Partition unique GUID: (\S+)', line)
if m:
return m.group(1).lower()
return None
def more_osd_info(path, uuid_map):
desc = []
ceph_fsid = get_oneliner(path, 'ceph_fsid')
if ceph_fsid:
cluster = find_cluster_by_uuid(ceph_fsid)
if cluster:
desc.append('cluster ' + cluster)
else:
desc.append('unknown cluster ' + ceph_fsid)
who = get_oneliner(path, 'whoami')
if who:
desc.append('osd.%s' % who)
journal_uuid = get_oneliner(path, 'journal_uuid')
if journal_uuid:
journal_uuid = journal_uuid.lower()
if journal_uuid in uuid_map:
desc.append('journal %s' % uuid_map[journal_uuid])
return desc
def list_dev(dev, uuid_map, journal_map):
ptype = 'unknown'
prefix = ''
if is_partition(dev):
ptype = get_partition_type(dev)
prefix = ' '
fs_type = get_dev_fs(dev)
path = is_mounted(dev)
desc = []
if ptype == 'ceph data':
if path:
desc.append('active')
desc.extend(more_osd_info(path, uuid_map))
elif fs_type:
try:
tpath = mount(dev=dev, fstype=fs_type, options='')
if tpath:
try:
magic = get_oneliner(tpath, 'magic')
if magic is not None:
desc.append('prepared')
desc.extend(more_osd_info(tpath, uuid_map))
finally:
unmount(tpath)
except MountError:
pass
if desc:
desc = ['ceph data'] + desc
else:
desc = ['ceph data', 'unprepared']
elif ptype == 'ceph journal':
desc.append('ceph journal')
part_uuid = get_partition_uuid(dev)
if part_uuid and part_uuid in journal_map:
desc.append('for %s' % journal_map[part_uuid])
else:
if is_swap(dev):
desc.append('swap')
else:
desc.append('other')
if fs_type:
desc.append(fs_type)
elif ptype:
desc.append(ptype)
if path:
desc.append('mounted on %s' % path)
print '%s%s %s' % (prefix, dev, ', '.join(desc))
def main_list(args):
partmap = list_all_partitions()
uuid_map = {}
journal_map = {}
for base, parts in sorted(partmap.iteritems()):
for p in parts:
dev = '/dev/' + p
part_uuid = get_partition_uuid(dev)
if part_uuid:
uuid_map[part_uuid] = dev
ptype = get_partition_type(dev)
if ptype == 'ceph data':
fs_type = get_dev_fs(dev)
try:
tpath = mount(dev=dev, fstype=fs_type, options='')
try:
journal_uuid = get_oneliner(tpath, 'journal_uuid')
if journal_uuid:
journal_map[journal_uuid.lower()] = dev
finally:
unmount(tpath)
except MountError:
pass
for base, parts in sorted(partmap.iteritems()):
if parts:
print '/dev/%s :' % base
for p in sorted(parts):
list_dev('/dev/' + p, uuid_map, journal_map)
else:
list_dev('/dev/' + base, uuid_map, journal_map)
###########################
#
# Mark devices that we want to suppress activates on with a
# file like
#
# /var/lib/ceph/tmp/suppress-activate.sdb
#
# where the last bit is the sanitized device name (/dev/X without the
# /dev/ prefix) and the is_suppress() check matches a prefix. That
# means suppressing sdb will stop activate on sdb1, sdb2, etc.
#
SUPPRESS_PREFIX = '/var/lib/ceph/tmp/suppress-activate.'
def is_suppressed(path):
disk = os.path.realpath(path)
try:
if not disk.startswith('/dev/') or not stat.S_ISBLK(os.lstat(path).st_mode):
return False
base = get_dev_name(disk)
while len(base):
if os.path.exists(SUPPRESS_PREFIX + base):
return True
base = base[:-1]
except:
return False
def set_suppress(path):
disk = os.path.realpath(path)
if not os.path.exists(disk):
raise Error('does not exist', path)
if not stat.S_ISBLK(os.lstat(path).st_mode):
raise Error('not a block device', path)
base = get_dev_name(disk)
with file(SUPPRESS_PREFIX + base, 'w') as f:
pass
LOG.info('set suppress flag on %s', base)
def unset_suppress(path):
disk = os.path.realpath(path)
if not os.path.exists(disk):
raise Error('does not exist', path)
if not stat.S_ISBLK(os.lstat(path).st_mode):
raise Error('not a block device', path)
assert disk.startswith('/dev/')
base = get_dev_name(disk)
fn = SUPPRESS_PREFIX + base
if not os.path.exists(fn):
raise Error('not marked as suppressed', path)
try:
os.unlink(fn)
LOG.info('unset suppress flag on %s', base)
except OSError as e:
raise Error('failed to unsuppress', e)
def main_suppress(args):
set_suppress(args.path)
def main_unsuppress(args):
unset_suppress(args.path)
def main_zap(args):
for dev in args.dev:
zap(dev)
###########################
def parse_args():
parser = argparse.ArgumentParser(
'ceph-disk',
)
parser.add_argument(
'-v', '--verbose',
action='store_true', default=None,
help='be more verbose',
)
parser.set_defaults(
# we want to hold on to this, for later
prog=parser.prog,
cluster='ceph',
)
subparsers = parser.add_subparsers(
title='subcommands',
description='valid subcommands',
help='sub-command help',
)
prepare_parser = subparsers.add_parser('prepare', help='Prepare a directory or disk for a Ceph OSD')
prepare_parser.add_argument(
'--cluster',
metavar='NAME',
help='cluster name to assign this disk to',
)
prepare_parser.add_argument(
'--cluster-uuid',
metavar='UUID',
help='cluster uuid to assign this disk to',
)
prepare_parser.add_argument(
'--osd-uuid',
metavar='UUID',
help='unique OSD uuid to assign this disk to',
)
prepare_parser.add_argument(
'--journal-uuid',
metavar='UUID',
help='unique uuid to assign to the journal',
)
prepare_parser.add_argument(
'--fs-type',
help='file system type to use (e.g. "ext4")',
)
prepare_parser.add_argument(
'--zap-disk',
action='store_true', default=None,
help='destroy the partition table (and content) of a disk',
)
prepare_parser.add_argument(
'--data-dir',
action='store_true', default=None,
help='verify that DATA is a dir',
)
prepare_parser.add_argument(
'--data-dev',
action='store_true', default=None,
help='verify that DATA is a block device',
)
prepare_parser.add_argument(
'--journal-file',
action='store_true', default=None,
help='verify that JOURNAL is a file',
)
prepare_parser.add_argument(
'--journal-dev',
action='store_true', default=None,
help='verify that JOURNAL is a block device',
)
prepare_parser.add_argument(
'--dmcrypt',
action='store_true', default=None,
help='encrypt DATA and/or JOURNAL devices with dm-crypt',
)
prepare_parser.add_argument(
'--dmcrypt-key-dir',
metavar='KEYDIR',
default='/etc/ceph/dmcrypt-keys',
help='directory where dm-crypt keys are stored',
)
prepare_parser.add_argument(
'data',
metavar='DATA',
help='path to OSD data (a disk block device or directory)',
)
prepare_parser.add_argument(
'journal',
metavar='JOURNAL',
nargs='?',
help=('path to OSD journal disk block device;'
+ ' leave out to store journal in file'),
)
prepare_parser.set_defaults(
func=main_prepare,
)
activate_parser = subparsers.add_parser('activate', help='Activate a Ceph OSD')
activate_parser.add_argument(
'--mount',
action='store_true', default=None,
help='mount a block device [deprecated, ignored]',
)
activate_parser.add_argument(
'--activate-key',
metavar='PATH',
help='bootstrap-osd keyring path template (%(default)s)',
dest='activate_key_template',
)
activate_parser.add_argument(
'--mark-init',
metavar='INITSYSTEM',
help='init system to manage this dir',
default='auto',
choices=INIT_SYSTEMS,
)
activate_parser.add_argument(
'path',
metavar='PATH',
nargs='?',
help='path to block device or directory',
)
activate_parser.set_defaults(
activate_key_template='/var/lib/ceph/bootstrap-osd/{cluster}.keyring',
func=main_activate,
)
activate_journal_parser = subparsers.add_parser('activate-journal', help='Activate an OSD via its journal device')
activate_journal_parser.add_argument(
'dev',
metavar='DEV',
help='path to journal block device',
)
activate_journal_parser.add_argument(
'--activate-key',
metavar='PATH',
help='bootstrap-osd keyring path template (%(default)s)',
dest='activate_key_template',
)
activate_journal_parser.add_argument(
'--mark-init',
metavar='INITSYSTEM',
help='init system to manage this dir',
default='auto',
choices=INIT_SYSTEMS,
)
activate_journal_parser.set_defaults(
activate_key_template='/var/lib/ceph/bootstrap-osd/{cluster}.keyring',
func=main_activate_journal,
)
activate_all_parser = subparsers.add_parser('activate-all', help='Activate all tagged OSD partitions')
activate_all_parser.add_argument(
'--activate-key',
metavar='PATH',
help='bootstrap-osd keyring path template (%(default)s)',
dest='activate_key_template',
)
activate_all_parser.add_argument(
'--mark-init',
metavar='INITSYSTEM',
help='init system to manage this dir',
default='auto',
choices=INIT_SYSTEMS,
)
activate_all_parser.set_defaults(
activate_key_template='/var/lib/ceph/bootstrap-osd/{cluster}.keyring',
func=main_activate_all,
)
list_parser = subparsers.add_parser('list', help='List disks, partitions, and Ceph OSDs')
list_parser.set_defaults(
func=main_list,
)
suppress_parser = subparsers.add_parser('suppress-activate', help='Suppress activate on a device (prefix)')
suppress_parser.add_argument(
'path',
metavar='PATH',
nargs='?',
help='path to block device or directory',
)
suppress_parser.set_defaults(
func=main_suppress,
)
unsuppress_parser = subparsers.add_parser('unsuppress-activate', help='Stop suppressing activate on a device (prefix)')
unsuppress_parser.add_argument(
'path',
metavar='PATH',
nargs='?',
help='path to block device or directory',
)
unsuppress_parser.set_defaults(
func=main_unsuppress,
)
zap_parser = subparsers.add_parser('zap', help='Zap/erase/destroy a device\'s partition table (and contents)')
zap_parser.add_argument(
'dev',
metavar='DEV',
nargs='*',
help='path to block device',
)
zap_parser.set_defaults(
func=main_zap,
)
args = parser.parse_args()
return args
def main():
args = parse_args()
loglevel = logging.INFO
if args.verbose:
loglevel = logging.DEBUG
logging.basicConfig(
level=loglevel,
)
try:
args.func(args)
except Error as e:
print >> sys.stderr, '{prog}: {msg}'.format(
prog=args.prog,
msg=e,
)
sys.exit(1)
if __name__ == '__main__':
main()