#!/usr/bin/python import argparse import errno import logging import os import os.path import platform import re import subprocess import stat import sys import tempfile import uuid CEPH_OSD_ONDISK_MAGIC = 'ceph osd volume v026' JOURNAL_UUID = '45b0969e-9b03-4f30-b4c6-b4b80ceff106' DMCRYPT_JOURNAL_UUID = '45b0969e-9b03-4f30-b4c6-5ec00ceff106' OSD_UUID = '4fbd7e29-9d25-41b8-afd0-062c0ceff05d' DMCRYPT_OSD_UUID = '4fbd7e29-9d25-41b8-afd0-5ec00ceff05d' TOBE_UUID = '89c57f98-2fe5-4dc0-89c1-f3ad0ceff2be' DMCRYPT_TOBE_UUID = '89c57f98-2fe5-4dc0-89c1-5ec00ceff2be' DEFAULT_FS_TYPE = 'xfs' MOUNT_OPTIONS = dict( btrfs='noatime,user_subvol_rm_allowed', # user_xattr is default ever since linux 2.6.39 / 3.0, but we'll # delay a moment before removing it fully because we did have some # issues with ext4 before the xatts-in-leveldb work, and it seemed # that user_xattr helped ext4='noatime,user_xattr', xfs='noatime', ) MKFS_ARGS = dict( btrfs=[ '-m', 'single', '-l', '32768', '-n', '32768', ], xfs=[ # xfs insists on not overwriting previous fs; even if we wipe # partition table, we often recreate it exactly the same way, # so we'll see ghosts of filesystems past '-f', '-i', 'size=2048', ], ) INIT_SYSTEMS = [ 'upstart', 'sysvinit', 'systemd', 'auto', ] LOG_NAME = __name__ if LOG_NAME == '__main__': LOG_NAME = os.path.basename(sys.argv[0]) LOG = logging.getLogger(LOG_NAME) ###### exceptions ######## class Error(Exception): """ Error """ def __str__(self): doc = self.__doc__.strip() return ': '.join([doc] + [str(a) for a in self.args]) class MountError(Error): """ Mounting filesystem failed """ class UnmountError(Error): """ Unmounting filesystem failed """ class BadMagicError(Error): """ Does not look like a Ceph OSD, or incompatible version """ class TruncatedLineError(Error): """ Line is truncated """ class TooManyLinesError(Error): """ Too many lines """ class FilesystemTypeError(Error): """ Cannot discover filesystem type """ ####### utils def maybe_mkdir(*a, **kw): """ Creates a new directory if it doesn't exist, removes existing symlink before creating the directory. """ # remove any symlink, if it is there.. if os.path.exists(*a) and stat.S_ISLNK(os.lstat(*a).st_mode): LOG.debug('Removing old symlink at %s', *a) os.unlink(*a) try: os.mkdir(*a, **kw) except OSError, e: if e.errno == errno.EEXIST: pass else: raise def list_all_partitions(): """ Return a list of devices and partitions """ dev_part_list = {} for name in os.listdir('/dev/disk/by-path'): target = os.readlink(os.path.join('/dev/disk/by-path', name)) dev = target.split('/')[-1] #print "name %s target %s dev %s" % (name, target, dev) (baser) = re.search('(.*)-part\d+$', name) if baser is not None: basename = baser.group(1) #print 'basename %s' % basename base = os.readlink(os.path.join('/dev/disk/by-path', basename)).split('/')[-1] if base not in dev_part_list: dev_part_list[base] = [] dev_part_list[base].append(dev) else: if dev not in dev_part_list: dev_part_list[dev] = [] return dev_part_list def list_partitions(disk): """ Return a list of partitions on the given device """ disk = os.path.realpath(disk) assert not is_partition(disk) assert disk.startswith('/dev/') base = disk[5:] partitions = [] with file('/proc/partitions', 'rb') as proc_partitions: for line in proc_partitions.read().split('\n')[2:]: fields = re.split('\s+', line) if len(fields) < 5: continue name = fields [4] if name != base and name.startswith(base): partitions.append('/dev/' + name) return partitions def is_partition(dev): """ Check whether a given device is a partition or a full disk. """ dev = os.path.realpath(dev) if not stat.S_ISBLK(os.lstat(dev).st_mode): raise Error('not a block device', dev) # we can't tell just from the name of the device if it is a # partition or not. look in the by-path dir and see if the # referring symlink ends in -partNNN. name = dev.split('/')[-1] for name in os.listdir('/dev/disk/by-path'): target = os.readlink(os.path.join('/dev/disk/by-path', name)) cdev = target.split('/')[-1] if '/dev/' + cdev != dev: continue (baser) = re.search('(.*)-part\d+$', name) if baser is not None: return True else: return False # hrm, don't know... return False def is_mounted(dev): """ Check if the given device is mounted. """ dev = os.path.realpath(dev) with file('/proc/mounts') as f: for line in f.read().split('\n'): d = line.split(' ')[0] if os.path.exists(d): d = os.path.realpath(d) if dev == d: return True return False def is_held(dev): """ Check if a device is held by another device (e.g., a dm-crypt mapping) """ assert os.path.exists(dev) dev = os.path.realpath(dev) base = dev[5:] disk = base while disk[-1].isdigit(): disk = disk[:-1] directory = '/sys/block/{disk}/{base}/holders'.format(disk=disk, base=base) if not os.path.exists(directory): return [] return os.listdir(directory) def verify_not_in_use(dev): """ Verify if a given device (path) is in use (e.g. mounted or in use by device-mapper). :raises: Error if device is in use. """ assert os.path.exists(dev) if is_partition(dev): if is_mounted(dev): raise Error('Device is mounted', dev) holders = is_held(dev) if holders: raise Error('Device is in use by a device-mapper mapping (dm-crypt?)' % dev, ','.join(holders)) else: for p in list_partitions(dev): if is_mounted(p): raise Error('Device is mounted', p) holders = is_held(p) if holders: raise Error('Device %s is in use by a device-mapper mapping (dm-crypt?)' % p, ','.join(holders)) def must_be_one_line(line): """ Checks if given line is really one single line. :raises: TruncatedLineError or TooManyLinesError :return: Content of the line, or None if line isn't valid. """ if line[-1:] != '\n': raise TruncatedLineError(line) line = line[:-1] if '\n' in line: raise TooManyLinesError(line) return line def read_one_line(parent, name): """ Read a file whose sole contents are a single line. Strips the newline. :return: Contents of the line, or None if file did not exist. """ path = os.path.join(parent, name) try: line = file(path, 'rb').read() except IOError as e: if e.errno == errno.ENOENT: return None else: raise try: line = must_be_one_line(line) except (TruncatedLineError, TooManyLinesError) as e: raise Error('File is corrupt: {path}: {msg}'.format( path=path, msg=e, )) return line def write_one_line(parent, name, text): """ Write a file whose sole contents are a single line. Adds a newline. """ path = os.path.join(parent, name) tmp = '{path}.{pid}.tmp'.format(path=path, pid=os.getpid()) with file(tmp, 'wb') as f: f.write(text + '\n') os.fsync(f.fileno()) os.rename(tmp, path) def check_osd_magic(path): """ Check that this path has the Ceph OSD magic. :raises: BadMagicError if this does not look like a Ceph OSD data dir. """ magic = read_one_line(path, 'magic') if magic is None: # probably not mkfs'ed yet raise BadMagicError(path) if magic != CEPH_OSD_ONDISK_MAGIC: raise BadMagicError(path) def check_osd_id(osd_id): """ Ensures osd id is numeric. """ if not re.match(r'^[0-9]+$', osd_id): raise Error('osd id is not numeric') def allocate_osd_id( cluster, fsid, keyring, ): """ Accocates an OSD id on the given cluster. :raises: Error if the call to allocate the OSD id fails. :return: The allocated OSD id. """ LOG.debug('Allocating OSD id...') try: osd_id = _check_output( args=[ '/usr/bin/ceph', '--cluster', cluster, '--name', 'client.bootstrap-osd', '--keyring', keyring, 'osd', 'create', '--concise', fsid, ], ) except subprocess.CalledProcessError as e: raise Error('ceph osd create failed', e) osd_id = must_be_one_line(osd_id) check_osd_id(osd_id) return osd_id def get_osd_id(path): """ Gets the OSD id of the OSD at the given path. """ osd_id = read_one_line(path, 'whoami') if osd_id is not None: check_osd_id(osd_id) return osd_id # TODO depend on python2.7 def _check_output(*args, **kwargs): process = subprocess.Popen( stdout=subprocess.PIPE, *args, **kwargs) out, _ = process.communicate() ret = process.wait() if ret: cmd = kwargs.get("args") if cmd is None: cmd = args[0] raise subprocess.CalledProcessError(ret, cmd, output=out) return out def get_conf(cluster, variable): """ Get the value of the given configuration variable from the cluster. :raises: Error if call to ceph-conf fails. :return: The variable value or None. """ try: p = subprocess.Popen( args=[ '/usr/bin/ceph-conf', '--cluster={cluster}'.format( cluster=cluster, ), '--name=osd.', '--lookup', variable, ], stdout=subprocess.PIPE, close_fds=True, ) except OSError as e: raise Error('error executing ceph-conf', e) (out, _err) = p.communicate() ret = p.wait() if ret == 1: # config entry not found return None elif ret != 0: raise Error('getting variable from configuration failed') value = str(out).split('\n', 1)[0] # don't differentiate between "var=" and no var set if not value: return None return value def get_conf_with_default(cluster, variable): """ Get a config value that is known to the C++ code. This will fail if called on variables that are not defined in common config options. """ try: out = _check_output( args=[ 'ceph-osd', '--cluster={cluster}'.format( cluster=cluster, ), '--show-config-value={variable}'.format( variable=variable, ), ], close_fds=True, ) except subprocess.CalledProcessError as e: raise Error( 'getting variable from configuration failed', e, ) value = str(out).split('\n', 1)[0] return value def get_fsid(cluster): """ Get the fsid of the cluster. :return: The fsid or raises Error. """ fsid = get_conf(cluster=cluster, variable='fsid') if fsid is None: raise Error('getting cluster uuid from configuration failed') return fsid def get_or_create_dmcrypt_key( _uuid, key_dir, ): """ Get path to dmcrypt key or create a new key file. :return: Path to the dmcrypt key file. """ path = os.path.join(key_dir, _uuid) # already have it? if os.path.exists(path): return path # make a new key try: if not os.path.exists(key_dir): os.makedirs(key_dir) with file('/dev/urandom', 'rb') as i: key = i.read(256) with file(path, 'wb') as f: f.write(key) return path except: raise Error('unable to read or create dm-crypt key', path) def dmcrypt_map( rawdev, keypath, _uuid, ): """ Maps a device to a dmcrypt device. :return: Path to the dmcrypt device. """ dev = '/dev/mapper/'+ _uuid args = [ 'cryptsetup', '--key-file', keypath, '--key-size', '256', 'create', _uuid, rawdev, ] try: subprocess.check_call(args) return dev except subprocess.CalledProcessError as e: raise Error('unable to map device', rawdev, e) def dmcrypt_unmap( _uuid ): """ Removes the dmcrypt device with the given UUID. """ args = [ 'cryptsetup', 'remove', _uuid ] try: subprocess.check_call(args) except subprocess.CalledProcessError as e: raise Error('unable to unmap device', _uuid, e) def mount( dev, fstype, options, ): """ Mounts a device with given filessystem type and mount options to a tempfile path under /var/lib/ceph/tmp. """ # pick best-of-breed mount options based on fs type if options is None: options = MOUNT_OPTIONS.get(fstype, '') # mount path = tempfile.mkdtemp( prefix='mnt.', dir='/var/lib/ceph/tmp', ) try: LOG.debug('Mounting %s on %s with options %s', dev, path, options) subprocess.check_call( args=[ 'mount', '-o', options, '--', dev, path, ], ) except subprocess.CalledProcessError as e: try: os.rmdir(path) except (OSError, IOError): pass raise MountError(e) return path def unmount( path, ): """ Unmount and removes the given mount point. """ try: LOG.debug('Unmounting %s', path) subprocess.check_call( args=[ '/bin/umount', '--', path, ], ) except subprocess.CalledProcessError as e: raise UnmountError(e) os.rmdir(path) ########################################### def get_free_partition_index(dev): """ Get the next free partition index on a given device. :return: Index number (> 1 if there is already a partition on the device) or 1 if there is no partition table. """ try: lines = _check_output( args=[ 'parted', '--machine', '--', dev, 'print', ], ) except subprocess.CalledProcessError as e: print 'cannot read partition index; assume it isn\'t present\n (Error: %s)' % e return 1 if not lines: raise Error('parted failed to output anything') lines = str(lines).splitlines(True) if lines[0] not in ['CHS;\n', 'CYL;\n', 'BYT;\n']: raise Error('weird parted units', lines[0]) del lines[0] if not lines[0].startswith('/dev/'): raise Error('weird parted disk entry', lines[0]) del lines[0] seen = set() for line in lines: idx, _ = line.split(':', 1) idx = int(idx) seen.add(idx) num = 1 while num in seen: num += 1 return num def zap(dev): """ Destroy the partition table and content of a given disk. """ try: LOG.debug('Zapping partition table on %s', dev) # try to wipe out any GPT partition table backups. sgdisk # isn't too thorough. lba_size = 4096 size = 33 * lba_size with file(dev, 'wb') as f: f.seek(-size, os.SEEK_END) f.write(size*'\0') subprocess.check_call( args=[ 'sgdisk', '--zap-all', '--clear', '--mbrtogpt', '--', dev, ], ) except subprocess.CalledProcessError as e: raise Error(e) def prepare_journal_dev( data, journal, journal_size, journal_uuid, journal_dm_keypath, ): if is_partition(journal): LOG.debug('Journal %s is a partition', journal) LOG.warning('OSD will not be hot-swappable if journal is not the same device as the osd data') return (journal, None, None) ptype = JOURNAL_UUID if journal_dm_keypath: ptype = DMCRYPT_JOURNAL_UUID # it is a whole disk. create a partition! num = None if journal == data: # we're sharing the disk between osd data and journal; # make journal be partition number 2, so it's pretty; put # journal at end of free space so partitioning tools don't # reorder them suddenly num = 2 journal_part = '{num}:-{size}M:0'.format( num=num, size=journal_size, ) else: # sgdisk has no way for me to say "whatever is the next # free index number" when setting type guids etc, so we # need to awkwardly look up the next free number, and then # fix that in the call -- and hope nobody races with us; # then again nothing guards the partition table from races # anyway num = get_free_partition_index(dev=journal) journal_part = '{num}:0:+{size}M'.format( num=num, size=journal_size, ) LOG.warning('OSD will not be hot-swappable if journal is not the same device as the osd data') try: LOG.debug('Creating journal partition num %d size %d on %s', num, journal_size, journal) subprocess.check_call( args=[ 'sgdisk', '--new={part}'.format(part=journal_part), '--change-name={num}:ceph journal'.format(num=num), '--partition-guid={num}:{journal_uuid}'.format( num=num, journal_uuid=journal_uuid, ), '--typecode={num}:{uuid}'.format( num=num, uuid=ptype, ), '--', journal, ], ) subprocess.call( args=[ # wait for udev event queue to clear 'udevadm', 'settle', '--timeout=10', ], ) subprocess.check_call( args=[ # also make sure the kernel refreshes the new table 'partprobe', journal, ], ) journal_symlink = '/dev/disk/by-partuuid/{journal_uuid}'.format( journal_uuid=journal_uuid, ) journal_dmcrypt = None if journal_dm_keypath: journal_dmcrypt = journal_symlink journal_symlink = '/dev/mapper/{uuid}'.format(uuid=journal_uuid) LOG.debug('Journal is GPT partition %s', journal_symlink) return (journal_symlink, journal_dmcrypt, journal_uuid) except subprocess.CalledProcessError as e: raise Error(e) def prepare_journal_file( journal, journal_size): if not os.path.exists(journal): LOG.debug('Creating journal file %s with size %dM', journal, journal_size) with file(journal, 'wb') as f: f.truncate(journal_size * 1048576) # FIXME: should we resize an existing journal file? LOG.debug('Journal is file %s', journal) LOG.warning('OSD will not be hot-swappable if journal is not the same device as the osd data') return (journal, None, None) def prepare_journal( data, journal, journal_size, journal_uuid, force_file, force_dev, journal_dm_keypath, ): if journal is None: if force_dev: raise Error('Journal is unspecified; not a block device') return (None, None, None) if not os.path.exists(journal): if force_dev: raise Error('Journal does not exist; not a block device', journal) return prepare_journal_file(journal, journal_size) jmode = os.stat(journal).st_mode if stat.S_ISREG(jmode): if force_dev: raise Error('Journal is not a block device', journal) return prepare_journal_file(journal, journal_size) if stat.S_ISBLK(jmode): if force_file: raise Error('Journal is not a regular file', journal) return prepare_journal_dev(data, journal, journal_size, journal_uuid, journal_dm_keypath) raise Error('Journal %s is neither a block device nor regular file', journal) def adjust_symlink(target, path): create = True if os.path.lexists(path): try: mode = os.lstat(path).st_mode if stat.S_ISREG(mode): LOG.debug('Removing old file %s', path) os.unlink(path) elif stat.S_ISLNK(mode): old = os.readlink(path) if old != target: LOG.debug('Removing old symlink %s -> %s', path, old) os.unlink(path) else: create = False except: raise Error('unable to remove (or adjust) old file (symlink)', path) if create: LOG.debug('Creating symlink %s -> %s', path, target) try: os.symlink(target, path) except: raise Error('unable to create symlink %s -> %s' % (path, target)) def prepare_dir( path, journal, cluster_uuid, osd_uuid, journal_uuid, journal_dmcrypt = None, ): LOG.debug('Preparing osd data dir %s', path) if osd_uuid is None: osd_uuid = str(uuid.uuid4()) if journal is not None: # we're using an external journal; point to it here adjust_symlink(journal, os.path.join(path, 'journal')) if journal_dmcrypt is not None: adjust_symlink(journal_dmcrypt, os.path.join(path, 'journal_dmcrypt')) else: try: os.unlink(os.path.join(path, 'journal_dmcrypt')) except OSError: pass write_one_line(path, 'ceph_fsid', cluster_uuid) write_one_line(path, 'fsid', osd_uuid) write_one_line(path, 'magic', CEPH_OSD_ONDISK_MAGIC) if journal_uuid is not None: # i.e., journal is a tagged partition write_one_line(path, 'journal_uuid', journal_uuid) def prepare_dev( data, journal, fstype, mkfs_args, mount_options, cluster_uuid, osd_uuid, journal_uuid, journal_dmcrypt, osd_dm_keypath, ): """ Prepare a data/journal combination to be used for an OSD. The ``magic`` file is written last, so it's presence is a reliable indicator of the whole sequence having completed. WARNING: This will unconditionally overwrite anything given to it. """ ptype_tobe = TOBE_UUID ptype_osd = OSD_UUID if osd_dm_keypath: ptype_tobe = DMCRYPT_TOBE_UUID ptype_osd = DMCRYPT_OSD_UUID rawdev = None if is_partition(data): LOG.debug('OSD data device %s is a partition', data) rawdev = data else: LOG.debug('Creating osd partition on %s', data) try: subprocess.check_call( args=[ 'sgdisk', '--largest-new=1', '--change-name=1:ceph data', '--partition-guid=1:{osd_uuid}'.format( osd_uuid=osd_uuid, ), '--typecode=1:%s' % ptype_tobe, '--', data, ], ) subprocess.call( args=[ # wait for udev event queue to clear 'udevadm', 'settle', '--timeout=10', ], ) subprocess.check_call( args=[ # also make sure the kernel refreshes the new table 'partprobe', data, ], ) except subprocess.CalledProcessError as e: raise Error(e) rawdev = '{data}1'.format(data=data) dev = None if osd_dm_keypath: dev = dmcrypt_map(rawdev, osd_dm_keypath, osd_uuid) else: dev = rawdev try: args = [ 'mkfs', '-t', fstype, ] if mkfs_args is not None: args.extend(mkfs_args.split()) if fstype == 'xfs': args.extend(['-f']) # always force else: args.extend(MKFS_ARGS.get(fstype, [])) args.extend([ '--', dev, ]) try: LOG.debug('Creating %s fs on %s', fstype, dev) subprocess.check_call(args=args) except subprocess.CalledProcessError as e: raise Error(e) #remove whitespaces from mount_options if mount_options is not None: mount_options = "".join(mount_options.split()) path = mount(dev=dev, fstype=fstype, options=mount_options) try: prepare_dir( path=path, journal=journal, cluster_uuid=cluster_uuid, osd_uuid=osd_uuid, journal_uuid=journal_uuid, journal_dmcrypt=journal_dmcrypt, ) finally: unmount(path) finally: if rawdev != dev: dmcrypt_unmap(osd_uuid) if not is_partition(data): try: subprocess.check_call( args=[ 'sgdisk', '--typecode=1:%s' % ptype_osd, '--', data, ], ) subprocess.call( args=[ # wait for udev event queue to clear 'udevadm', 'settle', '--timeout=10', ], ) subprocess.check_call( args=[ # also make sure the kernel refreshes the new table 'partprobe', data, ], ) except subprocess.CalledProcessError as e: raise Error(e) def main_prepare(args): journal_dm_keypath = None osd_dm_keypath = None try: if not os.path.exists(args.data): raise Error('data path does not exist', args.data) # in use? dmode = os.stat(args.data).st_mode if stat.S_ISBLK(dmode): verify_not_in_use(args.data) if args.journal and os.path.exists(args.journal): jmode = os.stat(args.journal).st_mode if stat.S_ISBLK(jmode): verify_not_in_use(args.journal) if args.zap_disk is not None: if stat.S_ISBLK(dmode) and not is_partition(args.data): zap(args.data) else: raise Error('not full block device; cannot zap', args.data) if args.cluster_uuid is None: args.cluster_uuid = get_fsid(cluster=args.cluster) if args.cluster_uuid is None: raise Error( 'must have fsid in config or pass --cluster--uuid=', ) if args.fs_type is None: args.fs_type = get_conf( cluster=args.cluster, variable='osd_mkfs_type', ) if args.fs_type is None: args.fs_type = get_conf( cluster=args.cluster, variable='osd_fs_type', ) if args.fs_type is None: args.fs_type = DEFAULT_FS_TYPE mkfs_args = get_conf( cluster=args.cluster, variable='osd_mkfs_options_{fstype}'.format( fstype=args.fs_type, ), ) if mkfs_args is None: mkfs_args = get_conf( cluster=args.cluster, variable='osd_fs_mkfs_options_{fstype}'.format( fstype=args.fs_type, ), ) mount_options = get_conf( cluster=args.cluster, variable='osd_mount_options_{fstype}'.format( fstype=args.fs_type, ), ) if mount_options is None: mount_options = get_conf( cluster=args.cluster, variable='osd_fs_mount_options_{fstype}'.format( fstype=args.fs_type, ), ) journal_size = get_conf_with_default( cluster=args.cluster, variable='osd_journal_size', ) journal_size = int(journal_size) # colocate journal with data? if stat.S_ISBLK(dmode) and not is_partition(args.data) and args.journal is None and args.journal_file is None: LOG.info('Will colocate journal with data on %s', args.data) args.journal = args.data if args.journal_uuid is None: args.journal_uuid = str(uuid.uuid4()) if args.osd_uuid is None: args.osd_uuid = str(uuid.uuid4()) # dm-crypt keys? if args.dmcrypt: journal_dm_keypath = get_or_create_dmcrypt_key(args.journal_uuid, args.dmcrypt_key_dir) osd_dm_keypath = get_or_create_dmcrypt_key(args.osd_uuid, args.dmcrypt_key_dir) # prepare journal (journal_symlink, journal_dmcrypt, journal_uuid) = prepare_journal( data=args.data, journal=args.journal, journal_size=journal_size, journal_uuid=args.journal_uuid, force_file=args.journal_file, force_dev=args.journal_dev, journal_dm_keypath=journal_dm_keypath, ) # prepare data if stat.S_ISDIR(dmode): if args.data_dev: raise Error('data path is not a block device', args.data) prepare_dir( path=args.data, journal=journal_symlink, cluster_uuid=args.cluster_uuid, osd_uuid=args.osd_uuid, journal_uuid=journal_uuid, journal_dmcrypt=journal_dmcrypt, ) elif stat.S_ISBLK(dmode): if args.data_dir: raise Error('data path is not a directory', args.data) prepare_dev( data=args.data, journal=journal_symlink, fstype=args.fs_type, mkfs_args=mkfs_args, mount_options=mount_options, cluster_uuid=args.cluster_uuid, osd_uuid=args.osd_uuid, journal_uuid=journal_uuid, journal_dmcrypt=journal_dmcrypt, osd_dm_keypath=osd_dm_keypath, ) else: raise Error('not a dir or block device', args.data) except Error as e: if journal_dm_keypath: os.unlink(journal_dm_keypath) if osd_dm_keypath: os.unlink(osd_dm_keypath) raise e ########################### def mkfs( path, cluster, osd_id, fsid, keyring, ): monmap = os.path.join(path, 'activate.monmap') subprocess.check_call( args=[ '/usr/bin/ceph', '--cluster', cluster, '--name', 'client.bootstrap-osd', '--keyring', keyring, 'mon', 'getmap', '-o', monmap, ], ) subprocess.check_call( args=[ '/usr/bin/ceph-osd', '--cluster', cluster, '--mkfs', '--mkkey', '-i', osd_id, '--monmap', monmap, '--osd-data', path, '--osd-journal', os.path.join(path, 'journal'), '--osd-uuid', fsid, '--keyring', os.path.join(path, 'keyring'), ], ) # TODO ceph-osd --mkfs removes the monmap file? # os.unlink(monmap) def auth_key( path, cluster, osd_id, keyring, ): subprocess.check_call( args=[ '/usr/bin/ceph', '--cluster', cluster, '--name', 'client.bootstrap-osd', '--keyring', keyring, 'auth', 'add', 'osd.{osd_id}'.format(osd_id=osd_id), '-i', os.path.join(path, 'keyring'), 'osd', 'allow *', 'mon', 'allow rwx', ], ) def move_mount( path, cluster, osd_id, ): LOG.debug('Moving mount to final location...') parent = '/var/lib/ceph/osd' osd_data = os.path.join( parent, '{cluster}-{osd_id}'.format(cluster=cluster, osd_id=osd_id), ) maybe_mkdir(osd_data) subprocess.check_call( args=[ '/bin/mount', '--move', '--', path, osd_data, ], ) def start_daemon( cluster, osd_id, ): LOG.debug('Starting %s osd.%s...', cluster, osd_id) path = '/var/lib/ceph/osd/{cluster}-{osd_id}'.format( cluster=cluster, osd_id=osd_id) # upstart? try: if os.path.exists(os.path.join(path,'upstart')): subprocess.check_call( args=[ '/sbin/initctl', # use emit, not start, because start would fail if the # instance was already running 'emit', # since the daemon starting doesn't guarantee much about # the service being operational anyway, don't bother # waiting for it '--no-wait', '--', 'ceph-osd', 'cluster={cluster}'.format(cluster=cluster), 'id={osd_id}'.format(osd_id=osd_id), ], ) elif os.path.exists(os.path.join(path, 'sysvinit')): subprocess.check_call( args=[ '/usr/sbin/service', 'ceph', 'start', 'osd.{osd_id}'.format(osd_id=osd_id), ], ) else: raise Error('{cluster} osd.{osd_id} is not tagged with an init system'.format( cluster=cluster, osd_id=osd_id, )) except subprocess.CalledProcessError as e: raise Error('ceph osd start failed', e) def detect_fstype( dev, ): fstype = _check_output( args=[ '/sbin/blkid', # we don't want stale cached results '-p', '-s', 'TYPE', '-o' 'value', '--', dev, ], ) fstype = must_be_one_line(fstype) return fstype def mount_activate( dev, activate_key_template, init, ): try: fstype = detect_fstype(dev=dev) except (subprocess.CalledProcessError, TruncatedLineError, TooManyLinesError) as e: raise FilesystemTypeError( 'device {dev}'.format(dev=dev), e, ) # TODO always using mount options from cluster=ceph for # now; see http://tracker.newdream.net/issues/3253 mount_options = get_conf( cluster='ceph', variable='osd_mount_options_{fstype}'.format( fstype=fstype, ), ) if mount_options is None: mount_options = get_conf( cluster='ceph', variable='osd_fs_mount_options_{fstype}'.format( fstype=fstype, ), ) #remove whitespaces from mount_options if mount_options is not None: mount_options = "".join(mount_options.split()) path = mount(dev=dev, fstype=fstype, options=mount_options) osd_id = None cluster = None try: (osd_id, cluster) = activate(path, activate_key_template, init) # check if the disk is already active, or if something else is already # mounted there active = False other = False src_dev = os.stat(path).st_dev try: dst_dev = os.stat('/var/lib/ceph/osd/{cluster}-{osd_id}'.format( cluster=cluster, osd_id=osd_id)).st_dev if src_dev == dst_dev: active = True else: parent_dev = os.stat('/var/lib/ceph/osd').st_dev if dst_dev != parent_dev: other = True except OSError: pass if active: LOG.info('%s osd.%s already mounted in position; unmounting ours.' % (cluster, osd_id)) unmount(path) elif other: raise Error('another %s osd.%s already mounted in position (old/different cluster instance?); unmounting ours.' % (cluster, osd_id)) else: move_mount( path=path, cluster=cluster, osd_id=osd_id, ) return (cluster, osd_id) except: LOG.error('Failed to activate') unmount(path) raise finally: # remove our temp dir if os.path.exists(path): os.rmdir(path) def activate_dir( path, activate_key_template, init, ): if not os.path.exists(path): raise Error( 'directory %s does not exist' % path ) (osd_id, cluster) = activate(path, activate_key_template, init) canonical = '/var/lib/ceph/osd/{cluster}-{osd_id}'.format( cluster=cluster, osd_id=osd_id) if path != canonical: # symlink it from the proper location create = True if os.path.lexists(canonical): old = os.readlink(canonical) if old != path: LOG.debug('Removing old symlink %s -> %s', canonical, old) try: os.unlink(canonical) except: raise Error('unable to remove old symlink %s', canonical) else: create = False if create: LOG.debug('Creating symlink %s -> %s', canonical, path) try: os.symlink(path, canonical) except: raise Error('unable to create symlink %s -> %s', canonical, path) return (cluster, osd_id) def find_cluster_by_uuid(_uuid): """ Find a cluster name by searching /etc/ceph/*.conf for a conf file with the right uuid. """ no_fsid = [] if not os.path.exists('/etc/ceph'): return None for conf_file in os.listdir('/etc/ceph'): if not conf_file.endswith('.conf'): continue cluster = conf_file[:-5] fsid = get_conf(cluster, 'fsid') if fsid is None: no_fsid.append(cluster) elif fsid == _uuid: return cluster # be tolerant of /etc/ceph/ceph.conf without an fsid defined. if len(no_fsid) == 1 and no_fsid[0] == 'ceph': LOG.warning('No fsid defined in /etc/ceph/ceph.conf; using anyway') return 'ceph' return None def activate( path, activate_key_template, init, ): try: check_osd_magic(path) ceph_fsid = read_one_line(path, 'ceph_fsid') if ceph_fsid is None: raise Error('No cluster uuid assigned.') LOG.debug('Cluster uuid is %s', ceph_fsid) cluster = find_cluster_by_uuid(ceph_fsid) if cluster is None: raise Error('No cluster conf found in /etc/ceph with fsid %s' % ceph_fsid) LOG.debug('Cluster name is %s', cluster) fsid = read_one_line(path, 'fsid') if fsid is None: raise Error('No OSD uuid assigned.') LOG.debug('OSD uuid is %s', fsid) keyring = activate_key_template.format(cluster=cluster) osd_id = get_osd_id(path) if osd_id is None: osd_id = allocate_osd_id( cluster=cluster, fsid=fsid, keyring=keyring, ) write_one_line(path, 'whoami', osd_id) LOG.debug('OSD id is %s', osd_id) if not os.path.exists(os.path.join(path, 'ready')): LOG.debug('Initializing OSD...') # re-running mkfs is safe, so just run until it completes mkfs( path=path, cluster=cluster, osd_id=osd_id, fsid=fsid, keyring=keyring, ) if init is not None: if init == 'auto': c = get_conf( cluster=cluster, variable='init' ) if c is not None: init = c else: (distro, release, codename) = platform.dist() if distro == 'Ubuntu': init = 'upstart' else: init = 'sysvinit' LOG.debug('Marking with init system %s', init) with file(os.path.join(path, init), 'w'): pass # remove markers for others, just in case. for other in INIT_SYSTEMS: if other != init: try: os.unlink(os.path.join(path, other)) except OSError: pass if not os.path.exists(os.path.join(path, 'active')): LOG.debug('Authorizing OSD key...') auth_key( path=path, cluster=cluster, osd_id=osd_id, keyring=keyring, ) write_one_line(path, 'active', 'ok') LOG.debug('%s osd.%s data dir is ready at %s', cluster, osd_id, path) return (osd_id, cluster) except: raise def main_activate(args): cluster = None osd_id = None if not os.path.exists(args.path): raise Error('%s does not exist', args.path) mode = os.stat(args.path).st_mode if stat.S_ISBLK(mode): (cluster, osd_id) = mount_activate( dev=args.path, activate_key_template=args.activate_key_template, init=args.mark_init, ) elif stat.S_ISDIR(mode): (cluster, osd_id) = activate_dir( path=args.path, activate_key_template=args.activate_key_template, init=args.mark_init, ) else: raise Error('%s is not a directory or block device', args.path) start_daemon( cluster=cluster, osd_id=osd_id, ) ########################### def is_swap(dev): with file('/proc/swaps', 'rb') as proc_swaps: for line in proc_swaps.readlines()[1:]: fields = line.split() if len(fields) < 3: continue d = fields[0] if d.startswith('/') and os.path.exists(d): d = os.path.realpath(d) if d == dev: return True return False def get_oneliner(base, name): path = os.path.join(base, name) if os.path.isfile(path): with open(path, 'r') as f: return f.readline().rstrip() return None def get_dev_fs(dev): fscheck = subprocess.Popen( [ 'blkid', '-s', 'TYPE', dev ], stdout = subprocess.PIPE, stderr=subprocess.PIPE).stdout.read() if 'TYPE' in fscheck: fs = fscheck.split()[1].split('"')[1] return fs else: return None def get_partition_type(part): (base, partnum) = re.match('(\D+)(\d+)', part).group(1,2) sgdisk = subprocess.Popen( [ 'sgdisk', '-p', base, ], stdout = subprocess.PIPE, stderr = subprocess.PIPE).stdout.read() for line in sgdisk.splitlines(): m = re.search('\s+(\d+)\s+\d+\s+\d+\s+\S+ \S+B\s+\S+\s+(.*)', line) if m is not None: num = m.group(1) if num != partnum: continue return m.group(2) return None def get_partition_uuid(dev): (base, partnum) = re.match('(\D+)(\d+)', dev).group(1,2) out = subprocess.Popen( [ 'sgdisk', '-i', partnum, base ], stdout = subprocess.PIPE, stderr = subprocess.PIPE).stdout.read() for line in out.splitlines(): m = re.match('Partition unique GUID: (\S+)', line) if m: return m.group(1).lower() return None def is_mounted(dev): with file('/proc/mounts', 'rb') as proc_mounts: for line in proc_mounts: fields = line.split() if len(fields) < 3: continue d = fields[0] path = fields[1] if d.startswith('/') and os.path.exists(d): d = os.path.realpath(d) if d == dev: return path return None def more_osd_info(path, uuid_map): desc = [] ceph_fsid = get_oneliner(path, 'ceph_fsid') if ceph_fsid: cluster = find_cluster_by_uuid(ceph_fsid) if cluster: desc.append('cluster ' + cluster) else: desc.append('unknown cluster ' + ceph_fsid) who = get_oneliner(path, 'whoami') if who: desc.append('osd.%s' % who) journal_uuid = get_oneliner(path, 'journal_uuid') if journal_uuid: journal_uuid = journal_uuid.lower() if journal_uuid in uuid_map: desc.append('journal %s' % uuid_map[journal_uuid]) return desc def list_dev(dev, uuid_map, journal_map): ptype = 'unknown' prefix = '' if is_partition(dev): ptype = get_partition_type(dev) prefix = ' ' fs = get_dev_fs(dev) path = is_mounted(dev) desc = [] if ptype == 'ceph data': if path: desc.append('active') desc.extend(more_osd_info(path, uuid_map)) elif fs: try: tpath = mount(dev=dev, fstype=fs, options='') if tpath: try: magic = get_oneliner(tpath, 'magic') if magic is not None: desc.append('prepared') desc.extend(more_osd_info(tpath, uuid_map)) finally: unmount(tpath) except: pass if desc: desc = ['ceph data'] + desc else: desc = ['ceph data', 'unprepared'] elif ptype == 'ceph journal': desc.append('ceph journal') uuid = get_partition_uuid(dev) if uuid and uuid in journal_map: desc.append('for %s' % journal_map[uuid]) else: if is_swap(dev): desc.append('swap') else: desc.append('other') if fs: desc.append(fs) elif ptype: desc.append(ptype) if path: desc.append('mounted on %s' % path) print '%s%s %s' % (prefix, dev, ', '.join(desc)) def main_list(args): partmap = list_all_partitions() uuid_map = {} journal_map = {} for base, parts in sorted(partmap.iteritems()): for p in parts: dev = '/dev/' + p uuid = get_partition_uuid(dev) if uuid: uuid_map[uuid] = dev ptype = get_partition_type(dev) if ptype == 'ceph data': fs = get_dev_fs(dev) try: tpath = mount(dev=dev, fstype=fs, options='') try: journal_uuid = get_oneliner(tpath, 'journal_uuid') if journal_uuid: journal_map[journal_uuid.lower()] = dev finally: unmount(tpath) except: pass for base, parts in sorted(partmap.iteritems()): if parts: print '/dev/%s :' % base for p in sorted(parts): list_dev('/dev/' + p, uuid_map, journal_map) else: list_dev('/dev/' + base, uuid_map, journal_map) ########################### def parse_args(): parser = argparse.ArgumentParser( 'ceph-disk', ) parser.add_argument( '-v', '--verbose', action='store_true', default=None, help='be more verbose', ) parser.set_defaults( # we want to hold on to this, for later prog=parser.prog, cluster='ceph', ) subparsers = parser.add_subparsers( title='subcommands', description='valid subcommands', help='sub-command help', ) prepare_parser = subparsers.add_parser('prepare', help='Prepare a directory or disk for a Ceph OSD') prepare_parser.add_argument( '--cluster', metavar='NAME', help='cluster name to assign this disk to', ) prepare_parser.add_argument( '--cluster-uuid', metavar='UUID', help='cluster uuid to assign this disk to', ) prepare_parser.add_argument( '--osd-uuid', metavar='UUID', help='unique OSD uuid to assign this disk to', ) prepare_parser.add_argument( '--journal-uuid', metavar='UUID', help='unique uuid to assign to the journal', ) prepare_parser.add_argument( '--fs-type', help='file system type to use (e.g. "ext4")', ) prepare_parser.add_argument( '--zap-disk', action='store_true', default=None, help='destroy the partition table (and content) of a disk', ) prepare_parser.add_argument( '--data-dir', action='store_true', default=None, help='verify that DATA is a dir', ) prepare_parser.add_argument( '--data-dev', action='store_true', default=None, help='verify that DATA is a block device', ) prepare_parser.add_argument( '--journal-file', action='store_true', default=None, help='verify that JOURNAL is a file', ) prepare_parser.add_argument( '--journal-dev', action='store_true', default=None, help='verify that JOURNAL is a block device', ) prepare_parser.add_argument( '--dmcrypt', action='store_true', default=None, help='encrypt DATA and/or JOURNAL devices with dm-crypt', ) prepare_parser.add_argument( '--dmcrypt-key-dir', metavar='KEYDIR', default='/etc/ceph/dmcrypt-keys', help='directory where dm-crypt keys are stored', ) prepare_parser.add_argument( 'data', metavar='DATA', help='path to OSD data (a disk block device or directory)', ) prepare_parser.add_argument( 'journal', metavar='JOURNAL', nargs='?', help=('path to OSD journal disk block device;' + ' leave out to store journal in file'), ) prepare_parser.set_defaults( func=main_prepare, ) activate_parser = subparsers.add_parser('activate', help='Activate a Ceph OSD') activate_parser.add_argument( '--mount', action='store_true', default=None, help='mount a block device [deprecated, ignored]', ) activate_parser.add_argument( '--activate-key', metavar='PATH', help='bootstrap-osd keyring path template (%(default)s)', dest='activate_key_template', ) activate_parser.add_argument( '--mark-init', metavar='INITSYSTEM', help='init system to manage this dir', default='auto', choices=INIT_SYSTEMS, ) activate_parser.add_argument( 'path', metavar='PATH', nargs='?', help='path to block device or directory', ) activate_parser.set_defaults( activate_key_template='/var/lib/ceph/bootstrap-osd/{cluster}.keyring', func=main_activate, ) list_parser = subparsers.add_parser('list', help='List disks, partitions, and Ceph OSDs') list_parser.set_defaults( func=main_list, ) args = parser.parse_args() return args def main(): args = parse_args() loglevel = logging.INFO if args.verbose: loglevel = logging.DEBUG logging.basicConfig( level=loglevel, ) try: args.func(args) except Error as e: print >> sys.stderr, '{prog}: {msg}'.format( prog=args.prog, msg=e, ) sys.exit(1) if __name__ == '__main__': main()