Merge PR #28082 into master

* refs/pull/28082/head:
	mgr / volumes: improve error handling
	py / cephfs: invoke base class Error::__init__() from OSError::__init__()
	mgr / volumes: carve out subvolume operations as a separate class
	mgr / volumes: introduce subvolume specification class
	mgr / volumes: carve of volume as a separate class
This commit is contained in:
Patrick Donnelly 2019-06-14 14:51:07 -07:00
commit 44b6e4b60e
No known key found for this signature in database
GPG Key ID: 3A2A7E25BEA8AADB
6 changed files with 704 additions and 603 deletions

View File

@ -158,6 +158,7 @@ class Error(Exception):
class OSError(Error):
def __init__(self, errno, strerror):
super(OSError, self).__init__(errno, strerror)
self.errno = errno
self.strerror = strerror

View File

@ -0,0 +1,10 @@
class VolumeException(Exception):
def __init__(self, error_code, error_message):
self.errno = error_code
self.error_str = error_message
def to_tuple(self):
return self.errno, "", self.error_str
def __str__(self):
return "{0} ({1})".format(self.errno, self.error_str)

View File

@ -0,0 +1,101 @@
import os
class SubvolumeSpec(object):
"""
Specification of a subvolume, identified by (subvolume-id, group-id) tuple. Add fields as
required...
"""
# where shall we (by default) create subvolumes
DEFAULT_SUBVOL_PREFIX = "/volumes"
# and the default namespace
DEFAULT_NS_PREFIX = "fsvolumens_"
# Reserved subvolume group name which we use in paths for subvolumes
# that are not assigned to a group (i.e. created with group=None)
NO_GROUP_NAME = "_nogroup"
def __init__(self, subvolumeid, groupid, subvolume_prefix=None, pool_ns_prefix=None):
assert groupid != SubvolumeSpec.NO_GROUP_NAME
self.subvolumeid = subvolumeid
self.groupid = groupid if groupid is not None else SubvolumeSpec.NO_GROUP_NAME
self.subvolume_prefix = subvolume_prefix if subvolume_prefix else SubvolumeSpec.DEFAULT_SUBVOL_PREFIX
self.pool_ns_prefix = pool_ns_prefix if pool_ns_prefix else SubvolumeSpec.DEFAULT_NS_PREFIX
def is_default_group(self):
"""
Is the group the default group?
"""
return self.groupid == SubvolumeSpec.NO_GROUP_NAME
@property
def subvolume_id(self):
"""
Return the subvolume-id from the subvolume specification
"""
return self.subvolumeid
@property
def group_id(self):
"""
Return the group-id from the subvolume secification
"""
return self.groupid
@property
def subvolume_path(self):
"""
return the subvolume path from subvolume specification
"""
return os.path.join(self.subvolume_prefix, self.groupid, self.subvolumeid)
@property
def group_path(self):
"""
return the group path from subvolume specification
"""
return os.path.join(self.subvolume_prefix, self.groupid)
@property
def trash_path(self):
"""
return the trash path from subvolume specification
"""
return os.path.join(self.subvolume_prefix, "_deleting", self.subvolumeid)
@property
def fs_namespace(self):
"""
return a filesystem namespace by stashing pool namespace prefix and subvolume-id
"""
return "{0}{1}".format(self.pool_ns_prefix, self.subvolumeid)
@property
def group_dir(self):
"""
return the group directory path
"""
return self.subvolume_prefix
@property
def trash_dir(self):
"""
return the trash directory path
"""
return os.path.join(self.subvolume_prefix, "_deleting")
def make_subvol_snap_path(self, snapdir, snapname):
"""
return the subvolume snapshot path for a given snapshot name
"""
return os.path.join(self.subvolume_path, snapdir, snapname)
def make_group_snap_path(self, snapdir, snapname):
"""
return the group snapshot path for a given snapshot name
"""
return os.path.join(self.group_path, snapdir, snapname)
def __str__(self):
return "{0}/{1}".format(self.groupid, self.subvolumeid)

View File

