Merge PR #25633 into master

* refs/pull/25633/head:
	mgr/rook: allow service describe and service ls to display nfs services
	mgr/rook: add support for adding NFS gateways
	mgr/orchestrator_cli: add support for adding NFS gateways
	mgr/orchestrator_cli: convert service add/rm commands to be type-specific
	mgr/rook: add decorator for add_stateless_service
	mgr/rook: add a context manager instead of open-coding exception handling

Reviewed-by: Ricardo Dias <rdias@suse.com>
Reviewed-by: Patrick Donnelly <pdonnell@redhat.com>
Reviewed-by: Sebastian Wagner <swagner@suse.com>
This commit is contained in:
Patrick Donnelly 2019-01-14 11:32:01 -08:00
commit a2b05b8334
No known key found for this signature in database
GPG Key ID: 3A2A7E25BEA8AADB
4 changed files with 177 additions and 84 deletions

View File

@ -209,10 +209,12 @@ Sizing: the ``size`` parameter gives the number of daemons in the cluster
Creating/growing/shrinking services::
ceph orchestrator service update <type> <name> <size> [host…]
ceph orchestrator service add <type> <what>
ceph orchestrator {mds,rgw} update <name> <size> [host…]
ceph orchestrator {mds,rgw} add <name>
ceph orchestrator nfs update <name> <size> [host…]
ceph orchestrator nfs add <name> <pool> [--namespace=<namespace>]
e.g., ``ceph orchestrator service update mds myfs 3 host1 host2 host3``
e.g., ``ceph orchestrator mds update myfs 3 host1 host2 host3``
Start/stop/reload::
@ -223,5 +225,5 @@ Start/stop/reload::
Removing services::
ceph orchestrator service rm <type> <name>
ceph orchestrator {mds,rgw} rm <name>

View File

@ -40,17 +40,53 @@ class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule):
"perm": "r"
},
{
'cmd': "orchestrator service add "
"name=svc_type,type=CephString "
'cmd': "orchestrator osd add "
"name=svc_arg,type=CephString ",
"desc": "Create a service of any type",
"desc": "Create an OSD service",
"perm": "rw"
},
{
'cmd': "orchestrator service rm "
"name=svc_type,type=CephString "
'cmd': "orchestrator osd rm "
"name=svc_id,type=CephString ",
"desc": "Remove a service",
"desc": "Remove an OSD service",
"perm": "rw"
},
{
'cmd': "orchestrator mds add "
"name=svc_arg,type=CephString ",
"desc": "Create an MDS service",
"perm": "rw"
},
{
'cmd': "orchestrator mds rm "
"name=svc_id,type=CephString ",
"desc": "Remove an MDS service",
"perm": "rw"
},
{
'cmd': "orchestrator rgw add "
"name=svc_arg,type=CephString ",
"desc": "Create an RGW service",
"perm": "rw"
},
{
'cmd': "orchestrator rgw rm "
"name=svc_id,type=CephString ",
"desc": "Remove an RGW service",
"perm": "rw"
},
{
'cmd': "orchestrator nfs add "
"name=svc_arg,type=CephString "
"name=pool,type=CephString "
"name=namespace,type=CephString,req=false ",
"desc": "Create an NFS service",
"perm": "rw"
},
{
'cmd': "orchestrator nfs rm "
"name=svc_id,type=CephString ",
"desc": "Remove an NFS service",
"perm": "rw"
},
{
@ -157,57 +193,68 @@ class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule):
return HandleCommandResult(stdout="\n".join(lines))
def _service_add(self, cmd):
svc_type = cmd['svc_type']
if svc_type == "osd":
device_spec = cmd['svc_arg']
try:
node_name, block_device = device_spec.split(":")
except TypeError:
return HandleCommandResult(-errno.EINVAL,
stderr="Invalid device spec, should be <node>:<device>")
def _osd_add(self, cmd):
device_spec = cmd['svc_arg']
try:
node_name, block_device = device_spec.split(":")
except TypeError:
return HandleCommandResult(-errno.EINVAL,
stderr="Invalid device spec, should be <node>:<device>")
spec = orchestrator.OsdCreationSpec()
spec.node = node_name
spec.format = "bluestore"
spec.drive_group = orchestrator.DriveGroupSpec([block_device])
spec = orchestrator.OsdCreationSpec()
spec.node = node_name
spec.format = "bluestore"
spec.drive_group = orchestrator.DriveGroupSpec([block_device])
completion = self.create_osds(spec)
self._orchestrator_wait([completion])
completion = self.create_osds(spec)
self._orchestrator_wait([completion])
return HandleCommandResult()
return HandleCommandResult()
elif svc_type == "mds":
fs_name = cmd['svc_arg']
def _add_stateless_svc(self, svc_type, spec):
completion = self.add_stateless_service(svc_type, spec)
self._orchestrator_wait([completion])
return HandleCommandResult()
spec = orchestrator.StatelessServiceSpec()
spec.name = fs_name
def _mds_add(self, cmd):
spec = orchestrator.StatelessServiceSpec()
spec.name = cmd['svc_arg']
return self._add_stateless_svc("mds", spec)
completion = self.add_stateless_service(svc_type, spec)
self._orchestrator_wait([completion])
def _rgw_add(self, cmd):
spec = orchestrator.StatelessServiceSpec()
spec.name = cmd['svc_arg']
return self._add_stateless_svc("rgw", spec)
return HandleCommandResult()
elif svc_type == "rgw":
store_name = cmd['svc_arg']
def _nfs_add(self, cmd):
cluster_name = cmd['svc_arg']
pool = cmd['pool']
ns = cmd.get('namespace', None)
spec = orchestrator.StatelessServiceSpec()
spec.name = store_name
completion = self.add_stateless_service(svc_type, spec)
self._orchestrator_wait([completion])
return HandleCommandResult()
else:
raise NotImplementedError(svc_type)
def _service_rm(self, cmd):
svc_type = cmd['svc_type']
svc_id = cmd['svc_id']
spec = orchestrator.StatelessServiceSpec()
spec.name = cluster_name
spec.extended = { "pool":pool }
if ns != None:
spec.extended["namespace"] = ns
return self._add_stateless_svc("nfs", spec)
def _rm_stateless_svc(self, svc_type, svc_id):
completion = self.remove_stateless_service(svc_type, svc_id)
self._orchestrator_wait([completion])
return HandleCommandResult()
def _osd_rm(self, cmd):
return self._rm_stateless_svc("osd", cmd['svc_id'])
def _mds_rm(self, cmd):
return self._rm_stateless_svc("mds", cmd['svc_id'])
def _rgw_rm(self, cmd):
return self._rm_stateless_svc("rgw", cmd['svc_id'])
def _nfs_rm(self, cmd):
return self._rm_stateless_svc("nfs", cmd['svc_id'])
def _set_backend(self, cmd):
"""
We implement a setter command instead of just having the user
@ -289,10 +336,22 @@ class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule):
return self._list_services(cmd)
elif cmd['prefix'] == "orchestrator service status":
return self._list_services(cmd) # TODO: create more detailed output
elif cmd['prefix'] == "orchestrator service add":
return self._service_add(cmd)
elif cmd['prefix'] == "orchestrator service rm":
return self._service_rm(cmd)
elif cmd['prefix'] == "orchestrator osd add":
return self._osd_add(cmd)
elif cmd['prefix'] == "orchestrator osd rm":
return self._osd_rm(cmd)
elif cmd['prefix'] == "orchestrator mds add":
return self._mds_add(cmd)
elif cmd['prefix'] == "orchestrator mds rm":
return self._mds_rm(cmd)
elif cmd['prefix'] == "orchestrator rgw add":
return self._rgw_add(cmd)
elif cmd['prefix'] == "orchestrator rgw rm":
return self._rgw_rm(cmd)
elif cmd['prefix'] == "orchestrator nfs add":
return self._nfs_add(cmd)
elif cmd['prefix'] == "orchestrator nfs rm":
return self._nfs_rm(cmd)
elif cmd['prefix'] == "orchestrator set backend":
return self._set_backend(cmd)
elif cmd['prefix'] == "orchestrator status":

View File

@ -334,7 +334,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
@deferred_read
def describe_service(self, service_type, service_id, nodename):
assert service_type in ("mds", "osd", "mgr", "mon", None), service_type + " unsupported"
assert service_type in ("mds", "osd", "mgr", "mon", "nfs", None), service_type + " unsupported"
pods = self.rook_cluster.describe_pods(service_type, service_id, nodename)
@ -353,6 +353,8 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
sd.daemon_name = p['labels']["mon"]
elif sd.service_type == "mgr":
sd.daemon_name = p['labels']["mgr"]
elif sd.service_type == "nfs":
sd.daemon_name = p['labels']["ceph_nfs"]
else:
# Unknown type -- skip it
continue
@ -361,19 +363,22 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
return result
def _service_add_decorate(self, typename, spec, func):
return RookWriteCompletion(lambda: func(spec), None,
"Creating {0} services for {1}".format(typename, spec.name))
def add_stateless_service(self, service_type, spec):
# assert isinstance(spec, orchestrator.StatelessServiceSpec)
if service_type == "mds":
return RookWriteCompletion(
lambda: self.rook_cluster.add_filesystem(spec), None,
"Creating Filesystem services for {0}".format(spec.name))
return self._service_add_decorate("Filesystem", spec,
self.rook_cluster.add_filesystem)
elif service_type == "rgw" :
return RookWriteCompletion(
lambda: self.rook_cluster.add_objectstore(spec), None,
"Creating RGW services for {0}".format(spec.name))
return self._service_add_decorate("RGW", spec,
self.rook_cluster.add_objectstore)
elif service_type == "nfs" :
return self._service_add_decorate("NFS", spec,
self.rook_cluster.add_nfsgw)
else:
# TODO: RGW, NFS
raise NotImplementedError(service_type)
def remove_stateless_service(self, service_type, service_id):

View File

@ -10,6 +10,7 @@ This module is runnable outside of ceph-mgr, useful for testing.
from six.moves.urllib.parse import urljoin # pylint: disable=import-error
import logging
import json
from contextlib import contextmanager
# Optional kubernetes imports to enable MgrModule.can_run
# to behave cleanly.
@ -160,6 +161,8 @@ class RookCluster(object):
label_filter += ",mon={0}".format(service_id)
elif service_type == "mgr":
label_filter += ",mgr={0}".format(service_id)
elif service_type == "nfs":
label_filter += ",ceph_nfs={0}".format(service_id)
elif service_type == "rgw":
# TODO: rgw
pass
@ -191,6 +194,17 @@ class RookCluster(object):
return pods_summary
@contextmanager
def ignore_409(self, what):
try:
yield
except ApiException as e:
if e.status == 409:
# Idempotent, succeed.
log.info("{} already exists".format(what))
else:
raise
def add_filesystem(self, spec):
# TODO use spec.placement
# TODO use spec.min_size (and use max_size meaningfully)
@ -214,17 +228,37 @@ class RookCluster(object):
}
}
try:
self.rook_api_post(
"cephfilesystems/",
body=rook_fs
)
except ApiException as e:
if e.status == 409:
log.info("CephFilesystem '{0}' already exists".format(spec.name))
# Idempotent, succeed.
else:
raise
with self.ignore_409("CephFilesystem '{0}' already exists".format(spec.name)):
self.rook_api_post("cephfilesystems/", body=rook_fs)
def add_nfsgw(self, spec):
# TODO use spec.placement
# TODO use spec.min_size (and use max_size meaningfully)
# TODO warn if spec.extended has entries we don't kow how
# to action.
rook_nfsgw = {
"apiVersion": ROOK_API_NAME,
"kind": "CephNFS",
"metadata": {
"name": spec.name,
"namespace": self.rook_namespace
},
"spec": {
"RADOS": {
"pool": spec.extended["pool"]
},
"server": {
"active": spec.max_size,
}
}
}
if "namespace" in spec.extended:
rook_nfsgw["spec"]["RADOS"]["namespace"] = spec.extended["namespace"]
with self.ignore_409("NFS cluster '{0}' already exists".format(spec.name)):
self.rook_api_post("cephnfses/", body=rook_nfsgw)
def add_objectstore(self, spec):
@ -257,25 +291,18 @@ class RookCluster(object):
}
}
try:
self.rook_api_post(
"cephobjectstores/",
body=rook_os
)
except ApiException as e:
if e.status == 409:
log.info("CephObjectStore '{0}' already exists".format(spec.name))
# Idempotent, succeed.
else:
raise
with self.ignore_409("CephObjectStore '{0}' already exists".format(spec.name)):
self.rook_api_post("cephobjectstores/", body=rook_os)
def rm_service(self, service_type, service_id):
assert service_type in ("mds", "rgw")
assert service_type in ("mds", "rgw", "nfs")
if service_type == "mds":
rooktype = "cephfilesystems"
elif service_type == "rgw":
rooktype = "cephobjectstores"
elif service_type == "nfs":
rooktype = "cephnfses"
objpath = "{0}/{1}".format(rooktype, service_id)