Merge pull request #144 from dalgaaf/wip-da-ceph-disk

Fix some issues in ceph-dsk

Reviewed-by: Sage Weil <sage@inktank.com>
This commit is contained in:
Sage Weil 2013-03-26 12:06:41 -07:00
commit b8160018bd

View File

@ -57,10 +57,10 @@ INIT_SYSTEMS = [
]
log_name = __name__
if log_name == '__main__':
log_name = os.path.basename(sys.argv[0])
log = logging.getLogger(log_name)
LOG_NAME = __name__
if LOG_NAME == '__main__':
LOG_NAME = os.path.basename(sys.argv[0])
LOG = logging.getLogger(LOG_NAME)
###### exceptions ########
@ -94,7 +94,6 @@ class TruncatedLineError(Error):
Line is truncated
"""
class TooManyLinesError(Error):
"""
Too many lines
@ -110,9 +109,13 @@ class FilesystemTypeError(Error):
def maybe_mkdir(*a, **kw):
"""
Creates a new directory if it doesn't exist, removes
existing symlink before creating the directory.
"""
# remove any symlink, if it is there..
if os.path.exists(*a) and stat.S_ISLNK(os.lstat(*a).st_mode):
log.debug('Removing old symlink at %s', *a)
LOG.debug('Removing old symlink at %s', *a)
os.unlink(*a)
try:
os.mkdir(*a, **kw)
@ -127,26 +130,26 @@ def list_all_partitions():
"""
Return a list of devices and partitions
"""
ls = {}
with file('/proc/partitions', 'rb') as f:
for line in f.read().split('\n')[2:]:
dev_part_list = {}
with file('/proc/partitions', 'rb') as proc_partitions:
for line in proc_partitions.read().split('\n')[2:]:
fields = re.split('\s+', line)
if len(fields) < 5:
continue
name = fields[4]
name = '/dev/' + name
if "dm-" in name:
if "/dev/dm" not in ls:
ls["/dev/dm"] = []
ls["/dev/dm"].append(name)
if "/dev/dm" not in dev_part_list:
dev_part_list["/dev/dm"] = []
dev_part_list["/dev/dm"].append(name)
if name[-1].isdigit():
base = name
while base[-1].isdigit():
base = base[:-1]
ls[base].append(name)
dev_part_list[base].append(name)
else:
ls[name] = []
return ls
dev_part_list[name] = []
return dev_part_list
def list_partitions(disk):
@ -157,16 +160,16 @@ def list_partitions(disk):
assert not is_partition(disk)
assert disk.startswith('/dev/')
base = disk[5:]
ls = []
with file('/proc/partitions', 'rb') as f:
for line in f.read().split('\n')[2:]:
partitions = []
with file('/proc/partitions', 'rb') as proc_partitions:
for line in proc_partitions.read().split('\n')[2:]:
fields = re.split('\s+', line)
if len(fields) < 5:
continue
name = fields [4]
name = fields [4]
if name != base and name.startswith(base):
ls.append('/dev/' + name)
return ls
partitions.append('/dev/' + name)
return partitions
def is_partition(dev):
@ -215,6 +218,12 @@ def is_held(dev):
def verify_not_in_use(dev):
"""
Verify if a given device (path) is in use (e.g. mounted or
in use by device-mapper).
:raises: Error if device is in use.
"""
assert os.path.exists(dev)
if is_partition(dev):
if is_mounted(dev):
@ -232,6 +241,12 @@ def verify_not_in_use(dev):
def must_be_one_line(line):
"""
Checks if given line is really one single line.
:raises: TruncatedLineError or TooManyLinesError
:return: Content of the line, or None if line isn't valid.
"""
if line[-1:] != '\n':
raise TruncatedLineError(line)
line = line[:-1]
@ -309,7 +324,14 @@ def allocate_osd_id(
fsid,
keyring,
):
log.debug('Allocating OSD id...')
"""
Accocates an OSD id on the given cluster.
:raises: Error if the call to allocate the OSD id fails.
:return: The allocated OSD id.
"""
LOG.debug('Allocating OSD id...')
try:
osd_id = _check_output(
args=[
@ -329,6 +351,9 @@ def allocate_osd_id(
def get_osd_id(path):
"""
Gets the OSD id of the OSD at the given path.
"""
osd_id = read_one_line(path, 'whoami')
if osd_id is not None:
check_osd_id(osd_id)
@ -351,6 +376,13 @@ def _check_output(*args, **kwargs):
def get_conf(cluster, variable):
"""
Get the value of the given configuration variable from the
cluster.
:raises: Error if call to ceph-conf fails.
:return: The variable value or None.
"""
try:
p = subprocess.Popen(
args=[
@ -412,6 +444,11 @@ def get_conf_with_default(cluster, variable):
def get_fsid(cluster):
"""
Get the fsid of the cluster.
:return: The fsid or raises Error.
"""
fsid = get_conf(cluster=cluster, variable='fsid')
if fsid is None:
raise Error('getting cluster uuid from configuration failed')
@ -422,6 +459,11 @@ def get_or_create_dmcrypt_key(
_uuid,
key_dir,
):
"""
Get path to dmcrypt key or create a new key file.
:return: Path to the dmcrypt key file.
"""
path = os.path.join(key_dir, _uuid)
# already have it?
@ -446,6 +488,11 @@ def dmcrypt_map(
keypath,
_uuid,
):
"""
Maps a device to a dmcrypt device.
:return: Path to the dmcrypt device.
"""
dev = '/dev/mapper/'+ _uuid
args = [
'cryptsetup',
@ -461,12 +508,15 @@ def dmcrypt_map(
return dev
except subprocess.CalledProcessError as e:
raise Error('unable to map device', rawdev)
raise Error('unable to map device', rawdev, e)
def dmcrypt_unmap(
_uuid
):
"""
Removes the dmcrypt device with the given UUID.
"""
args = [
'cryptsetup',
'remove',
@ -477,7 +527,7 @@ def dmcrypt_unmap(
subprocess.check_call(args)
except subprocess.CalledProcessError as e:
raise Error('unable to unmap device', _uuid)
raise Error('unable to unmap device', _uuid, e)
def mount(
@ -485,6 +535,10 @@ def mount(
fstype,
options,
):
"""
Mounts a device with given filessystem type and
mount options to a tempfile path under /var/lib/ceph/tmp.
"""
# pick best-of-breed mount options based on fs type
if options is None:
options = MOUNT_OPTIONS.get(fstype, '')
@ -495,7 +549,7 @@ def mount(
dir='/var/lib/ceph/tmp',
)
try:
log.debug('Mounting %s on %s with options %s', dev, path, options)
LOG.debug('Mounting %s on %s with options %s', dev, path, options)
subprocess.check_call(
args=[
'mount',
@ -518,8 +572,11 @@ def mount(
def unmount(
path,
):
"""
Unmount and removes the given mount point.
"""
try:
log.debug('Unmounting %s', path)
LOG.debug('Unmounting %s', path)
subprocess.check_call(
args=[
'/bin/umount',
@ -537,6 +594,12 @@ def unmount(
def get_free_partition_index(dev):
"""
Get the next free partition index on a given device.
:return: Index number (> 1 if there is already a partition on the device)
or 1 if there is no partition table.
"""
try:
lines = _check_output(
args=[
@ -548,7 +611,7 @@ def get_free_partition_index(dev):
],
)
except subprocess.CalledProcessError as e:
print 'cannot read partition index; assume it isn\'t present\n'
print 'cannot read partition index; assume it isn\'t present\n (Error: %s)' % e
return 1
if not lines:
@ -580,7 +643,7 @@ def zap(dev):
Destroy the partition table and content of a given disk.
"""
try:
log.debug('Zapping partition table on %s', dev)
LOG.debug('Zapping partition table on %s', dev)
# try to wipe out any GPT partition table backups. sgdisk
# isn't too thorough.
@ -613,8 +676,8 @@ def prepare_journal_dev(
):
if is_partition(journal):
log.debug('Journal %s is a partition', journal)
log.warning('OSD will not be hot-swappable if journal is not the same device as the osd data')
LOG.debug('Journal %s is a partition', journal)
LOG.warning('OSD will not be hot-swappable if journal is not the same device as the osd data')
return (journal, None, None)
ptype = JOURNAL_UUID
@ -645,10 +708,10 @@ def prepare_journal_dev(
num=num,
size=journal_size,
)
log.warning('OSD will not be hot-swappable if journal is not the same device as the osd data')
LOG.warning('OSD will not be hot-swappable if journal is not the same device as the osd data')
try:
log.debug('Creating journal partition num %d size %d on %s', num, journal_size, journal)
LOG.debug('Creating journal partition num %d size %d on %s', num, journal_size, journal)
subprocess.check_call(
args=[
'sgdisk',
@ -691,7 +754,7 @@ def prepare_journal_dev(
journal_dmcrypt = journal_symlink
journal_symlink = '/dev/mapper/{uuid}'.format(uuid=journal_uuid)
log.debug('Journal is GPT partition %s', journal_symlink)
LOG.debug('Journal is GPT partition %s', journal_symlink)
return (journal_symlink, journal_dmcrypt, journal_uuid)
except subprocess.CalledProcessError as e:
@ -703,14 +766,14 @@ def prepare_journal_file(
journal_size):
if not os.path.exists(journal):
log.debug('Creating journal file %s with size %dM', journal, journal_size)
LOG.debug('Creating journal file %s with size %dM', journal, journal_size)
with file(journal, 'wb') as f:
f.truncate(journal_size * 1048576)
# FIXME: should we resize an existing journal file?
log.debug('Journal is file %s', journal)
log.warning('OSD will not be hot-swappable if journal is not the same device as the osd data')
LOG.debug('Journal is file %s', journal)
LOG.warning('OSD will not be hot-swappable if journal is not the same device as the osd data')
return (journal, None, None)
@ -754,19 +817,19 @@ def adjust_symlink(target, path):
try:
mode = os.lstat(path).st_mode
if stat.S_ISREG(mode):
log.debug('Removing old file %s', path)
LOG.debug('Removing old file %s', path)
os.unlink(path)
elif stat.S_ISLNK(mode):
old = os.readlink(path)
if old != target:
log.debug('Removing old symlink %s -> %s', path, old)
LOG.debug('Removing old symlink %s -> %s', path, old)
os.unlink(path)
else:
create = False
except:
raise Error('unable to remove (or adjust) old file (symlink)', path)
if create:
log.debug('Creating symlink %s -> %s', path, target)
LOG.debug('Creating symlink %s -> %s', path, target)
try:
os.symlink(target, path)
except:
@ -780,7 +843,7 @@ def prepare_dir(
journal_uuid,
journal_dmcrypt = None,
):
log.debug('Preparing osd data dir %s', path)
LOG.debug('Preparing osd data dir %s', path)
if osd_uuid is None:
osd_uuid = str(uuid.uuid4())
@ -835,10 +898,10 @@ def prepare_dev(
rawdev = None
if is_partition(data):
log.debug('OSD data device %s is a partition', data)
LOG.debug('OSD data device %s is a partition', data)
rawdev = data
else:
log.debug('Creating osd partition on %s', data)
LOG.debug('Creating osd partition on %s', data)
try:
subprocess.check_call(
args=[
@ -896,7 +959,7 @@ def prepare_dev(
dev,
])
try:
log.debug('Creating %s fs on %s', fstype, dev)
LOG.debug('Creating %s fs on %s', fstype, dev)
subprocess.check_call(args=args)
except subprocess.CalledProcessError as e:
raise Error(e)
@ -1031,7 +1094,7 @@ def main_prepare(args):
# colocate journal with data?
if stat.S_ISBLK(dmode) and not is_partition(args.data) and args.journal is None and args.journal_file is None:
log.info('Will colocate journal with data on %s', args.data)
LOG.info('Will colocate journal with data on %s', args.data)
args.journal = args.data
if args.journal_uuid is None:
@ -1157,7 +1220,7 @@ def move_mount(
cluster,
osd_id,
):
log.debug('Moving mount to final location...')
LOG.debug('Moving mount to final location...')
parent = '/var/lib/ceph/osd'
osd_data = os.path.join(
parent,
@ -1179,7 +1242,7 @@ def start_daemon(
cluster,
osd_id,
):
log.debug('Starting %s osd.%s...', cluster, osd_id)
LOG.debug('Starting %s osd.%s...', cluster, osd_id)
path = '/var/lib/ceph/osd/{cluster}-{osd_id}'.format(
cluster=cluster, osd_id=osd_id)
@ -1300,7 +1363,7 @@ def mount_activate(
except OSError:
pass
if active:
log.info('%s osd.%s already mounted in position; unmounting ours.' % (cluster, osd_id))
LOG.info('%s osd.%s already mounted in position; unmounting ours.' % (cluster, osd_id))
unmount(path)
elif other:
raise Error('another %s osd.%s already mounted in position (old/different cluster instance?); unmounting ours.' % (cluster, osd_id))
@ -1313,7 +1376,7 @@ def mount_activate(
return (cluster, osd_id)
except:
log.error('Failed to activate')
LOG.error('Failed to activate')
unmount(path)
raise
finally:
@ -1342,7 +1405,7 @@ def activate_dir(
if os.path.lexists(canonical):
old = os.readlink(canonical)
if old != path:
log.debug('Removing old symlink %s -> %s', canonical, old)
LOG.debug('Removing old symlink %s -> %s', canonical, old)
try:
os.unlink(canonical)
except:
@ -1350,7 +1413,7 @@ def activate_dir(
else:
create = False
if create:
log.debug('Creating symlink %s -> %s', canonical, path)
LOG.debug('Creating symlink %s -> %s', canonical, path)
try:
os.symlink(path, canonical)
except:
@ -1371,14 +1434,14 @@ def find_cluster_by_uuid(_uuid):
if not conf_file.endswith('.conf'):
continue
cluster = conf_file[:-5]
u = get_conf(cluster, 'fsid')
if u is None:
fsid = get_conf(cluster, 'fsid')
if fsid is None:
no_fsid.append(cluster)
elif u == _uuid:
elif fsid == _uuid:
return cluster
# be tolerant of /etc/ceph/ceph.conf without an fsid defined.
if len(no_fsid) == 1 and no_fsid[0] == 'ceph':
log.warning('No fsid defined in /etc/ceph/ceph.conf; using anyway')
LOG.warning('No fsid defined in /etc/ceph/ceph.conf; using anyway')
return 'ceph'
return None
@ -1394,17 +1457,17 @@ def activate(
ceph_fsid = read_one_line(path, 'ceph_fsid')
if ceph_fsid is None:
raise Error('No cluster uuid assigned.')
log.debug('Cluster uuid is %s', ceph_fsid)
LOG.debug('Cluster uuid is %s', ceph_fsid)
cluster = find_cluster_by_uuid(ceph_fsid)
if cluster is None:
raise Error('No cluster conf found in /etc/ceph with fsid %s' % ceph_fsid)
log.debug('Cluster name is %s', cluster)
LOG.debug('Cluster name is %s', cluster)
fsid = read_one_line(path, 'fsid')
if fsid is None:
raise Error('No OSD uuid assigned.')
log.debug('OSD uuid is %s', fsid)
LOG.debug('OSD uuid is %s', fsid)
keyring = activate_key_template.format(cluster=cluster)
@ -1416,10 +1479,10 @@ def activate(
keyring=keyring,
)
write_one_line(path, 'whoami', osd_id)
log.debug('OSD id is %s', osd_id)
LOG.debug('OSD id is %s', osd_id)
if not os.path.exists(os.path.join(path, 'ready')):
log.debug('Initializing OSD...')
LOG.debug('Initializing OSD...')
# re-running mkfs is safe, so just run until it completes
mkfs(
path=path,
@ -1444,7 +1507,7 @@ def activate(
else:
init = 'sysvinit'
log.debug('Marking with init system %s', init)
LOG.debug('Marking with init system %s', init)
with file(os.path.join(path, init), 'w'):
pass
@ -1457,7 +1520,7 @@ def activate(
pass
if not os.path.exists(os.path.join(path, 'active')):
log.debug('Authorizing OSD key...')
LOG.debug('Authorizing OSD key...')
auth_key(
path=path,
cluster=cluster,
@ -1465,7 +1528,7 @@ def activate(
keyring=keyring,
)
write_one_line(path, 'active', 'ok')
log.debug('%s osd.%s data dir is ready at %s', cluster, osd_id, path)
LOG.debug('%s osd.%s data dir is ready at %s', cluster, osd_id, path)
return (osd_id, cluster)
except:
raise
@ -1510,7 +1573,7 @@ def list_dev(dev):
def main_list(args):
ls = list_all_partitions()
log.debug('partitions are %s' % ls)
LOG.debug('partitions are %s' % ls)
for base, parts in ls.iteritems():
if parts: