libbtrfsutil: allow tests to create multiple Btrfs instances

Some upcoming tests will need to create a second Btrfs filesystem, so
add support for this to the test helpers.

Signed-off-by: Omar Sandoval <osandov@fb.com>
Signed-off-by: David Sterba <dsterba@suse.com>
This commit is contained in:
Omar Sandoval 2018-11-13 23:47:01 -08:00 committed by David Sterba
parent fee45d5421
commit 39ac43a2a4
1 changed files with 22 additions and 13 deletions

View File

@ -57,14 +57,18 @@ def regain_privs():
@unittest.skipIf(os.geteuid() != 0, 'must be run as root') @unittest.skipIf(os.geteuid() != 0, 'must be run as root')
class BtrfsTestCase(unittest.TestCase): class BtrfsTestCase(unittest.TestCase):
def setUp(self): def __init__(self, *args, **kwds):
self.mountpoint = tempfile.mkdtemp() super().__init__(*args, **kwds)
self._mountpoints = []
def mount_btrfs(self):
mountpoint = tempfile.mkdtemp()
try: try:
with tempfile.NamedTemporaryFile(delete=False) as f: with tempfile.NamedTemporaryFile(delete=False) as f:
os.truncate(f.fileno(), 1024 * 1024 * 1024) os.truncate(f.fileno(), 1024 * 1024 * 1024)
self.image = f.name image = f.name
except Exception as e: except Exception as e:
os.rmdir(self.mountpoint) os.rmdir(mountpoint)
raise e raise e
if os.path.exists('../../mkfs.btrfs'): if os.path.exists('../../mkfs.btrfs'):
@ -72,19 +76,24 @@ class BtrfsTestCase(unittest.TestCase):
else: else:
mkfs = 'mkfs.btrfs' mkfs = 'mkfs.btrfs'
try: try:
subprocess.check_call([mkfs, '-q', self.image]) subprocess.check_call([mkfs, '-q', image])
subprocess.check_call(['mount', '-o', 'loop', '--', self.image, self.mountpoint]) subprocess.check_call(['mount', '-o', 'loop', '--', image, mountpoint])
except Exception as e: except Exception as e:
os.remove(self.image) os.rmdir(mountpoint)
os.rmdir(self.mountpoint) os.remove(image)
raise e raise e
self._mountpoints.append((mountpoint, image))
return mountpoint, image
def setUp(self):
self.mountpoint, self.image = self.mount_btrfs()
def tearDown(self): def tearDown(self):
try: for mountpoint, image in self._mountpoints:
subprocess.check_call(['umount', self.mountpoint]) subprocess.call(['umount', '-R', mountpoint])
finally: os.rmdir(mountpoint)
os.remove(self.image) os.remove(image)
os.rmdir(self.mountpoint)
@staticmethod @staticmethod
def path_or_fd(path, open_flags=os.O_RDONLY): def path_or_fd(path, open_flags=os.O_RDONLY):