mirror of
https://github.com/ceph/ceph
synced 2025-01-01 08:32:24 +00:00
pybind/mgr: Unified bits of volumes and orchestrator
* Created a common class `OrchestratorClientMixin` * `s/self._oremote("meth"...),/self.meth(...)/g` Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
This commit is contained in:
parent
fa24a0312f
commit
12b903d6ff
@ -861,7 +861,7 @@ class MgrModule(ceph_module.BaseMgrModule):
|
||||
Set the value of a persistent configuration setting
|
||||
|
||||
:param str key:
|
||||
:param str val:
|
||||
:type val: str | None
|
||||
"""
|
||||
self._validate_module_option(key)
|
||||
return self._set_module_option(key, val)
|
||||
|
@ -4,6 +4,7 @@ ceph-mgr orchestrator interface
|
||||
|
||||
Please see the ceph-mgr module developer's guide for more information.
|
||||
"""
|
||||
import time
|
||||
|
||||
|
||||
class _Completion(object):
|
||||
@ -158,6 +159,8 @@ class Orchestrator(object):
|
||||
|
||||
For fast operations (e.g. reading from a database), implementations
|
||||
may choose to do blocking IO in this call.
|
||||
|
||||
:rtype: bool
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@ -506,3 +509,46 @@ class InventoryNode(object):
|
||||
|
||||
def to_json(self):
|
||||
return {'name': self.name, 'devices': [d.to_json() for d in self.devices]}
|
||||
|
||||
|
||||
def _mk_orch_methods(cls):
|
||||
# Needs to be defined outside of for.
|
||||
# Otherwise meth is always bound to last key
|
||||
def shim(method_name):
|
||||
def inner(self, *args, **kwargs):
|
||||
return self._oremote(method_name, args, kwargs)
|
||||
return inner
|
||||
|
||||
for meth in Orchestrator.__dict__:
|
||||
if not meth.startswith('_') and meth not in ['is_orchestrator_module', 'available']:
|
||||
setattr(cls, meth, shim(meth))
|
||||
return cls
|
||||
|
||||
|
||||
@_mk_orch_methods
|
||||
class OrchestratorClientMixin(Orchestrator):
|
||||
def _oremote(self, meth, args, kwargs):
|
||||
"""
|
||||
Helper for invoking `remote` on whichever orchestrator is enabled
|
||||
"""
|
||||
try:
|
||||
o = self._select_orchestrator()
|
||||
except AttributeError:
|
||||
o = self.remote('orchestrator_cli', '_select_orchestrator')
|
||||
return self.remote(o, meth, *args, **kwargs)
|
||||
|
||||
def _orchestrator_wait(self, completions):
|
||||
"""
|
||||
Helper to wait for completions to complete (reads) or
|
||||
become persistent (writes).
|
||||
|
||||
Waits for writes to be *persistent* but not *effective*.
|
||||
"""
|
||||
while not self.wait(completions):
|
||||
if any(c.should_wait for c in completions):
|
||||
time.sleep(5)
|
||||
else:
|
||||
break
|
||||
|
||||
if all(hasattr(c, 'error') and getattr(c, 'error') for c in completions):
|
||||
raise Exception([getattr(c, 'error') for c in completions])
|
||||
|
@ -1,6 +1,5 @@
|
||||
import errno
|
||||
import json
|
||||
import time
|
||||
from mgr_module import MgrModule, HandleCommandResult
|
||||
|
||||
import orchestrator
|
||||
@ -10,7 +9,7 @@ class NoOrchestrator(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class OrchestratorCli(MgrModule):
|
||||
class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule):
|
||||
MODULE_OPTIONS = [
|
||||
{'name': 'orchestrator'}
|
||||
]
|
||||
@ -74,30 +73,6 @@ class OrchestratorCli(MgrModule):
|
||||
|
||||
return o
|
||||
|
||||
def _oremote(self, *args, **kwargs):
|
||||
"""
|
||||
Helper for invoking `remote` on whichever orchestrator is enabled
|
||||
"""
|
||||
return self.remote(self._select_orchestrator(),
|
||||
*args, **kwargs)
|
||||
|
||||
def _wait(self, completions):
|
||||
"""
|
||||
Helper to wait for completions to complete (reads) or
|
||||
become persistent (writes).
|
||||
|
||||
Waits for writes to be *persistent* but not *effective*.
|
||||
"""
|
||||
|
||||
while not self._oremote("wait", completions):
|
||||
|
||||
if any(c.should_wait for c in completions):
|
||||
time.sleep(5)
|
||||
else:
|
||||
break
|
||||
|
||||
if all(hasattr(c, 'error') and getattr(c, 'error')for c in completions):
|
||||
raise Exception([getattr(c, 'error') for c in completions])
|
||||
|
||||
def _list_devices(self, cmd):
|
||||
"""
|
||||
@ -131,9 +106,9 @@ class OrchestratorCli(MgrModule):
|
||||
else:
|
||||
nf = None
|
||||
|
||||
completion = self._oremote("get_inventory", node_filter=nf)
|
||||
completion = self.get_inventory(node_filter=nf)
|
||||
|
||||
self._wait([completion])
|
||||
self._orchestrator_wait([completion])
|
||||
|
||||
if cmd.get('format', 'plain') == 'json':
|
||||
data = [n.to_json() for n in completion.result]
|
||||
@ -157,10 +132,8 @@ class OrchestratorCli(MgrModule):
|
||||
# XXX this is kind of confusing for people because in the orchestrator
|
||||
# context the service ID for MDS is the filesystem ID, not the daemon ID
|
||||
|
||||
completion = self._oremote("describe_service", svc_type, svc_id, hostname)
|
||||
|
||||
self._wait([completion])
|
||||
|
||||
completion = self.describe_service(svc_type, svc_id, hostname)
|
||||
self._orchestrator_wait([completion])
|
||||
services = completion.result
|
||||
|
||||
# Sort the list for display
|
||||
@ -199,8 +172,8 @@ class OrchestratorCli(MgrModule):
|
||||
spec.format = "bluestore"
|
||||
spec.drive_group = orchestrator.DriveGroupSpec([block_device])
|
||||
|
||||
completion = self._oremote("create_osds", spec)
|
||||
self._wait([completion])
|
||||
completion = self.create_osds(spec)
|
||||
self._orchestrator_wait([completion])
|
||||
|
||||
return HandleCommandResult()
|
||||
|
||||
@ -210,12 +183,8 @@ class OrchestratorCli(MgrModule):
|
||||
spec = orchestrator.StatelessServiceSpec()
|
||||
spec.name = fs_name
|
||||
|
||||
completion = self._oremote(
|
||||
"add_stateless_service",
|
||||
svc_type,
|
||||
spec
|
||||
)
|
||||
self._wait([completion])
|
||||
completion = self.add_stateless_service(svc_type, spec)
|
||||
self._orchestrator_wait([completion])
|
||||
|
||||
return HandleCommandResult()
|
||||
elif svc_type == "rgw":
|
||||
@ -224,12 +193,8 @@ class OrchestratorCli(MgrModule):
|
||||
spec = orchestrator.StatelessServiceSpec()
|
||||
spec.name = store_name
|
||||
|
||||
completion = self._oremote(
|
||||
"add_stateless_service",
|
||||
svc_type,
|
||||
spec
|
||||
)
|
||||
self._wait([completion])
|
||||
completion = self.add_stateless_service(svc_type, spec)
|
||||
self._orchestrator_wait([completion])
|
||||
|
||||
return HandleCommandResult()
|
||||
else:
|
||||
@ -239,8 +204,8 @@ class OrchestratorCli(MgrModule):
|
||||
svc_type = cmd['svc_type']
|
||||
svc_id = cmd['svc_id']
|
||||
|
||||
completion = self._oremote("remove_stateless_service", svc_type, svc_id)
|
||||
self._wait([completion])
|
||||
completion = self.remove_stateless_service(svc_type, svc_id)
|
||||
self._orchestrator_wait([completion])
|
||||
return HandleCommandResult()
|
||||
|
||||
def _set_backend(self, cmd):
|
||||
@ -292,7 +257,7 @@ class OrchestratorCli(MgrModule):
|
||||
|
||||
def _status(self):
|
||||
try:
|
||||
avail, why = self._oremote("available")
|
||||
avail, why = self.available()
|
||||
except NoOrchestrator:
|
||||
return HandleCommandResult(stderr="No orchestrator configured (try "
|
||||
"`ceph orchestrator set backend`)")
|
||||
|
@ -23,7 +23,7 @@ class PurgeJob(object):
|
||||
self.subvolume_path = subvolume_path
|
||||
|
||||
|
||||
class Module(MgrModule):
|
||||
class Module(orchestrator.OrchestratorClientMixin, MgrModule):
|
||||
COMMANDS = [
|
||||
{
|
||||
'cmd': 'fs volume ls',
|
||||
@ -82,13 +82,6 @@ class Module(MgrModule):
|
||||
|
||||
self._background_jobs = Queue.Queue()
|
||||
|
||||
def _oremote(self, *args, **kwargs):
|
||||
"""
|
||||
Helper for invoking `remote` on whichever orchestrator is enabled
|
||||
"""
|
||||
return self.remote("orchestrator_cli", "_oremote",
|
||||
*args, **kwargs)
|
||||
|
||||
def serve(self):
|
||||
# TODO: discover any subvolumes pending purge, and enqueue
|
||||
# them in background_jobs at startup
|
||||
@ -114,35 +107,6 @@ class Module(MgrModule):
|
||||
|
||||
return handler(inbuf, cmd)
|
||||
|
||||
def _orchestrator_wait(self, completions):
|
||||
"""
|
||||
Helper to wait for completions to complete (reads) or
|
||||
become persistent (writes).
|
||||
|
||||
Waits for writes to be *persistent* but not *effective*.
|
||||
"""
|
||||
done = False
|
||||
|
||||
while done is False:
|
||||
done = self._oremote("wait", completions)
|
||||
|
||||
if not done:
|
||||
any_nonpersistent = False
|
||||
for c in completions:
|
||||
if c.is_read:
|
||||
if not c.is_complete:
|
||||
any_nonpersistent = True
|
||||
break
|
||||
else:
|
||||
if not c.is_persistent:
|
||||
any_nonpersistent = True
|
||||
break
|
||||
|
||||
if any_nonpersistent:
|
||||
time.sleep(5)
|
||||
else:
|
||||
done = True
|
||||
|
||||
def _pool_base_name(self, volume_name):
|
||||
"""
|
||||
Convention for naming pools for volumes
|
||||
@ -202,11 +166,7 @@ class Module(MgrModule):
|
||||
spec = orchestrator.StatelessServiceSpec()
|
||||
spec.name = vol_id
|
||||
try:
|
||||
completion = self._oremote(
|
||||
"add_stateless_service",
|
||||
"mds",
|
||||
spec
|
||||
)
|
||||
completion = self.add_stateless_service("mds", spec)
|
||||
self._orchestrator_wait([completion])
|
||||
except ImportError:
|
||||
return 0, "", "Volume created successfully (no MDS daemons created)"
|
||||
@ -265,7 +225,7 @@ class Module(MgrModule):
|
||||
|
||||
fs = self._volume_get_fs(vol_name)
|
||||
if fs is None:
|
||||
return 0, "", "Volume '{0}' already deleted".forma(vol_name)
|
||||
return 0, "", "Volume '{0}' already deleted".format(vol_name)
|
||||
|
||||
vol_fscid = fs['id']
|
||||
|
||||
@ -287,11 +247,7 @@ class Module(MgrModule):
|
||||
# Tear down MDS daemons
|
||||
# =====================
|
||||
try:
|
||||
completion = self._oremote(
|
||||
"remove_stateless_service",
|
||||
"mds",
|
||||
vol_name
|
||||
)
|
||||
completion = self.remove_stateless_service("mds", vol_name)
|
||||
self._orchestrator_wait([completion])
|
||||
except ImportError:
|
||||
self.log.warning("No orchestrator, not tearing down MDS daemons")
|
||||
|
Loading…
Reference in New Issue
Block a user