Merge pull request #57716 from rhcs-dashboard/add-rgw-multisite-wizard-api

mgr/dashboard: add api for rgw multisite replication wizard

Reviewed-by: Nizamudeen A <nia@redhat.com>
This commit is contained in:
Aashish Sharma 2024-07-04 17:35:00 +05:30 committed by GitHub
commit 12c77525ff
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 392 additions and 95 deletions

View File

@ -5,6 +5,7 @@ import json
import re
import tempfile
import time
from typing import Any, Dict
from urllib.parse import urlparse
import requests
@ -230,7 +231,7 @@ class MultiCluster(RESTController):
@Endpoint('PUT')
@UpdatePermission
def set_config(self, config: object):
def set_config(self, config: Dict[str, Any]):
multicluster_config = self.load_multi_cluster_config()
multicluster_config.update({'current_url': config['url']})
multicluster_config.update({'current_user': config['user']})

View File

@ -15,6 +15,7 @@ from ..security import Permission, Scope
from ..services.auth import AuthManager, JwtManager
from ..services.ceph_service import CephService
from ..services.rgw_client import _SYNC_GROUP_ID, NoRgwDaemonsException, RgwClient, RgwMultisite
from ..services.service import RgwServiceManager
from ..tools import json_str_to_object, str_to_bool
from . import APIDoc, APIRouter, BaseController, CreatePermission, \
CRUDCollectionMethod, CRUDEndpoint, DeletePermission, Endpoint, \
@ -112,6 +113,27 @@ class RgwMultisiteStatus(RESTController):
secret_key)
return result
@RESTController.Collection(method='POST', path='/multisite-replications')
@allow_empty_body
# pylint: disable=W0102,W0613
def setup_multisite_replication(self, daemon_name=None, realm_name=None, zonegroup_name=None,
zonegroup_endpoints=None, zone_name=None, zone_endpoints=None,
username=None, cluster_fsid=None):
multisite_instance = RgwMultisite()
result = multisite_instance.setup_multisite_replication(realm_name, zonegroup_name,
zonegroup_endpoints, zone_name,
zone_endpoints, username,
cluster_fsid)
return result
@RESTController.Collection(method='PUT', path='/setup-rgw-credentials')
@allow_empty_body
# pylint: disable=W0102,W0613
def restart_rgw_daemons_and_set_credentials(self):
rgw_service_manager_instance = RgwServiceManager()
result = rgw_service_manager_instance.restart_rgw_daemons_and_set_credentials()
return result
@APIRouter('rgw/multisite', Scope.RGW)
@APIDoc("RGW Multisite Management API", "RgwMultisite")
@ -1075,11 +1097,9 @@ class RgwRealm(RESTController):
@UpdatePermission
@allow_empty_body
# pylint: disable=W0613
def import_realm_token(self, realm_token, zone_name, port, placement_spec):
def import_realm_token(self, realm_token, zone_name, port, placement_spec=None):
try:
multisite_instance = RgwMultisite()
result = CephService.import_realm_token(realm_token, zone_name, port, placement_spec)
multisite_instance.update_period()
return result
except NoRgwDaemonsException as e:
raise DashboardException(e, http_status_code=404, component='rgw')

View File

@ -32,7 +32,7 @@ from .grafana import push_local_dashboards
from .services import nvmeof_cli # noqa # pylint: disable=unused-import
from .services.auth import AuthManager, AuthManagerTool, JwtManager
from .services.exception import dashboard_exception_handler
from .services.rgw_client import configure_rgw_credentials
from .services.service import RgwServiceManager
from .services.sso import SSO_COMMANDS, handle_sso_command
from .settings import handle_option_command, options_command_list, options_schema_list
from .tools import NotificationQueue, RequestLoggingTool, TaskManager, \
@ -417,7 +417,8 @@ class Module(MgrModule, CherryPyConfig):
@CLIWriteCommand("dashboard set-rgw-credentials")
def set_rgw_credentials(self):
try:
configure_rgw_credentials()
rgw_service_manager = RgwServiceManager()
rgw_service_manager.configure_rgw_credentials()
except Exception as error:
return -errno.EINVAL, '', str(error)

