ceph/qa/workunits/ceph-disk/ceph-disk-test.py

778 lines
30 KiB
Python
Raw Normal View History

#
# Copyright (C) 2015, 2016 Red Hat <contact@redhat.com>
#
# Author: Loic Dachary <loic@dachary.org>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Library Public License as published by
# the Free Software Foundation; either version 2, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Library Public License for more details.
#
# When debugging these tests (must be root), here are a few useful commands:
#
# export PATH=.:..:$PATH
# ceph-disk.sh # run once to prepare the environment as it would be by teuthology
# ln -sf /home/ubuntu/ceph/src/ceph-disk/ceph_disk/main.py $(which ceph-disk)
# ln -sf /home/ubuntu/ceph/udev/95-ceph-osd.rules /lib/udev/rules.d/95-ceph-osd.rules
# ln -sf /home/ubuntu/ceph/systemd/ceph-disk@.service /usr/lib/systemd/system/ceph-disk@.service
# ceph-disk.conf will be silently ignored if it is a symbolic link or a hard link /var/log/upstart for logs
# cp /home/ubuntu/ceph/src/upstart/ceph-disk.conf /etc/init/ceph-disk.conf
# id=3 ; ceph-disk deactivate --deactivate-by-id $id ; ceph-disk destroy --zap --destroy-by-id $id
# py.test -s -v -k test_activate_dmcrypt_luks ceph-disk-test.py
#
# CentOS 7
# udevadm monitor --property & tail -f /var/log/messages
# udev rules messages are logged in /var/log/messages
# systemctl stop ceph-osd@2
# systemctl start ceph-osd@2
#
# udevadm monitor --property & tail -f /var/log/syslog /var/log/upstart/* # on Ubuntu 14.04
# udevadm test --action=add /block/vdb/vdb1 # verify the udev rule is run as expected
# udevadm control --reload # when changing the udev rules
# sudo /usr/sbin/ceph-disk -v trigger /dev/vdb1 # activates if vdb1 is data
#
# integration tests coverage
# pip install coverage
# perl -pi -e 's|"ceph-disk |"coverage run --source=/usr/sbin/ceph-disk --append /usr/sbin/ceph-disk |' ceph-disk-test.py
# rm -f .coverage ; py.test -s -v ceph-disk-test.py
# coverage report --show-missing
#
import argparse
import json
import logging
import configobj
import os
import pytest
import re
import subprocess
import sys
import tempfile
import time
import uuid
LOG = logging.getLogger('CephDisk')
class CephDisk:
def __init__(self):
self.conf = configobj.ConfigObj('/etc/ceph/ceph.conf')
def save_conf(self):
self.conf.write(open('/etc/ceph/ceph.conf', 'wb'))
@staticmethod
def helper(command):
command = "ceph-helpers-root.sh " + command
return CephDisk.sh(command)
@staticmethod
def sh(command):
LOG.debug(":sh: " + command)
proc = subprocess.Popen(
args=command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=True,
bufsize=1)
lines = []
with proc.stdout:
for line in iter(proc.stdout.readline, b''):
line = line.decode('utf-8')
if 'dangerous and experimental' in line:
LOG.debug('SKIP dangerous and experimental')
continue
lines.append(line)
LOG.debug(line.strip().encode('ascii', 'ignore'))
if proc.wait() != 0:
raise subprocess.CalledProcessError(
returncode=proc.returncode,
cmd=command
)
return "".join(lines)
def unused_disks(self, pattern='[vs]d.'):
names = [x for x in os.listdir("/sys/block") if re.match(pattern, x)]
if not names:
return []
disks = json.loads(
self.sh("ceph-disk list --format json " + " ".join(names)))
unused = []
for disk in disks:
if 'partitions' not in disk:
unused.append(disk['path'])
return unused
def ensure_sd(self):
LOG.debug(self.unused_disks('sd.'))
if self.unused_disks('sd.'):
return
modprobe = "modprobe scsi_debug vpd_use_hostno=0 add_host=1 dev_size_mb=200 ; udevadm settle"
try:
self.sh(modprobe)
except:
self.helper("install linux-image-extra-3.13.0-61-generic")
self.sh(modprobe)
def unload_scsi_debug(self):
self.sh("rmmod scsi_debug || true")
def get_lockbox(self):
disks = json.loads(self.sh("ceph-disk list --format json"))
for disk in disks:
if 'partitions' in disk:
for partition in disk['partitions']:
if partition.get('type') == 'lockbox':
return partition
raise Exception("no lockbox found " + str(disks))
def get_osd_partition(self, uuid):
disks = json.loads(self.sh("ceph-disk list --format json"))
for disk in disks:
if 'partitions' in disk:
for partition in disk['partitions']:
if partition.get('uuid') == uuid:
return partition
raise Exception("uuid = " + uuid + " not found in " + str(disks))
def get_journal_partition(self, uuid):
return self.get_space_partition('journal', uuid)
def get_block_partition(self, uuid):
return self.get_space_partition('block', uuid)
def get_blockdb_partition(self, uuid):
return self.get_space_partition('block.db', uuid)
def get_blockwal_partition(self, uuid):
return self.get_space_partition('block.wal', uuid)
def get_space_partition(self, name, uuid):
data_partition = self.get_osd_partition(uuid)
space_dev = data_partition[name + '_dev']
disks = json.loads(self.sh("ceph-disk list --format json"))
for disk in disks:
if 'partitions' in disk:
for partition in disk['partitions']:
if partition['path'] == space_dev:
if name + '_for' in partition:
assert partition[
name + '_for'] == data_partition['path']
return partition
raise Exception(
name + " for uuid = " + uuid + " not found in " + str(disks))
def destroy_osd(self, uuid):
id = self.sh("ceph osd create " + uuid).strip()
self.sh("""
set -xe
ceph-disk --verbose deactivate --deactivate-by-id {id}
ceph-disk --verbose destroy --destroy-by-id {id} --zap
""".format(id=id))
def deactivate_osd(self, uuid):
id = self.sh("ceph osd create " + uuid).strip()
self.sh("""
set -xe
ceph-disk --verbose deactivate --once --deactivate-by-id {id}
""".format(id=id))
@staticmethod
def osd_up_predicate(osds, uuid):
for osd in osds:
if osd['uuid'] == uuid and 'up' in osd['state']:
return True
return False
@staticmethod
def wait_for_osd_up(uuid):
CephDisk.wait_for_osd(uuid, CephDisk.osd_up_predicate, 'up')
@staticmethod
def osd_down_predicate(osds, uuid):
found = False
for osd in osds:
if osd['uuid'] == uuid:
found = True
if 'down' in osd['state'] or ['exists'] == osd['state']:
return True
return not found
@staticmethod
def wait_for_osd_down(uuid):
CephDisk.wait_for_osd(uuid, CephDisk.osd_down_predicate, 'down')
@staticmethod
def wait_for_osd(uuid, predicate, info):
LOG.info("wait_for_osd " + info + " " + uuid)
for delay in (1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024):
dump = json.loads(CephDisk.sh("ceph osd dump -f json"))
if predicate(dump['osds'], uuid):
return True
time.sleep(delay)
raise Exception('timeout waiting for osd ' + uuid + ' to be ' + info)
def check_osd_status(self, uuid, space_name=None):
data_partition = self.get_osd_partition(uuid)
assert data_partition['type'] == 'data'
assert data_partition['state'] == 'active'
if space_name is not None:
space_partition = self.get_space_partition(space_name, uuid)
assert space_partition
class TestCephDisk(object):
def setup_class(self):
logging.basicConfig(level=logging.DEBUG)
c = CephDisk()
if c.sh("lsb_release -si").strip() == 'CentOS':
c.helper("install multipath-tools device-mapper-multipath")
c.conf['global']['pid file'] = '/var/run/ceph/$cluster-$name.pid'
#
# Avoid json parsing interference
#
c.conf['global']['debug monc'] = 0
#
# objecstore
#
c.conf['global']['osd journal size'] = 100
#
# bluestore
#
c.conf['global']['enable experimental unrecoverable data corrupting features'] = '*'
c.conf['global']['bluestore fsck on mount'] = 'true'
c.save_conf()
def setup(self):
c = CephDisk()
for key in ('osd objectstore', 'osd dmcrypt type'):
if key in c.conf['global']:
del c.conf['global'][key]
c.save_conf()
def test_deactivate_reactivate_osd(self):
c = CephDisk()
disk = c.unused_disks()[0]
osd_uuid = str(uuid.uuid1())
c.sh("ceph-disk --verbose zap " + disk)
c.sh("ceph-disk --verbose prepare --osd-uuid " + osd_uuid +
" " + disk)
c.wait_for_osd_up(osd_uuid)
device = json.loads(c.sh("ceph-disk list --format json " + disk))[0]
assert len(device['partitions']) == 2
c.check_osd_status(osd_uuid, 'journal')
data_partition = c.get_osd_partition(osd_uuid)
c.sh("ceph-disk --verbose deactivate " + data_partition['path'])
c.wait_for_osd_down(osd_uuid)
c.sh("ceph-disk --verbose activate " + data_partition['path'] + " --reactivate")
# check again
c.wait_for_osd_up(osd_uuid)
device = json.loads(c.sh("ceph-disk list --format json " + disk))[0]
assert len(device['partitions']) == 2
c.check_osd_status(osd_uuid, 'journal')
c.helper("pool_read_write")
c.destroy_osd(osd_uuid)
def test_destroy_osd_by_id(self):
c = CephDisk()
disk = c.unused_disks()[0]
osd_uuid = str(uuid.uuid1())
c.sh("ceph-disk --verbose prepare --osd-uuid " + osd_uuid + " " + disk)
c.wait_for_osd_up(osd_uuid)
c.check_osd_status(osd_uuid)
c.destroy_osd(osd_uuid)
def test_destroy_osd_by_dev_path(self):
c = CephDisk()
disk = c.unused_disks()[0]
osd_uuid = str(uuid.uuid1())
c.sh("ceph-disk --verbose prepare --osd-uuid " + osd_uuid + " " + disk)
c.wait_for_osd_up(osd_uuid)
partition = c.get_osd_partition(osd_uuid)
assert partition['type'] == 'data'
assert partition['state'] == 'active'
c.sh("ceph-disk --verbose deactivate " + partition['path'])
c.wait_for_osd_down(osd_uuid)
c.sh("ceph-disk --verbose destroy " + partition['path'] + " --zap")
def test_deactivate_reactivate_dmcrypt_plain(self):
c = CephDisk()
c.conf['global']['osd dmcrypt type'] = 'plain'
c.save_conf()
osd_uuid = self.activate_dmcrypt('ceph-disk-no-lockbox')
data_partition = c.get_osd_partition(osd_uuid)
c.sh("ceph-disk --verbose deactivate " + data_partition['path'])
c.wait_for_osd_down(osd_uuid)
c.sh("ceph-disk --verbose activate-journal " + data_partition['journal_dev'] +
" --reactivate" + " --dmcrypt")
c.wait_for_osd_up(osd_uuid)
c.check_osd_status(osd_uuid, 'journal')
c.destroy_osd(osd_uuid)
c.save_conf()
def test_deactivate_reactivate_dmcrypt_luks(self):
c = CephDisk()
osd_uuid = self.activate_dmcrypt('ceph-disk')
data_partition = c.get_osd_partition(osd_uuid)
lockbox_partition = c.get_lockbox()
c.sh("ceph-disk --verbose deactivate " + data_partition['path'])
c.wait_for_osd_down(osd_uuid)
c.sh("ceph-disk --verbose trigger --sync " + lockbox_partition['path'])
c.sh("ceph-disk --verbose activate-journal " + data_partition['journal_dev'] +
" --reactivate" + " --dmcrypt")
c.wait_for_osd_up(osd_uuid)
c.check_osd_status(osd_uuid, 'journal')
c.destroy_osd(osd_uuid)
def test_activate_dmcrypt_plain_no_lockbox(self):
c = CephDisk()
c.conf['global']['osd dmcrypt type'] = 'plain'
c.save_conf()
osd_uuid = self.activate_dmcrypt('ceph-disk-no-lockbox')
c.destroy_osd(osd_uuid)
c.save_conf()
def test_activate_dmcrypt_luks_no_lockbox(self):
c = CephDisk()
osd_uuid = self.activate_dmcrypt('ceph-disk-no-lockbox')
c.destroy_osd(osd_uuid)
def test_activate_dmcrypt_luks_with_lockbox(self):
c = CephDisk()
osd_uuid = self.activate_dmcrypt('ceph-disk')
c.destroy_osd(osd_uuid)
def test_activate_lockbox(self):
c = CephDisk()
osd_uuid = self.activate_dmcrypt('ceph-disk')
lockbox = c.get_lockbox()
assert lockbox['state'] == 'active'
c.sh("umount " + lockbox['path'])
lockbox = c.get_lockbox()
assert lockbox['state'] == 'prepared'
c.sh("ceph-disk --verbose trigger " + lockbox['path'])
success = False
for delay in (1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024):
lockbox = c.get_lockbox()
if lockbox['state'] == 'active':
success = True
break
time.sleep(delay)
if not success:
raise Exception('timeout waiting for lockbox ' + lockbox['path'])
c.destroy_osd(osd_uuid)
def activate_dmcrypt(self, ceph_disk):
c = CephDisk()
disk = c.unused_disks()[0]
osd_uuid = str(uuid.uuid1())
journal_uuid = str(uuid.uuid1())
c.sh("ceph-disk --verbose zap " + disk)
c.sh(ceph_disk + " --verbose prepare " +
" --osd-uuid " + osd_uuid +
" --journal-uuid " + journal_uuid +
" --dmcrypt " +
" " + disk)
c.wait_for_osd_up(osd_uuid)
c.check_osd_status(osd_uuid, 'journal')
return osd_uuid
def test_trigger_dmcrypt_journal_lockbox(self):
c = CephDisk()
osd_uuid = self.activate_dmcrypt('ceph-disk')
data_partition = c.get_osd_partition(osd_uuid)
lockbox_partition = c.get_lockbox()
c.deactivate_osd(osd_uuid)
c.wait_for_osd_down(osd_uuid)
with pytest.raises(subprocess.CalledProcessError):
# fails because the lockbox is not mounted yet
c.sh("ceph-disk --verbose trigger --sync " + data_partition['journal_dev'])
c.sh("ceph-disk --verbose trigger --sync " + lockbox_partition['path'])
c.wait_for_osd_up(osd_uuid)
c.destroy_osd(osd_uuid)
def test_trigger_dmcrypt_data_lockbox(self):
c = CephDisk()
osd_uuid = self.activate_dmcrypt('ceph-disk')
data_partition = c.get_osd_partition(osd_uuid)
lockbox_partition = c.get_lockbox()
c.deactivate_osd(osd_uuid)
c.wait_for_osd_down(osd_uuid)
with pytest.raises(subprocess.CalledProcessError):
# fails because the lockbox is not mounted yet
c.sh("ceph-disk --verbose trigger --sync " + data_partition['path'])
c.sh("ceph-disk --verbose trigger --sync " + lockbox_partition['path'])
c.wait_for_osd_up(osd_uuid)
c.destroy_osd(osd_uuid)
def test_trigger_dmcrypt_lockbox(self):
c = CephDisk()
osd_uuid = self.activate_dmcrypt('ceph-disk')
data_partition = c.get_osd_partition(osd_uuid)
lockbox_partition = c.get_lockbox()
c.deactivate_osd(osd_uuid)
c.wait_for_osd_down(osd_uuid)
c.sh("ceph-disk --verbose trigger --sync " + lockbox_partition['path'])
c.wait_for_osd_up(osd_uuid)
c.destroy_osd(osd_uuid)
def test_activate_no_journal(self):
c = CephDisk()
disk = c.unused_disks()[0]
osd_uuid = str(uuid.uuid1())
c.sh("ceph-disk --verbose zap " + disk)
c.conf['global']['osd objectstore'] = 'memstore'
c.save_conf()
c.sh("ceph-disk --verbose prepare --osd-uuid " + osd_uuid +
" " + disk)
c.wait_for_osd_up(osd_uuid)
device = json.loads(c.sh("ceph-disk list --format json " + disk))[0]
assert len(device['partitions']) == 1
partition = device['partitions'][0]
assert partition['type'] == 'data'
assert partition['state'] == 'active'
assert 'journal_dev' not in partition
c.helper("pool_read_write")
c.destroy_osd(osd_uuid)
c.save_conf()
def test_activate_with_journal_dev_no_symlink(self):
c = CephDisk()
disk = c.unused_disks()[0]
osd_uuid = str(uuid.uuid1())
c.sh("ceph-disk --verbose zap " + disk)
c.sh("ceph-disk --verbose prepare --osd-uuid " + osd_uuid +
" " + disk)
c.wait_for_osd_up(osd_uuid)
device = json.loads(c.sh("ceph-disk list --format json " + disk))[0]
assert len(device['partitions']) == 2
c.check_osd_status(osd_uuid, 'journal')
c.helper("pool_read_write")
c.destroy_osd(osd_uuid)
def test_activate_bluestore(self):
c = CephDisk()
disk = c.unused_disks()[0]
osd_uuid = str(uuid.uuid1())
c.sh("ceph-disk --verbose zap " + disk)
c.conf['global']['osd objectstore'] = 'bluestore'
c.save_conf()
c.sh("ceph-disk --verbose prepare --bluestore --osd-uuid " + osd_uuid +
" " + disk)
c.wait_for_osd_up(osd_uuid)
device = json.loads(c.sh("ceph-disk list --format json " + disk))[0]
assert len(device['partitions']) == 2
c.check_osd_status(osd_uuid, 'block')
c.helper("pool_read_write")
c.destroy_osd(osd_uuid)
c.sh("ceph-disk --verbose zap " + disk)
def test_activate_bluestore_seperated_block_db_wal(self):
c = CephDisk()
disk1 = c.unused_disks()[0]
disk2 = c.unused_disks()[1]
osd_uuid = str(uuid.uuid1())
c.sh("ceph-disk --verbose zap " + disk1 + " " + disk2)
c.conf['global']['osd objectstore'] = 'bluestore'
c.save_conf()
c.sh("ceph-disk --verbose prepare --bluestore --osd-uuid " + osd_uuid +
" " + disk1 + " --block.db " + disk2 + " --block.wal " + disk2)
c.wait_for_osd_up(osd_uuid)
device = json.loads(c.sh("ceph-disk list --format json " + disk1))[0]
assert len(device['partitions']) == 2
device = json.loads(c.sh("ceph-disk list --format json " + disk2))[0]
assert len(device['partitions']) == 2
c.check_osd_status(osd_uuid, 'block')
c.check_osd_status(osd_uuid, 'block.wal')
c.check_osd_status(osd_uuid, 'block.db')
c.helper("pool_read_write")
c.destroy_osd(osd_uuid)
c.sh("ceph-disk --verbose zap " + disk1 + " " + disk2)
def test_activate_bluestore_reuse_db_wal_partition(self):
c = CephDisk()
disks = c.unused_disks()
block_disk = disks[0]
db_wal_disk = disks[1]
#
# Create an OSD with two disks (one for block,
# the other for block.db and block.wal ) and then destroy osd.
#
osd_uuid1 = str(uuid.uuid1())
c.sh("ceph-disk --verbose zap " + block_disk + " " + db_wal_disk)
c.conf['global']['osd objectstore'] = 'bluestore'
c.save_conf()
c.sh("ceph-disk --verbose prepare --bluestore --osd-uuid " +
osd_uuid1 + " " + block_disk + " --block.db " + db_wal_disk +
" --block.wal " + db_wal_disk)
c.wait_for_osd_up(osd_uuid1)
blockdb_partition = c.get_blockdb_partition(osd_uuid1)
blockdb_path = blockdb_partition['path']
blockwal_partition = c.get_blockwal_partition(osd_uuid1)
blockwal_path = blockwal_partition['path']
c.destroy_osd(osd_uuid1)
c.sh("ceph-disk --verbose zap " + block_disk)
#
# Create another OSD with the block.db and block.wal partition
# of the previous OSD
#
osd_uuid2 = str(uuid.uuid1())
c.sh("ceph-disk --verbose prepare --bluestore --osd-uuid " +
osd_uuid2 + " " + block_disk + " --block.db " + blockdb_path +
" --block.wal " + blockwal_path)
c.wait_for_osd_up(osd_uuid2)
device = json.loads(c.sh("ceph-disk list --format json " + block_disk))[0]
assert len(device['partitions']) == 2
device = json.loads(c.sh("ceph-disk list --format json " + db_wal_disk))[0]
assert len(device['partitions']) == 2
c.check_osd_status(osd_uuid2, 'block')
c.check_osd_status(osd_uuid2, 'block.wal')
c.check_osd_status(osd_uuid2, 'block.db')
blockdb_partition = c.get_blockdb_partition(osd_uuid2)
blockwal_partition = c.get_blockwal_partition(osd_uuid2)
#
# Verify the previous OSD partition has been reused
#
assert blockdb_partition['path'] == blockdb_path
assert blockwal_partition['path'] == blockwal_path
c.destroy_osd(osd_uuid2)
c.sh("ceph-disk --verbose zap " + block_disk + " " + db_wal_disk)
def test_activate_with_journal_dev_is_symlink(self):
c = CephDisk()
disk = c.unused_disks()[0]
osd_uuid = str(uuid.uuid1())
tempdir = tempfile.mkdtemp()
symlink = os.path.join(tempdir, 'osd')
os.symlink(disk, symlink)
c.sh("ceph-disk --verbose zap " + symlink)
c.sh("ceph-disk --verbose prepare --osd-uuid " + osd_uuid +
" " + symlink)
c.wait_for_osd_up(osd_uuid)
device = json.loads(c.sh("ceph-disk list --format json " + symlink))[0]
assert len(device['partitions']) == 2
data_partition = c.get_osd_partition(osd_uuid)
assert data_partition['type'] == 'data'
assert data_partition['state'] == 'active'
journal_partition = c.get_journal_partition(osd_uuid)
assert journal_partition
c.helper("pool_read_write")
c.destroy_osd(osd_uuid)
c.sh("ceph-disk --verbose zap " + symlink)
os.unlink(symlink)
os.rmdir(tempdir)
def test_activate_journal_file(self):
c = CephDisk()
disks = c.unused_disks()
data_disk = disks[0]
#
# /var/lib/ceph/osd is required otherwise it may violate
# restrictions enforced by systemd regarding the directories
# which ceph-osd is allowed to read/write
#
tempdir = tempfile.mkdtemp(dir='/var/lib/ceph/osd')
c.sh("chown ceph:ceph " + tempdir + " || true")
journal_file = os.path.join(tempdir, 'journal')
osd_uuid = str(uuid.uuid1())
c.sh("ceph-disk --verbose prepare --osd-uuid " + osd_uuid +
" " + data_disk + " " + journal_file)
c.wait_for_osd_up(osd_uuid)
device = json.loads(
c.sh("ceph-disk list --format json " + data_disk))[0]
assert len(device['partitions']) == 1
partition = device['partitions'][0]
assert journal_file == os.readlink(
os.path.join(partition['mount'], 'journal'))
c.check_osd_status(osd_uuid)
c.helper("pool_read_write 1") # 1 == pool size
c.destroy_osd(osd_uuid)
c.sh("ceph-disk --verbose zap " + data_disk)
os.unlink(journal_file)
os.rmdir(tempdir)
def test_activate_separated_journal(self):
c = CephDisk()
disks = c.unused_disks()
data_disk = disks[0]
journal_disk = disks[1]
osd_uuid = self.activate_separated_journal(data_disk, journal_disk)
c.helper("pool_read_write 1") # 1 == pool size
c.destroy_osd(osd_uuid)
c.sh("ceph-disk --verbose zap " + data_disk + " " + journal_disk)
def test_activate_separated_journal_dev_is_symlink(self):
c = CephDisk()
disks = c.unused_disks()
data_disk = disks[0]
journal_disk = disks[1]
tempdir = tempfile.mkdtemp()
data_symlink = os.path.join(tempdir, 'osd')
os.symlink(data_disk, data_symlink)
journal_symlink = os.path.join(tempdir, 'journal')
os.symlink(journal_disk, journal_symlink)
osd_uuid = self.activate_separated_journal(
data_symlink, journal_symlink)
c.helper("pool_read_write 1") # 1 == pool size
c.destroy_osd(osd_uuid)
c.sh("ceph-disk --verbose zap " + data_symlink + " " + journal_symlink)
os.unlink(data_symlink)
os.unlink(journal_symlink)
os.rmdir(tempdir)
def activate_separated_journal(self, data_disk, journal_disk):
c = CephDisk()
osd_uuid = str(uuid.uuid1())
c.sh("ceph-disk --verbose prepare --osd-uuid " + osd_uuid +
" " + data_disk + " " + journal_disk)
c.wait_for_osd_up(osd_uuid)
device = json.loads(
c.sh("ceph-disk list --format json " + data_disk))[0]
assert len(device['partitions']) == 1
c.check_osd_status(osd_uuid, 'journal')
return osd_uuid
#
# Create an OSD and get a journal partition from a disk that
# already contains a journal partition which is in use. Updates of
# the kernel partition table may behave differently when a
# partition is in use. See http://tracker.ceph.com/issues/7334 for
# more information.
#
def test_activate_two_separated_journal(self):
c = CephDisk()
disks = c.unused_disks()
data_disk = disks[0]
other_data_disk = disks[1]
journal_disk = disks[2]
osd_uuid = self.activate_separated_journal(data_disk, journal_disk)
other_osd_uuid = self.activate_separated_journal(
other_data_disk, journal_disk)
#
# read/write can only succeed if the two osds are up because
# the pool needs two OSD
#
c.helper("pool_read_write 2") # 2 == pool size
c.destroy_osd(osd_uuid)
c.destroy_osd(other_osd_uuid)
c.sh("ceph-disk --verbose zap " + data_disk + " " +
journal_disk + " " + other_data_disk)
#
# Create an OSD and reuse an existing journal partition
#
def test_activate_reuse_journal(self):
c = CephDisk()
disks = c.unused_disks()
data_disk = disks[0]
journal_disk = disks[1]
#
# Create an OSD with a separated journal and destroy it.
#
osd_uuid = self.activate_separated_journal(data_disk, journal_disk)
journal_partition = c.get_journal_partition(osd_uuid)
journal_path = journal_partition['path']
c.destroy_osd(osd_uuid)
c.sh("ceph-disk --verbose zap " + data_disk)
osd_uuid = str(uuid.uuid1())
#
# Create another OSD with the journal partition of the previous OSD
#
c.sh("ceph-disk --verbose prepare --osd-uuid " + osd_uuid +
" " + data_disk + " " + journal_path)
c.helper("pool_read_write 1") # 1 == pool size
c.wait_for_osd_up(osd_uuid)
device = json.loads(
c.sh("ceph-disk list --format json " + data_disk))[0]
assert len(device['partitions']) == 1
c.check_osd_status(osd_uuid)
journal_partition = c.get_journal_partition(osd_uuid)
#
# Verify the previous OSD partition has been reused
#
assert journal_partition['path'] == journal_path
c.destroy_osd(osd_uuid)
c.sh("ceph-disk --verbose zap " + data_disk + " " + journal_disk)
def test_activate_multipath(self):
c = CephDisk()
if c.sh("lsb_release -si").strip() != 'CentOS':
pytest.skip(
"see issue https://bugs.launchpad.net/ubuntu/+source/multipath-tools/+bug/1488688")
c.ensure_sd()
#
# Figure out the name of the multipath device
#
disk = c.unused_disks('sd.')[0]
c.sh("mpathconf --enable || true")
c.sh("multipath " + disk)
holders = os.listdir(
"/sys/block/" + os.path.basename(disk) + "/holders")
assert 1 == len(holders)
name = open("/sys/block/" + holders[0] + "/dm/name").read()
multipath = "/dev/mapper/" + name
#
# Prepare the multipath device
#
osd_uuid = str(uuid.uuid1())
c.sh("ceph-disk --verbose zap " + multipath)
c.sh("ceph-disk --verbose prepare --osd-uuid " + osd_uuid +
" " + multipath)
c.wait_for_osd_up(osd_uuid)
device = json.loads(
c.sh("ceph-disk list --format json " + multipath))[0]
assert len(device['partitions']) == 2
data_partition = c.get_osd_partition(osd_uuid)
assert data_partition['type'] == 'data'
assert data_partition['state'] == 'active'
journal_partition = c.get_journal_partition(osd_uuid)
assert journal_partition
c.helper("pool_read_write")
c.destroy_osd(osd_uuid)
c.sh("udevadm settle")
c.sh("multipath -F")
c.unload_scsi_debug()
class CephDiskTest(CephDisk):
def main(self, argv):
parser = argparse.ArgumentParser(
'ceph-disk-test',
)
parser.add_argument(
'-v', '--verbose',
action='store_true', default=None,
help='be more verbose',
)
parser.add_argument(
'--destroy-osd',
help='stop, umount and destroy',
)
args = parser.parse_args(argv)
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
if args.destroy_osd:
dump = json.loads(CephDisk.sh("ceph osd dump -f json"))
osd_uuid = None
for osd in dump['osds']:
if str(osd['osd']) == args.destroy_osd:
osd_uuid = osd['uuid']
if osd_uuid:
self.destroy_osd(osd_uuid)
else:
raise Exception("cannot find OSD " + args.destroy_osd +
" ceph osd dump -f json")
return
if __name__ == '__main__':
sys.exit(CephDiskTest().main(sys.argv[1:]))