Merge pull request #38786 from kotreshhr/evict-clients-of-deauthorized-authids

mgr/volumes: Evict clients based on auth-IDs and subvolume path

Reviewed-by: Victoria Martinez de la Cruz <victoria@redhat.com>
Reviewed-by: Ramana Raja <rraja@redhat.com>
This commit is contained in:
Ramana Raja 2021-01-27 19:44:17 +05:30 committed by GitHub
commit 23f0c5b2dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 321 additions and 3 deletions

View File

@ -187,6 +187,10 @@ List cephx auth IDs authorized to access fs subvolume::
$ ceph fs subvolume authorized_list <vol_name> <sub_name> [--group_name=<group_name>]
Evict fs clients based on auth ID and subvolume mounted::
$ ceph fs subvolume evict <vol_name> <sub_name> <auth_id> [--group_name=<group_name>]
Fetch the absolute path of a subvolume using::
$ ceph fs subvolume getpath <vol_name> <subvol_name> [--group_name <subvol_group_name>]

View File

@ -1850,6 +1850,75 @@ class TestSubvolumes(TestVolumesHelper):
self._fs_cmd("subvolume", "rm", self.volname, subvolume2, "--group_name", group)
self._fs_cmd("subvolumegroup", "rm", self.volname, group)
def test_subvolume_evict_client(self):
"""
That a subvolume client can be evicted based on the auth ID
"""
subvolumes = self._generate_random_subvolume_name(2)
group = self._generate_random_group_name()
# create group
self._fs_cmd("subvolumegroup", "create", self.volname, group)
# mounts[0] and mounts[1] would be used as guests to mount the volumes/shares.
for i in range(0, 2):
self.mounts[i].umount_wait()
guest_mounts = (self.mounts[0], self.mounts[1])
auth_id = "guest"
guestclient_1 = {
"auth_id": auth_id,
"tenant_id": "tenant1",
}
# Create two subvolumes. Authorize 'guest' auth ID to mount the two
# subvolumes. Mount the two subvolumes. Write data to the volumes.
for i in range(2):
# Create subvolume.
self._fs_cmd("subvolume", "create", self.volname, subvolumes[i], "--group_name", group)
# authorize guest authID read-write access to subvolume
key = self._fs_cmd("subvolume", "authorize", self.volname, subvolumes[i], guestclient_1["auth_id"],
"--group_name", group, "--tenant_id", guestclient_1["tenant_id"])
mount_path = self._fs_cmd("subvolume", "getpath", self.volname, subvolumes[i],
"--group_name", group).rstrip()
# configure credentials for guest client
self._configure_guest_auth(guest_mounts[i], auth_id, key)
# mount the subvolume, and write to it
guest_mounts[i].mount(cephfs_mntpt=mount_path)
guest_mounts[i].write_n_mb("data.bin", 1)
# Evict client, guest_mounts[0], using auth ID 'guest' and has mounted
# one volume.
self._fs_cmd("subvolume", "evict", self.volname, subvolumes[0], auth_id, "--group_name", group)
# Evicted guest client, guest_mounts[0], should not be able to do
# anymore metadata ops. It should start failing all operations
# when it sees that its own address is in the blocklist.
try:
guest_mounts[0].write_n_mb("rogue.bin", 1)
except CommandFailedError:
pass
else:
raise RuntimeError("post-eviction write should have failed!")
# The blocklisted guest client should now be unmountable
guest_mounts[0].umount_wait()
# Guest client, guest_mounts[1], using the same auth ID 'guest', but
# has mounted the other volume, should be able to use its volume
# unaffected.
guest_mounts[1].write_n_mb("data.bin.1", 1)
# Cleanup.
guest_mounts[1].umount_wait()
for i in range(2):
self._fs_cmd("subvolume", "deauthorize", self.volname, subvolumes[i], auth_id, "--group_name", group)
self._fs_cmd("subvolume", "rm", self.volname, subvolumes[i], "--group_name", group)
self._fs_cmd("subvolumegroup", "rm", self.volname, group)
def test_subvolume_pin_random(self):
self.fs.set_max_mds(2)
self.fs.wait_for_daemons()

View File

@ -35,3 +35,29 @@ class OpSmException(Exception):
class NotImplementedException(Exception):
pass
class ClusterTimeout(Exception):
"""
Exception indicating that we timed out trying to talk to the Ceph cluster,
either to the mons, or to any individual daemon that the mons indicate ought
to be up but isn't responding to us.
"""
pass
class ClusterError(Exception):
"""
Exception indicating that the cluster returned an error to a command that
we thought should be successful based on our last knowledge of the cluster
state.
"""
def __init__(self, action, result_code, result_str):
self._action = action
self._result_code = result_code
self._result_str = result_str
def __str__(self):
return "Error {0} (\"{1}\") while {2}".format(
self._result_code, self._result_str, self._action)
class EvictionError(Exception):
pass

