1
0
mirror of https://github.com/ceph/ceph synced 2025-03-25 11:48:05 +00:00

cephadm: HA for RGW endpoints

Cephadm deploying keepalived and HAproxy for providing High availability for RGW endpoints

Fixes: https://tracker.ceph.com/issues/45116

Signed-off-by: Daniel-Pivonka <dpivonka@redhat.com>
Signed-off-by: Adam King <adking@redhat.com>
Signed-off-by: Juan Miguel Olmo Martínez <jolmomar@redhat.com>
This commit is contained in:
Daniel-Pivonka 2020-10-26 14:34:39 -04:00 committed by Adam King
parent 66cafc8312
commit a398fc239f
15 changed files with 1047 additions and 101 deletions
doc
src
cephadm
pybind/mgr
python-common/ceph/deployment

File diff suppressed because one or more lines are too long

After

(image error) Size: 28 KiB

View File

@ -177,7 +177,7 @@ Example::
When the parameter ``all-available-devices`` or a DriveGroup specification is used, a cephadm service is created.
This service guarantees that all available devices or devices included in the DriveGroup will be used for OSDs.
Note that the effect of ``--all-available-devices`` is persistent; that is, drives which are added to the system
Note that the effect of ``--all-available-devices`` is persistent; that is, drives which are added to the system
or become available (say, by zapping) after the command is complete will be automatically found and added to the cluster.
That is, after using::
@ -312,7 +312,7 @@ error if it doesn't know how to do this transition.
Update the number of monitor hosts::
ceph orch apply mon --placement=<placement> [--dry-run]
Where ``placement`` is a :ref:`orchestrator-cli-placement-spec`.
Each host can optionally specify a network for the monitor to listen on.
@ -320,7 +320,7 @@ Each host can optionally specify a network for the monitor to listen on.
Update the number of manager hosts::
ceph orch apply mgr --placement=<placement> [--dry-run]
Where ``placement`` is a :ref:`orchestrator-cli-placement-spec`.
..
@ -413,6 +413,174 @@ Service Commands::
ceph orch <start|stop|restart|redeploy|reconfig> <service_name>
.. _orchestrator-haproxy-service-spec:
High availability service for RGW
=================================
This service allows the user to create a high avalilability RGW service
providing a mimimun set of configuration options.
The orchestrator will deploy and configure automatically several HAProxy and
Keepalived containers to assure the continuity of the RGW service while the
Ceph cluster will have at least 1 RGW daemon running.
The next image explains graphically how this service works:
.. image:: ../images/HAProxy_for_RGW.svg
There are N hosts where the HA RGW service is deployed. This means that we have
an HAProxy and a keeplived daemon running in each of this hosts.
Keepalived is used to provide a "virtual IP" binded to the hosts. All RGW
clients use this "virtual IP" to connect with the RGW Service.
Each keeplived daemon is checking each few seconds what is the status of the
HAProxy daemon running in the same host. Also it is aware that the "master" keepalived
daemon will be running without problems.
If the "master" keepalived daemon or the Active HAproxy is not responding, one
of the keeplived daemons running in backup mode will be elected as master, and
the "virtual ip" will be moved to that node.
The active HAProxy also acts like a load balancer, distributing all RGW requests
between all the RGW daemons available.
**Prerequisites:**
* At least two RGW daemons running in the Ceph cluster
* Operating system prerequisites:
In order for the Keepalived service to forward network packets properly to the
real servers, each router node must have IP forwarding turned on in the kernel.
So it will be needed to set this system option::
net.ipv4.ip_forward = 1
Load balancing in HAProxy and Keepalived at the same time also requires the
ability to bind to an IP address that are nonlocal, meaning that it is not
assigned to a device on the local system. This allows a running load balancer
instance to bind to an IP that is not local for failover.
So it will be needed to set this system option::
net.ipv4.ip_nonlocal_bind = 1
Be sure to set properly these two options in the file ``/etc/sysctl.conf`` in
order to persist this values even if the hosts are restarted.
These configuration changes must be applied in all the hosts where the HAProxy for
RGW service is going to be deployed.
**Deploy of the high availability service for RGW**
Use the command::
ceph orch apply -i <service_spec_file>
**Service specification file:**
It is a yaml format file with the following properties:
.. code-block:: yaml
service_type: ha-rgw
service_id: haproxy_for_rgw
placement:
hosts:
- host1
- host2
- host3
spec:
virtual_ip_interface: <string> # ex: eth0
virtual_ip_address: <string>/<string> # ex: 192.168.20.1/24
frontend_port: <integer> # ex: 8080
ha_proxy_port: <integer> # ex: 1967
ha_proxy_stats_enabled: <boolean> # ex: true
ha_proxy_stats_user: <string> # ex: admin
ha_proxy_stats_password: <string> # ex: true
ha_proxy_enable_prometheus_exporter: <boolean> # ex: true
ha_proxy_monitor_uri: <string> # ex: /haproxy_health
keepalived_user: <string> # ex: admin
keepalived_password: <string> # ex: admin
ha_proxy_frontend_ssl_certificate: <optional string> ex:
[
"-----BEGIN CERTIFICATE-----",
"MIIDZTCCAk2gAwIBAgIUClb9dnseOsgJWAfhPQvrZw2MP2kwDQYJKoZIhvcNAQEL",
....
"-----END CERTIFICATE-----",
"-----BEGIN PRIVATE KEY-----",
....
"sCHaZTUevxb4h6dCEk1XdPr2O2GdjV0uQ++9bKahAy357ELT3zPE8yYqw7aUCyBO",
"aW5DSCo8DgfNOgycVL/rqcrc",
"-----END PRIVATE KEY-----"
]
ha_proxy_frontend_ssl_port: <optional integer> # ex: 8090
ha_proxy_ssl_dh_param: <optional integer> # ex: 1024
ha_proxy_ssl_ciphers: <optional string> # ex: ECDH+AESGCM:!MD5
ha_proxy_ssl_options: <optional string> # ex: no-sslv3
haproxy_container_image: <optional string> # ex: haproxy:2.4-dev3-alpine
keepalived_container_image: <optional string> # ex: arcts/keepalived:1.2.2
where the properties of this service specification are:
* ``service_type``
Mandatory and set to "ha-rgw"
* ``service_id``
The name of the service.
* ``placement hosts``
The hosts where it is desired to run the HA daemons. An HAProxy and a
Keepalived containers will be deployed in these hosts.
The RGW daemons can run in other different hosts or not.
* ``virtual_ip_interface``
The physical network interface where the virtual ip will be binded
* ``virtual_ip_address``
The virtual IP ( and network ) where the HA RGW service will be available.
All your RGW clients must point to this IP in order to use the HA RGW
service .
* ``frontend_port``
The port used to access the HA RGW service
* ``ha_proxy_port``
The port used by HAProxy containers
* ``ha_proxy_stats_enabled``
If it is desired to enable the statistics URL in HAProxy daemons
* ``ha_proxy_stats_user``
User needed to access the HAProxy statistics URL
* ``ha_proxy_stats_password``
The password needed to access the HAProxy statistics URL
* ``ha_proxy_enable_prometheus_exporter``
If it is desired to enable the Promethes exporter in HAProxy. This will
allow to consume RGW Service metrics from Grafana.
* ``ha_proxy_monitor_uri``:
To set the API endpoint where the health of HAProxy daemon is provided
* ``keepalived_user``
User needed to access keepalived daemons
* ``keepalived_password``:
The password needed to access keepalived daemons
* ``ha_proxy_frontend_ssl_certificate``:
SSl certificate. You must paste the content of your .pem file
* ``ha_proxy_frontend_ssl_port``:
The https port used by HAProxy containers
* ``ha_proxy_ssl_dh_param``:
Value used for the `tune.ssl.default-dh-param` setting in the HAProxy
config file
* ``ha_proxy_ssl_ciphers``:
Value used for the `ssl-default-bind-ciphers` setting in HAProxy config
file.
* ``ha_proxy_ssl_options``:
Value used for the `ssl-default-bind-options` setting in HAProxy config
file.
* ``haproxy_container_image``:
HAProxy image location used to pull the image
* ``keepalived_container_image``:
Keepalived image location used to pull the image
**Useful hints for the RGW Service:**
* Good to have at least 3 RGW daemons
* Use at least 3 hosts for the HAProxy for RGW service
* In each host an HAProxy and a Keepalived daemon will be deployed. These
daemons can be managed as systemd services
Deploying custom containers
===========================

View File