@ -4,38 +4,18 @@ Copyright (C) 2019 Red Hat, Inc.
LGPL2.1. See file COPYING.
"""
import errno
import logging
import os
import errno
import cephfs
import rados
from .subvolspec import SubvolumeSpec
from .exception import VolumeException
log = logging.getLogger(__name__)
# Reserved subvolume group name which we use in paths for subvolumes
# that are not assigned to a group (i.e. created with group=None)
NO_GROUP_NAME = "_nogroup"
class SubvolumePath(object):
"""
Identify a subvolume's path as group->subvolume
The Subvolume ID is a unique identifier, but this is a much more
helpful thing to pass around.
"""
def __init__(self, group_id, subvolume_id):
self.group_id = group_id
self.subvolume_id = subvolume_id
assert self.group_id != NO_GROUP_NAME
assert self.subvolume_id != "" and self.subvolume_id is not None
def __str__(self):
return "{0}/{1}".format(self.group_id, self.subvolume_id)
class SubvolumeClient(object):
class SubVolume(object):
"""
Combine libcephfs and librados interfaces to implement a
'Subvolume' concept implemented as a cephfs directory.
@ -52,42 +32,228 @@ class SubvolumeClient(object):
or cephfs.Error exceptions in unexpected situations.
"""
# Where shall we create our subvolumes?
DEFAULT_SUBVOL_PREFIX = "/volumes"
DEFAULT_NS_PREFIX = "fsvolumens_"
def __init__(self, mgr, subvolume_prefix=None, pool_ns_prefix=None, fs_name=None):
def __init__(self, mgr, fs_name=None):
self.fs = None
self.fs_name = fs_name
self.connected = False
self.rados = mgr.rados
self.subvolume_prefix = subvolume_prefix if subvolume_prefix else self.DEFAULT_SUBVOL_PREFIX
self.pool_ns_prefix = pool_ns_prefix if pool_ns_prefix else self.DEFAULT_NS_PREFIX
def _mkdir_p(self, path, mode=0o755):
try:
self.fs.stat(path)
except cephfs.ObjectNotFound:
pass
else:
return
def _subvolume_path(self, subvolume_path):
"""
Determine the path within CephFS where this subvolume will live
:return: absolute path (string)
"""
return os.path.join(
self.subvolume_prefix,
subvolume_path.group_id if subvolume_path.group_id is not None else NO_GROUP_NAME,
subvolume_path.subvolume_id)
parts = path.split(os.path.sep)
def _group_path(self, group_id):
"""
Determine the path within CephFS where this subvolume group will live
:return: absolute path (string)
"""
if group_id is None:
raise ValueError("group_id may not be None")
for i in range(1, len(parts) + 1):
subpath = os.path.join(*parts[0:i])
try:
self.fs.stat(subpath)
except cephfs.ObjectNotFound:
self.fs.mkdir(subpath, mode)
except cephfs.Error as e:
raise VolumeException(e.args[0], e.args[1])
return os.path.join(
self.subvolume_prefix,
group_id
)
### basic subvolume operations
def create_subvolume(self, spec, size=None, namespace_isolated=True, mode=0o755):
"""
Set up metadata, pools and auth for a subvolume.
This function is idempotent. It is safe to call this again
for an already-created subvolume, even if it is in use.
:param spec: subvolume path specification
:param size: In bytes, or None for no size limit
:param namespace_isolated: If true, use separate RADOS namespace for this subvolume
:return: None
"""
subvolpath = spec.subvolume_path
log.info("creating subvolume with path: {0}".format(subvolpath))
self._mkdir_p(subvolpath, mode)
if size is not None:
self.fs.setxattr(subvolpath, 'ceph.quota.max_bytes', str(size).encode('utf-8'), 0)
xattr_key = xattr_val = None
if namespace_isolated:
# enforce security isolation, use separate namespace for this subvolume
xattr_key = 'ceph.dir.layout.pool_namespace'
xattr_val = spec.fs_namespace
else:
# If subvolume's namespace layout is not set, then the subvolume's pool
# layout remains unset and will undesirably change with ancestor's
# pool layout changes.
xattr_key = 'ceph.dir.layout.pool'
xattr_val = self._get_ancestor_xattr(subvolpath, "ceph.dir.layout.pool")
# TODO: handle error...
self.fs.setxattr(subvolpath, xattr_key, xattr_val.encode('utf-8'), 0)
def remove_subvolume(self, spec, force):
"""
Make a subvolume inaccessible to guests. This function is idempotent.
This is the fast part of tearing down a subvolume: you must also later
call purge_subvolume, which is the slow part.
:param spec: subvolume path specification
:param force: flag to ignore non-existent path (never raise exception)
:return: None
"""
subvolpath = spec.subvolume_path
log.info("deleting subvolume with path: {0}".format(subvolpath))
# Create the trash directory if it doesn't already exist
trashdir = spec.trash_dir
self._mkdir_p(trashdir)
trashpath = spec.trash_path
try:
self.fs.rename(subvolpath, trashpath)
except cephfs.ObjectNotFound:
if not force:
raise VolumeException(
-errno.ENOENT, "Subvolume '{0}' not found, cannot remove it".format(spec.subvolume_id))
except cephfs.Error as e:
raise VolumeException(e.args[0], e.args[1])
def purge_subvolume(self, spec):
"""
Finish clearing up a subvolume that was previously passed to delete_subvolume. This
function is idempotent.
"""
def rmtree(root_path):
log.debug("rmtree {0}".format(root_path))
try:
dir_handle = self.fs.opendir(root_path)
except cephfs.ObjectNotFound:
return
except cephfs.Error as e:
raise VolumeException(e.args[0], e.args[1])
d = self.fs.readdir(dir_handle)
while d:
d_name = d.d_name.decode('utf-8')
if d_name not in [".", ".."]:
# Do not use os.path.join because it is sensitive
# to string encoding, we just pass through dnames
# as byte arrays
d_full = "{0}/{1}".format(root_path, d_name)
if d.is_dir():
rmtree(d_full)
else:
self.fs.unlink(d_full)
d = self.fs.readdir(dir_handle)
self.fs.closedir(dir_handle)
self.fs.rmdir(root_path)
trashpath = spec.trash_path
rmtree(trashpath)
def get_subvolume_path(self, spec):
path = spec.subvolume_path
try:
self.fs.stat(path)
except cephfs.ObjectNotFound:
return None
except cephfs.Error as e:
raise VolumeException(e.args[0]. e.args[1])
return path
### group operations
def create_group(self, spec, mode=0o755):
path = spec.group_path
self._mkdir_p(path, mode)
def remove_group(self, spec, force):
path = spec.group_path
try:
self.fs.rmdir(path)
except cephfs.ObjectNotFound:
if not force:
raise VolumeException(-errno.ENOENT, "Subvolume group '{0}' not found".format(spec.group_id))
except cephfs.Error as e:
raise VolumeException(e.args[0], e.args[1])
def get_group_path(self, spec):
path = spec.group_path
try:
self.fs.stat(path)
except cephfs.ObjectNotFound:
return None
return path
def _get_ancestor_xattr(self, path, attr):
"""
Helper for reading layout information: if this xattr is missing
on the requested path, keep checking parents until we find it.
"""
try:
result = self.fs.getxattr(path, attr).decode('utf-8')
if result == "":
# Annoying! cephfs gives us empty instead of an error when attr not found
raise cephfs.NoData()
else:
return result
except cephfs.NoData:
if path == "/":
raise
else:
return self._get_ancestor_xattr(os.path.split(path)[0], attr)
### snapshot operations
def _snapshot_create(self, snappath, mode=0o755):
"""
Create a snapshot, or do nothing if it already exists.
"""
try:
self.fs.stat(snappath)
except cephfs.ObjectNotFound:
self.fs.mkdir(snappath, mode)
except cephfs.Error as e:
raise VolumeException(e.args[0], e.args[1])
else:
log.warn("Snapshot '{0}' already exists".format(snappath))
def _snapshot_delete(self, snappath, force):
"""
Remove a snapshot, or do nothing if it doesn't exist.
"""
try:
self.fs.stat(snappath)
self.fs.rmdir(snappath)
except cephfs.ObjectNotFound:
if not force:
raise VolumeException(-errno.ENOENT, "Snapshot '{0}' not found, cannot remove it".format(snappath))
except cephfs.Error as e:
raise VolumeException(e.args[0], e.args[1])
def create_subvolume_snapshot(self, spec, snapname, mode=0o755):
snappath = spec.make_subvol_snap_path(self.rados.conf_get('client_snapdir'), snapname)
self._snapshot_create(snappath, mode)
def remove_subvolume_snapshot(self, spec, snapname, force):
snappath = spec.make_subvol_snap_path(self.rados.conf_get('client_snapdir'), snapname)
self._snapshot_delete(snappath, force)
def create_group_snapshot(self, spec, snapname, mode=0o755):
snappath = spec.make_group_snap_path(self.rados.conf_get('client_snapdir'), snapname)
self._snapshot_create(snappath, mode)
def remove_group_snapshot(self, spec, snapname, force):
snappath = spec.make_group_snap_path(self.rados.conf_get('client_snapdir'), snapname)
return self._snapshot_delete(snappath, force)
### context manager routines
def connect(self):
log.debug("Connecting to cephfs...")
@ -115,193 +281,3 @@ class SubvolumeClient(object):
def __del__(self):
self.disconnect()
def _mkdir_p(self, path, mode=0o755):
try:
self.fs.stat(path)
except cephfs.ObjectNotFound:
pass
else:
return
parts = path.split(os.path.sep)
for i in range(1, len(parts) + 1):
subpath = os.path.join(*parts[0:i])
try:
self.fs.stat(subpath)
except cephfs.ObjectNotFound:
self.fs.mkdir(subpath, mode)
def create_group(self, group_id, mode=0o755):
path = self._group_path(group_id)
self._mkdir_p(path, mode)
def delete_group(self, group_id):
path = self._group_path(group_id)
self.fs.rmdir(path)
def create_subvolume(self, subvolume_path, size=None, namespace_isolated=True, mode=0o755):
"""
Set up metadata, pools and auth for a subvolume.
This function is idempotent. It is safe to call this again
for an already-created subvolume, even if it is in use.
:param subvolume_path: SubvolumePath instance
:param size: In bytes, or None for no size limit
:param namespace_isolated: If true, use separate RADOS namespace for this subvolume
:return: None
"""
path = self._subvolume_path(subvolume_path)
log.info("creating subvolume with path: {0}".format(path))
self._mkdir_p(path, mode)
if size is not None:
self.fs.setxattr(path, 'ceph.quota.max_bytes', str(size).encode('utf-8'), 0)
# enforce security isolation, use separate namespace for this subvolume
if namespace_isolated:
namespace = "{0}{1}".format(self.pool_ns_prefix, subvolume_path.subvolume_id)
log.info("creating subvolume with path: {0}, using rados namespace {1} to isolate data.".format(subvolume_path, namespace))
self.fs.setxattr(path, 'ceph.dir.layout.pool_namespace',
namespace.encode('utf-8'), 0)
else:
# If subvolume's namespace layout is not set, then the subvolume's pool
# layout remains unset and will undesirably change with ancestor's
# pool layout changes.
pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool")
self.fs.setxattr(path, 'ceph.dir.layout.pool',
pool_name.encode('utf-8'), 0)
def delete_subvolume(self, subvolume_path):
"""
Make a subvolume inaccessible to guests. This function is idempotent.
This is the fast part of tearing down a subvolume: you must also later
call purge_subvolume, which is the slow part.
:param subvolume_path: Same identifier used in create_subvolume
:return: None
"""
path = self._subvolume_path(subvolume_path)
log.info("deleting subvolume with path: {0}".format(path))
# Create the trash folder if it doesn't already exist
trash = os.path.join(self.subvolume_prefix, "_deleting")
self._mkdir_p(trash)
# We'll move it to here
trashed_subvolume = os.path.join(trash, subvolume_path.subvolume_id)
# Move the subvolume to the trash folder
self.fs.rename(path, trashed_subvolume)
def purge_subvolume(self, subvolume_path):
"""
Finish clearing up a subvolume that was previously passed to delete_subvolume. This
function is idempotent.
"""
trash = os.path.join(self.subvolume_prefix, "_deleting")
trashed_subvolume = os.path.join(trash, subvolume_path.subvolume_id)
def rmtree(root_path):
log.debug("rmtree {0}".format(root_path))
try:
dir_handle = self.fs.opendir(root_path)
except cephfs.ObjectNotFound:
return
d = self.fs.readdir(dir_handle)
while d:
d_name = d.d_name.decode('utf-8')
if d_name not in [".", ".."]:
# Do not use os.path.join because it is sensitive
# to string encoding, we just pass through dnames
# as byte arrays
d_full = "{0}/{1}".format(root_path, d_name)
if d.is_dir():
rmtree(d_full)
else:
self.fs.unlink(d_full)
d = self.fs.readdir(dir_handle)
self.fs.closedir(dir_handle)
self.fs.rmdir(root_path)
rmtree(trashed_subvolume)
def _get_ancestor_xattr(self, path, attr):
"""
Helper for reading layout information: if this xattr is missing
on the requested path, keep checking parents until we find it.
"""
try:
result = self.fs.getxattr(path, attr).decode('utf-8')
if result == "":
# Annoying! cephfs gives us empty instead of an error when attr not found
raise cephfs.NoData()
else:
return result
except cephfs.NoData:
if path == "/":
raise
else:
return self._get_ancestor_xattr(os.path.split(path)[0], attr)
def get_group_path(self, group_id):
path = self._group_path(group_id)
try:
self.fs.stat(path)
except cephfs.ObjectNotFound:
return None
return path
def get_subvolume_path(self, subvolume_path):
path = self._subvolume_path(subvolume_path)
try:
self.fs.stat(path)
except cephfs.ObjectNotFound:
return None
return path
def _snapshot_path(self, dir_path, snapshot_name):
return os.path.join(
dir_path, self.rados.conf_get('client_snapdir'), snapshot_name
)
def _snapshot_create(self, dir_path, snapshot_name, mode=0o755):
"""
Create a snapshot, or do nothing if it already exists.
"""
snapshot_path = self._snapshot_path(dir_path, snapshot_name)
try:
self.fs.stat(snapshot_path)
except cephfs.ObjectNotFound:
self.fs.mkdir(snapshot_path, mode)
else:
log.warn("Snapshot '{0}' already exists".format(snapshot_name))
def _snapshot_delete(self, dir_path, snapshot_name):
"""
Remove a snapshot, or do nothing if it doesn't exist.
"""
snapshot_path = self._snapshot_path(dir_path, snapshot_name)
self.fs.stat(snapshot_path)
self.fs.rmdir(snapshot_path)
def create_subvolume_snapshot(self, subvolume_path, snapshot_name, mode=0o755):
return self._snapshot_create(self._subvolume_path(subvolume_path), snapshot_name, mode)
def delete_subvolume_snapshot(self, subvolume_path, snapshot_name):
return self._snapshot_delete(self._subvolume_path(subvolume_path), snapshot_name)
def create_group_snapshot(self, group_id, snapshot_name, mode=0o755):
return self._snapshot_create(self._group_path(group_id), snapshot_name, mode)
def delete_group_snapshot(self, group_id, snapshot_name):
return self._snapshot_delete(self._group_path(group_id), snapshot_name)

View File

@ -0,0 +1,358 @@
import json
import errno
import logging
import cephfs
import orchestrator
from .subvolspec import SubvolumeSpec
from .subvolume import SubVolume
from .exception import VolumeException
log = logging.getLogger(__name__)
class VolumeClient(object):
def __init__(self, mgr):
self.mgr = mgr
def gen_pool_names(self, volname):
"""
return metadata and data pool name (from a filesystem/volume name) as a tuple
"""
return "cephfs.{}.meta".format(volname), "cephfs.{}.data".format(volname)
def get_fs(self, fs_name):
fs_map = self.mgr.get('fs_map')
for fs in fs_map['filesystems']:
if fs['mdsmap']['fs_name'] == fs_name:
return fs
return None
def get_mds_names(self, fs_name):
fs = self.get_fs(fs_name)
if fs is None:
return []
return [mds['name'] for mds in fs['mdsmap']['info'].values()]
def volume_exists(self, volname):
return self.get_fs(volname) is not None
def volume_exception_to_retval(self, ve):
"""
return a tuple representation from a volume exception
"""
return ve.to_tuple()
def create_pool(self, pool_name, pg_num, pg_num_min=None, pg_autoscale_factor=None):
# create the given pool
command = {'prefix': 'osd pool create', 'pool': pool_name, 'pg_num': pg_num}
if pg_num_min:
command['pg_num_min'] = pg_num_min
r, outb, outs = self.mgr.mon_command(command)
if r != 0:
return r, outb, outs
# set pg autoscale if needed
if pg_autoscale_factor:
command = {'prefix': 'osd pool set', 'pool': pool_name, 'var': 'pg_autoscale_bias',
'val': str(pg_autoscale_factor)}
r, outb, outs = self.mgr.mon_command(command)
return r, outb, outs
def remove_pool(self, pool_name):
command = {'prefix': 'osd pool rm', 'pool': pool_name, 'pool2': pool_name,
'yes_i_really_really_mean_it': True}
return self.mgr.mon_command(command)
def create_filesystem(self, fs_name, metadata_pool, data_pool):
command = {'prefix': 'fs new', 'fs_name': fs_name, 'metadata': metadata_pool,
'data': data_pool}
return self.mgr.mon_command(command)
def remove_filesystem(self, fs_name):
command = {'prefix': 'fs rm', 'fs_name': fs_name, 'yes_i_really_mean_it': True}
return self.mgr.mon_command(command)
def create_mds(self, fs_name):
spec = orchestrator.StatelessServiceSpec()
spec.name = fs_name
try:
completion = self.mgr.add_stateless_service("mds", spec)
self.mgr._orchestrator_wait([completion])
orchestrator.raise_if_exception(completion)
except (ImportError, orchestrator.OrchestratorError):
return 0, "", "Volume created successfully (no MDS daemons created)"
except Exception as e:
# Don't let detailed orchestrator exceptions (python backtraces)
# bubble out to the user
log.exception("Failed to create MDS daemons")
return -errno.EINVAL, "", str(e)
return 0, "", ""
def set_mds_down(self, fs_name):
command = {'prefix': 'fs set', 'fs_name': fs_name, 'var': 'cluster_down', 'val': 'true'}
r, outb, outs = self.mgr.mon_command(command)
if r != 0:
return r, outb, outs
for mds in self.get_mds_names(fs_name):
command = {'prefix': 'mds fail', 'role_or_gid': mds}
r, outb, outs = self.mgr.mon_command(command)
if r != 0:
return r, outb, outs
return 0, "", ""
### volume operations -- create, rm, ls
def create_volume(self, volname, size=None):
"""
create volume (pool, filesystem and mds)
"""
metadata_pool, data_pool = self.gen_pool_names(volname)
# create pools
r, outs, outb = self.create_pool(metadata_pool, 16, pg_num_min=16, pg_autoscale_factor=4.0)
if r != 0:
return r, outb, outs
r, outb, outs = self.create_pool(data_pool, 8)
if r != 0:
return r, outb, outs
# create filesystem
r, outb, outs = self.create_filesystem(volname, metadata_pool, data_pool)
if r != 0:
log.error("Filesystem creation error: {0} {1} {2}".format(r, outb, outs))
return r, outb, outs
# create mds
return self.create_mds(volname)
def delete_volume(self, volname):
"""
delete the given module (tear down mds, remove filesystem)
"""
# Tear down MDS daemons
try:
completion = self.mgr.remove_stateless_service("mds", volname)
self.mgr._orchestrator_wait([completion])
orchestrator.raise_if_exception(completion)
except (ImportError, orchestrator.OrchestratorError):
log.warning("OrchestratorError, not tearing down MDS daemons")
except Exception as e:
# Don't let detailed orchestrator exceptions (python backtraces)
# bubble out to the user
log.exception("Failed to tear down MDS daemons")
return -errno.EINVAL, "", str(e)
# In case orchestrator didn't tear down MDS daemons cleanly, or
# there was no orchestrator, we force the daemons down.
if self.volume_exists(volname):
r, outb, outs = self.set_mds_down(volname)
if r != 0:
return r, outb, outs
r, outb, outs = self.remove_filesystem(volname)
if r != 0:
return r, outb, outs
else:
log.warning("Filesystem already gone for volume '{0}'".format(volname))
metadata_pool, data_pool = self.gen_pool_names(volname)
r, outb, outs = self.remove_pool(metadata_pool)
if r != 0:
return r, outb, outs
return self.remove_pool(data_pool)
def list_volumes(self):
result = []
fs_map = self.mgr.get("fs_map")
for f in fs_map['filesystems']:
result.append({'name': f['mdsmap']['fs_name']})
return 0, json.dumps(result, indent=2), ""
def group_exists(self, sv, spec):
# default group need not be explicitly created (as it gets created
# at the time of subvolume, snapshot and other create operations).
return spec.is_default_group() or sv.get_group_path(spec)
### subvolume operations
def create_subvolume(self, volname, subvolname, groupname, size):
ret = 0, "", ""
try:
if not self.volume_exists(volname):
raise VolumeException(
-errno.ENOENT, "Volume '{0}' not found, create it with `ceph fs " \
"volume create` before trying to create subvolumes".format(volname))
with SubVolume(self.mgr, fs_name=volname) as sv:
spec = SubvolumeSpec(subvolname, groupname)
if not self.group_exists(sv, spec):
raise VolumeException(
-errno.ENOENT, "Subvolume group '{0}' not found, create it with " \
"`ceph fs subvolumegroup create` before creating subvolumes".format(groupname))
sv.create_subvolume(spec, size)
except VolumeException as ve:
ret = self.volume_exception_to_retval(ve)
return ret
def remove_subvolume(self, volname, subvolname, groupname, force):
ret = 0, "", ""
try:
fs = self.get_fs(volname)
if fs:
with SubVolume(self.mgr, fs_name=volname) as sv:
spec = SubvolumeSpec(subvolname, groupname)
if self.group_exists(sv, spec):
sv.remove_subvolume(spec, force)
sv.purge_subvolume(spec)
elif not force:
raise VolumeException(
-errno.ENOENT, "Subvolume group '{0}' not found, cannot remove " \
"subvolume '{1}'".format(groupname, subvolname))
elif not force:
raise VolumeException(
-errno.ENOENT, "Volume '{0}' not found, cannot remove subvolume " \
"'{1}'".format(volname, subvolname))
except VolumeException as ve:
ret = self.volume_exception_to_retval(ve)
return ret
def subvolume_getpath(self, volname, subvolname, groupname):
ret = None
try:
if not self.volume_exists(volname):
raise VolumeException(
-errno.ENOENT, "Volume '{0}' not found".format(volname))
with SubVolume(self.mgr, fs_name=volname) as sv:
spec = SubvolumeSpec(subvolname, groupname)
if not self.group_exists(sv, spec):
raise VolumeException(
-errno.ENOENT, "Subvolume group '{0}' not found".format(groupname))
path = sv.get_subvolume_path(spec)
if not path:
raise VolumeException(
-errno.ENOENT, "Subvolume '{0}' not found".format(subvolname))
ret = 0, path, ""
except VolumeException as ve:
ret = self.volume_exception_to_retval(ve)
return ret
### subvolume snapshot
def create_subvolume_snapshot(self, volname, subvolname, snapname, groupname):
ret = 0, "", ""
try:
if not self.volume_exists(volname):
raise VolumeException(
-errno.ENOENT, "Volume '{0}' not found, cannot create snapshot " \
"'{1}'".format(volname, snapname))
with SubVolume(self.mgr, fs_name=volname) as sv:
spec = SubvolumeSpec(subvolname, groupname)
if not self.group_exists(sv, spec):
raise VolumeException(
-errno.ENOENT, "Subvolume group '{0}' not found, cannot create " \
"snapshot '{1}'".format(groupname, snapname))
if not sv.get_subvolume_path(spec):
raise VolumeException(
-errno.ENOENT, "Subvolume '{0}' not found, cannot create snapshot " \
"'{1}'".format(subvolname, snapname))
sv.create_subvolume_snapshot(spec, snapname)
except VolumeException as ve:
ret = self.volume_exception_to_retval(ve)
return ret
def remove_subvolume_snapshot(self, volname, subvolname, snapname, groupname, force):
ret = 0, "", ""
try:
if self.volume_exists(volname):
with SubVolume(self.mgr, fs_name=volname) as sv:
spec = SubvolumeSpec(subvolname, groupname)
if self.group_exists(sv, spec):
if sv.get_subvolume_path(spec):
sv.remove_subvolume_snapshot(spec, snapname, force)
elif not force:
raise VolumeException(
-errno.ENOENT, "Subvolume '{0}' not found, cannot remove " \
"subvolume snapshot '{1}'".format(subvolname, snapname))
elif not force:
raise VolumeException(
-errno.ENOENT, "Subvolume group '{0}' already removed, cannot " \
"remove subvolume snapshot '{1}'".format(groupname, snapname))
elif not force:
raise VolumeException(
-errno.ENOENT, "Volume '{0}' not found, cannot remove subvolumegroup " \
"snapshot '{1}'".format(volname, snapname))
except VolumeException as ve:
ret = self.volume_exception_to_retval(ve)
return ret
### group operations
def create_subvolume_group(self, volname, groupname):
ret = 0, "", ""
try:
if not self.volume_exists(volname):
raise VolumeException(
-errno.ENOENT, "Volume '{0}' not found, create it with `ceph fs " \
"volume create` before trying to create subvolume groups".format(volname))
# TODO: validate that subvol size fits in volume size
with SubVolume(self.mgr, fs_name=volname) as sv:
spec = SubvolumeSpec("", groupname)
sv.create_group(spec)
except VolumeException as ve:
ret = self.volume_exception_to_retval(ve)
return ret
def remove_subvolume_group(self, volname, groupname, force):
ret = 0, "", ""
try:
if self.volume_exists(volname):
with SubVolume(self.mgr, fs_name=volname) as sv:
# TODO: check whether there are no subvolumes in the group
spec = SubvolumeSpec("", groupname)
sv.remove_group(spec, force)
elif not force:
raise VolumeException(
-errno.ENOENT, "Volume '{0}' not found, cannot remove subvolume " \
"group '{0}'".format(volname, groupname))
except VolumeException as ve:
ret = self.volume_exception_to_retval(ve)
return ret
### group snapshot
def create_subvolume_group_snapshot(self, volname, groupname, snapname):
ret = 0, "", ""
try:
if not self.volume_exists(volname):
raise VolumeException(
-errno.ENOENT, "Volume '{0}' not found, cannot create snapshot " \
"'{1}'".format(volname, snapname))
with SubVolume(self.mgr, fs_name=volname) as sv:
spec = SubvolumeSpec("", groupname)
if not self.group_exists(sv, spec):
raise VolumeException(
-errno.ENOENT, "Subvolume group '{0}' not found, cannot create " \
"snapshot '{1}'".format(groupname, snapname))
sv.create_group_snapshot(spec, snapname)
except VolumeException as ve:
ret = self.volume_exception_to_retval(ve)
return ret
def remove_subvolume_group_snapshot(self, volname, groupname, snapname, force):
ret = 0, "", ""
try:
if self.volume_exists(volname):
with SubVolume(self.mgr, fs_name=volname) as sv:
spec = SubvolumeSpec("", groupname)
if self.group_exists(sv, spec):
sv.remove_group_snapshot(spec, snapname, force)
elif not force:
raise VolumeException(
-errno.ENOENT, "Subvolume group '{0}' not found, cannot " \
"remove it".format(groupname))
elif not force:
raise VolumeException(
-errno.ENOENT, "Volume '{0}' not found, cannot remove subvolumegroup " \
"snapshot '{1}'".format(volname, snapname))
except VolumeException as ve:
ret = self.volume_exception_to_retval(ve)
return ret

View File

@ -10,7 +10,7 @@ import cephfs
from mgr_module import MgrModule
import orchestrator
from .fs.subvolume import SubvolumePath, SubvolumeClient
from .fs.volume import VolumeClient
class PurgeJob(object):
def __init__(self, volume_fscid, subvolume_path):
@ -23,7 +23,6 @@ class PurgeJob(object):
self.fscid = volume_fscid
self.subvolume_path = subvolume_path
class Module(orchestrator.OrchestratorClientMixin, MgrModule):
COMMANDS = [
{
@ -148,6 +147,7 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
def __init__(self, *args, **kwargs):
super(Module, self).__init__(*args, **kwargs)
self._initialized = Event()
self.vc = VolumeClient(self)
self._background_jobs = Queue.Queue()
@ -176,107 +176,20 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
return handler(inbuf, cmd)
def _pool_base_name(self, volume_name):
"""
Convention for naming pools for volumes
:return: string
"""
return "cephfs.{0}".format(volume_name)
def _pool_names(self, pool_base_name):
return pool_base_name + ".meta", pool_base_name + ".data"
def _cmd_fs_volume_create(self, inbuf, cmd):
vol_id = cmd['name']
# TODO: validate name against any rules for pool/fs names
# (...are there any?)
vol_id = cmd['name']
size = cmd.get('size', None)
base_name = self._pool_base_name(vol_id)
mdp_name, dp_name = self._pool_names(base_name)
return self.vc.create_volume(vol_id, size)
r, outb, outs = self.mon_command({
'prefix': 'osd pool create',
'pool': mdp_name,
'pg_num': 16,
'pg_num_min': 16,
})
if r != 0:
return r, outb, outs
def _cmd_fs_volume_rm(self, inbuf, cmd):
vol_name = cmd['vol_name']
return self.vc.delete_volume(vol_name)
# count fs metadata omap at 4x usual rate
r, outb, outs = self.mon_command({
'prefix': 'osd pool set',
'pool': mdp_name,
'var': "pg_autoscale_bias",
'val': "4.0",
})
if r != 0:
return r, outb, outs
r, outb, outs = self.mon_command({
'prefix': 'osd pool create',
'pool': dp_name,
'pg_num': 8
})
if r != 0:
return r, outb, outs
# Create a filesystem
# ====================
r, outb, outs = self.mon_command({
'prefix': 'fs new',
'fs_name': vol_id,
'metadata': mdp_name,
'data': dp_name
})
if r != 0:
self.log.error("Filesystem creation error: {0} {1} {2}".format(
r, outb, outs
))
return r, outb, outs
# TODO: apply quotas to the filesystem root
# Create an MDS cluster
# =====================
spec = orchestrator.StatelessServiceSpec()
spec.name = vol_id
try:
completion = self.add_stateless_service("mds", spec)
self._orchestrator_wait([completion])
orchestrator.raise_if_exception(completion)
except (ImportError, orchestrator.OrchestratorError):
return 0, "", "Volume created successfully (no MDS daemons created)"
except Exception as e:
# Don't let detailed orchestrator exceptions (python backtraces)
# bubble out to the user
self.log.exception("Failed to create MDS daemons")
return -errno.EINVAL, "", str(e)
return 0, "", ""
def _volume_get_fs(self, vol_name):
fs_map = self.get('fs_map')
for fs in fs_map['filesystems']:
if fs['mdsmap']['fs_name'] == vol_name:
return fs
# Fall through
return None
def _volume_get_mds_daemon_names(self, vol_name):
fs = self._volume_get_fs(vol_name)
if fs is None:
return []
return [i['name'] for i in fs['mdsmap']['info'].values()]
def _volume_exists(self, vol_name):
return self._volume_get_fs(vol_name) is not None
def _cmd_fs_volume_ls(self, inbuf, cmd):
return self.vc.list_volumes()
def _cmd_fs_subvolumegroup_create(self, inbuf, cmd):
"""
@ -285,17 +198,7 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
vol_name = cmd['vol_name']
group_name = cmd['group_name']
if not self._volume_exists(vol_name):
return -errno.ENOENT, "", \
"Volume '{0}' not found, create it with `ceph fs volume create` " \
"before trying to create subvolume groups".format(vol_name)
# TODO: validate that subvol size fits in volume size
with SubvolumeClient(self, fs_name=vol_name) as svc:
svc.create_group(group_name)
return 0, "", ""
return self.vc.create_subvolume_group(vol_name, group_name)
def _cmd_fs_subvolumegroup_rm(self, inbuf, cmd):
"""
@ -303,29 +206,9 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
"""
vol_name = cmd['vol_name']
group_name = cmd['group_name']
force = cmd.get('force', False)
if not self._volume_exists(vol_name):
if force:
return 0, "", ""
else:
return -errno.ENOENT, "", \
"Volume '{0}' not found, cannot remove subvolume group '{0}'".format(vol_name, group_name)
with SubvolumeClient(self, fs_name=vol_name) as svc:
# TODO: check whether there are no subvolumes in the group
try:
svc.delete_group(group_name)
except cephfs.ObjectNotFound:
if force:
return 0, "", ""
else:
return -errno.ENOENT, "", \
"Subvolume group '{0}' not found, cannot remove it".format(group_name)
return 0, "", ""
return self.vc.remove_subvolume_group(vol_name, group_name, force)
def _cmd_fs_subvolume_create(self, inbuf, cmd):
"""
@ -333,26 +216,10 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
"""
vol_name = cmd['vol_name']
sub_name = cmd['sub_name']
size = cmd.get('size', None)
group_name = cmd.get('group_name', None)
if not self._volume_exists(vol_name):
return -errno.ENOENT, "", \
"Volume '{0}' not found, create it with `ceph fs volume create` " \
"before trying to create subvolumes".format(vol_name)
# TODO: validate that subvol size fits in volume size
with SubvolumeClient(self, fs_name=vol_name) as svc:
if group_name and not svc.get_group_path(group_name):
return -errno.ENOENT, "", \
"Subvolume group '{0}' not found, create it with `ceph fs subvolumegroup create` " \
"before trying to create subvolumes".format(group_name)
svp = SubvolumePath(group_name, sub_name)
svc.create_subvolume(svp, size)
return 0, "", ""
return self.vc.create_subvolume(vol_name, sub_name, group_name, size)
def _cmd_fs_subvolume_rm(self, inbuf, cmd):
"""
@ -360,258 +227,46 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
"""
vol_name = cmd['vol_name']
sub_name = cmd['sub_name']
force = cmd.get('force', False)
group_name = cmd.get('group_name', None)
fs = self._volume_get_fs(vol_name)
if fs is None:
if force:
return 0, "", ""
else:
return -errno.ENOENT, "", \
"Volume '{0}' not found, cannot remove subvolume '{1}'".format(vol_name, sub_name)
vol_fscid = fs['id']
with SubvolumeClient(self, fs_name=vol_name) as svc:
if group_name and not svc.get_group_path(group_name):
if force:
return 0, "", ""
else:
return -errno.ENOENT, "", \
"Subvolume group '{0}' not found, cannot remove subvolume '{1}'".format(group_name, sub_name)
svp = SubvolumePath(group_name, sub_name)
try:
svc.delete_subvolume(svp)
except cephfs.ObjectNotFound:
if force:
return 0, "", ""
else:
return -errno.ENOENT, "", \
"Subvolume '{0}' not found, cannot remove it".format(sub_name)
svc.purge_subvolume(svp)
# TODO: purge subvolume asynchronously
# self._background_jobs.put(PurgeJob(vol_fscid, svp))
return 0, "", ""
def _cmd_fs_volume_rm(self, inbuf, cmd):
vol_name = cmd['vol_name']
# Tear down MDS daemons
# =====================
try:
completion = self.remove_stateless_service("mds", vol_name)
self._orchestrator_wait([completion])
orchestrator.raise_if_exception(completion)
except (ImportError, orchestrator.OrchestratorError):
self.log.warning("OrchestratorError, not tearing down MDS daemons")
except Exception as e:
# Don't let detailed orchestrator exceptions (python backtraces)
# bubble out to the user
self.log.exception("Failed to tear down MDS daemons")
return -errno.EINVAL, "", str(e)
if self._volume_exists(vol_name):
# In case orchestrator didn't tear down MDS daemons cleanly, or
# there was no orchestrator, we force the daemons down.
r, out, err = self.mon_command({
'prefix': 'fs set',
'fs_name': vol_name,
'var': 'cluster_down',
'val': 'true'
})
if r != 0:
return r, out, err
for mds_name in self._volume_get_mds_daemon_names(vol_name):
r, out, err = self.mon_command({
'prefix': 'mds fail',
'role_or_gid': mds_name})
if r != 0:
return r, out, err
# Delete CephFS filesystem
# =========================
r, out, err = self.mon_command({
'prefix': 'fs rm',
'fs_name': vol_name,
'yes_i_really_mean_it': True,
})
if r != 0:
return r, out, err
else:
self.log.warning("Filesystem already gone for volume '{0}'".format(
vol_name
))
# Delete pools
# ============
base_name = self._pool_base_name(vol_name)
mdp_name, dp_name = self._pool_names(base_name)
r, out, err = self.mon_command({
'prefix': 'osd pool rm',
'pool': mdp_name,
'pool2': mdp_name,
'yes_i_really_really_mean_it': True,
})
if r != 0:
return r, out, err
r, out, err = self.mon_command({
'prefix': 'osd pool rm',
'pool': dp_name,
'pool2': dp_name,
'yes_i_really_really_mean_it': True,
})
if r != 0:
return r, out, err
return 0, "", ""
def _cmd_fs_volume_ls(self, inbuf, cmd):
fs_map = self.get("fs_map")
result = []
for f in fs_map['filesystems']:
result.append({
'name': f['mdsmap']['fs_name']
})
return 0, json.dumps(result, indent=2), ""
return self.vc.remove_subvolume(vol_name, sub_name, group_name, force)
def _cmd_fs_subvolume_getpath(self, inbuf, cmd):
vol_name = cmd['vol_name']
sub_name = cmd['sub_name']
group_name = cmd.get('group_name', None)
if not self._volume_exists(vol_name):
return -errno.ENOENT, "", "Volume '{0}' not found".format(vol_name)
with SubvolumeClient(self, fs_name=vol_name) as svc:
if group_name and not svc.get_group_path(group_name):
return -errno.ENOENT, "", \
"Subvolume group '{0}' not found".format(group_name)
svp = SubvolumePath(group_name, sub_name)
path = svc.get_subvolume_path(svp)
if not path:
return -errno.ENOENT, "", \
"Subvolume '{0}' not found".format(sub_name)
return 0, path, ""
return self.vc.subvolume_getpath(vol_name, sub_name, group_name)
def _cmd_fs_subvolumegroup_snapshot_create(self, inbuf, cmd):
vol_name = cmd['vol_name']
group_name = cmd['group_name']
snap_name = cmd['snap_name']
if not self._volume_exists(vol_name):
return -errno.ENOENT, "", \
"Volume '{0}' not found, cannot create snapshot '{1}'".format(vol_name, snap_name)
with SubvolumeClient(self, fs_name=vol_name) as svc:
if group_name and not svc.get_group_path(group_name):
return -errno.ENOENT, "", \
"Subvolume group '{0}' not found, cannot create snapshot '{1}'".format(group_name, snap_name)
svc.create_group_snapshot(group_name, snap_name)
return 0, "", ""
return self.vc.create_subvolume_group_snapshot(vol_name, group_name, snap_name)
def _cmd_fs_subvolumegroup_snapshot_rm(self, inbuf, cmd):
vol_name = cmd['vol_name']
group_name = cmd['group_name']
snap_name = cmd['snap_name']
force = cmd.get('force', False)
if not self._volume_exists(vol_name):
if force:
return 0, "", ""
else:
return -errno.ENOENT, "", \
"Volume '{0}' not found, cannot remove subvolumegroup snapshot '{1}'".format(vol_name, snap_name)
with SubvolumeClient(self, fs_name=vol_name) as svc:
if group_name and not svc.get_group_path(group_name):
if force:
return 0, "", ""
else:
return -errno.ENOENT, "", \
"Subvolume group '{0}' not found, cannot remove subvolumegroup snapshot '{1}'".format(group_name, snap_name)
try:
svc.delete_group_snapshot(group_name, snap_name)
except cephfs.ObjectNotFound:
if force:
return 0, "", ""
else:
return -errno.ENOENT, "", \
"Subvolume group snapshot '{0}' not found, cannot remove it".format(snap_name)
return 0, "", ""
return self.vc.remove_subvolume_group_snapshot(vol_name, group_name, snap_name, force)
def _cmd_fs_subvolume_snapshot_create(self, inbuf, cmd):
vol_name = cmd['vol_name']
sub_name = cmd['sub_name']
snap_name = cmd['snap_name']
group_name = cmd.get('group_name', None)
if not self._volume_exists(vol_name):
return -errno.ENOENT, "", \
"Volume '{0}' not found, cannot create snapshot '{1}'".format(vol_name, snap_name)
with SubvolumeClient(self, fs_name=vol_name) as svc:
if group_name and not svc.get_group_path(group_name):
return -errno.ENOENT, "", \
"Subvolume group '{0}' not found, cannot create snapshot '{1}'".format(group_name, snap_name)
svp = SubvolumePath(group_name, sub_name)
if not svc.get_subvolume_path(svp):
return -errno.ENOENT, "", \
"Subvolume '{0}' not found, cannot create snapshot '{1}'".format(sub_name, snap_name)
svc.create_subvolume_snapshot(svp, snap_name)
return 0, "", ""
return self.vc.create_subvolume_snapshot(vol_name, sub_name, snap_name, group_name)
def _cmd_fs_subvolume_snapshot_rm(self, inbuf, cmd):
vol_name = cmd['vol_name']
sub_name = cmd['sub_name']
snap_name = cmd['snap_name']
force = cmd.get('force', False)
group_name = cmd.get('group_name', None)
if not self._volume_exists(vol_name):
if force:
return 0, "", ""
else:
return -errno.ENOENT, "", \
"Volume '{0}' not found, cannot remove subvolume snapshot '{1}'".format(vol_name, snap_name)
with SubvolumeClient(self, fs_name=vol_name) as svc:
if group_name and not svc.get_group_path(group_name):
if force:
return 0, "", ""
else:
return -errno.ENOENT, "", \
"Subvolume group '{0}' not found, cannot remove subvolume snapshot '{1}'".format(group_name, snap_name)
svp = SubvolumePath(group_name, sub_name)
if not svc.get_subvolume_path(svp):
if force:
return 0, "", ""
else:
return -errno.ENOENT, "", \
"Subvolume '{0}' not found, cannot remove subvolume snapshot '{1}'".format(sub_name, snap_name)
try:
svc.delete_subvolume_snapshot(svp, snap_name)
except cephfs.ObjectNotFound:
if force:
return 0, "", ""
else:
return -errno.ENOENT, "", \
"Subvolume snapshot '{0}' not found, cannot remove it".format(snap_name)
return 0, "", ""
return self.vc.remove_subvolume_snapshot(vol_name, sub_name, snap_name, group_name, force)