mgr/rook: Adapt to new orch interface

Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
This commit is contained in:
Sebastian Wagner 2021-02-08 01:47:42 +01:00
parent 68030bec52
commit 42d5c8991a

View File

@ -33,6 +33,7 @@ except ImportError:
from mgr_module import MgrModule, Option
import orchestrator
from orchestrator import handle_orch_error, OrchResult, raise_if_exception
from .rook_cluster import RookCluster
@ -41,42 +42,6 @@ FuncT = TypeVar('FuncT', bound=Callable)
ServiceSpecT = TypeVar('ServiceSpecT', bound=ServiceSpec)
class RookCompletion(orchestrator.Completion[T]):
def evaluate(self) -> None:
self.finalize(None)
def deferred_read(f):
# type: (Callable[..., T]) -> Callable[..., RookCompletion[T]]
# See https://stackoverflow.com/questions/65936408/typing-function-when-decorator-change-generic-return-type
"""
Decorator to make RookOrchestrator methods return
a completion object that executes themselves.
"""
@functools.wraps(f)
def wrapper(*args: Any, **kwargs: Any) -> RookCompletion[T]:
return RookCompletion(on_complete=lambda _: f(*args, **kwargs))
return wrapper
def write_completion(on_complete, # type: Callable[[], T]
message, # type: str
mgr: 'RookOrchestrator',
calc_percent=None # type: Optional[Callable[[], RookCompletion]]
):
# type: (...) -> RookCompletion[T]
return RookCompletion.with_progress(
message=message,
mgr=mgr,
on_complete=lambda _: on_complete(),
calc_percent=calc_percent,
)
class RookEnv(object):
def __init__(self) -> None:
# POD_NAMESPACE already exist for Rook 0.9
@ -110,13 +75,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
# TODO: configure k8s API addr instead of assuming local
]
def process(self, completions: List[RookCompletion]) -> None: # type: ignore
if completions:
self.log.info("process: completions={0}".format(orchestrator.pretty_print(completions)))
for p in completions:
p.evaluate()
@staticmethod
def can_run() -> Tuple[bool, str]:
if not kubernetes_imported:
@ -149,8 +107,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
self._shutdown = threading.Event()
self.all_progress_references = list() # type: List[orchestrator.ProgressReference]
def shutdown(self) -> None:
self._shutdown.set()
@ -204,22 +160,9 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
self._initialized.set()
while not self._shutdown.is_set():
# XXX hack (or is it?) to kick all completions periodically,
# in case we had a caller that wait()'ed on them long enough
# to get persistence but not long enough to get completion
self.all_progress_references = [p for p in self.all_progress_references if not p.effective]
for p in self.all_progress_references:
p.update()
self._shutdown.wait(5)
def cancel_completions(self) -> None:
for p in self.all_progress_references:
p.fail()
self.all_progress_references.clear()
@deferred_read
@handle_orch_error
def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]:
host_list = None
if host_filter and host_filter.hosts:
@ -244,7 +187,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
sys_api = dict(
rotational = '1' if d['rotational'] else '0',
size = d['size']
),
),
available = False,
rejected_reasons=['device data coming from ceph-volume not provided'],
))
@ -253,12 +196,12 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
return result
@deferred_read
@handle_orch_error
def get_hosts(self):
# type: () -> List[orchestrator.HostSpec]
return [orchestrator.HostSpec(n) for n in self.rook_cluster.get_node_names()]
@deferred_read
@handle_orch_error
def describe_service(self,
service_type: Optional[str] = None,
service_name: Optional[str] = None,
@ -275,26 +218,26 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
spec = {}
if service_type == 'mon' or service_type is None:
spec['mon'] = orchestrator.ServiceDescription(
spec=ServiceSpec(
'mon',
placement=PlacementSpec(
count=cl['spec'].get('mon', {}).get('count', 1),
),
),
size=cl['spec'].get('mon', {}).get('count', 1),
container_image_name=image_name,
last_refresh=now,
)
spec=ServiceSpec(
'mon',
placement=PlacementSpec(
count=cl['spec'].get('mon', {}).get('count', 1),
),
),
size=cl['spec'].get('mon', {}).get('count', 1),
container_image_name=image_name,
last_refresh=now,
)
if service_type == 'mgr' or service_type is None:
spec['mgr'] = orchestrator.ServiceDescription(
spec=ServiceSpec(
'mgr',
placement=PlacementSpec.from_string('count:1'),
),
size=1,
container_image_name=image_name,
last_refresh=now,
)
spec=ServiceSpec(
'mgr',
placement=PlacementSpec.from_string('count:1'),
),
size=1,
container_image_name=image_name,
last_refresh=now,
)
if not cl['spec'].get('crashCollector', {}).get('disable', False):
spec['crash'] = orchestrator.ServiceDescription(
spec=ServiceSpec(
@ -309,7 +252,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
if service_type == 'mds' or service_type is None:
# CephFilesystems
all_fs = self.rook_cluster.rook_api_get(
"cephfilesystems/")
"cephfilesystems/")
self.log.debug('CephFilesystems %s' % all_fs)
for fs in all_fs.get('items', []):
svc = 'mds.' + fs['metadata']['name']
@ -321,20 +264,20 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
if fs['spec'].get('metadataServer', {}).get('activeStandby', False):
total_mds = active * 2
spec[svc] = orchestrator.ServiceDescription(
spec=ServiceSpec(
service_type='mds',
service_id=fs['metadata']['name'],
placement=PlacementSpec(count=active),
),
size=total_mds,
container_image_name=image_name,
last_refresh=now,
)
spec=ServiceSpec(
service_type='mds',
service_id=fs['metadata']['name'],
placement=PlacementSpec(count=active),
),
size=total_mds,
container_image_name=image_name,
last_refresh=now,
)
if service_type == 'rgw' or service_type is None:
# CephObjectstores
all_zones = self.rook_cluster.rook_api_get(
"cephobjectstores/")
"cephobjectstores/")
self.log.debug('CephObjectstores %s' % all_zones)
for zone in all_zones.get('items', []):
rgw_realm = zone['metadata']['name']
@ -350,23 +293,23 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
ssl = False
port = zone['spec']['gateway']['port'] or 80
spec[svc] = orchestrator.ServiceDescription(
spec=RGWSpec(
service_id=rgw_realm + '.' + rgw_zone,
rgw_realm=rgw_realm,
rgw_zone=rgw_zone,
ssl=ssl,
rgw_frontend_port=port,
placement=PlacementSpec(count=active),
),
size=active,
container_image_name=image_name,
last_refresh=now,
)
spec=RGWSpec(
service_id=rgw_realm + '.' + rgw_zone,
rgw_realm=rgw_realm,
rgw_zone=rgw_zone,
ssl=ssl,
rgw_frontend_port=port,
placement=PlacementSpec(count=active),
),
size=active,
container_image_name=image_name,
last_refresh=now,
)
if service_type == 'nfs' or service_type is None:
# CephNFSes
all_nfs = self.rook_cluster.rook_api_get(
"cephnfses/")
"cephnfses/")
self.log.warning('CephNFS %s' % all_nfs)
for nfs in all_nfs.get('items', []):
nfs_name = nfs['metadata']['name']
@ -375,15 +318,15 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
continue
active = nfs['spec'].get('server', {}).get('active')
spec[svc] = orchestrator.ServiceDescription(
spec=NFSServiceSpec(
service_id=nfs_name,
pool=nfs['spec']['rados']['pool'],
namespace=nfs['spec']['rados'].get('namespace', None),
placement=PlacementSpec(count=active),
),
size=active,
last_refresh=now,
)
spec=NFSServiceSpec(
service_id=nfs_name,
pool=nfs['spec']['rados']['pool'],
namespace=nfs['spec']['rados'].get('namespace', None),
placement=PlacementSpec(count=active),
),
size=active,
last_refresh=now,
)
for dd in self._list_daemons():
if dd.service_name() not in spec:
@ -401,7 +344,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
return [v for k, v in spec.items()]
@deferred_read
@handle_orch_error
def list_daemons(self,
service_name: Optional[str] = None,
daemon_type: Optional[str] = None,
@ -458,73 +401,48 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
return result
def _service_add_decorate(self, typename: str, spec: ServiceSpecT, func: Callable[[ServiceSpecT], T]) -> RookCompletion[T]:
return write_completion(
on_complete=lambda : func(spec),
message="Creating {} services for {}".format(typename, spec.service_id),
mgr=self
)
def _service_rm_decorate(self, typename: str, name: str, func: Callable[[], T]) -> RookCompletion[T]:
return write_completion(
on_complete=lambda : func(),
message="Removing {} services for {}".format(typename, name),
mgr=self
)
def remove_service(self, service_name: str) -> RookCompletion[str]:
@handle_orch_error
def remove_service(self, service_name: str) -> str:
service_type, service_name = service_name.split('.', 1)
if service_type == 'mds':
return self._service_rm_decorate(
'MDS', service_name, lambda: self.rook_cluster.rm_service(
'cephfilesystems', service_name)
)
return self.rook_cluster.rm_service('cephfilesystems', service_name)
elif service_type == 'rgw':
return self._service_rm_decorate(
'RGW', service_name, lambda: self.rook_cluster.rm_service('cephobjectstores', service_name)
)
return self.rook_cluster.rm_service('cephobjectstores', service_name)
elif service_type == 'nfs':
return self._service_rm_decorate(
'NFS', service_name, lambda: self.rook_cluster.rm_service('cephnfses', service_name)
)
return self.rook_cluster.rm_service('cephnfses', service_name)
else:
raise orchestrator.OrchestratorError(f'Service type {service_type} not supported')
@handle_orch_error
def apply_mon(self, spec):
# type: (ServiceSpec) -> RookCompletion[str]
# type: (ServiceSpec) -> str
if spec.placement.hosts or spec.placement.label:
raise RuntimeError("Host list or label is not supported by rook.")
return write_completion(
lambda: self.rook_cluster.update_mon_count(spec.placement.count),
"Updating mon count to {0}".format(spec.placement.count),
mgr=self
)
return self.rook_cluster.update_mon_count(spec.placement.count)
@handle_orch_error
def apply_mds(self, spec):
# type: (ServiceSpec) -> RookCompletion[str]
return self._service_add_decorate('MDS', spec,
self.rook_cluster.apply_filesystem)
# type: (ServiceSpec) -> str
return self.rook_cluster.apply_filesystem(spec)
@handle_orch_error
def apply_rgw(self, spec):
# type: (RGWSpec) -> RookCompletion[str]
return self._service_add_decorate('RGW', spec,
self.rook_cluster.apply_objectstore)
# type: (RGWSpec) -> str
return self.rook_cluster.apply_objectstore(spec)
@handle_orch_error
def apply_nfs(self, spec):
# type: (NFSServiceSpec) -> RookCompletion[str]
return self._service_add_decorate("NFS", spec,
self.rook_cluster.apply_nfsgw)
# type: (NFSServiceSpec) -> str
return self.rook_cluster.apply_nfsgw(spec)
def remove_daemons(self, names: List[str]) -> RookCompletion[List[str]]:
return write_completion(
lambda: self.rook_cluster.remove_pods(names),
"Removing daemons {}".format(','.join(names)),
mgr=self
)
@handle_orch_error
def remove_daemons(self, names: List[str]) -> List[str]:
return self.rook_cluster.remove_pods(names)
@handle_orch_error
def create_osds(self, drive_group):
# type: (DriveGroupSpec) -> RookCompletion[str]
# type: (DriveGroupSpec) -> str
""" Creates OSDs from a drive group specification.
$: ceph orch osd create -i <dg.file>
@ -538,41 +456,36 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
if drive_group.data_directories:
targets += drive_group.data_directories
def execute(all_hosts_):
# type: (List[orchestrator.HostSpec]) -> orchestrator.Completion
matching_hosts = drive_group.placement.filter_matching_hosts(lambda label=None, as_hostspec=None: all_hosts_)
all_hosts = raise_if_exception(self.get_hosts())
assert len(matching_hosts) == 1
matching_hosts = drive_group.placement.filter_matching_hosts(lambda label=None, as_hostspec=None: all_hosts)
if not self.rook_cluster.node_exists(matching_hosts[0]):
raise RuntimeError("Node '{0}' is not in the Kubernetes "
"cluster".format(matching_hosts))
assert len(matching_hosts) == 1
# Validate whether cluster CRD can accept individual OSD
# creations (i.e. not useAllDevices)
if not self.rook_cluster.can_create_osd():
raise RuntimeError("Rook cluster configuration does not "
"support OSD creation.")
if not self.rook_cluster.node_exists(matching_hosts[0]):
raise RuntimeError("Node '{0}' is not in the Kubernetes "
"cluster".format(matching_hosts))
return orchestrator.Completion.with_progress(
message="Creating OSD on {0}:{1}".format(
matching_hosts,
targets),
mgr=self,
on_complete=lambda _:self.rook_cluster.add_osds(drive_group, matching_hosts),
calc_percent=lambda: has_osds(matching_hosts)
)
# Validate whether cluster CRD can accept individual OSD
# creations (i.e. not useAllDevices)
if not self.rook_cluster.can_create_osd():
raise RuntimeError("Rook cluster configuration does not "
"support OSD creation.")
@deferred_read
return self.rook_cluster.add_osds(drive_group, matching_hosts)
# TODO: this was the code to update the progress reference:
"""
@handle_orch_error
def has_osds(matching_hosts: List[str]) -> bool:
# Find OSD pods on this host
pod_osd_ids = set()
pods = self.k8s.list_namespaced_pod(self._rook_env.namespace,
label_selector="rook_cluster={},app=rook-ceph-osd".format(self._rook_env.cluster_name),
field_selector="spec.nodeName={0}".format(
matching_hosts[0]
)).items
label_selector="rook_cluster={},app=rook-ceph-osd".format(self._rook_env.cluster_name),
field_selector="spec.nodeName={0}".format(
matching_hosts[0]
)).items
for p in pods:
pod_osd_ids.add(int(p.metadata.labels['ceph-osd-id']))
@ -594,15 +507,8 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
))
return found is not None
"""
c = self.get_hosts().then(execute)
return c
def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> RookCompletion:
return write_completion(
on_complete=lambda: self.rook_cluster.blink_light(
ident_fault, on, locs),
message="Switching <{}> identification light in {}".format(
on, ",".join(["{}:{}".format(loc.host, loc.dev) for loc in locs])),
mgr=self
)
@handle_orch_error
def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> List[str]:
return self.rook_cluster.blink_light(ident_fault, on, locs)