View File

@ -11669,7 +11669,6 @@ paths:
- realm_token
- zone_name
- port
- placement_spec
type: object
responses:
'201':

View File

@ -130,11 +130,9 @@ class ServiceManager(ResourceManager):
service_ids = [service_ids]
completion_list = [
self.api.service_action('reload', service_type, service_name,
service_id)
for service_name, service_id in service_ids
self.api.service_action('restart', f'{service_type}.{service_id}')
for service_id in service_ids
]
self.api.orchestrator_wait(completion_list)
for c in completion_list:
raise_if_exception(c)

View File

@ -2,25 +2,31 @@
# pylint: disable=C0302
# pylint: disable=too-many-branches
# pylint: disable=too-many-lines
import ast
import ipaddress
import json
import logging
import os
import re
import time
import xml.etree.ElementTree as ET # noqa: N814
from enum import Enum
from subprocess import SubprocessError
from urllib.parse import urlparse
import requests
from mgr_util import build_url, name_to_config_section
from .. import mgr
from ..awsauth import S3Auth
from ..controllers.multi_cluster import MultiCluster
from ..exceptions import DashboardException
from ..rest_client import RequestException, RestClient
from ..settings import Settings
from ..tools import dict_contains_path, dict_get, json_str_to_object, str_to_bool
from .ceph_service import CephService
from .orchestrator import OrchClient
from .service import RgwServiceManager
try:
from typing import Any, Dict, List, Optional, Tuple, Union
@ -39,14 +45,6 @@ class NoRgwDaemonsException(Exception):
super().__init__('No RGW service is running.')
class NoCredentialsException(Exception):
def __init__(self):
super(NoCredentialsException, self).__init__(
'No RGW credentials found, '
'please consult the documentation on how to enable RGW for '
'the dashboard.')
class RgwAdminException(Exception):
pass
@ -216,78 +214,6 @@ def _parse_frontend_config(config) -> Tuple[int, bool]:
raise LookupError('Failed to determine RGW port from "{}"'.format(config))
def _parse_secrets(user: str, data: dict) -> Tuple[str, str]:
for key in data.get('keys', []):
if key.get('user') == user and data.get('system') in ['true', True]:
access_key = key.get('access_key')
secret_key = key.get('secret_key')
return access_key, secret_key
return '', ''
def _get_user_keys(user: str, realm: Optional[str] = None) -> Tuple[str, str]:
access_key = ''
secret_key = ''
rgw_user_info_cmd = ['user', 'info', '--uid', user]
cmd_realm_option = ['--rgw-realm', realm] if realm else []
if realm:
rgw_user_info_cmd += cmd_realm_option
try:
_, out, err = mgr.send_rgwadmin_command(rgw_user_info_cmd)
if out:
access_key, secret_key = _parse_secrets(user, out)
if not access_key:
rgw_create_user_cmd = [
'user', 'create',
'--uid', user,
'--display-name', 'Ceph Dashboard',
'--system',
] + cmd_realm_option
_, out, err = mgr.send_rgwadmin_command(rgw_create_user_cmd)
if out:
access_key, secret_key = _parse_secrets(user, out)
if not access_key:
logger.error('Unable to create rgw user "%s": %s', user, err)
except SubprocessError as error:
logger.exception(error)
return access_key, secret_key
def configure_rgw_credentials():
logger.info('Configuring dashboard RGW credentials')
user = 'dashboard'
realms = []
access_key = ''
secret_key = ''
try:
_, out, err = mgr.send_rgwadmin_command(['realm', 'list'])
if out:
realms = out.get('realms', [])
if err:
logger.error('Unable to list RGW realms: %s', err)
if realms:
realm_access_keys = {}
realm_secret_keys = {}
for realm in realms:
realm_access_key, realm_secret_key = _get_user_keys(user, realm)
if realm_access_key:
realm_access_keys[realm] = realm_access_key
realm_secret_keys[realm] = realm_secret_key
if realm_access_keys:
access_key = json.dumps(realm_access_keys)
secret_key = json.dumps(realm_secret_keys)
else:
access_key, secret_key = _get_user_keys(user)
assert access_key and secret_key
Settings.RGW_API_ACCESS_KEY = access_key
Settings.RGW_API_SECRET_KEY = secret_key
except (AssertionError, SubprocessError) as error:
logger.exception(error)
raise NoCredentialsException
# pylint: disable=R0904
class RgwClient(RestClient):
_host = None
@ -348,7 +274,8 @@ class RgwClient(RestClient):
# The API access key and secret key are mandatory for a minimal configuration.
if not (Settings.RGW_API_ACCESS_KEY and Settings.RGW_API_SECRET_KEY):
configure_rgw_credentials()
rgw_service_manager = RgwServiceManager()
rgw_service_manager.configure_rgw_credentials()
daemon_keys = RgwClient._daemons.keys()
if not daemon_name:
@ -1219,6 +1146,177 @@ class RgwMultisite:
except SubprocessError as error:
raise DashboardException(error, http_status_code=500, component='rgw')
def replace_hostname(self, endpoint, hostname_to_ip):
# Replace the hostname in the endpoint URL with its corresponding IP address.
parsed_url = urlparse(endpoint)
hostname = parsed_url.hostname
if hostname in hostname_to_ip:
return endpoint.replace(hostname, hostname_to_ip[hostname])
return endpoint
def setup_multisite_replication(self, realm_name: str, zonegroup_name: str,
zonegroup_endpoints: str, zone_name: str,
zone_endpoints: str, username: str,
cluster_fsid: Optional[str] = None):
# Set up multisite replication for Ceph RGW.
logger.info("Starting multisite replication setup")
orch = OrchClient.instance()
def get_updated_endpoints(endpoints):
# Update endpoint URLs by replacing hostnames with IP addresses.
logger.debug("Updating endpoints: %s", endpoints)
try:
hostname_to_ip = {host['hostname']: host['addr'] for host in (h.to_json() for h in orch.hosts.list())} # noqa E501 # pylint: disable=line-too-long
updated_endpoints = [self.replace_hostname(endpoint, hostname_to_ip) for endpoint in endpoints.split(',')] # noqa E501 # pylint: disable=line-too-long
logger.debug("Updated endpoints: %s", updated_endpoints)
return updated_endpoints
except Exception as e:
logger.error("Failed to update endpoints: %s", e)
raise
zonegroup_ip_url = ','.join(get_updated_endpoints(zonegroup_endpoints))
zone_ip_url = ','.join(get_updated_endpoints(zone_endpoints))
try:
# Create the realm and zonegroup
logger.info("Creating realm: %s", realm_name)
self.create_realm(realm_name=realm_name, default=True)
logger.info("Creating zonegroup: %s", zonegroup_name)
self.create_zonegroup(realm_name=realm_name, zonegroup_name=zonegroup_name,
default=True, master=True, endpoints=zonegroup_ip_url)
except Exception as e:
logger.error("Failed to create realm or zonegroup: %s", e)
raise
try:
# Create the zone and system user, then modify the zone with user credentials
logger.info("Creating zone: %s", zone_name)
if self.create_zone(zone_name=zone_name, zonegroup_name=zonegroup_name,
default=True, master=True, endpoints=zone_ip_url,
access_key=None, secret_key=None):
logger.info("Creating system user: %s", username)
user_details = self.create_system_user(username, zone_name)
if user_details:
keys = user_details['keys'][0]
logger.info("Modifying zone with user credentials: %s", username)
self.modify_zone(zone_name=zone_name, zonegroup_name=zonegroup_name,
default='true', master='true', endpoints=zone_ip_url,
access_key=keys['access_key'],
secret_key=keys['secret_key'])
except Exception as e:
logger.error("Failed to create zone or system user: %s", e)
raise
try:
# Restart RGW daemons and set credentials
logger.info("Restarting RGW daemons and setting credentials")
rgw_service_manager = RgwServiceManager()
rgw_service_manager.restart_rgw_daemons_and_set_credentials()
except Exception as e:
logger.error("Failed to restart RGW daemons: %s", e)
raise
try:
# Get realm tokens and import to another cluster if specified
logger.info("Getting realm tokens")
realm_token_info = CephService.get_realm_tokens()
if cluster_fsid and realm_token_info:
logger.info("Importing realm token to cluster: %s", cluster_fsid)
self.import_realm_token_to_cluster(cluster_fsid, realm_name,
realm_token_info, username)
except Exception as e:
logger.error("Failed to get realm tokens or import to cluster: %s", e)
raise
logger.info("Multisite replication setup completed")
return realm_token_info
def import_realm_token_to_cluster(self, cluster_fsid, realm_name, realm_token_info, username):
logger.info("Importing realm token to cluster: %s", cluster_fsid)
try:
for realm_token in realm_token_info:
if realm_token['realm'] == realm_name:
realm_export_token = realm_token['token']
break
else:
raise ValueError(f"Realm {realm_name} not found in realm tokens")
multi_cluster_config_str = str(mgr.get_module_option_ex('dashboard', 'MULTICLUSTER_CONFIG')) # noqa E501 # pylint: disable=line-too-long
multi_cluster_config = ast.literal_eval(multi_cluster_config_str)
for fsid, clusters in multi_cluster_config['config'].items():
if fsid == cluster_fsid:
for cluster_info in clusters:
cluster_token = cluster_info.get('token')
cluster_url = cluster_info.get('url')
break
else:
raise ValueError(f"No cluster token found for fsid: {cluster_fsid}")
break
else:
raise ValueError(f"Cluster fsid {cluster_fsid} not found in multi-cluster config")
if cluster_token:
placement_spec: Dict[str, Dict] = {"placement": {}}
payload = {
'realm_token': realm_export_token,
'zone_name': 'new_replicated_zone',
'port': 81,
'placement_spec': placement_spec
}
if not cluster_url.endswith('/'):
cluster_url += '/'
path = 'api/rgw/realm/import_realm_token'
try:
multi_cluster_instance = MultiCluster()
# pylint: disable=protected-access
response = multi_cluster_instance._proxy(method='POST', base_url=cluster_url,
path=path, payload=payload,
token=cluster_token)
logger.info("Successfully imported realm token to cluster: %s", cluster_fsid)
self.check_user_in_second_cluster(cluster_url, cluster_token, username)
return response
except requests.RequestException as e:
logger.error("Could not reach %s: %s", cluster_url, e)
raise DashboardException(f"Could not reach {cluster_url}: {e}",
http_status_code=404, component='dashboard')
except json.JSONDecodeError as e:
logger.error("Error parsing Dashboard API response: %s", e.msg)
raise DashboardException(f"Error parsing Dashboard API response: {e.msg}",
component='dashboard')
except Exception as e:
logger.error("Failed to import realm token to cluster: %s", e)
raise
def check_user_in_second_cluster(self, cluster_url, cluster_token, username):
logger.info("Checking for user %s in the second cluster", username)
path = 'api/rgw/zone/get_user_list?zoneName=new_replicated_zone'
user_found = False
start_time = time.time()
while not user_found:
if time.time() - start_time > 120: # Timeout after 2 minutes
logger.error("Timeout reached while waiting for user %s to appear \
in the second cluster", username)
raise DashboardException(code='user_replication_timeout',
msg="Timeout reached while waiting for \
user %s to appear in the second cluster." % username)
try:
multi_cluster_instance = MultiCluster()
# pylint: disable=protected-access
user_content = multi_cluster_instance._proxy(method='GET', base_url=cluster_url,
path=path, token=cluster_token)
logger.info("User content in the second cluster: %s", user_content)
for user in user_content:
if user['user_id'] == username:
user_found = True
logger.info("User %s found in the second cluster", username)
# pylint: disable=protected-access
restart_daemons_content = multi_cluster_instance._proxy(method='PUT', base_url=cluster_url, # noqa E501 # pylint: disable=line-too-long
path='ui-api/rgw/multisite/setup-rgw-credentials', # noqa E501 # pylint: disable=line-too-long
token=cluster_token) # noqa E501 # pylint: disable=line-too-long
logger.info("Restarted RGW daemons in the second cluster: %s", restart_daemons_content) # noqa E501 # pylint: disable=line-too-long
break
except requests.RequestException as e:
logger.error("Error checking user in the second cluster: %s", e)
logger.info("User %s not found yet, retrying in 5 seconds", username)
time.sleep(5)
def create_realm(self, realm_name: str, default: bool):
rgw_realm_create_cmd = ['realm', 'create']
cmd_create_realm_options = ['--rgw-realm', realm_name]

