diff --git a/src/pybind/mgr/rook/module.py b/src/pybind/mgr/rook/module.py index 1fe17a60ab4..959a75952a4 100644 --- a/src/pybind/mgr/rook/module.py +++ b/src/pybind/mgr/rook/module.py @@ -94,7 +94,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): for the corresponding change to appear in the Ceph cluster (slow) - Right now, wre calling the k8s API synchronously. + Right now, we are calling the k8s API synchronously. """ MODULE_OPTIONS = [ @@ -135,7 +135,8 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): super(RookOrchestrator, self).__init__(*args, **kwargs) self._initialized = threading.Event() - self._k8s = None + self._k8s_CoreV1_api = None + self._k8s_BatchV1_api = None self._rook_cluster = None self._rook_env = RookEnv() @@ -150,8 +151,8 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): def k8s(self): # type: () -> client.CoreV1Api self._initialized.wait() - assert self._k8s is not None - return self._k8s + assert self._k8s_CoreV1_api is not None + return self._k8s_CoreV1_api @property def rook_cluster(self): @@ -178,20 +179,22 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): from kubernetes.client import configuration configuration.verify_ssl = False - self._k8s = client.CoreV1Api() + self._k8s_CoreV1_api = client.CoreV1Api() + self._k8s_BatchV1_api = client.BatchV1Api() try: # XXX mystery hack -- I need to do an API call from # this context, or subsequent API usage from handle_command # fails with SSLError('bad handshake'). Suspect some kind of # thread context setup in SSL lib? - self._k8s.list_namespaced_pod(cluster_name) + self._k8s_CoreV1_api.list_namespaced_pod(cluster_name) except ApiException: # Ignore here to make self.available() fail with a proper error message pass self._rook_cluster = RookCluster( - self._k8s, + self._k8s_CoreV1_api, + self._k8s_BatchV1_api, self._rook_env) self._initialized.set() @@ -550,3 +553,12 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): c = self.get_hosts().then(execute) return c + + def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> RookCompletion: + return write_completion( + on_complete=lambda: self.rook_cluster.blink_light( + ident_fault, on, locs), + message="Switching <{}> identification light in {}".format( + on, ",".join(["{}:{}".format(loc.host, loc.dev) for loc in locs])), + mgr=self + ) diff --git a/src/pybind/mgr/rook/rook_cluster.py b/src/pybind/mgr/rook/rook_cluster.py index f177c1668e8..cd4b5899003 100644 --- a/src/pybind/mgr/rook/rook_cluster.py +++ b/src/pybind/mgr/rook/rook_cluster.py @@ -11,6 +11,7 @@ import threading import logging import json from contextlib import contextmanager +from time import sleep import jsonpatch from six.moves.urllib.parse import urljoin # pylint: disable=import-error @@ -29,9 +30,8 @@ except ImportError: pass # just for type annotations try: + from kubernetes import client, watch from kubernetes.client.rest import ApiException - from kubernetes.client import V1ListMeta, CoreV1Api, V1Pod, V1DeleteOptions - from kubernetes import watch except ImportError: class ApiException(Exception): # type: ignore status = 0 @@ -108,8 +108,8 @@ class KubernetesResource(object): def _fetch(self): """ Execute the requested api method as a one-off fetch""" response = self.api_func(**self.kwargs) - # metadata is a V1ListMeta object type - metadata = response.metadata # type: V1ListMeta + # metadata is a client.V1ListMeta object type + metadata = response.metadata # type: client.V1ListMeta self._items = {item.metadata.name: item for item in response.items} log.info('Full fetch of {}. result: {}'.format(self.api_func, len(self._items))) return metadata.resource_version @@ -183,21 +183,22 @@ class KubernetesResource(object): class RookCluster(object): - def __init__(self, k8s, rook_env): + def __init__(self, coreV1_api, batchV1_api, rook_env): self.rook_env = rook_env # type: RookEnv - self.k8s = k8s # type: CoreV1Api + self.coreV1_api = coreV1_api # client.CoreV1Api + self.batchV1_api = batchV1_api # TODO: replace direct k8s calls with Rook API calls # when they're implemented - self.inventory_maps = KubernetesResource(self.k8s.list_namespaced_config_map, + self.inventory_maps = KubernetesResource(self.coreV1_api.list_namespaced_config_map, namespace=self.rook_env.operator_namespace, label_selector="app=rook-discover") - self.rook_pods = KubernetesResource(self.k8s.list_namespaced_pod, + self.rook_pods = KubernetesResource(self.coreV1_api.list_namespaced_pod, namespace=self.rook_env.namespace, label_selector="rook_cluster={0}".format( self.rook_env.cluster_name)) - self.nodes = KubernetesResource(self.k8s.list_node) + self.nodes = KubernetesResource(self.coreV1_api.list_node) def rook_url(self, path): prefix = "/apis/ceph.rook.io/%s/namespaces/%s/" % ( @@ -208,7 +209,7 @@ class RookCluster(object): full_path = self.rook_url(path) log.debug("[%s] %s" % (verb, full_path)) - return self.k8s.api_client.call_api( + return self.coreV1_api.api_client.call_api( full_path, verb, auth_settings=['BearerToken'], @@ -240,7 +241,7 @@ class RookCluster(object): try: result = [i for i in self.inventory_maps.items if predicate(i)] - except ApiException as e: + except ApiException as dummy_e: log.exception("Failed to fetch device metadata") raise @@ -287,7 +288,7 @@ class RookCluster(object): rook_file_system= """ def predicate(item): - # type: (V1Pod) -> bool + # type: (client.V1Pod) -> bool metadata = item.metadata if service_type is not None: if metadata.labels['app'] != "rook-ceph-{0}".format(service_type): @@ -361,10 +362,10 @@ class RookCluster(object): daemon_id = d['metadata']['labels']['ceph_daemon_id'] name = daemon_type + '.' + daemon_id if name in names: - self.k8s.delete_namespaced_pod( + self.coreV1_api.delete_namespaced_pod( d['metadata']['name'], self.rook_env.namespace, - body=V1DeleteOptions() + body=client.V1DeleteOptions() ) num += 1 return "Removed %d pods" % num @@ -652,3 +653,120 @@ class RookCluster(object): self.rook_api_post("{}/".format(crd_name), body=new.to_json()) return "Created" + def get_ceph_image(self) -> str: + try: + api_response = self.coreV1_api.list_namespaced_pod(self.rook_env.namespace, + label_selector="app=rook-ceph-mon", + timeout_seconds=10) + if api_response.items: + return api_response.items[-1].spec.containers[0].image + else: + raise orchestrator.OrchestratorError( + "Error getting ceph image. Cluster without monitors") + except ApiException as e: + raise orchestrator.OrchestratorError("Error getting ceph image: {}".format(e)) + + + def _execute_blight_job(self, ident_fault: str, on: bool, loc: orchestrator.DeviceLightLoc) -> str: + operation_id = str(hash(loc)) + message = "" + + # job definition + job_metadata = client.V1ObjectMeta(name=operation_id, + namespace= self.rook_env.namespace, + labels={"ident": operation_id}) + pod_metadata = client.V1ObjectMeta(labels={"ident": operation_id}) + pod_container = client.V1Container(name="ceph-lsmcli-command", + security_context=client.V1SecurityContext(privileged=True), + image=self.get_ceph_image(), + command=["lsmcli",], + args=['local-disk-%s-led-%s' % (ident_fault,'on' if on else 'off'), + '--path', loc.path or loc.dev,], + volume_mounts=[client.V1VolumeMount(name="devices", mount_path="/dev"), + client.V1VolumeMount(name="run-udev", mount_path="/run/udev")]) + pod_spec = client.V1PodSpec(containers=[pod_container], + active_deadline_seconds=30, # Max time to terminate pod + restart_policy="Never", + node_selector= {"kubernetes.io/hostname": loc.host}, + volumes=[client.V1Volume(name="devices", + host_path=client.V1HostPathVolumeSource(path="/dev")), + client.V1Volume(name="run-udev", + host_path=client.V1HostPathVolumeSource(path="/run/udev"))]) + pod_template = client.V1PodTemplateSpec(metadata=pod_metadata, + spec=pod_spec) + job_spec = client.V1JobSpec(active_deadline_seconds=60, # Max time to terminate job + ttl_seconds_after_finished=10, # Alfa. Lifetime after finishing (either Complete or Failed) + backoff_limit=0, + template=pod_template) + job = client.V1Job(api_version="batch/v1", + kind="Job", + metadata=job_metadata, + spec=job_spec) + + # delete previous job if it exists + try: + try: + api_response = self.batchV1_api.delete_namespaced_job(operation_id, + self.rook_env.namespace, + propagation_policy="Background") + except ApiException as e: + if e.status != 404: # No problem if the job does not exist + raise + + # wait until the job is not present + deleted = False + retries = 0 + while not deleted and retries < 10: + api_response = self.batchV1_api.list_namespaced_job(self.rook_env.namespace, + label_selector="ident=%s" % operation_id, + timeout_seconds=10) + deleted = not api_response.items + if retries > 5: + sleep(0.1) + ++retries + if retries == 10 and not deleted: + raise orchestrator.OrchestratorError( + "Light <{}> in <{}:{}> cannot be executed. Cannot delete previous job <{}>".format( + on, loc.host, loc.path or loc.dev, operation_id)) + + # create the job + api_response = self.batchV1_api.create_namespaced_job(self.rook_env.namespace, job) + + # get the result + finished = False + while not finished: + api_response = self.batchV1_api.read_namespaced_job(operation_id, + self.rook_env.namespace) + finished = api_response.status.succeeded or api_response.status.failed + if finished: + message = api_response.status.conditions[-1].message + + # get the result of the lsmcli command + api_response=self.coreV1_api.list_namespaced_pod(self.rook_env.namespace, + label_selector="ident=%s" % operation_id, + timeout_seconds=10) + if api_response.items: + pod_name = api_response.items[-1].metadata.name + message = self.coreV1_api.read_namespaced_pod_log(pod_name, + self.rook_env.namespace) + + except ApiException as e: + log.exception('K8s API failed. {}'.format(e)) + raise + + # Finally, delete the job. + # The job uses . This makes that the TTL controller delete automatically the job. + # This feature is in Alpha state, so extra explicit delete operations trying to delete the Job has been used strategically + try: + api_response = self.batchV1_api.delete_namespaced_job(operation_id, + self.rook_env.namespace, + propagation_policy="Background") + except ApiException as e: + if e.status != 404: # No problem if the job does not exist + raise + + return message + + def blink_light(self, ident_fault, on, locs): + # type: (str, bool, List[orchestrator.DeviceLightLoc]) -> List[str] + return [self._execute_blight_job(ident_fault, on, loc) for loc in locs]