btrfs-progs/libbtrfsutil/python/tests/test_subvolume.py
Nikolay Borisov 451db51bdb btrfs-progs: remove support for BTRFS_SUBVOL_CREATE_ASYNC
Kernel has removed support for this feature in 5.7 so let's remove
support from progs as well.

Signed-off-by: Nikolay Borisov <nborisov@suse.com>
Signed-off-by: David Sterba <dsterba@suse.com>
2021-10-06 16:49:21 +02:00

564 lines
22 KiB
Python

# Copyright (C) 2018 Facebook
#
# This file is part of libbtrfsutil.
#
# libbtrfsutil is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# libbtrfsutil is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with libbtrfsutil. If not, see <http://www.gnu.org/licenses/>.
import fcntl
import errno
import multiprocessing
import os
import os.path
from pathlib import PurePath
import subprocess
import traceback
import btrfsutil
from tests import (
BtrfsTestCase,
drop_privs,
HAVE_PATH_LIKE,
NOBODY_UID,
regain_privs,
skipUnlessHaveNobody,
)
class TestSubvolume(BtrfsTestCase):
def test_is_subvolume(self):
dir = os.path.join(self.mountpoint, 'foo')
os.mkdir(dir)
for arg in self.path_or_fd(self.mountpoint):
with self.subTest(type=type(arg)):
self.assertTrue(btrfsutil.is_subvolume(arg))
for arg in self.path_or_fd(dir):
with self.subTest(type=type(arg)):
self.assertFalse(btrfsutil.is_subvolume(arg))
with self.assertRaises(btrfsutil.BtrfsUtilError) as e:
btrfsutil.is_subvolume(os.path.join(self.mountpoint, 'bar'))
# This is a bit of an implementation detail, but really this is testing
# that the exception is initialized correctly.
self.assertEqual(e.exception.btrfsutilerror, btrfsutil.ERROR_STATFS_FAILED)
self.assertEqual(e.exception.errno, errno.ENOENT)
def test_subvolume_id(self):
dir = os.path.join(self.mountpoint, 'foo')
os.mkdir(dir)
for arg in self.path_or_fd(self.mountpoint):
with self.subTest(type=type(arg)):
self.assertEqual(btrfsutil.subvolume_id(arg), 5)
for arg in self.path_or_fd(dir):
with self.subTest(type=type(arg)):
self.assertEqual(btrfsutil.subvolume_id(arg), 5)
def test_subvolume_id_error(self):
fd = os.open('/dev/null', os.O_RDONLY)
try:
btrfsutil.subvolume_id(fd)
except Exception:
pass
finally:
# btrfs_util_subvolume_id_fd() had a bug that would erroneously
# close the provided file descriptor. In that case, this will fail
# with EBADF.
os.close(fd)
def test_subvolume_path(self):
btrfsutil.create_subvolume(os.path.join(self.mountpoint, 'subvol1'))
os.mkdir(os.path.join(self.mountpoint, 'dir1'))
os.mkdir(os.path.join(self.mountpoint, 'dir1/dir2'))
btrfsutil.create_subvolume(os.path.join(self.mountpoint, 'dir1/dir2/subvol2'))
btrfsutil.create_subvolume(os.path.join(self.mountpoint, 'dir1/dir2/subvol2/subvol3'))
os.mkdir(os.path.join(self.mountpoint, 'subvol1/dir3'))
btrfsutil.create_subvolume(os.path.join(self.mountpoint, 'subvol1/dir3/subvol4'))
for arg in self.path_or_fd(self.mountpoint):
with self.subTest(type=type(arg)):
self.assertEqual(btrfsutil.subvolume_path(arg), '')
self.assertEqual(btrfsutil.subvolume_path(arg, 5), '')
self.assertEqual(btrfsutil.subvolume_path(arg, 256), 'subvol1')
self.assertEqual(btrfsutil.subvolume_path(arg, 257), 'dir1/dir2/subvol2')
self.assertEqual(btrfsutil.subvolume_path(arg, 258), 'dir1/dir2/subvol2/subvol3')
self.assertEqual(btrfsutil.subvolume_path(arg, 259), 'subvol1/dir3/subvol4')
pwd = os.getcwd()
try:
os.chdir(self.mountpoint)
path = ''
for i in range(26):
name = chr(ord('a') + i) * 255
path = os.path.join(path, name)
btrfsutil.create_subvolume(name)
os.chdir(name)
self.assertEqual(btrfsutil.subvolume_path('.'), path)
finally:
os.chdir(pwd)
def _test_subvolume_info(self, subvol, snapshot):
for arg in self.path_or_fd(self.mountpoint):
with self.subTest(type=type(arg)):
info = btrfsutil.subvolume_info(arg)
self.assertEqual(info.id, 5)
self.assertEqual(info.parent_id, 0)
self.assertEqual(info.dir_id, 0)
self.assertEqual(info.flags, 0)
self.assertIsInstance(info.uuid, bytes)
self.assertEqual(len(info.uuid), 16)
self.assertEqual(info.parent_uuid, bytes(16))
self.assertEqual(info.received_uuid, bytes(16))
self.assertNotEqual(info.generation, 0)
self.assertGreaterEqual(info.ctransid, 0)
self.assertEqual(info.otransid, 0)
self.assertEqual(info.stransid, 0)
self.assertEqual(info.rtransid, 0)
self.assertIsInstance(info.ctime, float)
self.assertIsInstance(info.otime, float)
self.assertEqual(info.stime, 0)
self.assertEqual(info.rtime, 0)
info = btrfsutil.subvolume_info(subvol)
self.assertEqual(info.id, 256)
self.assertEqual(info.parent_id, 5)
self.assertEqual(info.dir_id, 256)
self.assertEqual(info.flags, 0)
self.assertIsInstance(info.uuid, bytes)
self.assertEqual(len(info.uuid), 16)
self.assertEqual(info.parent_uuid, bytes(16))
self.assertEqual(info.received_uuid, bytes(16))
self.assertNotEqual(info.generation, 0)
self.assertNotEqual(info.ctransid, 0)
self.assertNotEqual(info.otransid, 0)
self.assertEqual(info.stransid, 0)
self.assertEqual(info.rtransid, 0)
self.assertNotEqual(info.ctime, 0)
self.assertNotEqual(info.otime, 0)
self.assertEqual(info.stime, 0)
self.assertEqual(info.rtime, 0)
subvol_uuid = info.uuid
info = btrfsutil.subvolume_info(snapshot)
self.assertEqual(info.parent_uuid, subvol_uuid)
# TODO: test received_uuid, stransid, rtransid, stime, and rtime
def test_subvolume_info(self):
subvol = os.path.join(self.mountpoint, 'subvol')
btrfsutil.create_subvolume(subvol)
snapshot = os.path.join(self.mountpoint, 'snapshot')
btrfsutil.create_snapshot(subvol, snapshot)
self._test_subvolume_info(subvol, snapshot)
for arg in self.path_or_fd(self.mountpoint):
with self.subTest(type=type(arg)):
with self.assertRaises(btrfsutil.BtrfsUtilError) as e:
# BTRFS_EXTENT_TREE_OBJECTID
btrfsutil.subvolume_info(arg, 2)
self.assertEqual(e.exception.btrfsutilerror,
btrfsutil.ERROR_SUBVOLUME_NOT_FOUND)
@skipUnlessHaveNobody
def test_subvolume_info_unprivileged(self):
subvol = os.path.join(self.mountpoint, 'subvol')
btrfsutil.create_subvolume(subvol)
snapshot = os.path.join(self.mountpoint, 'snapshot')
btrfsutil.create_snapshot(subvol, snapshot)
with drop_privs():
try:
btrfsutil.subvolume_info(self.mountpoint)
except OSError as e:
if e.errno == errno.ENOTTY:
self.skipTest('BTRFS_IOC_GET_SUBVOL_INFO is not available')
else:
raise
self._test_subvolume_info(subvol, snapshot)
def test_read_only(self):
for arg in self.path_or_fd(self.mountpoint):
with self.subTest(type=type(arg)):
btrfsutil.set_subvolume_read_only(arg)
self.assertTrue(btrfsutil.get_subvolume_read_only(arg))
self.assertTrue(btrfsutil.subvolume_info(arg).flags & 1)
btrfsutil.set_subvolume_read_only(arg, False)
self.assertFalse(btrfsutil.get_subvolume_read_only(arg))
self.assertFalse(btrfsutil.subvolume_info(arg).flags & 1)
btrfsutil.set_subvolume_read_only(arg, True)
self.assertTrue(btrfsutil.get_subvolume_read_only(arg))
self.assertTrue(btrfsutil.subvolume_info(arg).flags & 1)
btrfsutil.set_subvolume_read_only(arg, False)
def test_default_subvolume(self):
for arg in self.path_or_fd(self.mountpoint):
with self.subTest(type=type(arg)):
self.assertEqual(btrfsutil.get_default_subvolume(arg), 5)
subvol = os.path.join(self.mountpoint, 'subvol')
btrfsutil.create_subvolume(subvol)
for arg in self.path_or_fd(subvol):
with self.subTest(type=type(arg)):
btrfsutil.set_default_subvolume(arg)
self.assertEqual(btrfsutil.get_default_subvolume(arg), 256)
btrfsutil.set_default_subvolume(arg, 5)
self.assertEqual(btrfsutil.get_default_subvolume(arg), 5)
def test_create_subvolume(self):
subvol = os.path.join(self.mountpoint, 'subvol')
btrfsutil.create_subvolume(subvol + '1')
self.assertTrue(btrfsutil.is_subvolume(subvol + '1'))
btrfsutil.create_subvolume((subvol + '2').encode())
self.assertTrue(btrfsutil.is_subvolume(subvol + '2'))
if HAVE_PATH_LIKE:
btrfsutil.create_subvolume(PurePath(subvol + '3'))
self.assertTrue(btrfsutil.is_subvolume(subvol + '3'))
pwd = os.getcwd()
try:
os.chdir(self.mountpoint)
btrfsutil.create_subvolume('subvol4')
self.assertTrue(btrfsutil.is_subvolume('subvol4'))
finally:
os.chdir(pwd)
btrfsutil.create_subvolume(subvol + '5/')
self.assertTrue(btrfsutil.is_subvolume(subvol + '5'))
btrfsutil.create_subvolume(subvol + '6//')
self.assertTrue(btrfsutil.is_subvolume(subvol + '6'))
# Test creating subvolumes under '/' in a chroot.
pid = os.fork()
if pid == 0:
try:
os.chroot(self.mountpoint)
os.chdir('/')
btrfsutil.create_subvolume('/subvol8')
self.assertTrue(btrfsutil.is_subvolume('/subvol8'))
with self.assertRaises(btrfsutil.BtrfsUtilError):
btrfsutil.create_subvolume('/')
os._exit(0)
except Exception:
traceback.print_exc()
os._exit(1)
wstatus = os.waitpid(pid, 0)[1]
self.assertTrue(os.WIFEXITED(wstatus))
self.assertEqual(os.WEXITSTATUS(wstatus), 0)
def test_create_snapshot(self):
subvol = os.path.join(self.mountpoint, 'subvol')
btrfsutil.create_subvolume(subvol)
os.mkdir(os.path.join(subvol, 'dir'))
for i, arg in enumerate(self.path_or_fd(subvol)):
with self.subTest(type=type(arg)):
snapshots_dir = os.path.join(self.mountpoint, 'snapshots{}'.format(i))
os.mkdir(snapshots_dir)
snapshot = os.path.join(snapshots_dir, 'snapshot')
btrfsutil.create_snapshot(subvol, snapshot + '1')
self.assertTrue(btrfsutil.is_subvolume(snapshot + '1'))
self.assertTrue(os.path.exists(os.path.join(snapshot + '1', 'dir')))
btrfsutil.create_snapshot(subvol, (snapshot + '2').encode())
self.assertTrue(btrfsutil.is_subvolume(snapshot + '2'))
self.assertTrue(os.path.exists(os.path.join(snapshot + '2', 'dir')))
if HAVE_PATH_LIKE:
btrfsutil.create_snapshot(subvol, PurePath(snapshot + '3'))
self.assertTrue(btrfsutil.is_subvolume(snapshot + '3'))
self.assertTrue(os.path.exists(os.path.join(snapshot + '3', 'dir')))
nested_subvol = os.path.join(subvol, 'nested')
more_nested_subvol = os.path.join(nested_subvol, 'more_nested')
btrfsutil.create_subvolume(nested_subvol)
btrfsutil.create_subvolume(more_nested_subvol)
os.mkdir(os.path.join(more_nested_subvol, 'nested_dir'))
snapshot = os.path.join(self.mountpoint, 'snapshot')
btrfsutil.create_snapshot(subvol, snapshot + '1')
# Dummy subvolume.
self.assertEqual(os.stat(os.path.join(snapshot + '1', 'nested')).st_ino, 2)
self.assertFalse(os.path.exists(os.path.join(snapshot + '1', 'nested', 'more_nested')))
btrfsutil.create_snapshot(subvol, snapshot + '2', recursive=True)
self.assertTrue(os.path.exists(os.path.join(snapshot + '2', 'nested/more_nested/nested_dir')))
btrfsutil.create_snapshot(subvol, snapshot + '3', read_only=True)
self.assertTrue(btrfsutil.get_subvolume_read_only(snapshot + '3'))
def test_delete_subvolume(self):
subvol = os.path.join(self.mountpoint, 'subvol')
btrfsutil.create_subvolume(subvol + '1')
self.assertTrue(os.path.exists(subvol + '1'))
btrfsutil.create_subvolume(subvol + '2')
self.assertTrue(os.path.exists(subvol + '2'))
btrfsutil.create_subvolume(subvol + '3')
self.assertTrue(os.path.exists(subvol + '3'))
btrfsutil.delete_subvolume(subvol + '1')
self.assertFalse(os.path.exists(subvol + '1'))
btrfsutil.delete_subvolume((subvol + '2').encode())
self.assertFalse(os.path.exists(subvol + '2'))
if HAVE_PATH_LIKE:
btrfsutil.delete_subvolume(PurePath(subvol + '3'))
self.assertFalse(os.path.exists(subvol + '3'))
# Test deleting subvolumes under '/' in a chroot.
pid = os.fork()
if pid == 0:
try:
os.chroot(self.mountpoint)
os.chdir('/')
btrfsutil.create_subvolume('/subvol4')
self.assertTrue(os.path.exists('/subvol4'))
btrfsutil.delete_subvolume('/subvol4')
self.assertFalse(os.path.exists('/subvol4'))
with self.assertRaises(btrfsutil.BtrfsUtilError):
btrfsutil.delete_subvolume('/')
os._exit(0)
except Exception:
traceback.print_exc()
os._exit(1)
wstatus = os.waitpid(pid, 0)[1]
self.assertTrue(os.WIFEXITED(wstatus))
self.assertEqual(os.WEXITSTATUS(wstatus), 0)
btrfsutil.create_subvolume(subvol + '5')
btrfsutil.create_subvolume(subvol + '5/foo')
btrfsutil.create_subvolume(subvol + '5/bar')
btrfsutil.create_subvolume(subvol + '5/bar/baz')
btrfsutil.create_subvolume(subvol + '5/bar/qux')
btrfsutil.create_subvolume(subvol + '5/quux')
with self.assertRaises(btrfsutil.BtrfsUtilError):
btrfsutil.delete_subvolume(subvol + '5')
btrfsutil.delete_subvolume(subvol + '5', recursive=True)
self.assertFalse(os.path.exists(subvol + '5'))
def test_deleted_subvolumes(self):
subvol = os.path.join(self.mountpoint, 'subvol')
btrfsutil.create_subvolume(subvol + '1')
btrfsutil.delete_subvolume(subvol + '1')
for arg in self.path_or_fd(self.mountpoint):
with self.subTest(type=type(arg)):
self.assertEqual(btrfsutil.deleted_subvolumes(arg), [256])
def _test_subvolume_iterator(self):
btrfsutil.create_subvolume('foo')
with btrfsutil.SubvolumeIterator('.', info=True) as it:
path, subvol = next(it)
self.assertEqual(path, 'foo')
self.assertIsInstance(subvol, btrfsutil.SubvolumeInfo)
self.assertEqual(subvol.id, 256)
self.assertEqual(subvol.parent_id, 5)
self.assertRaises(StopIteration, next, it)
btrfsutil.create_subvolume('foo/bar')
btrfsutil.create_subvolume('foo/bar/baz')
subvols = [
('foo', 256),
('foo/bar', 257),
('foo/bar/baz', 258),
]
for arg in self.path_or_fd('.'):
with self.subTest(type=type(arg)), btrfsutil.SubvolumeIterator(arg) as it:
self.assertEqual(list(it), subvols)
with btrfsutil.SubvolumeIterator('.', top=0) as it:
self.assertEqual(list(it), subvols)
if os.geteuid() == 0:
with btrfsutil.SubvolumeIterator('foo', top=5) as it:
self.assertEqual(list(it), subvols)
with btrfsutil.SubvolumeIterator('.', post_order=True) as it:
self.assertEqual(list(it),
[('foo/bar/baz', 258),
('foo/bar', 257),
('foo', 256)])
subvols = [
('bar', 257),
('bar/baz', 258),
]
if os.geteuid() == 0:
with btrfsutil.SubvolumeIterator('.', top=256) as it:
self.assertEqual(list(it), subvols)
with btrfsutil.SubvolumeIterator('foo') as it:
self.assertEqual(list(it), subvols)
with btrfsutil.SubvolumeIterator('foo', top=0) as it:
self.assertEqual(list(it), subvols)
os.rename('foo/bar/baz', 'baz')
os.mkdir('dir')
btrfsutil.create_subvolume('dir/qux')
os.mkdir('dir/qux/dir2')
btrfsutil.create_subvolume('dir/qux/dir2/quux')
subvols = [
('baz', 258),
('dir/qux', 259),
('dir/qux/dir2/quux', 260),
('foo', 256),
('foo/bar', 257),
]
# Test various corner cases of the unprivileged implementation
# where we can't access the subvolume.
if os.geteuid() != 0:
with regain_privs():
# We don't have permission to traverse the path.
os.mkdir('directory_perms', 0o700)
btrfsutil.create_subvolume('directory_perms/subvol')
# We don't have permission to resolve the subvolume path.
os.mkdir('subvol_perms', 0o755)
btrfsutil.create_subvolume('subvol_perms/subvol')
os.chmod('subvol_perms/subvol', 0o700)
# The path doesn't exist.
os.mkdir('enoent', 0o755)
btrfsutil.create_subvolume('enoent/subvol')
subprocess.check_call(['mount', '-t', 'tmpfs', 'tmpfs', 'enoent'])
# The path exists but it's not a subvolume.
os.mkdir('notsubvol', 0o755)
btrfsutil.create_subvolume('notsubvol/subvol')
subprocess.check_call(['mount', '-t', 'tmpfs', 'tmpfs', 'notsubvol'])
os.mkdir('notsubvol/subvol')
# The path exists and is a subvolume, but on a different
# filesystem.
os.mkdir('wrongfs', 0o755)
btrfsutil.create_subvolume('wrongfs/subvol')
other_mountpoint, _ = self.mount_btrfs()
subprocess.check_call(['mount', '--bind', '--',
other_mountpoint, 'wrongfs/subvol'])
# The path exists and is a subvolume on the same
# filesystem, but not the right one.
os.mkdir('wrongsubvol', 0o755)
btrfsutil.create_subvolume('wrongsubvol/subvol')
subprocess.check_call(['mount', '--bind', 'baz', 'wrongsubvol/subvol'])
with btrfsutil.SubvolumeIterator('.') as it:
self.assertEqual(sorted(it), subvols)
with btrfsutil.SubvolumeIterator('.', post_order=True) as it:
self.assertEqual(sorted(it), subvols)
with btrfsutil.SubvolumeIterator('.') as it:
self.assertGreaterEqual(it.fileno(), 0)
it.close()
with self.assertRaises(ValueError):
next(iter(it))
with self.assertRaises(ValueError):
it.fileno()
it.close()
def test_subvolume_iterator(self):
pwd = os.getcwd()
try:
os.chdir(self.mountpoint)
self._test_subvolume_iterator()
finally:
os.chdir(pwd)
def _skip_unless_have_unprivileged_subvolume_iterator(self, path):
with drop_privs():
try:
for _ in btrfsutil.SubvolumeIterator(path):
break
except OSError as e:
if e.errno == errno.ENOTTY:
self.skipTest('BTRFS_IOC_GET_SUBVOL_ROOTREF is not available')
else:
raise
@skipUnlessHaveNobody
def test_subvolume_iterator_unprivileged(self):
os.chown(self.mountpoint, NOBODY_UID, -1)
pwd = os.getcwd()
try:
os.chdir(self.mountpoint)
self._skip_unless_have_unprivileged_subvolume_iterator('.')
with drop_privs():
self._test_subvolume_iterator()
finally:
os.chdir(pwd)
@staticmethod
def _create_and_delete_subvolume(i):
dir_name = f'dir{i}'
subvol_name = dir_name + '/subvol'
while True:
os.mkdir(dir_name)
btrfsutil.create_subvolume(subvol_name)
btrfsutil.delete_subvolume(subvol_name)
os.rmdir(dir_name)
def _test_subvolume_iterator_race(self):
procs = []
fd = os.open('.', os.O_RDONLY | os.O_DIRECTORY)
try:
for i in range(10):
procs.append(
multiprocessing.Process(
target=self._create_and_delete_subvolume,
args=(i,),
daemon=True,
)
)
for proc in procs:
proc.start()
for i in range(1000):
with btrfsutil.SubvolumeIterator(fd) as it:
for _ in it:
pass
finally:
for proc in procs:
proc.terminate()
proc.join()
os.close(fd)
def test_subvolume_iterator_race(self):
pwd = os.getcwd()
try:
os.chdir(self.mountpoint)
self._test_subvolume_iterator_race()
finally:
os.chdir(pwd)
def test_subvolume_iterator_race_unprivileged(self):
os.chown(self.mountpoint, NOBODY_UID, -1)
pwd = os.getcwd()
try:
os.chdir(self.mountpoint)
self._skip_unless_have_unprivileged_subvolume_iterator('.')
with drop_privs():
self._test_subvolume_iterator_race()
finally:
os.chdir(pwd)