mirror of
https://github.com/ceph/ceph
synced 2025-01-01 00:22:25 +00:00
mgr/rook: Blinking lights
Blinking lights implementation Signed-off-by: Juan Miguel Olmo Martínez <jolmomar@redhat.com>
This commit is contained in:
parent
867bc60644
commit
a35c1b2593
@ -94,7 +94,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
|
||||
for the corresponding change to appear in the
|
||||
Ceph cluster (slow)
|
||||
|
||||
Right now, wre calling the k8s API synchronously.
|
||||
Right now, we are calling the k8s API synchronously.
|
||||
"""
|
||||
|
||||
MODULE_OPTIONS = [
|
||||
@ -135,7 +135,8 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
|
||||
super(RookOrchestrator, self).__init__(*args, **kwargs)
|
||||
|
||||
self._initialized = threading.Event()
|
||||
self._k8s = None
|
||||
self._k8s_CoreV1_api = None
|
||||
self._k8s_BatchV1_api = None
|
||||
self._rook_cluster = None
|
||||
self._rook_env = RookEnv()
|
||||
|
||||
@ -150,8 +151,8 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
|
||||
def k8s(self):
|
||||
# type: () -> client.CoreV1Api
|
||||
self._initialized.wait()
|
||||
assert self._k8s is not None
|
||||
return self._k8s
|
||||
assert self._k8s_CoreV1_api is not None
|
||||
return self._k8s_CoreV1_api
|
||||
|
||||
@property
|
||||
def rook_cluster(self):
|
||||
@ -178,20 +179,22 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
|
||||
from kubernetes.client import configuration
|
||||
configuration.verify_ssl = False
|
||||
|
||||
self._k8s = client.CoreV1Api()
|
||||
self._k8s_CoreV1_api = client.CoreV1Api()
|
||||
self._k8s_BatchV1_api = client.BatchV1Api()
|
||||
|
||||
try:
|
||||
# XXX mystery hack -- I need to do an API call from
|
||||
# this context, or subsequent API usage from handle_command
|
||||
# fails with SSLError('bad handshake'). Suspect some kind of
|
||||
# thread context setup in SSL lib?
|
||||
self._k8s.list_namespaced_pod(cluster_name)
|
||||
self._k8s_CoreV1_api.list_namespaced_pod(cluster_name)
|
||||
except ApiException:
|
||||
# Ignore here to make self.available() fail with a proper error message
|
||||
pass
|
||||
|
||||
self._rook_cluster = RookCluster(
|
||||
self._k8s,
|
||||
self._k8s_CoreV1_api,
|
||||
self._k8s_BatchV1_api,
|
||||
self._rook_env)
|
||||
|
||||
self._initialized.set()
|
||||
@ -550,3 +553,12 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
|
||||
|
||||
c = self.get_hosts().then(execute)
|
||||
return c
|
||||
|
||||
def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> RookCompletion:
|
||||
return write_completion(
|
||||
on_complete=lambda: self.rook_cluster.blink_light(
|
||||
ident_fault, on, locs),
|
||||
message="Switching <{}> identification light in {}".format(
|
||||
on, ",".join(["{}:{}".format(loc.host, loc.dev) for loc in locs])),
|
||||
mgr=self
|
||||
)
|
||||
|
@ -11,6 +11,7 @@ import threading
|
||||
import logging
|
||||
import json
|
||||
from contextlib import contextmanager
|
||||
from time import sleep
|
||||
|
||||
import jsonpatch
|
||||
from six.moves.urllib.parse import urljoin # pylint: disable=import-error
|
||||
@ -29,9 +30,8 @@ except ImportError:
|
||||
pass # just for type annotations
|
||||
|
||||
try:
|
||||
from kubernetes import client, watch
|
||||
from kubernetes.client.rest import ApiException
|
||||
from kubernetes.client import V1ListMeta, CoreV1Api, V1Pod, V1DeleteOptions
|
||||
from kubernetes import watch
|
||||
except ImportError:
|
||||
class ApiException(Exception): # type: ignore
|
||||
status = 0
|
||||
@ -108,8 +108,8 @@ class KubernetesResource(object):
|
||||
def _fetch(self):
|
||||
""" Execute the requested api method as a one-off fetch"""
|
||||
response = self.api_func(**self.kwargs)
|
||||
# metadata is a V1ListMeta object type
|
||||
metadata = response.metadata # type: V1ListMeta
|
||||
# metadata is a client.V1ListMeta object type
|
||||
metadata = response.metadata # type: client.V1ListMeta
|
||||
self._items = {item.metadata.name: item for item in response.items}
|
||||
log.info('Full fetch of {}. result: {}'.format(self.api_func, len(self._items)))
|
||||
return metadata.resource_version
|
||||
@ -183,21 +183,22 @@ class KubernetesResource(object):
|
||||
|
||||
|
||||
class RookCluster(object):
|
||||
def __init__(self, k8s, rook_env):
|
||||
def __init__(self, coreV1_api, batchV1_api, rook_env):
|
||||
self.rook_env = rook_env # type: RookEnv
|
||||
self.k8s = k8s # type: CoreV1Api
|
||||
self.coreV1_api = coreV1_api # client.CoreV1Api
|
||||
self.batchV1_api = batchV1_api
|
||||
|
||||
# TODO: replace direct k8s calls with Rook API calls
|
||||
# when they're implemented
|
||||
self.inventory_maps = KubernetesResource(self.k8s.list_namespaced_config_map,
|
||||
self.inventory_maps = KubernetesResource(self.coreV1_api.list_namespaced_config_map,
|
||||
namespace=self.rook_env.operator_namespace,
|
||||
label_selector="app=rook-discover")
|
||||
|
||||
self.rook_pods = KubernetesResource(self.k8s.list_namespaced_pod,
|
||||
self.rook_pods = KubernetesResource(self.coreV1_api.list_namespaced_pod,
|
||||
namespace=self.rook_env.namespace,
|
||||
label_selector="rook_cluster={0}".format(
|
||||
self.rook_env.cluster_name))
|
||||
self.nodes = KubernetesResource(self.k8s.list_node)
|
||||
self.nodes = KubernetesResource(self.coreV1_api.list_node)
|
||||
|
||||
def rook_url(self, path):
|
||||
prefix = "/apis/ceph.rook.io/%s/namespaces/%s/" % (
|
||||
@ -208,7 +209,7 @@ class RookCluster(object):
|
||||
full_path = self.rook_url(path)
|
||||
log.debug("[%s] %s" % (verb, full_path))
|
||||
|
||||
return self.k8s.api_client.call_api(
|
||||
return self.coreV1_api.api_client.call_api(
|
||||
full_path,
|
||||
verb,
|
||||
auth_settings=['BearerToken'],
|
||||
@ -240,7 +241,7 @@ class RookCluster(object):
|
||||
|
||||
try:
|
||||
result = [i for i in self.inventory_maps.items if predicate(i)]
|
||||
except ApiException as e:
|
||||
except ApiException as dummy_e:
|
||||
log.exception("Failed to fetch device metadata")
|
||||
raise
|
||||
|
||||
@ -287,7 +288,7 @@ class RookCluster(object):
|
||||
rook_file_system=<self.fs_name>
|
||||
"""
|
||||
def predicate(item):
|
||||
# type: (V1Pod) -> bool
|
||||
# type: (client.V1Pod) -> bool
|
||||
metadata = item.metadata
|
||||
if service_type is not None:
|
||||
if metadata.labels['app'] != "rook-ceph-{0}".format(service_type):
|
||||
@ -361,10 +362,10 @@ class RookCluster(object):
|
||||
daemon_id = d['metadata']['labels']['ceph_daemon_id']
|
||||
name = daemon_type + '.' + daemon_id
|
||||
if name in names:
|
||||
self.k8s.delete_namespaced_pod(
|
||||
self.coreV1_api.delete_namespaced_pod(
|
||||
d['metadata']['name'],
|
||||
self.rook_env.namespace,
|
||||
body=V1DeleteOptions()
|
||||
body=client.V1DeleteOptions()
|
||||
)
|
||||
num += 1
|
||||
return "Removed %d pods" % num
|
||||
@ -652,3 +653,120 @@ class RookCluster(object):
|
||||
self.rook_api_post("{}/".format(crd_name),
|
||||
body=new.to_json())
|
||||
return "Created"
|
||||
def get_ceph_image(self) -> str:
|
||||
try:
|
||||
api_response = self.coreV1_api.list_namespaced_pod(self.rook_env.namespace,
|
||||
label_selector="app=rook-ceph-mon",
|
||||
timeout_seconds=10)
|
||||
if api_response.items:
|
||||
return api_response.items[-1].spec.containers[0].image
|
||||
else:
|
||||
raise orchestrator.OrchestratorError(
|
||||
"Error getting ceph image. Cluster without monitors")
|
||||
except ApiException as e:
|
||||
raise orchestrator.OrchestratorError("Error getting ceph image: {}".format(e))
|
||||
|
||||
|
||||
def _execute_blight_job(self, ident_fault: str, on: bool, loc: orchestrator.DeviceLightLoc) -> str:
|
||||
operation_id = str(hash(loc))
|
||||
message = ""
|
||||
|
||||
# job definition
|
||||
job_metadata = client.V1ObjectMeta(name=operation_id,
|
||||
namespace= self.rook_env.namespace,
|
||||
labels={"ident": operation_id})
|
||||
pod_metadata = client.V1ObjectMeta(labels={"ident": operation_id})
|
||||
pod_container = client.V1Container(name="ceph-lsmcli-command",
|
||||
security_context=client.V1SecurityContext(privileged=True),
|
||||
image=self.get_ceph_image(),
|
||||
command=["lsmcli",],
|
||||
args=['local-disk-%s-led-%s' % (ident_fault,'on' if on else 'off'),
|
||||
'--path', loc.path or loc.dev,],
|
||||
volume_mounts=[client.V1VolumeMount(name="devices", mount_path="/dev"),
|
||||
client.V1VolumeMount(name="run-udev", mount_path="/run/udev")])
|
||||
pod_spec = client.V1PodSpec(containers=[pod_container],
|
||||
active_deadline_seconds=30, # Max time to terminate pod
|
||||
restart_policy="Never",
|
||||
node_selector= {"kubernetes.io/hostname": loc.host},
|
||||
volumes=[client.V1Volume(name="devices",
|
||||
host_path=client.V1HostPathVolumeSource(path="/dev")),
|
||||
client.V1Volume(name="run-udev",
|
||||
host_path=client.V1HostPathVolumeSource(path="/run/udev"))])
|
||||
pod_template = client.V1PodTemplateSpec(metadata=pod_metadata,
|
||||
spec=pod_spec)
|
||||
job_spec = client.V1JobSpec(active_deadline_seconds=60, # Max time to terminate job
|
||||
ttl_seconds_after_finished=10, # Alfa. Lifetime after finishing (either Complete or Failed)
|
||||
backoff_limit=0,
|
||||
template=pod_template)
|
||||
job = client.V1Job(api_version="batch/v1",
|
||||
kind="Job",
|
||||
metadata=job_metadata,
|
||||
spec=job_spec)
|
||||
|
||||
# delete previous job if it exists
|
||||
try:
|
||||
try:
|
||||
api_response = self.batchV1_api.delete_namespaced_job(operation_id,
|
||||
self.rook_env.namespace,
|
||||
propagation_policy="Background")
|
||||
except ApiException as e:
|
||||
if e.status != 404: # No problem if the job does not exist
|
||||
raise
|
||||
|
||||
# wait until the job is not present
|
||||
deleted = False
|
||||
retries = 0
|
||||
while not deleted and retries < 10:
|
||||
api_response = self.batchV1_api.list_namespaced_job(self.rook_env.namespace,
|
||||
label_selector="ident=%s" % operation_id,
|
||||
timeout_seconds=10)
|
||||
deleted = not api_response.items
|
||||
if retries > 5:
|
||||
sleep(0.1)
|
||||
++retries
|
||||
if retries == 10 and not deleted:
|
||||
raise orchestrator.OrchestratorError(
|
||||
"Light <{}> in <{}:{}> cannot be executed. Cannot delete previous job <{}>".format(
|
||||
on, loc.host, loc.path or loc.dev, operation_id))
|
||||
|
||||
# create the job
|
||||
api_response = self.batchV1_api.create_namespaced_job(self.rook_env.namespace, job)
|
||||
|
||||
# get the result
|
||||
finished = False
|
||||
while not finished:
|
||||
api_response = self.batchV1_api.read_namespaced_job(operation_id,
|
||||
self.rook_env.namespace)
|
||||
finished = api_response.status.succeeded or api_response.status.failed
|
||||
if finished:
|
||||
message = api_response.status.conditions[-1].message
|
||||
|
||||
# get the result of the lsmcli command
|
||||
api_response=self.coreV1_api.list_namespaced_pod(self.rook_env.namespace,
|
||||
label_selector="ident=%s" % operation_id,
|
||||
timeout_seconds=10)
|
||||
if api_response.items:
|
||||
pod_name = api_response.items[-1].metadata.name
|
||||
message = self.coreV1_api.read_namespaced_pod_log(pod_name,
|
||||
self.rook_env.namespace)
|
||||
|
||||
except ApiException as e:
|
||||
log.exception('K8s API failed. {}'.format(e))
|
||||
raise
|
||||
|
||||
# Finally, delete the job.
|
||||
# The job uses <ttl_seconds_after_finished>. This makes that the TTL controller delete automatically the job.
|
||||
# This feature is in Alpha state, so extra explicit delete operations trying to delete the Job has been used strategically
|
||||
try:
|
||||
api_response = self.batchV1_api.delete_namespaced_job(operation_id,
|
||||
self.rook_env.namespace,
|
||||
propagation_policy="Background")
|
||||
except ApiException as e:
|
||||
if e.status != 404: # No problem if the job does not exist
|
||||
raise
|
||||
|
||||
return message
|
||||
|
||||
def blink_light(self, ident_fault, on, locs):
|
||||
# type: (str, bool, List[orchestrator.DeviceLightLoc]) -> List[str]
|
||||
return [self._execute_blight_job(ident_fault, on, loc) for loc in locs]
|
||||
|
Loading…
Reference in New Issue
Block a user