Merge pull request #28468 from jmolmo/ansible_orchestrator

mgr/ansible: RGW service

Reviewed-by: Ernesto Puerta <epuertat@redhat.com>
Reviewed-by: Sebastian Wagner <sebastian.wagner@suse.com>
This commit is contained in:
Sebastian Wagner 2019-07-17 17:09:25 +02:00 committed by GitHub
commit 36d4408e85
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 437 additions and 21 deletions

View File

@ -5,6 +5,7 @@ Client module to interact with the Ansible Runner Service
import json
import re
from functools import wraps
import collections
import requests
@ -13,6 +14,8 @@ API_URL = "api"
PLAYBOOK_EXEC_URL = "api/v1/playbooks"
PLAYBOOK_EVENTS = "api/v1/jobs/%s/events"
EVENT_DATA_URL = "api/v1/jobs/%s/events/%s"
URL_MANAGE_GROUP = "api/v1/groups/{group_name}"
URL_ADD_RM_HOSTS = "api/v1/hosts/{host_name}/groups/{inventory_group}"
class AnsibleRunnerServiceError(Exception):
"""Generic Ansible Runner Service Exception"""
@ -308,3 +311,202 @@ class Client(object):
"""
# TODO
raise NotImplementedError("TODO")
def add_hosts_to_group(self, hosts, group):
""" Add one or more hosts to an Ansible inventory group
:host : host to add
:group: Ansible inventory group where the hosts will be included
:return : Nothing
:raises : AnsibleRunnerServiceError if not possible to complete
the operation
"""
url_group = URL_MANAGE_GROUP.format(group_name=group)
# Get/Create the group
response = self.http_get(url_group)
if response.status_code == 404:
# Create the new group
response = self.http_post(url_group, "", {})
if response.status_code != 200:
raise AnsibleRunnerServiceError("Error when trying to "\
"create group:{}".format(group))
hosts_in_group = []
else:
hosts_in_group = json.loads(response.text)["data"]["members"]
# Here we have the group in the inventory. Add the hosts
for host in hosts:
if host not in hosts_in_group:
add_url = URL_ADD_RM_HOSTS.format(host_name=host,
inventory_group=group)
response = self.http_post(add_url, "", {})
if response.status_code != 200:
raise AnsibleRunnerServiceError("Error when trying to "\
"include host '{}' in group"\
" '{}'".format(host, group))
def remove_hosts_from_group(self, group, hosts):
""" Remove all the hosts from group, it also removes the group itself if
it is empty
: group : Group name (str)
: hosts : List of hosts to remove
"""
url_group = URL_MANAGE_GROUP.format(group_name=group)
response = self.http_get(url_group)
# Group not found is OK!
if response.status_code == 404:
return
# Once we have the group, we remove the hosts required
if response.status_code == 200:
hosts_in_group = json.loads(response.text)["data"]["members"]
# Delete the hosts (it does not matter if the host does not exist)
for host in hosts:
if host in hosts_in_group:
url_host = URL_ADD_RM_HOSTS.format(host_name=host,
inventory_group=group)
response = self.http_delete(url_host)
hosts_in_group.remove(host)
# Delete the group if no hosts in it
if not hosts_in_group:
response = self.http_delete(url_group)
def get_hosts_in_group(self, group):
""" Return the list of hosts in and inventory group
: group : Group name (str)
"""
url_group = URL_MANAGE_GROUP.format(group_name=group)
response = self.http_get(url_group)
if response.status_code == 404:
raise AnsibleRunnerServiceError("Group {} not found in Ansible"\
" inventory".format(group))
return json.loads(response.text)["data"]["members"]
class InventoryGroup(collections.MutableSet):
""" Manages an Ansible Inventory Group
"""
def __init__(self, group_name, ars_client):
"""Init the group_name attribute and
Create the inventory group if it does not exist
: group_name : Name of the group in the Ansible Inventory
: returns : Nothing
"""
self.elements = set()
self.group_name = group_name
self.url_group = URL_MANAGE_GROUP.format(group_name=self.group_name)
self.created = False
self.ars_client = ars_client
# Get/Create the group
response = self.ars_client.http_get(self.url_group)
if response.status_code == 404:
return
# get members if the group exists previously
self.created = True
self.elements.update(json.loads(response.text)["data"]["members"])
def __contains__(self, host):
""" Check if the host is in the group
: host: Check if hosts is in Ansible Inventory Group
"""
return host in self.elements
def __iter__(self):
return iter(self.elements)
def __len__(self):
return len(self.elements)
def add(self, value):
""" Add a new host to the group
Create the Ansible Inventory group if it does not exist
: value : The host(string) to add
"""
if not self.created:
self.__create_group__()
add_url = URL_ADD_RM_HOSTS.format(host_name=value,
inventory_group=self.group_name)
response = self.ars_client.http_post(add_url, "", {})
if response.status_code != 200:
raise AnsibleRunnerServiceError("Error when trying to "\
"include host '{}' in group"\
" '{}'".format(value,
self.group_name))
# Refresh members
response = self.ars_client.http_get(self.url_group)
self.elements.update(json.loads(response.text)["data"]["members"])
def discard(self, value):
"""Remove a host from the group.
Remove the group from the Ansible inventory if it is empty
: value : The host(string) to remove
"""
url_host = URL_ADD_RM_HOSTS.format(host_name=value,
inventory_group=self.group_name)
response = self.ars_client.http_delete(url_host)
# Refresh members
response = self.ars_client.http_get(self.url_group)
self.elements.update(json.loads(response.text)["data"]["members"])
# Delete the group if no members
if not self.elements:
response = self.ars_client.http_delete(self.url_group)
def update(self, iterable=None):
""" Update the hosts in the group with the iterable items
:iterable : And iterable object with hosts names
"""
for item in iterable:
self.add(item)
def clean(self, iterable=None):
""" Remove from the group the hosts included in iterable
If not provided an iterable, all the hosts are removed from the group
:iterable : And iterable object with hosts names
"""
if not iterable:
iterable = self.elements
for item in iterable:
self.discard(item)
def __create_group__(self):
""" Create the Ansible inventory group
"""
response = self.ars_client.http_post(self.url_group, "", {})
if response.status_code != 200:
raise AnsibleRunnerServiceError("Error when trying to "\
"create group:{}".format(
self.group_name))
self.created = True
self.elements = {}

View File

@ -17,7 +17,8 @@ from mgr_module import MgrModule, Option, CLIWriteCommand
import orchestrator
from .ansible_runner_svc import Client, PlayBookExecution, ExecutionStatusCode,\
AnsibleRunnerServiceError
AnsibleRunnerServiceError, InventoryGroup,\
URL_MANAGE_GROUP, URL_ADD_RM_HOSTS
from .output_wizards import ProcessInventory, ProcessPlaybookResult, \
ProcessHostsList
@ -38,17 +39,22 @@ ADD_OSD_PLAYBOOK = "add-osd.yml"
# Used in the remove_osds method
REMOVE_OSD_PLAYBOOK = "shrink-osd.yml"
# General multi purpose cluster playbook
SITE_PLAYBOOK = "site.yml"
# General multi-purpose playbook for removing daemons
PURGE_PLAYBOOK = "purge-cluster.yml"
# Default name for the inventory group for hosts managed by the Orchestrator
ORCHESTRATOR_GROUP = "orchestrator"
# URLs for Ansible Runner Operations
# Add or remove host in one group
URL_ADD_RM_HOSTS = "api/v1/hosts/{host_name}/groups/{inventory_group}"
# Retrieve the groups where the host is included in.
URL_GET_HOST_GROUPS = "api/v1/hosts/{host_name}"
# Manage groups
URL_MANAGE_GROUP = "api/v1/groups/{group_name}"
# URLs for Ansible Runner Operations
URL_GET_HOSTS = "api/v1/hosts"
@ -190,6 +196,10 @@ class PlaybookOperation(AnsibleReadOperation):
# An aditional filter of result events based in the event
self.event_filter_list = [""]
# A dict with groups and hosts to remove from inventory if operation is
# succesful. Ex: {"group1": ["host1"], "group2": ["host3", "host4"]}
self.clean_hosts_on_success = {}
# Playbook execution object
self.pb_execution = PlayBookExecution(client,
playbook,
@ -219,6 +229,10 @@ class PlaybookOperation(AnsibleReadOperation):
if self._is_complete:
self.update_result()
# Clean hosts if operation is succesful
if self._status == ExecutionStatusCode.SUCCESS:
self.clean_inventory()
return self._status
def execute_playbook(self):
@ -242,23 +256,31 @@ class PlaybookOperation(AnsibleReadOperation):
processed_result = []
if self._is_errored:
processed_result = self.pb_execution.get_result(["runner_on_failed",
raw_result = self.pb_execution.get_result(["runner_on_failed",
"runner_on_unreachable",
"runner_on_no_hosts",
"runner_on_async_failed",
"runner_item_on_failed"])
elif self._is_complete:
raw_result = self.pb_execution.get_result(self.event_filter_list)
if self.output_wizard:
processed_result = self.output_wizard.process(self.pb_execution.play_uuid,
raw_result)
else:
processed_result = raw_result
if self.output_wizard:
processed_result = self.output_wizard.process(self.pb_execution.play_uuid,
raw_result)
else:
processed_result = raw_result
self._result = processed_result
def clean_inventory(self):
""" Remove hosts from inventory groups
"""
for group, hosts in self.clean_hosts_on_success.items():
InventoryGroup(group, self.ar_client).clean(hosts)
del self.clean_hosts_on_success[group]
class AnsibleChangeOperation(orchestrator.WriteCompletion):
"""Operations that changes the "cluster" state
@ -673,6 +695,115 @@ class Module(MgrModule, orchestrator.Orchestrator):
return ARSChangeOperation(self.ar_client, self.log, operations)
def add_stateless_service(self, service_type, spec):
""" Add a stateless service in the cluster
: service_type: Kind of service (nfs, rgw, mds)
: spec : an Orchestrator.StatelessServiceSpec object
: returns : Completion object
"""
# Check service_type is supported
if service_type not in ["rgw"]:
raise orchestrator.OrchestratorError(
"{} service not supported".format(service_type))
# Add the hosts to the inventory in the right group
hosts = spec.service_spec.hosts
if not hosts:
raise orchestrator.OrchestratorError("No hosts provided."\
"At least one destination host is needed to install the RGW "\
"service")
InventoryGroup("{}s".format(service_type), self.ar_client).update(hosts)
# Limit playbook execution to certain hosts
limited = ",".join(hosts)
# Add the settings for this service
extravars = vars(spec.service_spec)
# Group hosts by resource (used in rm ops)
if service_type == "rgw":
resource_group = "rgw_zone_{}".format(spec.service_spec.rgw_zone)
InventoryGroup(resource_group, self.ar_client).update(hosts)
# Execute the playbook to create the service
playbook_operation = PlaybookOperation(client=self.ar_client,
playbook=SITE_PLAYBOOK,
logger=self.log,
result_pattern="",
params=extravars,
querystr_dict={"limit": limited})
# Filter to get the result
playbook_operation.output_wizard = ProcessPlaybookResult(self.ar_client,
self.log)
playbook_operation.event_filter_list = ["playbook_on_stats"]
# Execute the playbook
self._launch_operation(playbook_operation)
return playbook_operation
def remove_stateless_service(self, service_type, id_resource):
""" Remove a stateles services providing <sv_id> resources
:svc_type : Kind of service (nfs, rgw, mds)
:id_resource : Id of the resource provided
<zone name> if service is RGW
...
: returns : Completion object
"""
# Check service_type is supported
if service_type not in ["rgw"]:
raise orchestrator.OrchestratorError(
"{} service not supported".format(service_type))
# Ansible Inventory group for the kind of service
group = "{}s".format(service_type)
# get the list of hosts where to remove the service
# (hosts in resource group)
if service_type == "rgw":
group_prefix = "rgw_zone_{}"
resource_group = group_prefix.format(id_resource)
hosts_list = list(InventoryGroup(resource_group, self.ar_client))
limited = ",".join(hosts_list)
# Avoid manual confirmation
extravars = {"ireallymeanit": "yes"}
# Execute the playbook to remove the service
playbook_operation = PlaybookOperation(client=self.ar_client,
playbook=PURGE_PLAYBOOK,
logger=self.log,
result_pattern="",
params=extravars,
querystr_dict={"limit": limited})
# Filter to get the result
playbook_operation.output_wizard = ProcessPlaybookResult(self.ar_client,
self.log)
playbook_operation.event_filter_list = ["playbook_on_stats"]
# Cleaning of inventory after a sucessful operation
clean_inventory = {}
clean_inventory[resource_group] = hosts_list
clean_inventory[group] = hosts_list
playbook_operation.clean_hosts_on_success = clean_inventory
# Execute the playbook
self.log.info("Removing service %s for resource %s", service_type,
id_resource)
self._launch_operation(playbook_operation)
return playbook_operation
def _launch_operation(self, ansible_operation):
"""Launch the operation and add the operation to the completion objects
ongoing

View File

@ -97,7 +97,6 @@ class ProcessPlaybookResult(OutputWizard):
"""
# Just making more readable the method
inventory_events = raw_result
result = ""
# Loop over the result events and request the data
@ -106,7 +105,6 @@ class ProcessPlaybookResult(OutputWizard):
(operation_id, event_key))
result += event_response.text
return result