View File

@ -0,0 +1,180 @@
import json
import logging
import time
from subprocess import SubprocessError
try:
from typing import Optional, Tuple
except ImportError:
pass # For typing only
from .. import mgr
from ..exceptions import DashboardException
from ..settings import Settings
from .orchestrator import OrchClient
logger = logging.getLogger('service')
class NoCredentialsException(Exception):
def __init__(self):
super(NoCredentialsException, self).__init__(
'No RGW credentials found, '
'please consult the documentation on how to enable RGW for '
'the dashboard.')
def verify_service_restart(service_type: str, service_id: str):
orch = OrchClient.instance()
service_name = f'{service_type}.{service_id}'
logger.info("Getting initial service info for: %s", service_name)
info = orch.services.get(service_name)[0].to_dict()
last_refreshed = info['status']['last_refresh']
logger.info("Reloading service: %s", service_name)
orch.services.reload(service_type, service_id)
logger.info("Waiting for service refresh: %s", service_name)
wait_for_refresh(orch, service_name, last_refreshed)
logger.info("Checking daemon status for: %s", service_name)
daemon_status = wait_for_daemon_to_start(orch, service_name)
return daemon_status
def wait_for_refresh(orch, service_name, last_refreshed):
orch = OrchClient.instance()
logger.info("Waiting for service %s to refresh", service_name)
while True:
updated_info = orch.services.get(service_name)[0].to_dict()
if updated_info['status']['last_refresh'] != last_refreshed:
logger.info("Service %s refreshed", service_name)
break
def wait_for_daemon_to_start(orch, service_name):
orch = OrchClient.instance()
start_time = time.time()
logger.info("Waiting for daemon %s to start", service_name)
while True:
daemons = [d.to_dict() for d in orch.services.list_daemons(service_name=service_name)]
all_running = True
for daemon in daemons:
daemon_state = daemon['status_desc']
logger.debug("Daemon %s state: %s", daemon['daemon_id'], daemon_state)
if daemon_state in ('unknown', 'error', 'stopped'):
logger.error("Failed to restart daemon %s for service %s. State is %s", daemon['daemon_id'], service_name, daemon_state) # noqa E501 # pylint: disable=line-too-long
raise DashboardException(
code='daemon_restart_failed',
msg="Failed to restart the daemon %s. Daemon state is %s." % (service_name, daemon_state) # noqa E501 # pylint: disable=line-too-long
)
if daemon_state != 'running':
all_running = False
if all_running:
logger.info("All daemons for service %s are running", service_name)
return True
if time.time() - start_time > 10:
logger.error("Timeout reached while waiting for daemon %s to start", service_name)
raise DashboardException(
code='daemon_restart_timeout',
msg="Timeout reached while waiting for daemon %s to start." % service_name
)
return False
class RgwServiceManager:
def restart_rgw_daemons_and_set_credentials(self):
# Restart RGW daemons and set credentials.
logger.info("Restarting RGW daemons and setting credentials")
orch = OrchClient.instance()
services, _ = orch.services.list(service_type='rgw', offset=0)
all_daemons_up = True
for service in services:
logger.info("Verifying service restart for: %s", service['service_id'])
daemons_up = verify_service_restart('rgw', service['service_id'])
if not daemons_up:
logger.error("Service %s restart verification failed", service['service_id'])
all_daemons_up = False
if all_daemons_up:
logger.info("All daemons are up, configuring RGW credentials")
self.configure_rgw_credentials()
else:
logger.error("Not all daemons are up, skipping RGW credentials configuration")
def _parse_secrets(self, user: str, data: dict) -> Tuple[str, str]:
for key in data.get('keys', []):
if key.get('user') == user and data.get('system') in ['true', True]:
access_key = key.get('access_key')
secret_key = key.get('secret_key')
return access_key, secret_key
return '', ''
def _get_user_keys(self, user: str, realm: Optional[str] = None) -> Tuple[str, str]:
access_key = ''
secret_key = ''
rgw_user_info_cmd = ['user', 'info', '--uid', user]
cmd_realm_option = ['--rgw-realm', realm] if realm else []
if realm:
rgw_user_info_cmd += cmd_realm_option
try:
_, out, err = mgr.send_rgwadmin_command(rgw_user_info_cmd)
if out:
access_key, secret_key = self._parse_secrets(user, out)
if not access_key:
rgw_create_user_cmd = [
'user', 'create',
'--uid', user,
'--display-name', 'Ceph Dashboard',
'--system',
] + cmd_realm_option
_, out, err = mgr.send_rgwadmin_command(rgw_create_user_cmd)
if out:
access_key, secret_key = self._parse_secrets(user, out)
if not access_key:
logger.error('Unable to create rgw user "%s": %s', user, err)
except SubprocessError as error:
logger.exception(error)
return access_key, secret_key
def configure_rgw_credentials(self):
logger.info('Configuring dashboard RGW credentials')
user = 'dashboard'
realms = []
access_key = ''
secret_key = ''
try:
_, out, err = mgr.send_rgwadmin_command(['realm', 'list'])
if out:
realms = out.get('realms', [])
if err:
logger.error('Unable to list RGW realms: %s', err)
if realms:
realm_access_keys = {}
realm_secret_keys = {}
for realm in realms:
realm_access_key, realm_secret_key = self._get_user_keys(user, realm)
if realm_access_key:
realm_access_keys[realm] = realm_access_key
realm_secret_keys[realm] = realm_secret_key
if realm_access_keys:
access_key = json.dumps(realm_access_keys)
secret_key = json.dumps(realm_secret_keys)
else:
access_key, secret_key = self._get_user_keys(user)
assert access_key and secret_key
Settings.RGW_API_ACCESS_KEY = access_key
Settings.RGW_API_SECRET_KEY = secret_key
except (AssertionError, SubprocessError) as error:
logger.exception(error)
raise NoCredentialsException

View File

@ -6,8 +6,8 @@ from unittest.mock import Mock, patch
from .. import mgr
from ..exceptions import DashboardException
from ..services.rgw_client import NoCredentialsException, \
NoRgwDaemonsException, RgwClient, _parse_frontend_config
from ..services.rgw_client import NoRgwDaemonsException, RgwClient, _parse_frontend_config
from ..services.service import NoCredentialsException
from ..settings import Settings
from ..tests import CLICommandTestMixin, RgwStub