Merge pull request #52740 from adk3798/ingress-vip-networks

mgr/cephadm: pick correct IPs for ingress service based on VIP

Reviewed-by: John Mulligan <jmulligan@redhat.com>
This commit is contained in:
Adam King 2023-08-16 13:22:50 -04:00 committed by GitHub
commit 306e8437e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 274 additions and 66 deletions

View File

@ -148,7 +148,7 @@ class HostAssignment(object):
daemons: List[orchestrator.DaemonDescription],
related_service_daemons: Optional[List[DaemonDescription]] = None,
networks: Dict[str, Dict[str, Dict[str, List[str]]]] = {},
filter_new_host: Optional[Callable[[str], bool]] = None,
filter_new_host: Optional[Callable[[str, ServiceSpec], bool]] = None,
allow_colo: bool = False,
primary_daemon_type: Optional[str] = None,
per_host_daemon_type: Optional[str] = None,
@ -451,7 +451,7 @@ class HostAssignment(object):
old = ls.copy()
ls = []
for h in old:
if self.filter_new_host(h.hostname):
if self.filter_new_host(h.hostname, self.spec):
ls.append(h)
if len(old) > len(ls):
logger.debug('Filtered %s down to %s' % (old, ls))

View File

@ -6,7 +6,7 @@ import uuid
import os
from collections import defaultdict
from typing import TYPE_CHECKING, Optional, List, cast, Dict, Any, Union, Tuple, Set, \
DefaultDict
DefaultDict, Callable
from ceph.deployment import inventory
from ceph.deployment.drive_group import DriveGroupSpec
@ -17,6 +17,7 @@ from ceph.deployment.service_spec import (
PlacementSpec,
RGWSpec,
ServiceSpec,
IngressSpec,
)
from ceph.utils import datetime_now
@ -695,8 +696,7 @@ class CephadmServe:
public_networks = [x.strip() for x in out.split(',')]
self.log.debug('mon public_network(s) is %s' % public_networks)
def matches_network(host):
# type: (str) -> bool
def matches_public_network(host: str, sspec: ServiceSpec) -> bool:
# make sure the host has at least one network that belongs to some configured public network(s)
for pn in public_networks:
public_network = ipaddress.ip_network(pn)
@ -713,6 +713,40 @@ class CephadmServe:
)
return False
def has_interface_for_vip(host: str, sspec: ServiceSpec) -> bool:
# make sure the host has an interface that can
# actually accomodate the VIP
if not sspec or sspec.service_type != 'ingress':
return True
ingress_spec = cast(IngressSpec, sspec)
virtual_ips = []
if ingress_spec.virtual_ip:
virtual_ips.append(ingress_spec.virtual_ip)
elif ingress_spec.virtual_ips_list:
virtual_ips = ingress_spec.virtual_ips_list
for vip in virtual_ips:
found = False
bare_ip = str(vip).split('/')[0]
for subnet, ifaces in self.mgr.cache.networks.get(host, {}).items():
if ifaces and ipaddress.ip_address(bare_ip) in ipaddress.ip_network(subnet):
# found matching interface for this IP, move on
self.log.debug(
f'{bare_ip} is in {subnet} on {host} interface {list(ifaces.keys())[0]}'
)
found = True
break
if not found:
self.log.info(
f"Filtered out host {host}: Host has no interface available for VIP: {vip}"
)
return False
return True
host_filters: Dict[str, Callable[[str, ServiceSpec], bool]] = {
'mon': matches_public_network,
'ingress': has_interface_for_vip
}
rank_map = None
if svc.ranked():
rank_map = self.mgr.spec_store[spec.service_name()].rank_map or {}
@ -725,10 +759,7 @@ class CephadmServe:
daemons=daemons,
related_service_daemons=related_service_daemons,
networks=self.mgr.cache.networks,
filter_new_host=(
matches_network if service_type == 'mon'
else None
),
filter_new_host=host_filters.get(service_type, None),
allow_colo=svc.allow_colo(),
primary_daemon_type=svc.primary_daemon_type(spec),
per_host_daemon_type=svc.per_host_daemon_type(spec),