@ -218,7 +218,17 @@ class Monitoring(object):
} # type: ignore
##################################
def populate_files(config_dir, config_files, uid, gid):
# type: (str, Dict, int, int) -> None
"""create config files for different services"""
for fname in config_files:
config_file = os.path.join(config_dir, fname)
config_content = dict_get_join(config_files, fname)
logger.info('Write file: %s' % (config_file))
with open(config_file, 'w') as f:
os.fchown(f.fileno(), uid, gid)
os.fchmod(f.fileno(), 0o600)
f.write(config_content)
class NFSGanesha(object):
"""Defines a NFS-Ganesha container"""
@ -343,14 +353,7 @@ class NFSGanesha(object):
makedirs(config_dir, uid, gid, 0o755)
# populate files from the config-json
for fname in self.files:
config_file = os.path.join(config_dir, fname)
config_content = dict_get_join(self.files, fname)
logger.info('Write file: %s' % (config_file))
with open(config_file, 'w') as f:
os.fchown(f.fileno(), uid, gid)
os.fchmod(f.fileno(), 0o600)
f.write(config_content)
populate_files(config_dir, self.files, uid, gid)
# write the RGW keyring
if self.rgw:
@ -491,14 +494,7 @@ class CephIscsi(object):
makedirs(configfs_dir, uid, gid, 0o755)
# populate files from the config-json
for fname in self.files:
config_file = os.path.join(data_dir, fname)
config_content = dict_get_join(self.files, fname)
logger.info('Write file: %s' % (config_file))
with open(config_file, 'w') as f:
os.fchown(f.fileno(), uid, gid)
os.fchmod(f.fileno(), 0o600)
f.write(config_content)
populate_files(data_dir, self.files, uid, gid)
@staticmethod
def configfs_mount_umount(data_dir, mount=True):
@ -524,6 +520,163 @@ class CephIscsi(object):
##################################
class HAproxy(object):
"""Defines an HAproxy container"""
daemon_type = 'haproxy'
required_files = ['haproxy.cfg']
default_image = 'haproxy'
def __init__(self, fsid: str, daemon_id: Union[int, str],
config_json: Dict, image: str) -> None:
self.fsid = fsid
self.daemon_id = daemon_id
self.image = image
# config-json options
self.files = dict_get(config_json, 'files', {})
self.validate()
@classmethod
def init(cls, fsid: str, daemon_id: Union[int, str]) -> 'HAproxy':
return cls(fsid, daemon_id, get_parm(args.config_json), args.image)
def create_daemon_dirs(self, data_dir: str, uid: int, gid: int) -> None:
"""Create files under the container data dir"""
if not os.path.isdir(data_dir):
raise OSError('data_dir is not a directory: %s' % (data_dir))
# create additional directories in data dir for HAproxy to use
if not os.path.isdir(os.path.join(data_dir, 'haproxy')):
makedirs(os.path.join(data_dir, 'haproxy'), uid, gid, DATA_DIR_MODE)
data_dir = os.path.join(data_dir, 'haproxy')
populate_files(data_dir, self.files, uid, gid)
def get_daemon_args(self) -> List[str]:
return ['haproxy', '-f', '/var/lib/haproxy/haproxy.cfg']
def validate(self):
# type: () -> None
if not is_fsid(self.fsid):
raise Error('not an fsid: %s' % self.fsid)
if not self.daemon_id:
raise Error('invalid daemon_id: %s' % self.daemon_id)
if not self.image:
raise Error('invalid image: %s' % self.image)
# check for the required files
if self.required_files:
for fname in self.required_files:
if fname not in self.files:
raise Error('required file missing from config-json: %s' % fname)
def get_daemon_name(self):
# type: () -> str
return '%s.%s' % (self.daemon_type, self.daemon_id)
def get_container_name(self, desc=None):
# type: (Optional[str]) -> str
cname = 'ceph-%s-%s' % (self.fsid, self.get_daemon_name())
if desc:
cname = '%s-%s' % (cname, desc)
return cname
def extract_uid_gid_haproxy(self):
# better directory for this?
return extract_uid_gid(file_path='/var/lib')
@staticmethod
def get_container_mounts(data_dir: str) -> Dict[str, str]:
mounts = dict()
mounts[os.path.join(data_dir,'haproxy')] = '/var/lib/haproxy'
return mounts
##################################
class Keepalived(object):
"""Defines an Keepalived container"""
daemon_type = 'keepalived'
required_files = ['keepalived.conf']
default_image = 'arcts/keepalived'
def __init__(self, fsid: str, daemon_id: Union[int, str],
config_json: Dict, image: str) -> None:
self.fsid = fsid
self.daemon_id = daemon_id
self.image = image
# config-json options
self.files = dict_get(config_json, 'files', {})
self.validate()
@classmethod
def init(cls, fsid: str, daemon_id: Union[int, str]) -> 'Keepalived':
return cls(fsid, daemon_id, get_parm(args.config_json), args.image)
def create_daemon_dirs(self, data_dir: str, uid: int, gid: int) -> None:
"""Create files under the container data dir"""
if not os.path.isdir(data_dir):
raise OSError('data_dir is not a directory: %s' % (data_dir))
# create additional directories in data dir for keepalived to use
if not os.path.isdir(os.path.join(data_dir, 'keepalived')):
makedirs(os.path.join(data_dir, 'keepalived'), uid, gid, DATA_DIR_MODE)
# populate files from the config-json
populate_files(data_dir, self.files, uid, gid)
def validate(self):
# type: () -> None
if not is_fsid(self.fsid):
raise Error('not an fsid: %s' % self.fsid)
if not self.daemon_id:
raise Error('invalid daemon_id: %s' % self.daemon_id)
if not self.image:
raise Error('invalid image: %s' % self.image)
# check for the required files
if self.required_files:
for fname in self.required_files:
if fname not in self.files:
raise Error('required file missing from config-json: %s' % fname)
def get_daemon_name(self):
# type: () -> str
return '%s.%s' % (self.daemon_type, self.daemon_id)
def get_container_name(self, desc=None):
# type: (Optional[str]) -> str
cname = 'ceph-%s-%s' % (self.fsid, self.get_daemon_name())
if desc:
cname = '%s-%s' % (cname, desc)
return cname
@staticmethod
def get_container_envs():
# type: () -> List[str]
envs = [
'KEEPALIVED_AUTOCONF=false',
'KEEPALIVED_CONF=/etc/keepalived/keepalived.conf',
'KEEPALIVED_CMD="/usr/sbin/keepalived -n -l -f /etc/keepalived/keepalived.conf"',
'KEEPALIVED_DEBUG=false'
]
return envs
def extract_uid_gid_keepalived(self):
# better directory for this?
return extract_uid_gid(file_path='/var/lib')
@staticmethod
def get_container_mounts(data_dir: str) -> Dict[str, str]:
mounts = dict()
mounts[os.path.join(data_dir,'keepalived.conf')] = '/etc/keepalived/keepalived.conf'
return mounts
##################################
class CustomContainer(object):
"""Defines a custom container"""
@ -685,6 +838,8 @@ def get_supported_daemons():
supported_daemons.append(CephIscsi.daemon_type)
supported_daemons.append(CustomContainer.daemon_type)
supported_daemons.append(CephadmDaemon.daemon_type)
supported_daemons.append(HAproxy.daemon_type)
supported_daemons.append(Keepalived.daemon_type)
assert len(supported_daemons) == len(set(supported_daemons))
return supported_daemons
@ -1401,6 +1556,10 @@ def default_image(func):
type_ = args.name.split('.', 1)[0]
if type_ in Monitoring.components:
args.image = Monitoring.components[type_]['image']
if type_ == 'haproxy':
args.image = HAproxy.default_image
if type_ == 'keepalived':
args.image = Keepalived.default_image
if not args.image:
args.image = os.environ.get('CEPHADM_IMAGE')
if not args.image:
@ -1749,6 +1908,9 @@ def get_daemon_args(fsid, daemon_type, daemon_id):
elif daemon_type == NFSGanesha.daemon_type:
nfs_ganesha = NFSGanesha.init(fsid, daemon_id)
r += nfs_ganesha.get_daemon_args()
elif daemon_type == HAproxy.daemon_type:
haproxy = HAproxy.init(fsid, daemon_id)
r += haproxy.get_daemon_args()
elif daemon_type == CustomContainer.daemon_type:
cc = CustomContainer.init(fsid, daemon_id)
r.extend(cc.get_daemon_args())
@ -1818,6 +1980,14 @@ def create_daemon_dirs(fsid, daemon_type, daemon_id, uid, gid,
ceph_iscsi = CephIscsi.init(fsid, daemon_id)
ceph_iscsi.create_daemon_dirs(data_dir, uid, gid)
elif daemon_type == HAproxy.daemon_type:
haproxy = HAproxy.init(fsid, daemon_id)
haproxy.create_daemon_dirs(data_dir, uid, gid)
elif daemon_type == Keepalived.daemon_type:
keepalived = Keepalived.init(fsid, daemon_id)
keepalived.create_daemon_dirs(data_dir, uid, gid)
elif daemon_type == CustomContainer.daemon_type:
cc = CustomContainer.init(fsid, daemon_id)
cc.create_daemon_dirs(data_dir, uid, gid)
@ -1972,12 +2142,22 @@ def get_container_mounts(fsid, daemon_type, daemon_id,
nfs_ganesha = NFSGanesha.init(fsid, daemon_id)
mounts.update(nfs_ganesha.get_container_mounts(data_dir))
if daemon_type == HAproxy.daemon_type:
assert daemon_id
data_dir = get_data_dir(fsid, daemon_type, daemon_id)
mounts.update(HAproxy.get_container_mounts(data_dir))
if daemon_type == CephIscsi.daemon_type:
assert daemon_id
data_dir = get_data_dir(fsid, daemon_type, daemon_id)
log_dir = get_log_dir(fsid)
mounts.update(CephIscsi.get_container_mounts(data_dir, log_dir))
if daemon_type == Keepalived.daemon_type:
assert daemon_id
data_dir = get_data_dir(fsid, daemon_type, daemon_id)
mounts.update(Keepalived.get_container_mounts(data_dir))
if daemon_type == CustomContainer.daemon_type:
assert daemon_id
cc = CustomContainer.init(fsid, daemon_id)
@ -2020,6 +2200,12 @@ def get_container(fsid: str, daemon_type: str, daemon_id: Union[int, str],
entrypoint = NFSGanesha.entrypoint
name = '%s.%s' % (daemon_type, daemon_id)
envs.extend(NFSGanesha.get_container_envs())
elif daemon_type == HAproxy.daemon_type:
name = '%s.%s' % (daemon_type, daemon_id)
elif daemon_type == Keepalived.daemon_type:
name = '%s.%s' % (daemon_type, daemon_id)
envs.extend(Keepalived.get_container_envs())
container_args.extend(['--cap-add NET_ADMIN'])
elif daemon_type == CephIscsi.daemon_type:
entrypoint = CephIscsi.entrypoint
name = '%s.%s' % (daemon_type, daemon_id)
@ -3514,6 +3700,22 @@ def command_deploy():
reconfig=args.reconfig,
ports=daemon_ports)
elif daemon_type == HAproxy.daemon_type:
haproxy = HAproxy.init(args.fsid, daemon_id)
uid, gid = haproxy.extract_uid_gid_haproxy()
c = get_container(args.fsid, daemon_type, daemon_id)
deploy_daemon(args.fsid, daemon_type, daemon_id, c, uid, gid,
reconfig=args.reconfig,
ports=daemon_ports)
elif daemon_type == Keepalived.daemon_type:
keepalived = Keepalived.init(args.fsid, daemon_id)
uid, gid = keepalived.extract_uid_gid_keepalived()
c = get_container(args.fsid, daemon_type, daemon_id)
deploy_daemon(args.fsid, daemon_type, daemon_id, c, uid, gid,
reconfig=args.reconfig,
ports=daemon_ports)
elif daemon_type == CustomContainer.daemon_type:
cc = CustomContainer.init(args.fsid, daemon_id)
if not args.reconfig and not redeploy:
@ -3952,6 +4154,22 @@ def list_daemons(detail=True, legacy_dir=None):
err.startswith('%s, version ' % cmd):
version = err.split(' ')[2]
seen_versions[image_id] = version
elif daemon_type == 'haproxy':
out, err, code = call(
[container_path, 'exec', container_id,
'haproxy', '-v'])
if not code and \
out.startswith('HA-Proxy version '):
version = out.split(' ')[2]
seen_versions[image_id] = version
elif daemon_type == 'keepalived':
out, err, code = call(
[container_path, 'exec', container_id,
'keepalived', '--version'])
if not code and \
err.startswith('Keepalived '):
version = err.split(' ')[1]
seen_versions[image_id] = version
elif daemon_type == CustomContainer.daemon_type:
# Because a custom container can contain
# everything, we do not know which command
@ -5646,6 +5864,14 @@ def command_gather_facts():
##################################
def command_verify_prereqs():
if args.service_type == 'haproxy' or args.service_type == 'keepalived':
out, err, code = call(['sysctl', '-n', 'net.ipv4.ip_nonlocal_bind'])
if out.strip() != "1":
raise Error('net.ipv4.ip_nonlocal_bind not set to 1')
##################################
class CephadmCache:
task_types = ['disks', 'daemons', 'host', 'http_server']
@ -6948,6 +7174,15 @@ def _get_parser():
help="Maintenance action - enter maintenance, or exit maintenance")
parser_maintenance.set_defaults(func=command_maintenance)
parser_verify_prereqs = subparsers.add_parser(
'verify-prereqs',
help='verify system prerequisites for a given service are met on this host')
parser_verify_prereqs.set_defaults(func=command_verify_prereqs)
parser_verify_prereqs.add_argument(
'--daemon-type',
required=True,
help='service type of service to whose prereqs will be checked')
return parser

View File

@ -417,6 +417,7 @@ class HostCache():
return r
def get_daemon(self, daemon_name: str) -> orchestrator.DaemonDescription:
assert not daemon_name.startswith('ha-rgw.')
for _, dm in self.daemons.items():
for _, dd in dm.items():
if dd.name() == daemon_name:
@ -437,6 +438,9 @@ class HostCache():
def get_daemons_by_service(self, service_name):
# type: (str) -> List[orchestrator.DaemonDescription]
assert not service_name.startswith('keepalived.')
assert not service_name.startswith('haproxy.')
result = [] # type: List[orchestrator.DaemonDescription]
for host, dm in self.daemons.items():
for name, d in dm.items():
@ -446,6 +450,8 @@ class HostCache():
def get_daemons_by_type(self, service_type):
# type: (str) -> List[orchestrator.DaemonDescription]
assert service_type not in ['keepalived', 'haproxy']
result = [] # type: List[orchestrator.DaemonDescription]
for host, dm in self.daemons.items():
for name, d in dm.items():
@ -578,6 +584,8 @@ class HostCache():
self.daemons[host][dd.name()] = dd
def rm_daemon(self, host: str, name: str) -> None:
assert not name.startswith('ha-rgw.')
if host in self.daemons:
if name in self.daemons[host]:
del self.daemons[host][name]
@ -594,6 +602,8 @@ class HostCache():
for h in self.get_hosts())
def schedule_daemon_action(self, host: str, daemon_name: str, action: str) -> None:
assert not daemon_name.startswith('ha-rgw.')
priorities = {
'start': 1,
'restart': 2,
@ -619,6 +629,8 @@ class HostCache():
del self.scheduled_daemon_actions[host]
def get_scheduled_daemon_action(self, host: str, daemon: str) -> Optional[str]:
assert not daemon.startswith('ha-rgw.')
return self.scheduled_daemon_actions.get(host, {}).get(daemon)

View File

@ -25,7 +25,7 @@ from ceph.deployment import inventory
from ceph.deployment.drive_group import DriveGroupSpec
from ceph.deployment.service_spec import \
NFSServiceSpec, ServiceSpec, PlacementSpec, assert_valid_host, \
CustomContainerSpec, HostPlacementSpec
CustomContainerSpec, HostPlacementSpec, HA_RGWSpec
from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
from cephadm.serve import CephadmServe
from cephadm.services.cephadmservice import CephadmDaemonSpec
@ -37,6 +37,7 @@ import orchestrator
from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpec, \
CLICommandMeta, OrchestratorEvent, set_exception_subject, DaemonDescription
from orchestrator._interface import GenericSpec
from orchestrator._interface import daemon_type_to_service, service_to_daemon_types
from . import remotes
from . import utils
@ -45,6 +46,7 @@ from .services.cephadmservice import MonService, MgrService, MdsService, RgwServ
RbdMirrorService, CrashService, CephadmService, CephadmExporter, CephadmExporterConfig
from .services.container import CustomContainerService
from .services.iscsi import IscsiService
from .services.ha_rgw import HA_RGWService
from .services.nfs import NFSService
from .services.osd import RemoveUtil, OSDQueue, OSDService, OSD, NotFoundError
from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
@ -217,6 +219,16 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
default='docker.io/prom/node-exporter:v0.18.1',
desc='Prometheus container image',
),
Option(
'container_image_haproxy',
default='haproxy',
desc='HAproxy container image',
),
Option(
'container_image_keepalived',
default='arcts/keepalived',
desc='Keepalived container image',
),
Option(
'warn_on_stray_hosts',
type='bool',
@ -337,6 +349,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
self.container_image_grafana = ''
self.container_image_alertmanager = ''
self.container_image_node_exporter = ''
self.container_image_haproxy = ''
self.container_image_keepalived = ''
self.warn_on_stray_hosts = True
self.warn_on_stray_daemons = True
self.warn_on_failed_host_check = True
@ -417,6 +431,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
self.node_exporter_service = NodeExporterService(self)
self.crash_service = CrashService(self)
self.iscsi_service = IscsiService(self)
self.ha_rgw_service = HA_RGWService(self)
self.container_service = CustomContainerService(self)
self.cephadm_exporter_service = CephadmExporter(self)
self.cephadm_services = {
@ -433,6 +448,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
'node-exporter': self.node_exporter_service,
'crash': self.crash_service,
'iscsi': self.iscsi_service,
'ha-rgw': self.ha_rgw_service,
'container': self.container_service,
'cephadm-exporter': self.cephadm_exporter_service,
}
@ -1189,6 +1205,10 @@ To check that the host is reachable:
image = self.container_image_alertmanager
elif daemon_type == 'node-exporter':
image = self.container_image_node_exporter
elif daemon_type == 'haproxy':
image = self.container_image_haproxy
elif daemon_type == 'keepalived':
image = self.container_image_keepalived
elif daemon_type == CustomContainerService.TYPE:
# The image can't be resolved, the necessary information
# is only available when a container is deployed (given
@ -1397,7 +1417,7 @@ To check that the host is reachable:
daemon_map[dd.daemon_type].append(dd.daemon_id)
for daemon_type, daemon_ids in daemon_map.items():
r = self.cephadm_services[daemon_type].ok_to_stop(daemon_ids)
r = self.cephadm_services[daemon_type_to_service(daemon_type)].ok_to_stop(daemon_ids)
if r.retval:
self.log.error(f'It is NOT safe to stop host {hostname}')
return r.retval, r.stderr
@ -1643,6 +1663,9 @@ To check that the host is reachable:
sm[n].container_image_id = 'mix'
if sm[n].container_image_name != dd.container_image_name:
sm[n].container_image_name = 'mix'
if dd.daemon_type == 'haproxy' or dd.daemon_type == 'keepalived':
# ha-rgw has 2 daemons running per host
sm[n].size = sm[n].size*2
for n, spec in self.spec_store.specs.items():
if n in sm:
continue
@ -1659,6 +1682,9 @@ To check that the host is reachable:
if service_type == 'nfs':
spec = cast(NFSServiceSpec, spec)
sm[n].rados_config_location = spec.rados_config_location()
if spec.service_type == 'ha-rgw':
# ha-rgw has 2 daemons running per host
sm[n].size = sm[n].size*2
return list(sm.values())
@trivial_completion
@ -2030,7 +2056,17 @@ To check that the host is reachable:
self.log.warning(msg)
return msg
cephadm_config, deps = self.cephadm_services[daemon_spec.daemon_type].generate_config(
if daemon_spec.daemon_type == 'haproxy':
haspec = cast(HA_RGWSpec, daemon_spec.spec)
if haspec.haproxy_container_image:
image = haspec.haproxy_container_image
if daemon_spec.daemon_type == 'keepalived':
haspec = cast(HA_RGWSpec, daemon_spec.spec)
if haspec.keepalived_container_image:
image = haspec.keepalived_container_image
cephadm_config, deps = self.cephadm_services[daemon_type_to_service(daemon_spec.daemon_type)].generate_config(
daemon_spec)
# TCP port to open in the host firewall
@ -2123,7 +2159,7 @@ To check that the host is reachable:
with set_exception_subject('service', daemon.service_id(), overwrite=True):
self.cephadm_services[daemon_type].pre_remove(daemon)
self.cephadm_services[daemon_type_to_service(daemon_type)].pre_remove(daemon)
args = ['--name', name, '--force']
self.log.info('Removing daemon %s from %s' % (name, host))
@ -2134,7 +2170,7 @@ To check that the host is reachable:
self.cache.rm_daemon(host, name)
self.cache.invalidate_host_daemons(host)
self.cephadm_services[daemon_type].post_remove(daemon)
self.cephadm_services[daemon_type_to_service(daemon_type)].post_remove(daemon)
return "Removed {} from host '{}'".format(name, host)
@ -2189,7 +2225,7 @@ To check that the host is reachable:
config_func(spec)
did_config = True
daemon_spec = self.cephadm_services[daemon_type].make_daemon_spec(
daemon_spec = self.cephadm_services[daemon_type_to_service(daemon_type)].make_daemon_spec(
host, daemon_id, network, spec)
self.log.debug('Placing %s.%s on host %s' % (
daemon_type, daemon_id, host))
@ -2279,6 +2315,7 @@ To check that the host is reachable:
'mgr': PlacementSpec(count=2),
'mds': PlacementSpec(count=2),
'rgw': PlacementSpec(count=2),
'ha-rgw': PlacementSpec(count=2),
'iscsi': PlacementSpec(count=1),
'rbd-mirror': PlacementSpec(count=2),
'nfs': PlacementSpec(count=1),
@ -2336,6 +2373,10 @@ To check that the host is reachable:
def apply_rgw(self, spec: ServiceSpec) -> str:
return self._apply(spec)
@trivial_completion
def apply_ha_rgw(self, spec: ServiceSpec) -> str:
return self._apply(spec)
@trivial_completion
def add_iscsi(self, spec):
# type: (ServiceSpec) -> List[str]

View File

@ -127,6 +127,15 @@ class HostAssignment(object):
logger.info("deploying %s monitor(s) instead of %s so monitors may achieve consensus" % (
len(candidates) - 1, len(candidates)))
return candidates[0:len(candidates)-1]
# do not deploy ha-rgw on hosts that don't support virtual ips
if self.spec.service_type == 'ha-rgw' and self.filter_new_host:
old = candidates
candidates = [h for h in candidates if self.filter_new_host(h.hostname)]
for h in list(set(old) - set(candidates)):
logger.info(
f"Filtered out host {h.hostname} for ha-rgw. Could not verify host allowed virtual ips")
logger.info('filtered %s down to %s' % (old, candidates))
return candidates
# if asked to place even number of mons, deploy 1 less
@ -160,21 +169,29 @@ class HostAssignment(object):
# we don't need any additional hosts
if need < 0:
return self.prefer_hosts_with_active_daemons(hosts_with_daemons, count)
final_candidates = self.prefer_hosts_with_active_daemons(hosts_with_daemons, count)
else:
# exclusive to 'mon' daemons. Filter out hosts that don't have a public network assigned
# exclusive to daemons from 'mon' and 'ha-rgw' services.
# Filter out hosts that don't have a public network assigned
# or don't allow virtual ips respectively
if self.filter_new_host:
old = others
others = [h for h in others if self.filter_new_host(h.hostname)]
logger.debug('filtered %s down to %s' % (old, others))
for h in list(set(old) - set(others)):
if self.spec.service_type == 'ha-rgw':
logger.info(
f"Filtered out host {h.hostname} for ha-rgw. Could not verify host allowed virtual ips")
logger.info('filtered %s down to %s' % (old, others))
# ask the scheduler to return a set of hosts with a up to the value of <count>
others = self.scheduler.place(others, need)
logger.debug('Combine hosts with existing daemons %s + new hosts %s' % (
logger.info('Combine hosts with existing daemons %s + new hosts %s' % (
hosts_with_daemons, others))
# if a host already has the anticipated daemon, merge it with the candidates
# to get a list of HostPlacementSpec that can be deployed on.
return list(merge_hostspecs(hosts_with_daemons, others))
final_candidates = list(merge_hostspecs(hosts_with_daemons, others))
return final_candidates
def get_hosts_with_active_daemon(self, hosts: List[HostPlacementSpec]) -> List[HostPlacementSpec]:
active_hosts: List['HostPlacementSpec'] = []

View File

@ -11,7 +11,7 @@ except ImportError:
from ceph.deployment import inventory
from ceph.deployment.drive_group import DriveGroupSpec
from ceph.deployment.service_spec import ServiceSpec, HostPlacementSpec, RGWSpec
from ceph.deployment.service_spec import ServiceSpec, HostPlacementSpec, RGWSpec, HA_RGWSpec
from ceph.utils import str_to_datetime, datetime_now
import orchestrator
@ -19,6 +19,7 @@ from cephadm.schedule import HostAssignment
from cephadm.upgrade import CEPH_UPGRADE_ORDER
from cephadm.utils import forall_hosts, cephadmNoImage, is_repo_digest
from orchestrator import OrchestratorError
from orchestrator._interface import daemon_type_to_service, service_to_daemon_types
if TYPE_CHECKING:
from cephadm.module import CephadmOrchestrator, ContainerInspectInfo
@ -449,7 +450,7 @@ class CephadmServe:
"""
self.mgr.migration.verify_no_migration()
daemon_type = spec.service_type
service_type = spec.service_type
service_name = spec.service_name()
if spec.unmanaged:
self.log.debug('Skipping unmanaged service %s' % service_name)
@ -459,9 +460,9 @@ class CephadmServe:
return False
self.log.debug('Applying service %s spec' % service_name)
config_func = self._config_fn(daemon_type)
config_func = self._config_fn(service_type)
if daemon_type == 'osd':
if service_type == 'osd':
self.mgr.osd_service.create_from_spec(cast(DriveGroupSpec, spec))
# TODO: return True would result in a busy loop
# can't know if daemon count changed; create_from_spec doesn't
@ -471,7 +472,7 @@ class CephadmServe:
daemons = self.mgr.cache.get_daemons_by_service(service_name)
public_network = None
if daemon_type == 'mon':
if service_type == 'mon':
ret, out, err = self.mgr.check_mon_command({
'prefix': 'config get',
'who': 'mon',
@ -489,20 +490,38 @@ class CephadmServe:
# host
return len(self.mgr.cache.networks[host].get(public_network, [])) > 0
def virtual_ip_allowed(host):
# type: (str) -> bool
# Verify that it is possible to use Virtual IPs in the host
try:
if self.mgr.cache.facts[host]['kernel_parameters']['net.ipv4.ip_nonlocal_bind'] == '0':
return False
except KeyError:
return False
return True
ha = HostAssignment(
spec=spec,
hosts=self.mgr._hosts_with_daemon_inventory(),
get_daemons_func=self.mgr.cache.get_daemons_by_service,
filter_new_host=matches_network if daemon_type == 'mon' else None,
filter_new_host=matches_network if service_type == 'mon'
else virtual_ip_allowed if service_type == 'ha-rgw' else None,
)
hosts: List[HostPlacementSpec] = ha.place()
self.log.debug('Usable hosts: %s' % hosts)
try:
hosts: List[HostPlacementSpec] = ha.place()
self.log.debug('Usable hosts: %s' % hosts)
except OrchestratorError as e:
self.log.error('Failed to apply %s spec %s: %s' % (
spec.service_name(), spec, e))
self.mgr.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e))
return False
r = None
# sanity check
if daemon_type in ['mon', 'mgr'] and len(hosts) < 1:
if service_type in ['mon', 'mgr'] and len(hosts) < 1:
self.log.debug('cannot scale mon|mgr below 1 (hosts=%s)' % hosts)
return False
@ -515,50 +534,55 @@ class CephadmServe:
remove_daemon_hosts: Set[orchestrator.DaemonDescription] = ha.remove_daemon_hosts(hosts)
self.log.debug('Hosts that will loose daemons: %s' % remove_daemon_hosts)
if service_type == 'ha-rgw':
spec = self.update_ha_rgw_definitive_hosts(spec, hosts, add_daemon_hosts)
for host, network, name in add_daemon_hosts:
daemon_id = self.mgr.get_unique_name(daemon_type, host, daemons,
prefix=spec.service_id,
forcename=name)
for daemon_type in service_to_daemon_types(service_type):
daemon_id = self.mgr.get_unique_name(daemon_type, host, daemons,
prefix=spec.service_id,
forcename=name)
if not did_config and config_func:
if daemon_type == 'rgw':
rgw_config_func = cast(Callable[[RGWSpec, str], None], config_func)
rgw_config_func(cast(RGWSpec, spec), daemon_id)
else:
config_func(spec)
did_config = True
if not did_config and config_func:
if daemon_type == 'rgw':
rgw_config_func = cast(Callable[[RGWSpec, str], None], config_func)
rgw_config_func(cast(RGWSpec, spec), daemon_id)
else:
config_func(spec)
did_config = True
daemon_spec = self.mgr.cephadm_services[daemon_type].make_daemon_spec(
host, daemon_id, network, spec)
self.log.debug('Placing %s.%s on host %s' % (
daemon_type, daemon_id, host))
daemon_spec = self.mgr.cephadm_services[service_type].make_daemon_spec(
host, daemon_id, network, spec, daemon_type=daemon_type)
self.log.debug('Placing %s.%s on host %s' % (
daemon_type, daemon_id, host))
try:
daemon_spec = self.mgr.cephadm_services[daemon_type].prepare_create(daemon_spec)
self.mgr._create_daemon(daemon_spec)
r = True
except (RuntimeError, OrchestratorError) as e:
self.mgr.events.for_service(spec, 'ERROR',
f"Failed while placing {daemon_type}.{daemon_id}"
f"on {host}: {e}")
# only return "no change" if no one else has already succeeded.
# later successes will also change to True
if r is None:
r = False
continue
try:
daemon_spec = self.mgr.cephadm_services[service_type].prepare_create(
daemon_spec)
self.mgr._create_daemon(daemon_spec)
r = True
except (RuntimeError, OrchestratorError) as e:
self.mgr.events.for_service(spec, 'ERROR',
f"Failed while placing {daemon_type}.{daemon_id}"
f"on {host}: {e}")
# only return "no change" if no one else has already succeeded.
# later successes will also change to True
if r is None:
r = False
continue
# add to daemon list so next name(s) will also be unique
sd = orchestrator.DaemonDescription(
hostname=host,
daemon_type=daemon_type,
daemon_id=daemon_id,
)
daemons.append(sd)
# add to daemon list so next name(s) will also be unique
sd = orchestrator.DaemonDescription(
hostname=host,
daemon_type=daemon_type,
daemon_id=daemon_id,
)
daemons.append(sd)
# remove any?
def _ok_to_stop(remove_daemon_hosts: Set[orchestrator.DaemonDescription]) -> bool:
daemon_ids = [d.daemon_id for d in remove_daemon_hosts]
r = self.mgr.cephadm_services[daemon_type].ok_to_stop(daemon_ids)
r = self.mgr.cephadm_services[service_type].ok_to_stop(daemon_ids)
return not r.retval
while remove_daemon_hosts and not _ok_to_stop(remove_daemon_hosts):
@ -595,7 +619,7 @@ class CephadmServe:
if dd.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager', 'nfs']:
daemons_post[dd.daemon_type].append(dd)
if self.mgr.cephadm_services[dd.daemon_type].get_active_daemon(
if self.mgr.cephadm_services[daemon_type_to_service(dd.daemon_type)].get_active_daemon(
self.mgr.cache.get_daemons_by_service(dd.service_name())).daemon_id == dd.daemon_id:
dd.is_active = True
else:
@ -653,7 +677,8 @@ class CephadmServe:
for daemon_type, daemon_descs in daemons_post.items():
if daemon_type in self.mgr.requires_post_actions:
self.mgr.requires_post_actions.remove(daemon_type)
self.mgr._get_cephadm_service(daemon_type).daemon_check_post(daemon_descs)
self.mgr._get_cephadm_service(daemon_type_to_service(
daemon_type)).daemon_check_post(daemon_descs)
def convert_tags_to_repo_digest(self) -> None:
if not self.mgr.use_repo_digest:
@ -672,3 +697,19 @@ class CephadmServe:
image_info = digests[container_image_ref]
if image_info.repo_digest:
self.mgr.set_container_image(entity, image_info.repo_digest)
# ha-rgw needs definitve host list to create keepalived config files
# if definitive host list has changed, all ha-rgw daemons must get new
# config, including those that are already on the correct host and not
# going to be deployed
def update_ha_rgw_definitive_hosts(self, spec: ServiceSpec, hosts: List[HostPlacementSpec],
add_hosts: Set[HostPlacementSpec]) -> HA_RGWSpec:
spec = cast(HA_RGWSpec, spec)
if not (set(hosts) == set(spec.definitive_host_list)):
spec.definitive_host_list = hosts
ha_rgw_daemons = self.mgr.cache.get_daemons_by_service(spec.service_name())
for daemon in ha_rgw_daemons:
if daemon.hostname in [h.hostname for h in hosts] and daemon.hostname not in add_hosts:
self.mgr.cache.schedule_daemon_action(
daemon.hostname, daemon.name(), 'reconfig')
return spec

View File

@ -94,12 +94,14 @@ class CephadmService(metaclass=ABCMeta):
def make_daemon_spec(self, host: str,
daemon_id: str,
netowrk: str,
spec: ServiceSpecs) -> CephadmDaemonSpec:
spec: ServiceSpecs,
daemon_type: Optional[str] = None,) -> CephadmDaemonSpec:
return CephadmDaemonSpec(
host=host,
daemon_id=daemon_id,
spec=spec,
network=netowrk
network=netowrk,
daemon_type=daemon_type
)
def prepare_create(self, daemon_spec: CephadmDaemonSpec) -> CephadmDaemonSpec:
@ -270,7 +272,7 @@ class CephService(CephadmService):
"""
Map the daemon id to a cephx keyring entity name
"""
if self.TYPE in ['rgw', 'rbd-mirror', 'nfs', "iscsi"]:
if self.TYPE in ['rgw', 'rbd-mirror', 'nfs', "iscsi", 'haproxy', 'keepalived']:
return AuthEntity(f'client.{self.TYPE}.{daemon_id}')
elif self.TYPE == 'crash':
if host == "":

View File

@ -0,0 +1,151 @@
import json
import logging
from typing import List, cast, Tuple, Dict, Any
from ceph.deployment.service_spec import HA_RGWSpec
from orchestrator import DaemonDescription, OrchestratorError
from .cephadmservice import CephadmDaemonSpec, CephService
from ..utils import CephadmNoImage, cephadmNoImage, resolve_ip
logger = logging.getLogger(__name__)
class HA_RGWService(CephService):
TYPE = 'ha-rgw'
class rgw_server():
def __init__(self, hostname: str, address: str):
self.name = hostname
self.ip = address
def prepare_create(self, daemon_spec: CephadmDaemonSpec[HA_RGWSpec]) -> CephadmDaemonSpec:
assert daemon_spec.daemon_type == 'haproxy' or daemon_spec.daemon_type == 'keepalived'
assert daemon_spec.spec
if daemon_spec.daemon_type == 'haproxy':
return self.haproxy_prepare_create(daemon_spec)
else:
return self.keepalived_prepare_create(daemon_spec)
def generate_config(self, daemon_spec: CephadmDaemonSpec) -> Tuple[Dict[str, Any], List[str]]:
assert daemon_spec.daemon_type == 'haproxy' or daemon_spec.daemon_type == 'keepalived'
if daemon_spec.daemon_type == 'haproxy':
return self.haproxy_generate_config(daemon_spec)
else:
return self.keepalived_generate_config(daemon_spec)
def haproxy_prepare_create(self, daemon_spec: CephadmDaemonSpec[HA_RGWSpec]) -> CephadmDaemonSpec:
assert daemon_spec.daemon_type == 'haproxy'
assert daemon_spec.spec
daemon_id = daemon_spec.daemon_id
host = daemon_spec.host
spec = daemon_spec.spec
logger.info('Create daemon %s on host %s with spec %s' % (
daemon_id, host, spec))
return daemon_spec
def keepalived_prepare_create(self, daemon_spec: CephadmDaemonSpec[HA_RGWSpec]) -> CephadmDaemonSpec:
assert daemon_spec.daemon_type == 'keepalived'
assert daemon_spec.spec
daemon_id = daemon_spec.daemon_id
host = daemon_spec.host
spec = daemon_spec.spec
logger.info('Create daemon %s on host %s with spec %s' % (
daemon_id, host, spec))
return daemon_spec
def haproxy_generate_config(self, daemon_spec: CephadmDaemonSpec) -> Tuple[Dict[str, Any], List[str]]:
daemon_id = daemon_spec.daemon_id
host = daemon_spec.host
service_name: str = "ha-rgw." + daemon_id.split('.')[0]
# if no service spec, return empty config
if not daemon_spec.spec and service_name not in self.mgr.spec_store.specs:
config_files = {'files': {}} # type: Dict[str, Any]
return config_files, []
elif daemon_spec.spec:
spec = daemon_spec.spec
else:
# service spec is not attached to daemon spec but is in spec store
spec = cast(HA_RGWSpec, self.mgr.spec_store.specs[service_name])
rgw_daemons = self.mgr.cache.get_daemons_by_type('rgw')
rgw_servers = []
for daemon in rgw_daemons:
rgw_servers.append(self.rgw_server(
daemon.name(), resolve_ip(daemon.hostname)))
# virtual ip address cannot have netmask attached when passed to haproxy config
# since the port is added to the end and something like 123.123.123.10/24:8080 is invalid
virtual_ip_address = spec.virtual_ip_address
if "/" in str(spec.virtual_ip_address):
just_ip = str(spec.virtual_ip_address).split('/')[0]
virtual_ip_address = just_ip
ha_context = {'spec': spec, 'rgw_servers': rgw_servers,
'virtual_ip_address': virtual_ip_address}
haproxy_conf = self.mgr.template.render('services/haproxy/haproxy.cfg.j2', ha_context)
config_files = {
'files': {
"haproxy.cfg": haproxy_conf,
}
}
if spec.ha_proxy_frontend_ssl_certificate:
ssl_cert = spec.ha_proxy_frontend_ssl_certificate
if isinstance(ssl_cert, list):
ssl_cert = '\n'.join(ssl_cert)
config_files['files']['haproxy.pem'] = ssl_cert
return config_files, []
def keepalived_generate_config(self, daemon_spec: CephadmDaemonSpec) -> Tuple[Dict[str, Any], List[str]]:
daemon_id = daemon_spec.daemon_id
host = daemon_spec.host
service_name: str = "ha-rgw." + daemon_id.split('.')[0]
# if no service spec, return empty config
if not daemon_spec.spec and service_name not in self.mgr.spec_store.specs:
config_file = {'files': {}} # type: Dict[str, Any]
return config_file, []
elif daemon_spec.spec:
spec = daemon_spec.spec
else:
# service spec is not attached to daemon spec but is in spec store
spec = cast(HA_RGWSpec, self.mgr.spec_store.specs[service_name])
all_hosts = []
for h, network, name in spec.definitive_host_list:
all_hosts.append(h)
# set state. first host in placement is master all others backups
state = 'BACKUP'
if all_hosts[0] == host:
state = 'MASTER'
# remove host, daemon is being deployed on from all_hosts list for
# other_ips in conf file and converter to ips
all_hosts.remove(host)
other_ips = [resolve_ip(h) for h in all_hosts]
ka_context = {'spec': spec, 'state': state,
'other_ips': other_ips,
'host_ip': resolve_ip(host)}
keepalived_conf = self.mgr.template.render(
'services/keepalived/keepalived.conf.j2', ka_context)
config_file = {
'files': {
"keepalived.conf": keepalived_conf,
}
}
return config_file, []

View File

@ -0,0 +1,66 @@
# {{ cephadm_managed }}
global
log 127.0.0.1 local2
chroot /var/lib/haproxy
pidfile /var/lib/haproxy/haproxy.pid
maxconn 8000
daemon
stats socket /var/lib/haproxy/stats
{% if spec.ha_proxy_frontend_ssl_certificate %}
{% if spec.ha_proxy_ssl_dh_param %}
tune.ssl.default-dh-param {{ spec.ha_proxy_ssl_dh_param }}
{% endif %}
{% if spec.ha_proxy_ssl_ciphers %}
ssl-default-bind-ciphers {{ spec.ha_proxy_ssl_ciphers | join(':') }}
{% endif %}
{% if spec.ha_proxy_ssl_options %}
ssl-default-bind-options {{ spec.ha_proxy_ssl_options | join(' ') }}
{% endif %}
{% endif %}
defaults
mode http
log global
option httplog
option dontlognull
option http-server-close
option forwardfor except 127.0.0.0/8
option redispatch
retries 3
timeout http-request 1s
timeout queue 20s
timeout connect 5s
timeout client 1s
timeout server 1s
timeout http-keep-alive 5s
timeout check 5s
maxconn 8000
frontend stats
bind *:{{ spec.ha_proxy_port }}
{% if spec.ha_proxy_stats_enabled %}
stats enable
{% endif %}
stats uri /stats
stats refresh 10s
stats auth {{ spec.ha_proxy_stats_user }}:{{ spec.ha_proxy_stats_password }}
{% if spec.ha_proxy_enable_prometheus_exporter %}
http-request use-service prometheus-exporter if { path /metrics }
{% endif %}
monitor-uri {{ spec.ha_proxy_monitor_uri }}
frontend rgw-frontend
{% if spec.ha_proxy_frontend_ssl_certificate %}
bind {{ virtual_ip_address }}:{{ spec.ha_proxy_frontend_ssl_port }} ssl crt /var/lib/haproxy/haproxy.pem
{% else %}
bind {{ virtual_ip_address }}:{{ spec.frontend_port }}
{% endif %}
default_backend rgw-backend
backend rgw-backend
option forwardfor
balance static-rr
option httpchk HEAD / HTTP/1.0
{% for server in rgw_servers %}
server {{ server.name }} {{ server.ip }}:80 check weight 100
{% endfor %}

View File

@ -0,0 +1,32 @@
# {{ cephadm_managed }}
vrrp_script check_haproxy {
script "curl http://localhost:{{ spec.ha_proxy_port }}/haproxy_test"
weight -20
interval 2
rise 2
fall 2
}
vrrp_instance VI_0 {
state {{ state }}
priority 100
interface {{ spec.virtual_ip_interface }}
virtual_router_id 51
advert_int 1
authentication {
auth_type PASS
auth_pass {{ spec.keepalived_password }}
}
unicast_src_ip {{ host_ip }}
unicast_peer {
{% for ip in other_ips %}
{{ ip }}
{% endfor %}
}
virtual_ipaddress {
{{ spec.virtual_ip_address }} dev {{ spec.virtual_ip_interface }}
}
track_script {
check_haproxy
}
}

View File

@ -6,8 +6,11 @@ import json
import pytest
import yaml
from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \
IscsiServiceSpec, AlertManagerSpec, HostPlacementSpec, CustomContainerSpec
IscsiServiceSpec, AlertManagerSpec, HostPlacementSpec, CustomContainerSpec, \
HA_RGWSpec
from orchestrator import DaemonDescription, OrchestratorError
@ -137,7 +140,7 @@ def test_spec_octopus(spec_json):
"last_refresh": "2020-04-03T15:31:48.725856",
"created": "2020-04-02T19:23:08.829543",
"started": "2020-04-03T07:29:16.932838",
"is_active": false
"is_active": false
},
{
"hostname": "ceph-001",
@ -152,7 +155,7 @@ def test_spec_octopus(spec_json):
"last_refresh": "2020-04-03T15:31:48.725903",
"created": "2020-04-02T19:23:11.390694",
"started": "2020-04-03T07:29:16.910897",
"is_active": false
"is_active": false
},
{
"hostname": "ceph-001",
@ -167,7 +170,7 @@ def test_spec_octopus(spec_json):
"last_refresh": "2020-04-03T15:31:48.725950",
"created": "2020-04-02T19:23:52.025088",
"started": "2020-04-03T07:29:16.847972",
"is_active": false
"is_active": false
},
{
"hostname": "ceph-001",
@ -182,7 +185,7 @@ def test_spec_octopus(spec_json):
"last_refresh": "2020-04-03T15:31:48.725807",
"created": "2020-04-02T19:22:18.648584",
"started": "2020-04-03T07:29:16.856153",
"is_active": false
"is_active": false
},
{
"hostname": "ceph-001",
@ -197,7 +200,7 @@ def test_spec_octopus(spec_json):
"last_refresh": "2020-04-03T15:31:48.725715",
"created": "2020-04-02T19:22:13.863300",
"started": "2020-04-03T07:29:17.206024",
"is_active": false
"is_active": false
},
{
"hostname": "ceph-001",
@ -212,7 +215,7 @@ def test_spec_octopus(spec_json):
"last_refresh": "2020-04-03T15:31:48.725996",
"created": "2020-04-02T19:23:53.880197",
"started": "2020-04-03T07:29:16.880044",
"is_active": false
"is_active": false
},
{
"hostname": "ceph-001",
@ -227,7 +230,7 @@ def test_spec_octopus(spec_json):
"last_refresh": "2020-04-03T15:31:48.726088",
"created": "2020-04-02T20:35:02.991435",
"started": "2020-04-03T07:29:19.373956",
"is_active": false
"is_active": false
},
{
"hostname": "ceph-001",
@ -242,7 +245,7 @@ def test_spec_octopus(spec_json):
"last_refresh": "2020-04-03T15:31:48.726134",
"created": "2020-04-02T20:35:17.142272",
"started": "2020-04-03T07:29:19.374002",
"is_active": false
"is_active": false
},
{
"hostname": "ceph-001",
@ -257,7 +260,7 @@ def test_spec_octopus(spec_json):
"last_refresh": "2020-04-03T15:31:48.726042",
"created": "2020-04-02T19:24:10.281163",
"started": "2020-04-03T07:29:16.926292",
"is_active": false
"is_active": false
},
{
"hostname": "ceph-001",
@ -265,7 +268,7 @@ def test_spec_octopus(spec_json):
"daemon_type": "rgw",
"status": 1,
"status_desc": "starting",
"is_active": false
"is_active": false
}
]""")
)
@ -657,3 +660,38 @@ def test_custom_container_spec_config_json():
config_json = spec.config_json()
for key in ['entrypoint', 'uid', 'gid', 'bind_mounts', 'dirs']:
assert key not in config_json
def test_HA_RGW_spec():
yaml_str ="""service_type: ha-rgw
service_id: haproxy_for_rgw
placement:
hosts:
- host1
- host2
- host3
spec:
virtual_ip_interface: eth0
virtual_ip_address: 192.168.20.1/24
frontend_port: 8080
ha_proxy_port: 1967
ha_proxy_stats_enabled: true
ha_proxy_stats_user: admin
ha_proxy_stats_password: admin
ha_proxy_enable_prometheus_exporter: true
ha_proxy_monitor_uri: /haproxy_health
keepalived_password: admin
"""
yaml_file = yaml.safe_load(yaml_str)
spec = ServiceSpec.from_json(yaml_file)
assert spec.service_type == "ha-rgw"
assert spec.service_id == "haproxy_for_rgw"
assert spec.virtual_ip_interface == "eth0"
assert spec.virtual_ip_address == "192.168.20.1/24"
assert spec.frontend_port == 8080
assert spec.ha_proxy_port == 1967
assert spec.ha_proxy_stats_enabled == True
assert spec.ha_proxy_stats_user == "admin"
assert spec.ha_proxy_stats_password == "admin"
assert spec.ha_proxy_enable_prometheus_exporter == True
assert spec.ha_proxy_monitor_uri == "/haproxy_health"
assert spec.keepalived_password == "admin"

View File

@ -22,7 +22,7 @@ import yaml
from ceph.deployment import inventory
from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \
ServiceSpecValidationError, IscsiServiceSpec
ServiceSpecValidationError, IscsiServiceSpec, HA_RGWSpec
from ceph.deployment.drive_group import DriveGroupSpec
from ceph.deployment.hostspec import HostSpec
from ceph.utils import datetime_to_str, str_to_datetime
@ -889,6 +889,7 @@ class Orchestrator(object):
'prometheus': self.apply_prometheus,
'rbd-mirror': self.apply_rbd_mirror,
'rgw': self.apply_rgw,
'ha-rgw': self.apply_ha_rgw,
'host': self.add_host,
'cephadm-exporter': self.apply_cephadm_exporter,
}
@ -1055,6 +1056,10 @@ class Orchestrator(object):
"""Update RGW cluster"""
raise NotImplementedError()
def apply_ha_rgw(self, spec: HA_RGWSpec) -> Completion[str]:
"""Update ha-rgw daemons"""
raise NotImplementedError()
def add_rbd_mirror(self, spec: ServiceSpec) -> Completion[List[str]]:
"""Create rbd-mirror daemon(s)"""
raise NotImplementedError()
@ -1171,6 +1176,51 @@ def json_to_generic_spec(spec: dict) -> GenericSpec:
return ServiceSpec.from_json(spec)
def daemon_type_to_service(dtype: str) -> str:
mapping = {
'mon': 'mon',
'mgr': 'mgr',
'mds': 'mds',
'rgw': 'rgw',
'osd': 'osd',
'haproxy': 'ha-rgw',
'keepalived': 'ha-rgw',
'iscsi': 'iscsi',
'rbd-mirror': 'rbd-mirror',
'nfs': 'nfs',
'grafana': 'grafana',
'alertmanager': 'alertmanager',
'prometheus': 'prometheus',
'node-exporter': 'node-exporter',
'crash': 'crash',
'container': 'container',
'cephadm-exporter': 'cephadm-exporter',
}
return mapping[dtype]
def service_to_daemon_types(stype: str) -> List[str]:
mapping = {
'mon': ['mon'],
'mgr': ['mgr'],
'mds': ['mds'],
'rgw': ['rgw'],
'osd': ['osd'],
'ha-rgw': ['haproxy', 'keepalived'],
'iscsi': ['iscsi'],
'rbd-mirror': ['rbd-mirror'],
'nfs': ['nfs'],
'grafana': ['grafana'],
'alertmanager': ['alertmanager'],
'prometheus': ['prometheus'],
'node-exporter': ['node-exporter'],
'crash': ['crash'],
'container': ['container'],
'cephadm-exporter': ['cephadm-exporter'],
}
return mapping[stype]
class UpgradeStatusSpec(object):
# Orchestrator's report on what's going on with any ongoing upgrade
def __init__(self):
@ -1236,6 +1286,8 @@ class DaemonDescription(object):
# The type of service (osd, mon, mgr, etc.)
self.daemon_type = daemon_type
assert daemon_type not in ['HA_RGW', 'ha-rgw']
# The orchestrator will have picked some names for daemons,
# typically either based on hostnames or on pod names.
# This is the <foo> in mds.<foo>, the ID that will appear
@ -1271,7 +1323,7 @@ class DaemonDescription(object):
def matches_service(self, service_name: Optional[str]) -> bool:
if service_name:
return self.name().startswith(service_name + '.')
return (daemon_type_to_service(self.daemon_type) + '.' + self.daemon_id).startswith(service_name + '.')
return False
def service_id(self):
@ -1318,15 +1370,15 @@ class DaemonDescription(object):
# daemon_id == "service_id"
return self.daemon_id
if self.daemon_type in ServiceSpec.REQUIRES_SERVICE_ID:
if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID:
return _match()
return self.daemon_id
def service_name(self):
if self.daemon_type in ServiceSpec.REQUIRES_SERVICE_ID:
return f'{self.daemon_type}.{self.service_id()}'
return self.daemon_type
if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID:
return f'{daemon_type_to_service(self.daemon_type)}.{self.service_id()}'
return daemon_type_to_service(self.daemon_type)
def __repr__(self):
return "<DaemonDescription>({type}.{id})".format(type=self.daemon_type,

View File

@ -17,7 +17,7 @@ from mgr_module import MgrModule, HandleCommandResult, Option
from ._interface import OrchestratorClientMixin, DeviceLightLoc, _cli_read_command, \
raise_if_exception, _cli_write_command, TrivialReadCompletion, OrchestratorError, \
NoOrchestrator, OrchestratorValidationError, NFSServiceSpec, \
NoOrchestrator, OrchestratorValidationError, NFSServiceSpec, HA_RGWSpec, \
RGWSpec, InventoryFilter, InventoryHost, HostSpec, CLICommandMeta, \
ServiceDescription, DaemonDescription, IscsiServiceSpec, json_to_generic_spec, GenericSpec
@ -765,7 +765,7 @@ Examples:
self._orchestrator_wait([completion])
data = completion.result
if format == 'plain':
out = generate_preview_tables(data , True)
out = generate_preview_tables(data, True)
else:
out = to_format(data, format, many=True, cls=None)
return HandleCommandResult(stdout=out)

View File

@ -381,8 +381,8 @@ class ServiceSpec(object):
"""
KNOWN_SERVICE_TYPES = 'alertmanager crash grafana iscsi mds mgr mon nfs ' \
'node-exporter osd prometheus rbd-mirror rgw ' \
'container cephadm-exporter'.split()
REQUIRES_SERVICE_ID = 'iscsi mds nfs osd rgw container'.split()
'container cephadm-exporter ha-rgw'.split()
REQUIRES_SERVICE_ID = 'iscsi mds nfs osd rgw container ha-rgw '.split()
@classmethod
def _cls(cls, service_type):
@ -394,6 +394,7 @@ class ServiceSpec(object):
'osd': DriveGroupSpec,
'iscsi': IscsiServiceSpec,
'alertmanager': AlertManagerSpec,
'ha-rgw': HA_RGWSpec,
'container': CustomContainerSpec,
}.get(service_type, cls)
if ret == ServiceSpec and not service_type:
@ -780,6 +781,95 @@ class AlertManagerSpec(ServiceSpec):
yaml.add_representer(AlertManagerSpec, ServiceSpec.yaml_representer)
class HA_RGWSpec(ServiceSpec):
def __init__(self,
service_type: str = 'ha-rgw',
service_id: Optional[str] = None,
placement: Optional[PlacementSpec] = None,
virtual_ip_interface: Optional[str] = None,
virtual_ip_address: Optional[str] = None,
frontend_port: Optional[int] = None,
ha_proxy_port: Optional[int] = None,
ha_proxy_stats_enabled: Optional[bool] = None,
ha_proxy_stats_user: Optional[str] = None,
ha_proxy_stats_password: Optional[str] = None,
ha_proxy_enable_prometheus_exporter: Optional[bool] = None,
ha_proxy_monitor_uri: Optional[str] = None,
keepalived_password: Optional[str] = None,
ha_proxy_frontend_ssl_certificate: Optional[str] = None,
ha_proxy_frontend_ssl_port: Optional[int] = None,
ha_proxy_ssl_dh_param: Optional[str] = None,
ha_proxy_ssl_ciphers: Optional[List[str]] = None,
ha_proxy_ssl_options: Optional[List[str]] = None,
haproxy_container_image: Optional[str] = None,
keepalived_container_image: Optional[str] = None,
definitive_host_list: Optional[List[HostPlacementSpec]] = None
):
assert service_type == 'ha-rgw'
super(HA_RGWSpec, self).__init__('ha-rgw', service_id=service_id, placement=placement)
self.virtual_ip_interface = virtual_ip_interface
self.virtual_ip_address = virtual_ip_address
self.frontend_port = frontend_port
self.ha_proxy_port = ha_proxy_port
self.ha_proxy_stats_enabled = ha_proxy_stats_enabled
self.ha_proxy_stats_user = ha_proxy_stats_user
self.ha_proxy_stats_password = ha_proxy_stats_password
self.ha_proxy_enable_prometheus_exporter = ha_proxy_enable_prometheus_exporter
self.ha_proxy_monitor_uri = ha_proxy_monitor_uri
self.keepalived_password = keepalived_password
self.ha_proxy_frontend_ssl_certificate = ha_proxy_frontend_ssl_certificate
self.ha_proxy_frontend_ssl_port = ha_proxy_frontend_ssl_port
self.ha_proxy_ssl_dh_param = ha_proxy_ssl_dh_param
self.ha_proxy_ssl_ciphers = ha_proxy_ssl_ciphers
self.ha_proxy_ssl_options = ha_proxy_ssl_options
self.haproxy_container_image = haproxy_container_image
self.keepalived_container_image = keepalived_container_image
# placeholder variable. Need definitive list of hosts this service will
# be placed on in order to generate keepalived config. Will be populated
# when applying spec
self.definitive_host_list = [] # type: List[HostPlacementSpec]
def validate(self):
super(HA_RGWSpec, self).validate()
if not self.virtual_ip_interface:
raise ServiceSpecValidationError(
'Cannot add ha-rgw: No Virtual IP Interface specified')
if not self.virtual_ip_address:
raise ServiceSpecValidationError(
'Cannot add ha-rgw: No Virtual IP Address specified')
if not self.frontend_port and not self.ha_proxy_frontend_ssl_certificate:
raise ServiceSpecValidationError(
'Cannot add ha-rgw: No Frontend Port specified')
if not self.ha_proxy_port:
raise ServiceSpecValidationError(
'Cannot add ha-rgw: No HA Proxy Port specified')
if not self.ha_proxy_stats_enabled:
raise ServiceSpecValidationError(
'Cannot add ha-rgw: Ha Proxy Stats Enabled option not set')
if not self.ha_proxy_stats_user:
raise ServiceSpecValidationError(
'Cannot add ha-rgw: No HA Proxy Stats User specified')
if not self.ha_proxy_stats_password:
raise ServiceSpecValidationError(
'Cannot add ha-rgw: No HA Proxy Stats Password specified')
if not self.ha_proxy_enable_prometheus_exporter:
raise ServiceSpecValidationError(
'Cannot add ha-rgw: HA Proxy Enable Prometheus Exporter option not set')
if not self.ha_proxy_monitor_uri:
raise ServiceSpecValidationError(
'Cannot add ha-rgw: No HA Proxy Monitor Uri specified')
if not self.keepalived_password:
raise ServiceSpecValidationError(
'Cannot add ha-rgw: No Keepalived Password specified')
if self.ha_proxy_frontend_ssl_certificate:
if not self.ha_proxy_frontend_ssl_port:
raise ServiceSpecValidationError(
'Cannot add ha-rgw: Specified Ha Proxy Frontend SSL ' +
'Certificate but no SSL Port')
class CustomContainerSpec(ServiceSpec):
def __init__(self,
service_type: str = 'container',