View File

@ -8,6 +8,8 @@ import sys
import time
import fnmatch
import uuid
import string
import random
import datetime
import six
@ -440,7 +442,7 @@ class Orchestrator(object):
"""
raise NotImplementedError()
def remove_stateless_service(self, service_type, id_):
def remove_stateless_service(self, service_type, id_resource):
# type: (str, str) -> WriteCompletion
"""
Uninstalls an existing service from the cluster.
@ -742,6 +744,68 @@ class StatelessServiceSpec(object):
# some replicaset special sauce for autoscaling?
self.extended = {}
# Object with specific settings for the service
self.service_spec = None
class RGWSpec(object):
"""
Settings to configure a multisite Ceph RGW
"""
def __init__(self, hosts=None, rgw_multisite=True, rgw_zone="Default_Zone",
rgw_zonemaster=True, rgw_zonesecondary=False,
rgw_multisite_proto="http", rgw_frontend_port="8080",
rgw_zonegroup="Main", rgw_zone_user="zone.user",
rgw_realm="RGW_Realm", system_access_key=None,
system_secret_key=None):
self.hosts = hosts
self.rgw_multisite = rgw_multisite
self.rgw_zone = rgw_zone
self.rgw_zonemaster = rgw_zonemaster
self.rgw_zonesecondary = rgw_zonesecondary
self.rgw_multisite_proto = rgw_multisite_proto
self.rgw_frontend_port = rgw_frontend_port
if hosts:
self.rgw_multisite_endpoint_addr = hosts[0]
self.rgw_multisite_endpoints_list = ",".join(
["{}://{}:{}".format(self.rgw_multisite_proto,
host,
self.rgw_frontend_port) for host in hosts])
self.rgw_zonegroup = rgw_zonegroup
self.rgw_zone_user = rgw_zone_user
self.rgw_realm = rgw_realm
if system_access_key:
self.system_access_key = system_access_key
else:
self.system_access_key = self.genkey(20)
if system_secret_key:
self.system_secret_key = system_secret_key
else:
self.system_secret_key = self.genkey(40)
def genkey(self, nchars):
""" Returns a random string of nchars
:nchars : Length of the returned string
"""
return ''.join(random.choice(string.ascii_uppercase +
string.ascii_lowercase +
string.digits) for _ in range(nchars))
@classmethod
def from_json(self, json_rgw_spec):
"""
Initialize 'RGWSpec' object geting data from a json estructure
:param json_rgw_spec: A valid json string with a the RGW settings
"""
args = {k:v for k, v in json_rgw_spec.items()}
return RGWSpec(**args)
class InventoryFilter(object):
"""

View File

@ -224,7 +224,7 @@ Usage:
completion = self.add_stateless_service(svc_type, spec)
self._orchestrator_wait([completion])
orchestrator.raise_if_exception(completion)
return HandleCommandResult()
return HandleCommandResult(stdout=str(completion.result))
@_write_cli('orchestrator mds add',
"name=svc_arg,type=CephString",
@ -235,11 +235,32 @@ Usage:
return self._add_stateless_svc("mds", spec)
@_write_cli('orchestrator rgw add',
"name=svc_arg,type=CephString",
'Create an RGW service')
def _rgw_add(self, svc_arg):
'name=svc_arg,type=CephString,req=false',
'Create an RGW service. A complete <rgw_spec> can be provided'\
' using <-i> to customize completelly the RGW service')
def _rgw_add(self, svc_arg=None, inbuf=None):
"""
"""
usage = """
Usage:
ceph orchestrator rgw add -i <json_file>
ceph orchestrator rgw add <zone_name>
"""
if inbuf:
try:
rgw_spec = orchestrator.RGWSpec.from_json(json.loads(inbuf))
except ValueError as e:
msg = 'Failed to read JSON input: {}'.format(str(e)) + usage
return HandleCommandResult(-errno.EINVAL, stderr=msg)
elif svc_arg:
rgw_spec = orchestrator.RGWSpec()
rgw_spec.zone_name = svc_arg
spec = orchestrator.StatelessServiceSpec()
spec.name = svc_arg
spec.service_spec = rgw_spec
spec.name = rgw_spec.rgw_zone
return self._add_stateless_svc("rgw", spec)
@_write_cli('orchestrator nfs add',
@ -259,7 +280,7 @@ Usage:
completion = self.remove_stateless_service(svc_type, svc_id)
self._orchestrator_wait([completion])
orchestrator.raise_if_exception(completion)
return HandleCommandResult()
return HandleCommandResult(stdout=str(completion.result))
@_write_cli('orchestrator mds rm',
"name=svc_id,type=CephString",