View File

@ -247,56 +247,35 @@ class IngressService(CephService):
host = daemon_spec.host
hosts = sorted(list(set([host] + [str(d.hostname) for d in daemons])))
# interface
bare_ips = []
if spec.virtual_ip:
bare_ips.append(str(spec.virtual_ip).split('/')[0])
elif spec.virtual_ips_list:
bare_ips = [str(vip).split('/')[0] for vip in spec.virtual_ips_list]
interface = None
for bare_ip in bare_ips:
def _get_valid_interface_and_ip(vip: str, host: str) -> Tuple[str, str]:
# interface
bare_ip = ipaddress.ip_interface(vip).ip
host_ip = ''
interface = None
for subnet, ifaces in self.mgr.cache.networks.get(host, {}).items():
if ifaces and ipaddress.ip_address(bare_ip) in ipaddress.ip_network(subnet):
interface = list(ifaces.keys())[0]
host_ip = ifaces[interface][0]
logger.info(
f'{bare_ip} is in {subnet} on {host} interface {interface}'
)
break
else: # nobreak
continue
break
# try to find interface by matching spec.virtual_interface_networks
if not interface and spec.virtual_interface_networks:
for subnet, ifaces in self.mgr.cache.networks.get(host, {}).items():
if subnet in spec.virtual_interface_networks:
interface = list(ifaces.keys())[0]
logger.info(
f'{spec.virtual_ip} will be configured on {host} interface '
f'{interface} (which has guiding subnet {subnet})'
)
break
if not interface:
raise OrchestratorError(
f"Unable to identify interface for {spec.virtual_ip} on {host}"
)
# Use interface as vrrp_interface for vrrp traffic if vrrp_interface_network not set on the spec
vrrp_interface = None
if not spec.vrrp_interface_network:
vrrp_interface = interface
else:
for subnet, ifaces in self.mgr.cache.networks.get(host, {}).items():
if subnet == spec.vrrp_interface_network:
vrrp_interface = list(ifaces.keys())[0]
logger.info(
f'vrrp will be configured on {host} interface '
f'{vrrp_interface} (which has guiding subnet {subnet})'
)
break
else:
# try to find interface by matching spec.virtual_interface_networks
if not interface and spec.virtual_interface_networks:
for subnet, ifaces in self.mgr.cache.networks.get(host, {}).items():
if subnet in spec.virtual_interface_networks:
interface = list(ifaces.keys())[0]
host_ip = ifaces[interface][0]
logger.info(
f'{spec.virtual_ip} will be configured on {host} interface '
f'{interface} (which is in subnet {subnet})'
)
break
if not interface:
raise OrchestratorError(
f"Unable to identify vrrp interface for {spec.vrrp_interface_network} on {host}"
f"Unable to identify interface for {spec.virtual_ip} on {host}"
)
return interface, host_ip
# script to monitor health
script = '/usr/bin/false'
@ -341,7 +320,36 @@ class IngressService(CephService):
# other_ips in conf file and converter to ips
if host in hosts:
hosts.remove(host)
other_ips = [utils.resolve_ip(self.mgr.inventory.get_addr(h)) for h in hosts]
host_ips: List[str] = []
other_ips: List[List[str]] = []
interfaces: List[str] = []
for vip in virtual_ips:
interface, ip = _get_valid_interface_and_ip(vip, host)
host_ips.append(ip)
interfaces.append(interface)
ips: List[str] = []
for h in hosts:
_, ip = _get_valid_interface_and_ip(vip, h)
ips.append(ip)
other_ips.append(ips)
# Use interface as vrrp_interface for vrrp traffic if vrrp_interface_network not set on the spec
vrrp_interfaces: List[str] = []
if not spec.vrrp_interface_network:
vrrp_interfaces = interfaces
else:
for subnet, ifaces in self.mgr.cache.networks.get(host, {}).items():
if subnet == spec.vrrp_interface_network:
vrrp_interface = [list(ifaces.keys())[0]] * len(interfaces)
logger.info(
f'vrrp will be configured on {host} interface '
f'{vrrp_interface} (which is in subnet {subnet})'
)
break
else:
raise OrchestratorError(
f"Unable to identify vrrp interface for {spec.vrrp_interface_network} on {host}"
)
keepalived_conf = self.mgr.template.render(
'services/ingress/keepalived.conf.j2',
@ -349,14 +357,14 @@ class IngressService(CephService):
'spec': spec,
'script': script,
'password': password,
'interface': interface,
'vrrp_interface': vrrp_interface,
'interfaces': interfaces,
'vrrp_interfaces': vrrp_interfaces,
'virtual_ips': virtual_ips,
'first_virtual_router_id': spec.first_virtual_router_id,
'states': states,
'priorities': priorities,
'other_ips': other_ips,
'host_ip': utils.resolve_ip(self.mgr.inventory.get_addr(host)),
'host_ips': host_ips,
}
)

