1
0
mirror of https://github.com/ceph/ceph synced 2025-02-14 22:37:51 +00:00

Merge pull request from ajarr/wip-15615

tasks/cephfs/test_volume_client: test authentication metadata

Reviewed-by: John Spray <john.spray@redhat.com>
This commit is contained in:
John Spray 2016-07-20 15:16:21 +01:00 committed by GitHub
commit b43a106f42

View File

@ -10,6 +10,10 @@ log = logging.getLogger(__name__)
class TestVolumeClient(CephFSTestCase):
#
# TODO: Test that VolumeClient can recover from partial auth updates.
#
# One for looking at the global filesystem, one for being
# the VolumeClient, two for mounting the created shares
CLIENTS_REQUIRED = 4
@ -70,7 +74,8 @@ vc.disconnect()
def _configure_guest_auth(self, volumeclient_mount, guest_mount,
guest_entity, mount_path,
namespace_prefix=None, readonly=False):
namespace_prefix=None, readonly=False,
tenant_id=None):
"""
Set up auth credentials for the guest client to mount a volume.
@ -83,6 +88,7 @@ vc.disconnect()
is used for the volume's layout.
:param readonly: defaults to False. If set to 'True' only read-only
mount access is granted to the guest.
:param tenant_id: (OpenStack) tenant ID of the guest client.
"""
head, volume_id = os.path.split(mount_path)
@ -93,15 +99,25 @@ vc.disconnect()
# Authorize the guest client's auth ID to mount the volume.
key = self._volume_client_python(volumeclient_mount, dedent("""
vp = VolumePath("{group_id}", "{volume_id}")
auth_result = vc.authorize(vp, "{guest_entity}", readonly={readonly})
auth_result = vc.authorize(vp, "{guest_entity}", readonly={readonly},
tenant_id="{tenant_id}")
print auth_result['auth_key']
""".format(
group_id=group_id,
volume_id=volume_id,
guest_entity=guest_entity,
readonly=readonly)), volume_prefix, namespace_prefix
readonly=readonly,
tenant_id=tenant_id)), volume_prefix, namespace_prefix
)
# CephFSVolumeClient's authorize() does not return the secret
# key to a caller who isn't multi-tenant aware. Explicitly
# query the key for such a client.
if not tenant_id:
key = self.fs.mon_manager.raw_cluster_cmd(
"auth", "get-key", "client.{name}".format(name=guest_entity),
)
# The guest auth ID should exist.
existing_ids = [a['entity'] for a in self.auth_list()]
self.assertIn("client.{0}".format(guest_entity), existing_ids)
@ -603,3 +619,217 @@ vc.disconnect()
# Cannot write into read-only volume.
with self.assertRaises(CommandFailedError):
guest_mount.write_n_mb("rogue.bin", 1)
def test_get_authorized_ids(self):
"""
That for a volume, the authorized IDs and their access levels
can be obtained using CephFSVolumeClient's get_authorized_ids().
"""
volumeclient_mount = self.mounts[1]
volumeclient_mount.umount_wait()
# Configure volumeclient_mount as the handle for driving volumeclient.
self._configure_vc_auth(volumeclient_mount, "manila")
group_id = "grpid"
volume_id = "volid"
guest_entity_1 = "guest1"
guest_entity_2 = "guest2"
log.info("print group ID: {0}".format(group_id))
# Create a volume.
auths = self._volume_client_python(volumeclient_mount, dedent("""
vp = VolumePath("{group_id}", "{volume_id}")
vc.create_volume(vp, 1024*1024*10)
auths = vc.get_authorized_ids(vp)
print auths
""".format(
group_id=group_id,
volume_id=volume_id,
)))
# Check the list of authorized IDs for the volume.
expected_result = None
self.assertEqual(str(expected_result), auths)
# Allow two auth IDs access to the volume.
auths = self._volume_client_python(volumeclient_mount, dedent("""
vp = VolumePath("{group_id}", "{volume_id}")
vc.authorize(vp, "{guest_entity_1}", readonly=False)
vc.authorize(vp, "{guest_entity_2}", readonly=True)
auths = vc.get_authorized_ids(vp)
print auths
""".format(
group_id=group_id,
volume_id=volume_id,
guest_entity_1=guest_entity_1,
guest_entity_2=guest_entity_2,
)))
# Check the list of authorized IDs and their access levels.
expected_result = [(u'guest1', u'rw'), (u'guest2', u'r')]
self.assertItemsEqual(str(expected_result), auths)
# Disallow both the auth IDs' access to the volume.
auths = self._volume_client_python(volumeclient_mount, dedent("""
vp = VolumePath("{group_id}", "{volume_id}")
vc.deauthorize(vp, "{guest_entity_1}")
vc.deauthorize(vp, "{guest_entity_2}")
auths = vc.get_authorized_ids(vp)
print auths
""".format(
group_id=group_id,
volume_id=volume_id,
guest_entity_1=guest_entity_1,
guest_entity_2=guest_entity_2,
)))
# Check the list of authorized IDs for the volume.
expected_result = None
self.assertItemsEqual(str(expected_result), auths)
def test_multitenant_volumes(self):
"""
That volume access can be restricted to a tenant.
That metadata used to enforce tenant isolation of
volumes is stored as a two-way mapping between auth
IDs and volumes that they're authorized to access.
"""
volumeclient_mount = self.mounts[1]
volumeclient_mount.umount_wait()
# Configure volumeclient_mount as the handle for driving volumeclient.
self._configure_vc_auth(volumeclient_mount, "manila")
group_id = "groupid"
volume_id = "volumeid"
# Guest clients belonging to different tenants, but using the same
# auth ID.
auth_id = "guest"
guestclient_1 = {
"auth_id": auth_id,
"tenant_id": "tenant1",
}
guestclient_2 = {
"auth_id": auth_id,
"tenant_id": "tenant2",
}
# Create a volume.
self._volume_client_python(volumeclient_mount, dedent("""
vp = VolumePath("{group_id}", "{volume_id}")
vc.create_volume(vp, 1024*1024*10)
""".format(
group_id=group_id,
volume_id=volume_id,
)))
# Check that volume metadata file is created on volume creation.
vol_metadata_filename = "_{0}:{1}.meta".format(group_id, volume_id)
self.assertIn(vol_metadata_filename, self.mounts[0].ls("volumes"))
# Authorize 'guestclient_1', using auth ID 'guest' and belonging to
# 'tenant1', with 'rw' access to the volume.
self._volume_client_python(volumeclient_mount, dedent("""
vp = VolumePath("{group_id}", "{volume_id}")
vc.authorize(vp, "{auth_id}", tenant_id="{tenant_id}")
""".format(
group_id=group_id,
volume_id=volume_id,
auth_id=guestclient_1["auth_id"],
tenant_id=guestclient_1["tenant_id"]
)))
# Check that auth metadata file for auth ID 'guest', is
# created on authorizing 'guest' access to the volume.
auth_metadata_filename = "${0}.meta".format(guestclient_1["auth_id"])
self.assertIn(auth_metadata_filename, self.mounts[0].ls("volumes"))
# Verify that the auth metadata file stores the tenant ID that the
# auth ID belongs to, the auth ID's authorized access levels
# for different volumes, versioning details, etc.
expected_auth_metadata = {
u"version": 1,
u"compat_version": 1,
u"dirty": False,
u"tenant_id": u"tenant1",
u"volumes": {
u"groupid/volumeid": {
u"dirty": False,
u"access_level": u"rw",
}
}
}
auth_metadata = self._volume_client_python(volumeclient_mount, dedent("""
vp = VolumePath("{group_id}", "{volume_id}")
auth_metadata = vc._auth_metadata_get("{auth_id}")
print auth_metadata
""".format(
group_id=group_id,
volume_id=volume_id,
auth_id=guestclient_1["auth_id"],
)))
self.assertItemsEqual(str(expected_auth_metadata), auth_metadata)
# Verify that the volume metadata file stores info about auth IDs
# and their access levels to the volume, versioning details, etc.
expected_vol_metadata = {
u"version": 1,
u"compat_version": 1,
u"auths": {
u"guest": {
u"dirty": False,
u"access_level": u"rw"
}
}
}
vol_metadata = self._volume_client_python(volumeclient_mount, dedent("""
vp = VolumePath("{group_id}", "{volume_id}")
volume_metadata = vc._volume_metadata_get(vp)
print volume_metadata
""".format(
group_id=group_id,
volume_id=volume_id,
)))
self.assertItemsEqual(str(expected_vol_metadata), vol_metadata)
# Cannot authorize 'guestclient_2' to access the volume.
# It uses auth ID 'guest', which has already been used by a
# 'guestclient_1' belonging to an another tenant for accessing
# the volume.
with self.assertRaises(CommandFailedError):
self._volume_client_python(volumeclient_mount, dedent("""
vp = VolumePath("{group_id}", "{volume_id}")
vc.authorize(vp, "{auth_id}", tenant_id="{tenant_id}")
""".format(
group_id=group_id,
volume_id=volume_id,
auth_id=guestclient_2["auth_id"],
tenant_id=guestclient_2["tenant_id"]
)))
# Check that auth metadata file is cleaned up on removing
# auth ID's only access to a volume.
self._volume_client_python(volumeclient_mount, dedent("""
vp = VolumePath("{group_id}", "{volume_id}")
vc.deauthorize(vp, "{guest_entity}")
""".format(
group_id=group_id,
volume_id=volume_id,
guest_entity=guestclient_1["auth_id"]
)))
self.assertNotIn(auth_metadata_filename, self.mounts[0].ls("volumes"))
# Check that volume metadata file is cleaned up on volume deletion.
self._volume_client_python(volumeclient_mount, dedent("""
vp = VolumePath("{group_id}", "{volume_id}")
vc.delete_volume(vp)
""".format(
group_id=group_id,
volume_id=volume_id,
)))
self.assertNotIn(vol_metadata_filename, self.mounts[0].ls("volumes"))