View File

@ -0,0 +1,114 @@
import errno
import json
import logging
import threading
import time
from .volume import get_mds_map
from ..exception import ClusterTimeout, ClusterError
log = logging.getLogger(__name__)
class RankEvicter(threading.Thread):
"""
Thread for evicting client(s) from a particular MDS daemon instance.
This is more complex than simply sending a command, because we have to
handle cases where MDS daemons might not be fully up yet, and/or might
be transiently unresponsive to commands.
"""
class GidGone(Exception):
pass
POLL_PERIOD = 5
def __init__(self, mgr, fs, client_spec, volname, rank, gid, mds_map, ready_timeout):
"""
:param client_spec: list of strings, used as filter arguments to "session evict"
pass ["id=123"] to evict a single client with session id 123.
"""
self.volname = volname
self.rank = rank
self.gid = gid
self._mds_map = mds_map
self._client_spec = client_spec
self._fs = fs
self._ready_timeout = ready_timeout
self._ready_waited = 0
self.mgr = mgr
self.success = False
self.exception = None
super(RankEvicter, self).__init__()
def _ready_to_evict(self):
if self._mds_map['up'].get("mds_{0}".format(self.rank), None) != self.gid:
log.info("Evicting {0} from {1}/{2}: rank no longer associated with gid, done.".format(
self._client_spec, self.rank, self.gid
))
raise RankEvicter.GidGone()
info = self._mds_map['info']["gid_{0}".format(self.gid)]
log.debug("_ready_to_evict: state={0}".format(info['state']))
return info['state'] in ["up:active", "up:clientreplay"]
def _wait_for_ready(self):
"""
Wait for that MDS rank to reach an active or clientreplay state, and
not be laggy.
"""
while not self._ready_to_evict():
if self._ready_waited > self._ready_timeout:
raise ClusterTimeout()
time.sleep(self.POLL_PERIOD)
self._ready_waited += self.POLL_PERIOD
self._mds_map = get_mds_map(self.mgr, self.volname)
def _evict(self):
"""
Run the eviction procedure. Return true on success, false on errors.
"""
# Wait til the MDS is believed by the mon to be available for commands
try:
self._wait_for_ready()
except self.GidGone:
return True
# Then send it an evict
ret = -errno.ETIMEDOUT
while ret == -errno.ETIMEDOUT:
log.debug("mds_command: {0}, {1}".format(
"%s" % self.gid, ["session", "evict"] + self._client_spec
))
ret, outb, outs = self._fs.mds_command(
"%s" % self.gid,
json.dumps({
"prefix": "session evict",
"filters": self._client_spec
}), "")
log.debug("mds_command: complete {0} {1}".format(ret, outs))
# If we get a clean response, great, it's gone from that rank.
if ret == 0:
return True
elif ret == -errno.ETIMEDOUT:
# Oh no, the MDS went laggy (that's how libcephfs knows to emit this error)
self._mds_map = get_mds_map(self.mgr, self.volname)
try:
self._wait_for_ready()
except self.GidGone:
return True
else:
raise ClusterError("Sending evict to mds.{0}".format(self.gid), ret, outs)
def run(self):
try:
self._evict()
except Exception as e:
self.success = False
self.exception = e
else:
self.success = True

View File

@ -58,6 +58,8 @@ class SubvolumeOpType(Enum):
CLONE_INTERNAL = 'clone_internal'
ALLOW_ACCESS = 'allow-access'
DENY_ACCESS = 'deny-access'
AUTH_LIST = 'auth-list'
EVICT = 'evict'
class SubvolumeTemplate(object):
VERSION = None # type: int

View File