View File

@ -11,7 +11,7 @@ vrrp_script check_backend {
vrrp_instance VI_{{ x }} {
state {{ states[x] }}
priority {{ priorities[x] }}
interface {{ vrrp_interface }}
interface {{ vrrp_interfaces[x] }}
virtual_router_id {{ first_virtual_router_id + x }}
advert_int 1
authentication {
@ -19,15 +19,15 @@ vrrp_instance VI_{{ x }} {
auth_pass {{ password }}
}
{% if not spec.use_keepalived_multicast %}
unicast_src_ip {{ host_ip }}
unicast_src_ip {{ host_ips[x] }}
unicast_peer {
{% for ip in other_ips %}
{% for ip in other_ips[x] %}
{{ ip }}
{% endfor %}
}
{% endif %}
virtual_ipaddress {
{{ virtual_ips[x] }} dev {{ interface }}
{{ virtual_ips[x] }} dev {{ interfaces[x] }}
}
track_script {
check_backend

View File

@ -658,6 +658,12 @@ class TestMonitoring:
_run_cephadm.side_effect = async_side_effect(('{}', '', 0))
s = RGWSpec(service_id="foo", placement=PlacementSpec(count=1), rgw_frontend_type='beast')
with with_host(cephadm_module, 'test'):
# host "test" needs to have networks for keepalive to be placed
cephadm_module.cache.update_host_networks('test', {
'1.2.3.0/24': {
'if0': ['1.2.3.1']
},
})
with with_service(cephadm_module, MonitoringSpec('node-exporter')) as _, \
with_service(cephadm_module, CephExporterSpec('ceph-exporter')) as _, \
with_service(cephadm_module, s) as _, \
@ -760,6 +766,12 @@ class TestMonitoring:
cephadm_module.http_server.service_discovery.password = 'sd_password'
cephadm_module.http_server.service_discovery.ssl_certs.generate_cert = MagicMock(
side_effect=gen_cert)
# host "test" needs to have networks for keepalive to be placed
cephadm_module.cache.update_host_networks('test', {
'1.2.3.0/24': {
'if0': ['1.2.3.1']
},
})
with with_service(cephadm_module, MonitoringSpec('node-exporter')) as _, \
with_service(cephadm_module, s) as _, \
with_service(cephadm_module, AlertManagerSpec('alertmanager')) as _, \
@ -1672,7 +1684,7 @@ class TestIngressService:
with with_host(cephadm_module, 'test', addr='1.2.3.7'):
cephadm_module.cache.update_host_networks('test', {
'1.2.3.0/24': {
'if0': ['1.2.3.4/32']
'if0': ['1.2.3.4']
}
})
@ -1716,7 +1728,7 @@ class TestIngressService:
'auth_type PASS\n '
'auth_pass 12345\n '
'}\n '
'unicast_src_ip 1.2.3.7\n '
'unicast_src_ip 1.2.3.4\n '
'unicast_peer {\n '
'}\n '
'virtual_ipaddress {\n '
@ -1795,7 +1807,7 @@ class TestIngressService:
with with_host(cephadm_module, 'test'):
cephadm_module.cache.update_host_networks('test', {
'1.2.3.0/24': {
'if0': ['1.2.3.4/32']
'if0': ['1.2.3.1']
}
})
@ -1839,7 +1851,7 @@ class TestIngressService:
'auth_type PASS\n '
'auth_pass 12345\n '
'}\n '
'unicast_src_ip 1::4\n '
'unicast_src_ip 1.2.3.1\n '
'unicast_peer {\n '
'}\n '
'virtual_ipaddress {\n '
@ -1920,7 +1932,7 @@ class TestIngressService:
with with_host(cephadm_module, 'test', addr='1.2.3.7'):
cephadm_module.cache.update_host_networks('test', {
'1.2.3.0/24': {
'if0': ['1.2.3.4/32']
'if0': ['1.2.3.1']
}
})
@ -1965,7 +1977,7 @@ class TestIngressService:
'auth_type PASS\n '
'auth_pass 12345\n '
'}\n '
'unicast_src_ip 1.2.3.7\n '
'unicast_src_ip 1.2.3.1\n '
'unicast_peer {\n '
'}\n '
'virtual_ipaddress {\n '
@ -2037,6 +2049,163 @@ class TestIngressService:
assert haproxy_generated_conf[0] == haproxy_expected_conf
@patch("cephadm.serve.CephadmServe._run_cephadm")
def test_keepalive_config_multi_interface_vips(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
_run_cephadm.side_effect = async_side_effect(('{}', '', 0))
with with_host(cephadm_module, 'test', addr='1.2.3.1'):
with with_host(cephadm_module, 'test2', addr='1.2.3.2'):
cephadm_module.cache.update_host_networks('test', {
'1.2.3.0/24': {
'if0': ['1.2.3.1']
},
'100.100.100.0/24': {
'if1': ['100.100.100.1']
}
})
cephadm_module.cache.update_host_networks('test2', {
'1.2.3.0/24': {
'if0': ['1.2.3.2']
},
'100.100.100.0/24': {
'if1': ['100.100.100.2']
}
})
# Check the ingress with multiple VIPs
s = RGWSpec(service_id="foo", placement=PlacementSpec(count=1),
rgw_frontend_type='beast')
ispec = IngressSpec(service_type='ingress',
service_id='test',
placement=PlacementSpec(hosts=['test', 'test2']),
backend_service='rgw.foo',
frontend_port=8089,
monitor_port=8999,
monitor_user='admin',
monitor_password='12345',
keepalived_password='12345',
virtual_ips_list=["1.2.3.100/24", "100.100.100.100/24"])
with with_service(cephadm_module, s) as _, with_service(cephadm_module, ispec) as _:
keepalived_generated_conf = cephadm_module.cephadm_services['ingress'].keepalived_generate_config(
CephadmDaemonDeploySpec(host='test', daemon_id='ingress', service_name=ispec.service_name()))
keepalived_expected_conf = {
'files':
{
'keepalived.conf':
'# This file is generated by cephadm.\n'
'vrrp_script check_backend {\n '
'script "/usr/bin/curl http://1.2.3.1:8999/health"\n '
'weight -20\n '
'interval 2\n '
'rise 2\n '
'fall 2\n}\n\n'
'vrrp_instance VI_0 {\n '
'state MASTER\n '
'priority 100\n '
'interface if0\n '
'virtual_router_id 50\n '
'advert_int 1\n '
'authentication {\n '
'auth_type PASS\n '
'auth_pass 12345\n '
'}\n '
'unicast_src_ip 1.2.3.1\n '
'unicast_peer {\n '
'1.2.3.2\n '
'}\n '
'virtual_ipaddress {\n '
'1.2.3.100/24 dev if0\n '
'}\n '
'track_script {\n '
'check_backend\n }\n'
'}\n'
'vrrp_instance VI_1 {\n '
'state BACKUP\n '
'priority 90\n '
'interface if1\n '
'virtual_router_id 51\n '
'advert_int 1\n '
'authentication {\n '
'auth_type PASS\n '
'auth_pass 12345\n '
'}\n '
'unicast_src_ip 100.100.100.1\n '
'unicast_peer {\n '
'100.100.100.2\n '
'}\n '
'virtual_ipaddress {\n '
'100.100.100.100/24 dev if1\n '
'}\n '
'track_script {\n '
'check_backend\n }\n'
'}\n'
}
}
# check keepalived config
assert keepalived_generated_conf[0] == keepalived_expected_conf
@patch("cephadm.serve.CephadmServe._run_cephadm")
def test_keepalive_interface_host_filtering(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
# we need to make sure keepalive daemons will have an interface
# on the hosts we deploy them on in order to set up their VIP.
_run_cephadm.side_effect = async_side_effect(('{}', '', 0))
with with_host(cephadm_module, 'test', addr='1.2.3.1'):
with with_host(cephadm_module, 'test2', addr='1.2.3.2'):
with with_host(cephadm_module, 'test3', addr='1.2.3.3'):
with with_host(cephadm_module, 'test4', addr='1.2.3.3'):
# setup "test" and "test4" to have all the necessary interfaces,
# "test2" to have one of them (should still be filtered)
# and "test3" to have none of them
cephadm_module.cache.update_host_networks('test', {
'1.2.3.0/24': {
'if0': ['1.2.3.1']
},
'100.100.100.0/24': {
'if1': ['100.100.100.1']
}
})
cephadm_module.cache.update_host_networks('test2', {
'1.2.3.0/24': {
'if0': ['1.2.3.2']
},
})
cephadm_module.cache.update_host_networks('test4', {
'1.2.3.0/24': {
'if0': ['1.2.3.4']
},
'100.100.100.0/24': {
'if1': ['100.100.100.4']
}
})
s = RGWSpec(service_id="foo", placement=PlacementSpec(count=1),
rgw_frontend_type='beast')
ispec = IngressSpec(service_type='ingress',
service_id='test',
placement=PlacementSpec(hosts=['test', 'test2', 'test3', 'test4']),
backend_service='rgw.foo',
frontend_port=8089,
monitor_port=8999,
monitor_user='admin',
monitor_password='12345',
keepalived_password='12345',
virtual_ips_list=["1.2.3.100/24", "100.100.100.100/24"])
with with_service(cephadm_module, s) as _, with_service(cephadm_module, ispec) as _:
# since we're never actually going to refresh the host here,
# check the tmp daemons to see what was placed during the apply
daemons = cephadm_module.cache._get_tmp_daemons()
keepalive_daemons = [d for d in daemons if d.daemon_type == 'keepalived']
hosts_deployed_on = [d.hostname for d in keepalive_daemons]
assert 'test' in hosts_deployed_on
assert 'test2' not in hosts_deployed_on
assert 'test3' not in hosts_deployed_on
assert 'test4' in hosts_deployed_on
@patch("cephadm.serve.CephadmServe._run_cephadm")
@patch("cephadm.services.nfs.NFSService.fence_old_ranks", MagicMock())
@patch("cephadm.services.nfs.NFSService.run_grace_tool", MagicMock())
@ -2048,7 +2217,7 @@ class TestIngressService:
with with_host(cephadm_module, 'test', addr='1.2.3.7'):
cephadm_module.cache.update_host_networks('test', {
'1.2.3.0/24': {
'if0': ['1.2.3.4/32']
'if0': ['1.2.3.1']
}
})
@ -2095,7 +2264,7 @@ class TestIngressService:
'auth_type PASS\n '
'auth_pass 12345\n '
'}\n '
'unicast_src_ip 1.2.3.7\n '
'unicast_src_ip 1.2.3.1\n '
'unicast_peer {\n '
'}\n '
'virtual_ipaddress {\n '