ceph/qa/workunits/rbd/set_ro.py

114 lines
3.5 KiB
Python
Executable File

#!/usr/bin/env python
import logging
import subprocess
import sys
logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger()
def run_command(args, except_on_error=True):
log.debug('running command "%s"', ' '.join(args))
proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = proc.communicate()
if out:
log.debug('stdout: %s', out)
if err:
log.debug('stderr: %s', err)
if proc.returncode:
log.debug('ret: %d', proc.returncode)
if except_on_error:
raise subprocess.CalledProcessError(proc.returncode, ' '.join(args))
return (proc.returncode, out, err)
def setup(image_name):
run_command(['rbd', 'create', '-s', '100', image_name])
run_command(['rbd', 'snap', 'create', image_name + '@snap'])
run_command(['rbd', 'map', image_name])
run_command(['rbd', 'map', image_name + '@snap'])
def teardown(image_name, fail_on_error=True):
run_command(['rbd', 'unmap', '/dev/rbd/rbd/' + image_name + '@snap'], fail_on_error)
run_command(['rbd', 'unmap', '/dev/rbd/rbd/' + image_name], fail_on_error)
run_command(['rbd', 'snap', 'rm', image_name + '@snap'], fail_on_error)
run_command(['rbd', 'rm', image_name], fail_on_error)
def write(target, expect_fail=False):
try:
with open(target, 'w', 0) as f:
f.write('test')
f.flush()
assert not expect_fail, 'writing should have failed'
except IOError:
assert expect_fail, 'writing should not have failed'
def test_ro(image_name):
dev = '/dev/rbd/rbd/' + image_name
snap_dev = dev + '@snap'
log.info('basic device is readable')
write(dev)
log.info('basic snapshot is read-only')
write(snap_dev, True)
log.info('cannot set snapshot rw')
ret, _, _ = run_command(['blockdev', '--setrw', snap_dev], False)
assert ret != 0, 'snapshot was set read-write!'
run_command(['udevadm', 'settle'])
write(snap_dev, True)
log.info('set device ro')
run_command(['blockdev', '--setro', dev])
run_command(['udevadm', 'settle'])
write(dev, True)
log.info('cannot set device rw when in-use')
with open(dev, 'r') as f:
ret, _, _ = run_command(['blockdev', '--setro', dev], False)
assert ret != 0, 'in-use device was set read-only!'
run_command(['udevadm', 'settle'])
write(dev, True)
run_command(['blockdev', '--setro', dev])
run_command(['udevadm', 'settle'])
write(dev, True)
run_command(['blockdev', '--setrw', dev])
run_command(['udevadm', 'settle'])
write(dev)
run_command(['udevadm', 'settle'])
run_command(['blockdev', '--setrw', dev])
run_command(['udevadm', 'settle'])
write(dev)
log.info('cannot set device ro when in-use')
with open(dev, 'r') as f:
ret, _, _ = run_command(['blockdev', '--setro', dev], False)
assert ret != 0, 'in-use device was set read-only!'
run_command(['udevadm', 'settle'])
run_command(['rbd', 'unmap', '/dev/rbd/rbd/' + image_name])
run_command(['rbd', 'map', '--read-only', image_name])
log.info('cannot write to newly mapped ro device')
write(dev, True)
log.info('can set ro mapped device rw')
run_command(['blockdev', '--setrw', dev])
run_command(['udevadm', 'settle'])
write(dev)
def main():
image_name = 'test1'
# clean up any state from previous test runs
teardown(image_name, False)
setup(image_name)
test_ro(image_name)
teardown(image_name)
if __name__ == '__main__':
main()