diff --git a/src/pybind/mgr/rook/module.py b/src/pybind/mgr/rook/module.py index 93021d0c2ef..86ecef06382 100644 --- a/src/pybind/mgr/rook/module.py +++ b/src/pybind/mgr/rook/module.py @@ -9,7 +9,7 @@ from ceph.deployment import inventory from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, PlacementSpec from ceph.utils import datetime_now -from typing import List, Dict, Optional, Callable, Any, TypeVar, Tuple +from typing import List, Dict, Optional, Callable, Any, TypeVar, Tuple, TYPE_CHECKING try: from ceph.deployment.drive_group import DriveGroupSpec @@ -81,6 +81,12 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): default='local', desc='storage class name for LSO-discovered PVs', ), + Option( + 'drive_group_interval', + type='float', + default=300.0, + desc='interval in seconds between re-application of applied drive_groups', + ), ] @staticmethod @@ -115,8 +121,13 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): self._rook_cluster: Optional[RookCluster] = None self._rook_env = RookEnv() self._k8s_AppsV1_api: Optional[client.AppsV1Api] = None - self.storage_class = self.get_module_option('storage_class') + self.config_notify() + if TYPE_CHECKING: + self.storage_class = 'foo' + self.drive_group_interval = 10.0 + + self._load_drive_groups() self._shutdown = threading.Event() def config_notify(self) -> None: @@ -132,7 +143,10 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): self.log.debug(' mgr option %s = %s', opt['name'], getattr(self, opt['name'])) # type: ignore assert isinstance(self.storage_class, str) - self.rook_cluster.storage_class = self.storage_class + assert isinstance(self.drive_group_interval, float) + + if self._rook_cluster: + self._rook_cluster.storage_class = self.storage_class def shutdown(self) -> None: self._shutdown.set() @@ -194,9 +208,11 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): self.storage_class) self._initialized.set() + self.config_notify() while not self._shutdown.is_set(): - self._shutdown.wait(5) + self._apply_drivegroups(list(self._drive_group_map.values())) + self._shutdown.wait(self.drive_group_interval) @handle_orch_error def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]: @@ -263,7 +279,11 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): container_image_name=image_name, last_refresh=now, ) - if not cl['spec'].get('crashCollector', {}).get('disable', False): + + if ( + service_type == 'crash' or service_type is None + and not cl['spec'].get('crashCollector', {}).get('disable', False) + ): spec['crash'] = orchestrator.ServiceDescription( spec=ServiceSpec( 'crash', @@ -344,17 +364,29 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): ) if service_type == 'osd' or service_type is None: # OSDs + # FIXME: map running OSDs back to their respective services... + + # the catch-all unmanaged all_osds = self.rook_cluster.get_osds() svc = 'osd' spec[svc] = orchestrator.ServiceDescription( spec=DriveGroupSpec( + unmanaged=True, service_type='osd', - placement=PlacementSpec(count=len(all_osds), hosts=[osd.metadata.labels['topology-location-host'] for osd in all_osds]), ), size=len(all_osds), last_refresh=now, - running= sum(osd.status.phase == 'Running' for osd in all_osds) + running=sum(osd.status.phase == 'Running' for osd in all_osds) ) + + # drivegroups + for name, dg in self._drive_group_map.items(): + spec[f'osd.{name}'] = orchestrator.ServiceDescription( + spec=dg, + last_refresh=now, + size=0, + running=0, + ) if service_type == 'rbd-mirror' or service_type is None: # rbd-mirrors @@ -466,15 +498,20 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): def remove_service(self, service_name: str, force: bool = False) -> str: if service_name == 'rbd-mirror': return self.rook_cluster.rm_service('cephrbdmirrors', 'default-rbd-mirror') - service_type, service_name = service_name.split('.', 1) + service_type, service_id = service_name.split('.', 1) if service_type == 'mds': - return self.rook_cluster.rm_service('cephfilesystems', service_name) + return self.rook_cluster.rm_service('cephfilesystems', service_id) elif service_type == 'rgw': - return self.rook_cluster.rm_service('cephobjectstores', service_name) + return self.rook_cluster.rm_service('cephobjectstores', service_id) elif service_type == 'nfs': - return self.rook_cluster.rm_service('cephnfses', service_name) + return self.rook_cluster.rm_service('cephnfses', service_id) elif service_type == 'rbd-mirror': - return self.rook_cluster.rm_service('cephrbdmirrors', service_name) + return self.rook_cluster.rm_service('cephrbdmirrors', service_id) + elif service_type == 'osd': + if service_id in self._drive_group_map: + del self._drive_group_map[service_id] + self._save_drive_groups() + return f'Removed {service_name}' else: raise orchestrator.OrchestratorError(f'Service type {service_type} not supported') @@ -523,10 +560,18 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): return self.rook_cluster.remove_pods(names) def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> OrchResult[List[str]]: - result_list = [] - all_hosts = raise_if_exception(self.get_hosts()) for drive_group in specs: - matching_hosts = drive_group.placement.filter_matching_hosts(lambda label=None, as_hostspec=None: all_hosts) + self._drive_group_map[str(drive_group.service_id)] = drive_group + self._save_drive_groups() + return OrchResult(self._apply_drivegroups(specs)) + + def _apply_drivegroups(self, ls: List[DriveGroupSpec]) -> List[str]: + all_hosts = raise_if_exception(self.get_hosts()) + result_list: List[str] = [] + for drive_group in ls: + matching_hosts = drive_group.placement.filter_matching_hosts( + lambda label=None, as_hostspec=None: all_hosts + ) if not self.rook_cluster.node_exists(matching_hosts[0]): raise RuntimeError("Node '{0}' is not in the Kubernetes " @@ -538,7 +583,23 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): raise RuntimeError("Rook cluster configuration does not " "support OSD creation.") result_list.append(self.rook_cluster.add_osds(drive_group, matching_hosts)) - return OrchResult(result_list) + return result_list + + def _load_drive_groups(self) -> None: + stored_drive_group = self.get_store("drive_group_map") + self._drive_group_map: Dict[str, DriveGroupSpec] = {} + if stored_drive_group: + for name, dg in json.loads(stored_drive_group).items(): + try: + self._drive_group_map[name] = DriveGroupSpec.from_json(dg) + except ValueError as e: + self.log.error(f'Failed to load drive group {name} ({dg}): {e}') + + def _save_drive_groups(self) -> None: + json_drive_group_map = { + name: dg.to_json() for name, dg in self._drive_group_map.items() + } + self.set_store("drive_group_map", json.dumps(json_drive_group_map)) def remove_osds(self, osd_ids: List[str], replace: bool = False, force: bool = False, zap: bool = False) -> OrchResult[str]: assert self._rook_cluster is not None diff --git a/src/pybind/mgr/rook/rook_cluster.py b/src/pybind/mgr/rook/rook_cluster.py index a4bc0917ef6..bc29b309868 100644 --- a/src/pybind/mgr/rook/rook_cluster.py +++ b/src/pybind/mgr/rook/rook_cluster.py @@ -415,9 +415,14 @@ class DefaultCreator(): if not hasattr(new_cluster.spec.storage, 'storageClassDeviceSets') or not new_cluster.spec.storage.storageClassDeviceSets: new_cluster.spec.storage.storageClassDeviceSets = ccl.StorageClassDeviceSetsList() + existing_scds = [ + scds.name for scds in new_cluster.spec.storage.storageClassDeviceSets + ] for device in to_create: new_scds = self.device_to_device_set(drive_group, device) new_cluster.spec.storage.storageClassDeviceSets.append(new_scds) + if new_scds.name not in existing_scds: + new_cluster.spec.storage.storageClassDeviceSets.append(new_scds) return new_cluster return _add_osds @@ -636,7 +641,16 @@ class DefaultRemover(): class RookCluster(object): # import of client.CoreV1Api must be optional at import time. # Instead allow mgr/rook to be imported anyway. - def __init__(self, coreV1_api: 'client.CoreV1Api', batchV1_api: 'client.BatchV1Api', customObjects_api: 'client.CustomObjectsApi', storageV1_api: 'client.StorageV1Api', appsV1_api: 'client.AppsV1Api', rook_env: 'RookEnv', storage_class: 'str'): + def __init__( + self, + coreV1_api: 'client.CoreV1Api', + batchV1_api: 'client.BatchV1Api', + customObjects_api: 'client.CustomObjectsApi', + storageV1_api: 'client.StorageV1Api', + appsV1_api: 'client.AppsV1Api', + rook_env: 'RookEnv', + storage_class: 'str' + ): self.rook_env = rook_env # type: RookEnv self.coreV1_api = coreV1_api # client.CoreV1Api self.batchV1_api = batchV1_api @@ -653,8 +667,6 @@ class RookCluster(object): label_selector="rook_cluster={0}".format( self.rook_env.namespace)) self.nodes: KubernetesResource[client.V1Node] = KubernetesResource(self.coreV1_api.list_node) - self.drive_group_map: Dict[str, Any] = {} - self.drive_group_lock = threading.Lock() def rook_url(self, path: str) -> str: prefix = "/apis/ceph.rook.io/%s/namespaces/%s/" % ( @@ -790,7 +802,11 @@ class RookCluster(object): image_name = c['image'] break - image_id = d['status']['container_statuses'][0]['image_id'] + ls = d['status'].get('container_statuses') + if not ls: + # ignore pods with no containers + continue + image_id = ls[0]['image_id'] image_id = image_id.split(prefix)[1] if prefix in image_id else image_id s = { @@ -1086,24 +1102,20 @@ class RookCluster(object): assert drive_group.service_id storage_class = self.get_storage_class() inventory = self.get_discovered_devices() - self.creator: Optional[DefaultCreator] = None - if storage_class.metadata.labels and ('local.storage.openshift.io/owner-name' in storage_class.metadata.labels): - self.creator = LSOCreator(inventory, self.coreV1_api, self.storage_class) + creator: Optional[DefaultCreator] = None + if ( + storage_class.metadata.labels + and 'local.storage.openshift.io/owner-name' in storage_class.metadata.labels + ): + creator = LSOCreator(inventory, self.coreV1_api, self.storage_class) else: - self.creator = DefaultCreator(inventory, self.coreV1_api, self.storage_class) - _add_osds = self.creator.add_osds(self.rook_pods, drive_group, matching_hosts) - with self.drive_group_lock: - self.drive_group_map[drive_group.service_id] = _add_osds - return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _add_osds) - - @threaded - def drive_group_loop(self) -> None: - ten_minutes = 10 * 60 - while True: - sleep(ten_minutes) - with self.drive_group_lock: - for _, add_osd in self.drive_group_map.items(): - self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, add_osd) + creator = DefaultCreator(inventory, self.coreV1_api, self.storage_class) + return self._patch( + ccl.CephCluster, + 'cephclusters', + self.rook_env.cluster_name, + creator.add_osds(self.rook_pods, drive_group, matching_hosts) + ) def remove_osds(self, osd_ids: List[str], replace: bool, force: bool, mon_command: Callable) -> str: inventory = self.get_discovered_devices()