Merge pull request #43775 from liewegas/wip-mgr-rook-osd-creation

mgr/rook: persist drive groups

Reviewed-by: Sebastian Wagner <sewagner@redhat.com>
This commit is contained in:
Sebastian Wagner 2021-11-10 17:34:07 +01:00 committed by GitHub
commit 7c7eb25130
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 110 additions and 37 deletions

View File

@ -9,7 +9,7 @@ from ceph.deployment import inventory
from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, PlacementSpec
from ceph.utils import datetime_now
from typing import List, Dict, Optional, Callable, Any, TypeVar, Tuple
from typing import List, Dict, Optional, Callable, Any, TypeVar, Tuple, TYPE_CHECKING
try:
from ceph.deployment.drive_group import DriveGroupSpec
@ -81,6 +81,12 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
default='local',
desc='storage class name for LSO-discovered PVs',
),
Option(
'drive_group_interval',
type='float',
default=300.0,
desc='interval in seconds between re-application of applied drive_groups',
),
]
@staticmethod
@ -115,8 +121,13 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
self._rook_cluster: Optional[RookCluster] = None
self._rook_env = RookEnv()
self._k8s_AppsV1_api: Optional[client.AppsV1Api] = None
self.storage_class = self.get_module_option('storage_class')
self.config_notify()
if TYPE_CHECKING:
self.storage_class = 'foo'
self.drive_group_interval = 10.0
self._load_drive_groups()
self._shutdown = threading.Event()
def config_notify(self) -> None:
@ -132,7 +143,10 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
self.log.debug(' mgr option %s = %s',
opt['name'], getattr(self, opt['name'])) # type: ignore
assert isinstance(self.storage_class, str)
self.rook_cluster.storage_class = self.storage_class
assert isinstance(self.drive_group_interval, float)
if self._rook_cluster:
self._rook_cluster.storage_class = self.storage_class
def shutdown(self) -> None:
self._shutdown.set()
@ -194,9 +208,11 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
self.storage_class)
self._initialized.set()
self.config_notify()
while not self._shutdown.is_set():
self._shutdown.wait(5)
self._apply_drivegroups(list(self._drive_group_map.values()))
self._shutdown.wait(self.drive_group_interval)
@handle_orch_error
def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]:
@ -263,7 +279,11 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
container_image_name=image_name,
last_refresh=now,
)
if not cl['spec'].get('crashCollector', {}).get('disable', False):
if (
service_type == 'crash' or service_type is None
and not cl['spec'].get('crashCollector', {}).get('disable', False)
):
spec['crash'] = orchestrator.ServiceDescription(
spec=ServiceSpec(
'crash',
@ -344,17 +364,29 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
)
if service_type == 'osd' or service_type is None:
# OSDs
# FIXME: map running OSDs back to their respective services...
# the catch-all unmanaged
all_osds = self.rook_cluster.get_osds()
svc = 'osd'
spec[svc] = orchestrator.ServiceDescription(
spec=DriveGroupSpec(
unmanaged=True,
service_type='osd',
placement=PlacementSpec(count=len(all_osds), hosts=[osd.metadata.labels['topology-location-host'] for osd in all_osds]),
),
size=len(all_osds),
last_refresh=now,
running= sum(osd.status.phase == 'Running' for osd in all_osds)
running=sum(osd.status.phase == 'Running' for osd in all_osds)
)
# drivegroups
for name, dg in self._drive_group_map.items():
spec[f'osd.{name}'] = orchestrator.ServiceDescription(
spec=dg,
last_refresh=now,
size=0,
running=0,
)
if service_type == 'rbd-mirror' or service_type is None:
# rbd-mirrors
@ -466,15 +498,20 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
def remove_service(self, service_name: str, force: bool = False) -> str:
if service_name == 'rbd-mirror':
return self.rook_cluster.rm_service('cephrbdmirrors', 'default-rbd-mirror')
service_type, service_name = service_name.split('.', 1)
service_type, service_id = service_name.split('.', 1)
if service_type == 'mds':
return self.rook_cluster.rm_service('cephfilesystems', service_name)
return self.rook_cluster.rm_service('cephfilesystems', service_id)
elif service_type == 'rgw':
return self.rook_cluster.rm_service('cephobjectstores', service_name)
return self.rook_cluster.rm_service('cephobjectstores', service_id)
elif service_type == 'nfs':
return self.rook_cluster.rm_service('cephnfses', service_name)
return self.rook_cluster.rm_service('cephnfses', service_id)
elif service_type == 'rbd-mirror':
return self.rook_cluster.rm_service('cephrbdmirrors', service_name)
return self.rook_cluster.rm_service('cephrbdmirrors', service_id)
elif service_type == 'osd':
if service_id in self._drive_group_map:
del self._drive_group_map[service_id]
self._save_drive_groups()
return f'Removed {service_name}'
else:
raise orchestrator.OrchestratorError(f'Service type {service_type} not supported')
@ -523,10 +560,18 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
return self.rook_cluster.remove_pods(names)
def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> OrchResult[List[str]]:
result_list = []
all_hosts = raise_if_exception(self.get_hosts())
for drive_group in specs:
matching_hosts = drive_group.placement.filter_matching_hosts(lambda label=None, as_hostspec=None: all_hosts)
self._drive_group_map[str(drive_group.service_id)] = drive_group
self._save_drive_groups()
return OrchResult(self._apply_drivegroups(specs))
def _apply_drivegroups(self, ls: List[DriveGroupSpec]) -> List[str]:
all_hosts = raise_if_exception(self.get_hosts())
result_list: List[str] = []
for drive_group in ls:
matching_hosts = drive_group.placement.filter_matching_hosts(
lambda label=None, as_hostspec=None: all_hosts
)
if not self.rook_cluster.node_exists(matching_hosts[0]):
raise RuntimeError("Node '{0}' is not in the Kubernetes "
@ -538,7 +583,23 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
raise RuntimeError("Rook cluster configuration does not "
"support OSD creation.")
result_list.append(self.rook_cluster.add_osds(drive_group, matching_hosts))
return OrchResult(result_list)
return result_list
def _load_drive_groups(self) -> None:
stored_drive_group = self.get_store("drive_group_map")
self._drive_group_map: Dict[str, DriveGroupSpec] = {}
if stored_drive_group:
for name, dg in json.loads(stored_drive_group).items():
try:
self._drive_group_map[name] = DriveGroupSpec.from_json(dg)
except ValueError as e:
self.log.error(f'Failed to load drive group {name} ({dg}): {e}')
def _save_drive_groups(self) -> None:
json_drive_group_map = {
name: dg.to_json() for name, dg in self._drive_group_map.items()
}
self.set_store("drive_group_map", json.dumps(json_drive_group_map))
def remove_osds(self, osd_ids: List[str], replace: bool = False, force: bool = False, zap: bool = False) -> OrchResult[str]:
assert self._rook_cluster is not None

View File

@ -415,9 +415,14 @@ class DefaultCreator():
if not hasattr(new_cluster.spec.storage, 'storageClassDeviceSets') or not new_cluster.spec.storage.storageClassDeviceSets:
new_cluster.spec.storage.storageClassDeviceSets = ccl.StorageClassDeviceSetsList()
existing_scds = [
scds.name for scds in new_cluster.spec.storage.storageClassDeviceSets
]
for device in to_create:
new_scds = self.device_to_device_set(drive_group, device)
new_cluster.spec.storage.storageClassDeviceSets.append(new_scds)
if new_scds.name not in existing_scds:
new_cluster.spec.storage.storageClassDeviceSets.append(new_scds)
return new_cluster
return _add_osds
@ -636,7 +641,16 @@ class DefaultRemover():
class RookCluster(object):
# import of client.CoreV1Api must be optional at import time.
# Instead allow mgr/rook to be imported anyway.
def __init__(self, coreV1_api: 'client.CoreV1Api', batchV1_api: 'client.BatchV1Api', customObjects_api: 'client.CustomObjectsApi', storageV1_api: 'client.StorageV1Api', appsV1_api: 'client.AppsV1Api', rook_env: 'RookEnv', storage_class: 'str'):
def __init__(
self,
coreV1_api: 'client.CoreV1Api',
batchV1_api: 'client.BatchV1Api',
customObjects_api: 'client.CustomObjectsApi',
storageV1_api: 'client.StorageV1Api',
appsV1_api: 'client.AppsV1Api',
rook_env: 'RookEnv',
storage_class: 'str'
):
self.rook_env = rook_env # type: RookEnv
self.coreV1_api = coreV1_api # client.CoreV1Api
self.batchV1_api = batchV1_api
@ -653,8 +667,6 @@ class RookCluster(object):
label_selector="rook_cluster={0}".format(
self.rook_env.namespace))
self.nodes: KubernetesResource[client.V1Node] = KubernetesResource(self.coreV1_api.list_node)
self.drive_group_map: Dict[str, Any] = {}
self.drive_group_lock = threading.Lock()
def rook_url(self, path: str) -> str:
prefix = "/apis/ceph.rook.io/%s/namespaces/%s/" % (
@ -790,7 +802,11 @@ class RookCluster(object):
image_name = c['image']
break
image_id = d['status']['container_statuses'][0]['image_id']
ls = d['status'].get('container_statuses')
if not ls:
# ignore pods with no containers
continue
image_id = ls[0]['image_id']
image_id = image_id.split(prefix)[1] if prefix in image_id else image_id
s = {
@ -1086,24 +1102,20 @@ class RookCluster(object):
assert drive_group.service_id
storage_class = self.get_storage_class()
inventory = self.get_discovered_devices()
self.creator: Optional[DefaultCreator] = None
if storage_class.metadata.labels and ('local.storage.openshift.io/owner-name' in storage_class.metadata.labels):
self.creator = LSOCreator(inventory, self.coreV1_api, self.storage_class)
creator: Optional[DefaultCreator] = None
if (
storage_class.metadata.labels
and 'local.storage.openshift.io/owner-name' in storage_class.metadata.labels
):
creator = LSOCreator(inventory, self.coreV1_api, self.storage_class)
else:
self.creator = DefaultCreator(inventory, self.coreV1_api, self.storage_class)
_add_osds = self.creator.add_osds(self.rook_pods, drive_group, matching_hosts)
with self.drive_group_lock:
self.drive_group_map[drive_group.service_id] = _add_osds
return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _add_osds)
@threaded
def drive_group_loop(self) -> None:
ten_minutes = 10 * 60
while True:
sleep(ten_minutes)
with self.drive_group_lock:
for _, add_osd in self.drive_group_map.items():
self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, add_osd)
creator = DefaultCreator(inventory, self.coreV1_api, self.storage_class)
return self._patch(
ccl.CephCluster,
'cephclusters',
self.rook_env.cluster_name,
creator.add_osds(self.rook_pods, drive_group, matching_hosts)
)
def remove_osds(self, osd_ids: List[str], replace: bool, force: bool, mon_command: Callable) -> str:
inventory = self.get_discovered_devices()