mgr/cephadm/schedule: add per_host_daemon_type support

This will be used to schedule a per-host keepalived alongside other
services.

Implement this as a final stage for place() that puts one per host and
also takes existing/stray daemons into consideration.

Signed-off-by: Sage Weil <sage@newdream.net>
This commit is contained in:
Sage Weil 2021-04-09 14:43:45 -04:00
parent 0894773e95
commit db9f1930fe
2 changed files with 70 additions and 9 deletions

View File

@ -68,7 +68,7 @@ class HostAssignment(object):
filter_new_host=None, # type: Optional[Callable[[str],bool]]
allow_colo: bool = False,
primary_daemon_type: Optional[str] = None,
per_host_sidecar: Optional[str] = None,
per_host_daemon_type: Optional[str] = None,
):
assert spec
self.spec = spec # type: ServiceSpec
@ -79,6 +79,7 @@ class HostAssignment(object):
self.daemons = daemons
self.networks = networks
self.allow_colo = allow_colo
self.per_host_daemon_type = per_host_daemon_type
self.ports_start = spec.get_port_start()
def hosts_by_label(self, label: str) -> List[orchestrator.HostSpec]:
@ -123,6 +124,35 @@ class HostAssignment(object):
f'Cannot place {self.spec.one_line_str()}: No matching '
f'hosts for label {self.spec.placement.label}')
def place_per_host_daemons(
self,
slots: List[DaemonPlacement],
to_add: List[DaemonPlacement],
to_remove: List[orchestrator.DaemonDescription],
) -> Tuple[List[DaemonPlacement], List[DaemonPlacement], List[orchestrator.DaemonDescription]]:
if self.per_host_daemon_type:
host_slots = [
DaemonPlacement(daemon_type=self.per_host_daemon_type,
hostname=hostname)
for hostname in set([s.hostname for s in slots])
]
existing = [
d for d in self.daemons if d.daemon_type == self.per_host_daemon_type
]
slots += host_slots
for dd in existings:
found = False
for p in host_slots:
if p.matches_daemon(dd):
host_slots.remove(p)
found = True
break
if not found:
to_remove.append(dd)
to_add += host_slots
return slots, to_add, to_remove
def place(self):
# type: () -> Tuple[List[DaemonPlacement], List[DaemonPlacement], List[orchestrator.DaemonDescription]]
"""
@ -158,11 +188,11 @@ class HostAssignment(object):
per_host = 1 + ((count - 1) // len(candidates))
candidates = expand_candidates(candidates, per_host)
# consider active daemons first
# consider active (primary) daemons first
daemons = [
d for d in self.daemons if d.is_active
d for d in self.daemons if d.is_active and d.daemon_type == self.primary_daemon_type
] + [
d for d in self.daemons if not d.is_active
d for d in self.daemons if not d.is_active and d.daemon_type == self.primary_daemon_type
]
# sort candidates into existing/used slots that already have a
@ -192,7 +222,7 @@ class HostAssignment(object):
# If we don't have <count> the list of candidates is definitive.
if count is None:
logger.debug('Provided hosts: %s' % candidates)
return candidates, others, to_remove
return self.place_per_host_daemons(candidates, others, to_remove)
# The number of new slots that need to be selected in order to fulfill count
need = count - len(existing)
@ -201,13 +231,13 @@ class HostAssignment(object):
if need <= 0:
to_remove.extend(existing[count:])
del existing_slots[count:]
return existing_slots, [], to_remove
return self.place_per_host_daemons(existing_slots, [], to_remove)
# ask the scheduler to select additional slots
to_add = others[:need]
logger.debug('Combine hosts with existing daemons %s + new hosts %s' % (
existing, to_add))
return existing_slots + to_add, to_add, to_remove
return self.place_per_host_daemons(existing_slots + to_add, to_add, to_remove)
def find_ip_on_host(self, hostname: str, subnets: List[str]) -> Optional[str]:
for subnet in subnets:

View File

@ -827,10 +827,40 @@ class NodeAssignmentTest4(NamedTuple):
'host3': {'192.168.0.0/16': {'eth2': ['192.168.0.1']}},
},
[],
['haproxy:host1(10.0.0.1:443,8888)', 'haproxy:host2(10.0.0.2:443,8888)'],
['haproxy:host1(10.0.0.1:443,8888)', 'haproxy:host2(10.0.0.2:443,8888)'],
['haproxy:host1(10.0.0.1:443,8888)', 'haproxy:host2(10.0.0.2:443,8888)',
'keepalived:host1', 'keepalived:host2'],
['haproxy:host1(10.0.0.1:443,8888)', 'haproxy:host2(10.0.0.2:443,8888)',
'keepalived:host1', 'keepalived:host2'],
[]
),
NodeAssignmentTest4(
IngressSpec(
service_type='ingress',
service_id='rgw.foo',
frontend_port=443,
monitor_port=8888,
virtual_ip='10.0.0.20/8',
backend_service='rgw.foo',
placement=PlacementSpec(label='foo'),
networks=['10.0.0.0/8'],
),
{
'host1': {'10.0.0.0/8': {'eth0': ['10.0.0.1']}},
'host2': {'10.0.0.0/8': {'eth1': ['10.0.0.2']}},
'host3': {'192.168.0.0/16': {'eth2': ['192.168.0.1']}},
},
[
DaemonDescription('haproxy', 'a', 'host1', ip='10.0.0.1',
ports=[443, 8888]),
DaemonDescription('keepalived', 'b', 'host2'),
DaemonDescription('keepalived', 'c', 'host3'),
],
['haproxy:host1(10.0.0.1:443,8888)', 'haproxy:host2(10.0.0.2:443,8888)',
'keepalived:host1', 'keepalived:host2'],
['haproxy:host2(10.0.0.2:443,8888)',
'keepalived:host1'],
['keepalived.c']
),
])
def test_node_assignment4(spec, networks, daemons,
expected, expected_add, expected_remove):
@ -841,6 +871,7 @@ def test_node_assignment4(spec, networks, daemons,
allow_colo=True,
networks=networks,
primary_daemon_type='haproxy' if spec.service_type == 'ingress' else spec.service_type,
per_host_daemon_type='keepalived' if spec.service_type == 'ingress' else None,
).place()
got = [str(p) for p in all_slots]