Merge pull request #33366 from jmolmo/rook_blinking_lights

mgr/rook: Blinking lights

Reviewed-by: Sebastian Wagner <swagner@suse.com>
This commit is contained in:
Kefu Chai 2020-03-26 16:19:52 +08:00 committed by GitHub
commit 633a8319fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 151 additions and 21 deletions

View File

@ -94,7 +94,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
for the corresponding change to appear in the
Ceph cluster (slow)
Right now, wre calling the k8s API synchronously.
Right now, we are calling the k8s API synchronously.
"""
MODULE_OPTIONS = [
@ -135,7 +135,8 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
super(RookOrchestrator, self).__init__(*args, **kwargs)
self._initialized = threading.Event()
self._k8s = None
self._k8s_CoreV1_api = None
self._k8s_BatchV1_api = None
self._rook_cluster = None
self._rook_env = RookEnv()
@ -150,8 +151,8 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
def k8s(self):
# type: () -> client.CoreV1Api
self._initialized.wait()
assert self._k8s is not None
return self._k8s
assert self._k8s_CoreV1_api is not None
return self._k8s_CoreV1_api
@property
def rook_cluster(self):
@ -178,20 +179,22 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
from kubernetes.client import configuration
configuration.verify_ssl = False
self._k8s = client.CoreV1Api()
self._k8s_CoreV1_api = client.CoreV1Api()
self._k8s_BatchV1_api = client.BatchV1Api()
try:
# XXX mystery hack -- I need to do an API call from
# this context, or subsequent API usage from handle_command
# fails with SSLError('bad handshake'). Suspect some kind of
# thread context setup in SSL lib?
self._k8s.list_namespaced_pod(cluster_name)
self._k8s_CoreV1_api.list_namespaced_pod(cluster_name)
except ApiException:
# Ignore here to make self.available() fail with a proper error message
pass
self._rook_cluster = RookCluster(
self._k8s,
self._k8s_CoreV1_api,
self._k8s_BatchV1_api,
self._rook_env)
self._initialized.set()
@ -550,3 +553,12 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
c = self.get_hosts().then(execute)
return c
def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> RookCompletion:
return write_completion(
on_complete=lambda: self.rook_cluster.blink_light(
ident_fault, on, locs),
message="Switching <{}> identification light in {}".format(
on, ",".join(["{}:{}".format(loc.host, loc.dev) for loc in locs])),
mgr=self
)

View File

@ -11,6 +11,7 @@ import threading
import logging
import json
from contextlib import contextmanager
from time import sleep
import jsonpatch
from six.moves.urllib.parse import urljoin # pylint: disable=import-error
@ -29,9 +30,8 @@ except ImportError:
pass # just for type annotations
try:
from kubernetes import client, watch
from kubernetes.client.rest import ApiException
from kubernetes.client import V1ListMeta, CoreV1Api, V1Pod, V1DeleteOptions
from kubernetes import watch
except ImportError:
class ApiException(Exception): # type: ignore
status = 0
@ -108,8 +108,8 @@ class KubernetesResource(object):
def _fetch(self):
""" Execute the requested api method as a one-off fetch"""
response = self.api_func(**self.kwargs)
# metadata is a V1ListMeta object type
metadata = response.metadata # type: V1ListMeta
# metadata is a client.V1ListMeta object type
metadata = response.metadata # type: client.V1ListMeta
self._items = {item.metadata.name: item for item in response.items}
log.info('Full fetch of {}. result: {}'.format(self.api_func, len(self._items)))
return metadata.resource_version
@ -183,21 +183,22 @@ class KubernetesResource(object):
class RookCluster(object):
def __init__(self, k8s, rook_env):
def __init__(self, coreV1_api, batchV1_api, rook_env):
self.rook_env = rook_env # type: RookEnv
self.k8s = k8s # type: CoreV1Api
self.coreV1_api = coreV1_api # client.CoreV1Api
self.batchV1_api = batchV1_api
# TODO: replace direct k8s calls with Rook API calls
# when they're implemented
self.inventory_maps = KubernetesResource(self.k8s.list_namespaced_config_map,
self.inventory_maps = KubernetesResource(self.coreV1_api.list_namespaced_config_map,
namespace=self.rook_env.operator_namespace,
label_selector="app=rook-discover")
self.rook_pods = KubernetesResource(self.k8s.list_namespaced_pod,
self.rook_pods = KubernetesResource(self.coreV1_api.list_namespaced_pod,
namespace=self.rook_env.namespace,
label_selector="rook_cluster={0}".format(
self.rook_env.cluster_name))
self.nodes = KubernetesResource(self.k8s.list_node)
self.nodes = KubernetesResource(self.coreV1_api.list_node)
def rook_url(self, path):
prefix = "/apis/ceph.rook.io/%s/namespaces/%s/" % (
@ -208,7 +209,7 @@ class RookCluster(object):
full_path = self.rook_url(path)
log.debug("[%s] %s" % (verb, full_path))
return self.k8s.api_client.call_api(
return self.coreV1_api.api_client.call_api(
full_path,
verb,
auth_settings=['BearerToken'],
@ -240,7 +241,7 @@ class RookCluster(object):
try:
result = [i for i in self.inventory_maps.items if predicate(i)]
except ApiException as e:
except ApiException as dummy_e:
log.exception("Failed to fetch device metadata")
raise
@ -287,7 +288,7 @@ class RookCluster(object):
rook_file_system=<self.fs_name>
"""
def predicate(item):
# type: (V1Pod) -> bool
# type: (client.V1Pod) -> bool
metadata = item.metadata
if service_type is not None:
if metadata.labels['app'] != "rook-ceph-{0}".format(service_type):
@ -361,10 +362,10 @@ class RookCluster(object):
daemon_id = d['metadata']['labels']['ceph_daemon_id']
name = daemon_type + '.' + daemon_id
if name in names:
self.k8s.delete_namespaced_pod(
self.coreV1_api.delete_namespaced_pod(
d['metadata']['name'],
self.rook_env.namespace,
body=V1DeleteOptions()
body=client.V1DeleteOptions()
)
num += 1
return "Removed %d pods" % num
@ -652,3 +653,120 @@ class RookCluster(object):
self.rook_api_post("{}/".format(crd_name),
body=new.to_json())
return "Created"
def get_ceph_image(self) -> str:
try:
api_response = self.coreV1_api.list_namespaced_pod(self.rook_env.namespace,
label_selector="app=rook-ceph-mon",
timeout_seconds=10)
if api_response.items:
return api_response.items[-1].spec.containers[0].image
else:
raise orchestrator.OrchestratorError(
"Error getting ceph image. Cluster without monitors")
except ApiException as e:
raise orchestrator.OrchestratorError("Error getting ceph image: {}".format(e))
def _execute_blight_job(self, ident_fault: str, on: bool, loc: orchestrator.DeviceLightLoc) -> str:
operation_id = str(hash(loc))
message = ""
# job definition
job_metadata = client.V1ObjectMeta(name=operation_id,
namespace= self.rook_env.namespace,
labels={"ident": operation_id})
pod_metadata = client.V1ObjectMeta(labels={"ident": operation_id})
pod_container = client.V1Container(name="ceph-lsmcli-command",
security_context=client.V1SecurityContext(privileged=True),
image=self.get_ceph_image(),
command=["lsmcli",],
args=['local-disk-%s-led-%s' % (ident_fault,'on' if on else 'off'),
'--path', loc.path or loc.dev,],
volume_mounts=[client.V1VolumeMount(name="devices", mount_path="/dev"),
client.V1VolumeMount(name="run-udev", mount_path="/run/udev")])
pod_spec = client.V1PodSpec(containers=[pod_container],
active_deadline_seconds=30, # Max time to terminate pod
restart_policy="Never",
node_selector= {"kubernetes.io/hostname": loc.host},
volumes=[client.V1Volume(name="devices",
host_path=client.V1HostPathVolumeSource(path="/dev")),
client.V1Volume(name="run-udev",
host_path=client.V1HostPathVolumeSource(path="/run/udev"))])
pod_template = client.V1PodTemplateSpec(metadata=pod_metadata,
spec=pod_spec)
job_spec = client.V1JobSpec(active_deadline_seconds=60, # Max time to terminate job
ttl_seconds_after_finished=10, # Alfa. Lifetime after finishing (either Complete or Failed)
backoff_limit=0,
template=pod_template)
job = client.V1Job(api_version="batch/v1",
kind="Job",
metadata=job_metadata,
spec=job_spec)
# delete previous job if it exists
try:
try:
api_response = self.batchV1_api.delete_namespaced_job(operation_id,
self.rook_env.namespace,
propagation_policy="Background")
except ApiException as e:
if e.status != 404: # No problem if the job does not exist
raise
# wait until the job is not present
deleted = False
retries = 0
while not deleted and retries < 10:
api_response = self.batchV1_api.list_namespaced_job(self.rook_env.namespace,
label_selector="ident=%s" % operation_id,
timeout_seconds=10)
deleted = not api_response.items
if retries > 5:
sleep(0.1)
++retries
if retries == 10 and not deleted:
raise orchestrator.OrchestratorError(
"Light <{}> in <{}:{}> cannot be executed. Cannot delete previous job <{}>".format(
on, loc.host, loc.path or loc.dev, operation_id))
# create the job
api_response = self.batchV1_api.create_namespaced_job(self.rook_env.namespace, job)
# get the result
finished = False
while not finished:
api_response = self.batchV1_api.read_namespaced_job(operation_id,
self.rook_env.namespace)
finished = api_response.status.succeeded or api_response.status.failed
if finished:
message = api_response.status.conditions[-1].message
# get the result of the lsmcli command
api_response=self.coreV1_api.list_namespaced_pod(self.rook_env.namespace,
label_selector="ident=%s" % operation_id,
timeout_seconds=10)
if api_response.items:
pod_name = api_response.items[-1].metadata.name
message = self.coreV1_api.read_namespaced_pod_log(pod_name,
self.rook_env.namespace)
except ApiException as e:
log.exception('K8s API failed. {}'.format(e))
raise
# Finally, delete the job.
# The job uses <ttl_seconds_after_finished>. This makes that the TTL controller delete automatically the job.
# This feature is in Alpha state, so extra explicit delete operations trying to delete the Job has been used strategically
try:
api_response = self.batchV1_api.delete_namespaced_job(operation_id,
self.rook_env.namespace,
propagation_policy="Background")
except ApiException as e:
if e.status != 404: # No problem if the job does not exist
raise
return message
def blink_light(self, ident_fault, on, locs):
# type: (str, bool, List[orchestrator.DeviceLightLoc]) -> List[str]
return [self._execute_blight_job(ident_fault, on, loc) for loc in locs]