@ -17,10 +17,12 @@ from .subvolume_base import SubvolumeBase
from ..template import SubvolumeTemplate
from ..snapshot_util import mksnap, rmsnap
from ..access import allow_access, deny_access
from ...exception import IndexException, OpSmException, VolumeException, MetadataMgrException
from ...exception import IndexException, OpSmException, VolumeException, MetadataMgrException, EvictionError
from ...fs_util import listsnaps, is_inherited_snap
from ..template import SubvolumeOpType
from ..group import Group
from ..rankevicter import RankEvicter
from ..volume import get_mds_map
from ..clone_index import open_clone_index, create_clone_index
@ -583,6 +585,53 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
return auths
def evict(self, volname, auth_id, timeout=30):
"""
Evict all clients based on the authorization ID and the subvolume path mounted.
Assumes that the authorization key has been revoked prior to calling this function.
This operation can throw an exception if the mon cluster is unresponsive, or
any individual MDS daemon is unresponsive for longer than the timeout passed in.
"""
client_spec = ["auth_name={0}".format(auth_id), ]
client_spec.append("client_metadata.root={0}".
format(self.path.decode('utf-8')))
log.info("evict clients with {0}".format(', '.join(client_spec)))
mds_map = get_mds_map(self.mgr, volname)
if not mds_map:
raise VolumeException(-errno.ENOENT, "mdsmap for volume {0} not found".format(volname))
up = {}
for name, gid in mds_map['up'].items():
# Quirk of the MDSMap JSON dump: keys in the up dict are like "mds_0"
assert name.startswith("mds_")
up[int(name[4:])] = gid
# For all MDS ranks held by a daemon
# Do the parallelism in python instead of using "tell mds.*", because
# the latter doesn't give us per-mds output
threads = []
for rank, gid in up.items():
thread = RankEvicter(self.mgr, self.fs, client_spec, volname, rank, gid, mds_map, timeout)
thread.start()
threads.append(thread)
for t in threads:
t.join()
log.info("evict: joined all")
for t in threads:
if not t.success:
msg = ("Failed to evict client with {0} from mds {1}/{2}: {3}".
format(', '.join(client_spec), t.rank, t.gid, t.exception)
)
log.error(msg)
raise EvictionError(msg)
def _get_clone_source(self):
try:
clone_source = {

View File

@ -23,6 +23,17 @@ def gen_pool_names(volname):
"""
return "cephfs.{}.meta".format(volname), "cephfs.{}.data".format(volname)
def get_mds_map(mgr, volname):
"""
return mdsmap for a volname
"""
mds_map = None
fs_map = mgr.get("fs_map")
for f in fs_map['filesystems']:
if volname == f['mdsmap']['fs_name']:
return f['mdsmap']
return mds_map
def get_pool_names(mgr, volname):
"""
return metadata and data pools (list) names of volume as a tuple

View File

@ -16,7 +16,7 @@ from .operations.subvolume import open_subvol, create_subvol, remove_subvol, \
create_clone
from .vol_spec import VolSpec
from .exception import VolumeException
from .exception import VolumeException, ClusterError, ClusterTimeout, EvictionError
from .async_cloner import Cloner
from .purge_queue import ThreadPoolPurgeQueueMixin
from .operations.template import SubvolumeOpType
@ -254,13 +254,37 @@ class VolumeClient(CephfsClient["Module"]):
try:
with open_volume(self, volname) as fs_handle:
with open_group(fs_handle, self.volspec, groupname) as group:
with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.ALLOW_ACCESS) as subvolume:
with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.AUTH_LIST) as subvolume:
auths = subvolume.authorized_list()
ret = 0, json.dumps(auths, indent=4, sort_keys=True), ""
except VolumeException as ve:
ret = self.volume_exception_to_retval(ve)
return ret
def evict(self, **kwargs):
ret = 0, "", ""
volname = kwargs['vol_name']
subvolname = kwargs['sub_name']
authid = kwargs['auth_id']
groupname = kwargs['group_name']
try:
with open_volume(self, volname) as fs_handle:
with open_group(fs_handle, self.volspec, groupname) as group:
with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.EVICT) as subvolume:
key = subvolume.evict(volname, authid)
ret = 0, "", ""
except (VolumeException, ClusterTimeout, ClusterError, EvictionError) as e:
if isinstance(e, VolumeException):
ret = self.volume_exception_to_retval(e)
elif isinstance(e, ClusterTimeout):
ret = -errno.ETIMEDOUT , "", "Timedout trying to talk to ceph cluster"
elif isinstance(e, ClusterError):
ret = e._result_code , "", e._result_str
elif isinstance(e, EvictionError):
ret = -errno.EINVAL, "", str(e)
return ret
def resize_subvolume(self, **kwargs):
ret = 0, "", ""
volname = kwargs['vol_name']

View File

@ -148,6 +148,15 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
'desc': "List auth IDs that have access to a subvolume",
'perm': 'r'
},
{
'cmd': 'fs subvolume evict '
'name=vol_name,type=CephString '
'name=sub_name,type=CephString '
'name=auth_id,type=CephString '
'name=group_name,type=CephString,req=false ',
'desc': "Evict clients based on auth IDs and subvolume mounted",
'perm': 'rw'
},
{
'cmd': 'fs subvolumegroup getpath '
'name=vol_name,type=CephString '
@ -555,6 +564,16 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
sub_name=cmd['sub_name'],
group_name=cmd.get('group_name', None))
@mgr_cmd_wrap
def _cmd_fs_subvolume_evict(self, inbuf, cmd):
"""
:return: a 3-tuple of return code(int), empyt string(str), error message (str)
"""
return self.vc.evict(vol_name=cmd['vol_name'],
sub_name=cmd['sub_name'],
auth_id=cmd['auth_id'],
group_name=cmd.get('group_name', None))
@mgr_cmd_wrap
def _cmd_fs_subvolume_ls(self, inbuf, cmd):
return self.vc.list_subvolumes(vol_name=cmd['vol_name'],