test: add test for setting ceph mirror virtual xattr

Signed-off-by: Venky Shankar <vshankar@redhat.com>
This commit is contained in:
Venky Shankar 2020-08-10 09:21:42 -04:00
parent aff632a67a
commit 1ec84603d0

View File

@ -3,6 +3,7 @@ from nose.tools import assert_raises, assert_equal, assert_greater, with_setup
import cephfs as libcephfs
import fcntl
import os
import random
import time
import stat
import uuid
@ -146,6 +147,47 @@ def test_xattr():
assert_equal(9, ret_val)
assert_equal("user.big\x00", ret_buff.decode('utf-8'))
@with_setup(setup_test)
def test_ceph_mirror_xattr():
def gen_mirror_xattr():
cluster_id = str(uuid.uuid4())
fs_id = random.randint(1, 10)
mirror_xattr = f'cluster_id={cluster_id} fs_id={fs_id}'
return mirror_xattr.encode('utf-8')
mirror_xattr_enc_1 = gen_mirror_xattr()
# mirror xattr is only allowed on root
cephfs.mkdir('/d0', 0o755)
assert_raises(libcephfs.InvalidValue, cephfs.setxattr,
'/d0', 'ceph.mirror.info', mirror_xattr_enc_1, os.XATTR_CREATE)
cephfs.rmdir('/d0')
cephfs.setxattr('/', 'ceph.mirror.info', mirror_xattr_enc_1, os.XATTR_CREATE)
assert_equal(mirror_xattr_enc_1, cephfs.getxattr('/', 'ceph.mirror.info'))
# setting again with XATTR_CREATE should fail
assert_raises(libcephfs.ObjectExists, cephfs.setxattr,
'/', 'ceph.mirror.info', mirror_xattr_enc_1, os.XATTR_CREATE)
# ceph.mirror.info should not show up in listing
ret_val, _ = cephfs.listxattr("/")
assert_equal(0, ret_val)
mirror_xattr_enc_2 = gen_mirror_xattr()
cephfs.setxattr('/', 'ceph.mirror.info', mirror_xattr_enc_2, os.XATTR_REPLACE)
assert_equal(mirror_xattr_enc_2, cephfs.getxattr('/', 'ceph.mirror.info'))
cephfs.removexattr('/', 'ceph.mirror.info')
# ceph.mirror.info is already removed
assert_raises(libcephfs.NoData, cephfs.getxattr, '/', 'ceph.mirror.info')
# removing again should throw error
assert_raises(libcephfs.NoData, cephfs.removexattr, "/", "ceph.mirror.info")
# check mirror info xattr format
assert_raises(libcephfs.InvalidValue, cephfs.setxattr, '/', 'ceph.mirror.info', b"unknown", 0)
@with_setup(setup_test)
def test_fxattr():
fd = cephfs.open(b'/file-fxattr', 'w', 0o755)