Merge PR #43101 into master

* refs/pull/43101/head:
	mgr/rook: implement apply rbd-mirror

Reviewed-by: Juan Miguel Olmo <jolmomar@redhat.com>
This commit is contained in:
Sage Weil 2021-11-01 15:27:13 -04:00
commit 38b6a8e8d0
4 changed files with 74 additions and 1 deletions

View File

@ -8,3 +8,4 @@ tasks:
- ceph orch device ls
- ceph orch apply rgw foo
- ceph orch apply mds foo
- ceph orch apply rbd-mirror

View File

@ -651,7 +651,7 @@ def task(ctx, config):
if ret.exitstatus == 0:
r = json.loads(ret.stdout.getvalue().decode('utf-8'))
for service in r:
if service['service_type'] in ['rgw', 'mds', 'nfs']:
if service['service_type'] in ['rgw', 'mds', 'nfs', 'rbd-mirror']:
_shell(ctx, config, ['ceph', 'orch', 'rm', service['service_name']])
to_remove.append(service['service_name'])
with safe_while(sleep=10, tries=90, action="waiting for service removal") as proceed:

View File

@ -355,6 +355,26 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
last_refresh=now,
running= sum(osd.status.phase == 'Running' for osd in all_osds)
)
if service_type == 'rbd-mirror' or service_type is None:
# rbd-mirrors
all_mirrors = self.rook_cluster.get_resource("cephrbdmirrors")
for mirror in all_mirrors:
logging.warn(mirror)
mirror_name = mirror['metadata']['name']
svc = 'rbd-mirror.' + mirror_name
if svc in spec:
continue
spec[svc] = orchestrator.ServiceDescription(
spec=ServiceSpec(
service_id=mirror_name,
service_type="rbd-mirror",
placement=PlacementSpec(count=1),
),
size=1,
last_refresh=now,
)
for dd in self._list_daemons():
if dd.service_name() not in spec:
continue
@ -444,6 +464,8 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
@handle_orch_error
def remove_service(self, service_name: str) -> str:
if service_name == 'rbd-mirror':
return self.rook_cluster.rm_service('cephrbdmirrors', 'default-rbd-mirror')
service_type, service_name = service_name.split('.', 1)
if service_type == 'mds':
return self.rook_cluster.rm_service('cephfilesystems', service_name)
@ -451,6 +473,8 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
return self.rook_cluster.rm_service('cephobjectstores', service_name)
elif service_type == 'nfs':
return self.rook_cluster.rm_service('cephnfses', service_name)
elif service_type == 'rbd-mirror':
return self.rook_cluster.rm_service('cephrbdmirrors', service_name)
else:
raise orchestrator.OrchestratorError(f'Service type {service_type} not supported')
@ -470,6 +494,13 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
return self.rook_cluster.update_mon_count(spec.placement.count)
def apply_rbd_mirror(self, spec: ServiceSpec) -> OrchResult[str]:
try:
self.rook_cluster.rbd_mirror(spec)
return OrchResult("Success")
except Exception as e:
return OrchResult(None, e)
@handle_orch_error
def apply_mds(self, spec):
# type: (ServiceSpec) -> str

View File

@ -43,6 +43,7 @@ from .rook_client.ceph import cephfilesystem as cfs
from .rook_client.ceph import cephnfs as cnfs
from .rook_client.ceph import cephobjectstore as cos
from .rook_client.ceph import cephcluster as ccl
from .rook_client.ceph import cephrbdmirror as crbdm
from .rook_client._helper import CrdClass
import orchestrator
@ -1212,6 +1213,46 @@ class RookCluster(object):
)
self.batchV1_api.create_namespaced_job('rook-ceph', body)
def rbd_mirror(self, spec: ServiceSpec) -> None:
service_id = spec.service_id or "default-rbd-mirror"
all_hosts = self.get_hosts()
def _create_rbd_mirror() -> crbdm.CephRBDMirror:
return crbdm.CephRBDMirror(
apiVersion=self.rook_env.api_name,
metadata=dict(
name=service_id,
namespace=self.rook_env.namespace,
),
spec=crbdm.Spec(
count=spec.placement.count or 1,
placement=crbdm.Placement(
nodeAffinity=crbdm.NodeAffinity(
requiredDuringSchedulingIgnoredDuringExecution=crbdm.RequiredDuringSchedulingIgnoredDuringExecution(
nodeSelectorTerms=crbdm.NodeSelectorTermsList(
[
placement_spec_to_node_selector(spec.placement, all_hosts)
]
)
)
)
)
)
)
def _update_rbd_mirror(new: crbdm.CephRBDMirror) -> crbdm.CephRBDMirror:
new.spec.count = spec.placement.count or 1
new.spec.placement = crbdm.Placement(
nodeAffinity=crbdm.NodeAffinity(
requiredDuringSchedulingIgnoredDuringExecution=crbdm.RequiredDuringSchedulingIgnoredDuringExecution(
nodeSelectorTerms=crbdm.NodeSelectorTermsList(
[
placement_spec_to_node_selector(spec.placement, all_hosts)
]
)
)
)
)
return new
self._create_or_patch(crbdm.CephRBDMirror, 'cephrbdmirrors', service_id, _update_rbd_mirror, _create_rbd_mirror)
def _patch(self, crd: Type, crd_name: str, cr_name: str, func: Callable[[CrdClassT, CrdClassT], CrdClassT]) -> str:
current_json = self.rook_api_get(
"{}/{}".format(crd_name, cr_name)