Merge pull request #61783 from adk3798/cephadm-nvmeof-one-daemon-per-node

mgr/cephadm: block deploying nvmeof daemons of different groups on same host

Reviewed-by: John Mulligan <jmulligan@redhat.com>
This commit is contained in:
Adam King 2025-03-10 15:22:08 -04:00 committed by GitHub
commit b3b6577e24
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 227 additions and 6 deletions

View File

@ -153,6 +153,7 @@ class HostAssignment(object):
primary_daemon_type: Optional[str] = None,
per_host_daemon_type: Optional[str] = None,
rank_map: Optional[Dict[int, Dict[int, Optional[str]]]] = None,
blocking_daemon_hosts: Optional[List[orchestrator.HostSpec]] = None,
):
assert spec
self.spec = spec # type: ServiceSpec
@ -160,6 +161,7 @@ class HostAssignment(object):
self.hosts: List[orchestrator.HostSpec] = hosts
self.unreachable_hosts: List[orchestrator.HostSpec] = unreachable_hosts
self.draining_hosts: List[orchestrator.HostSpec] = draining_hosts
self.blocking_daemon_hosts: List[orchestrator.HostSpec] = blocking_daemon_hosts or []
self.filter_new_host = filter_new_host
self.service_name = spec.service_name()
self.daemons = daemons
@ -333,10 +335,28 @@ class HostAssignment(object):
existing = existing_active + existing_standby
# build to_add
blocking_daemon_hostnames = [
h.hostname for h in self.blocking_daemon_hosts
]
unreachable_hostnames = [
h.hostname for h in self.unreachable_hosts
]
if not count:
to_add = [dd for dd in others if dd.hostname not in [
h.hostname for h in self.unreachable_hosts]]
to_add = [
dd for dd in others if (
dd.hostname not in blocking_daemon_hostnames
and dd.hostname not in unreachable_hostnames
)
]
else:
if blocking_daemon_hostnames:
to_remove.extend([
dd for dd in existing if dd.hostname in blocking_daemon_hostnames
])
existing = [
dd for dd in existing if dd.hostname not in blocking_daemon_hostnames
]
# The number of new slots that need to be selected in order to fulfill count
need = count - len(existing)
@ -356,7 +376,7 @@ class HostAssignment(object):
for dp in matching_dps:
if need <= 0:
break
if dp.hostname in related_service_hosts and dp.hostname not in [h.hostname for h in self.unreachable_hosts]:
if dp.hostname in related_service_hosts and dp.hostname not in unreachable_hostnames:
logger.debug(f'Preferring {dp.hostname} for service {self.service_name} as related daemons have been placed there')
to_add.append(dp)
need -= 1 # this is last use of need so it can work as a counter
@ -370,7 +390,10 @@ class HostAssignment(object):
for dp in others:
if need <= 0:
break
if dp.hostname not in [h.hostname for h in self.unreachable_hosts]:
if (
dp.hostname not in unreachable_hostnames
and dp.hostname not in blocking_daemon_hostnames
):
to_add.append(dp)
need -= 1 # this is last use of need in this function so it can work as a counter

View File

@ -755,6 +755,8 @@ class CephadmServe:
svc = service_registry.get_service(service_type)
daemons = self.mgr.cache.get_daemons_by_service(service_name)
blocking_daemon_hosts = svc.get_blocking_daemon_hosts(service_name)
related_service_daemons = self.mgr.cache.get_related_service_daemons(spec)
public_networks: List[str] = []
@ -824,6 +826,7 @@ class CephadmServe:
) == 'agent' else self.mgr.cache.get_schedulable_hosts(),
unreachable_hosts=self.mgr.cache.get_unreachable_hosts(),
draining_hosts=self.mgr.cache.get_draining_hosts(),
blocking_daemon_hosts=blocking_daemon_hosts,
daemons=daemons,
related_service_daemons=related_service_daemons,
networks=self.mgr.cache.networks,

View File

@ -23,7 +23,12 @@ from ceph.deployment.service_spec import (
)
from ceph.deployment.utils import is_ipv6, unwrap_ipv6
from mgr_util import build_url, merge_dicts
from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus
from orchestrator import (
OrchestratorError,
DaemonDescription,
DaemonDescriptionStatus,
HostSpec
)
from orchestrator._interface import daemon_type_to_service
from cephadm import utils
from .service_registry import register_cephadm_service
@ -581,6 +586,9 @@ class CephadmService(metaclass=ABCMeta):
"""
return False
def get_blocking_daemon_hosts(self, service_name: str) -> List[HostSpec]:
return []
class CephService(CephadmService):

View File

@ -7,7 +7,12 @@ from ipaddress import ip_address, IPv6Address
from mgr_module import HandleCommandResult
from ceph.deployment.service_spec import NvmeofServiceSpec
from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus
from orchestrator import (
OrchestratorError,
DaemonDescription,
DaemonDescriptionStatus,
HostSpec,
)
from .cephadmservice import CephadmDaemonDeploySpec, CephService
from .service_registry import register_cephadm_service
from .. import utils
@ -234,3 +239,22 @@ class NvmeofService(CephService):
_, _, err = self.mgr.mon_command(cmd)
if err:
self.mgr.log.error(f"Unable to send monitor command {cmd}, error {err}")
def get_blocking_daemon_hosts(self, service_name: str) -> List[HostSpec]:
# we should not deploy nvmeof daemons on hosts that have nvmeof daemons
# from services with a different "group" attribute (as recommended by
# the nvmeof team)
spec = cast(NvmeofServiceSpec, self.mgr.spec_store[service_name].spec)
nvmeof_group = cast(NvmeofServiceSpec, spec).group
blocking_daemons: List[DaemonDescription] = []
other_group_nvmeof_services = [
nspec for nspec in self.mgr.spec_store.get_specs_by_type('nvmeof').values()
if cast(NvmeofServiceSpec, nspec).group != nvmeof_group
]
for other_group_nvmeof_service in other_group_nvmeof_services:
blocking_daemons += self.mgr.cache.get_daemons_by_service(other_group_nvmeof_service.service_name())
blocking_daemon_hosts = [
HostSpec(hostname=blocking_daemon.hostname)
for blocking_daemon in blocking_daemons if blocking_daemon.hostname is not None
]
return blocking_daemon_hosts

View File

@ -15,8 +15,10 @@ from cephadm.inventory import (
PrivKey,
CERT_STORE_CERT_PREFIX,
CERT_STORE_KEY_PREFIX,
SpecDescription,
)
from cephadm.services.osd import OSD, OSDRemovalQueue, OsdIdClaims
from cephadm.services.nvmeof import NvmeofService
from cephadm.utils import SpecialHostLabels
try:
@ -3052,3 +3054,60 @@ Traceback (most recent call last):
assert osd.cpu_percentage == '6.54%'
assert osd.memory_usage == 73410805
assert osd.created == str_to_datetime('2023-09-22T22:41:03.615080Z')
@mock.patch("cephadm.inventory.HostCache.get_daemons_by_service")
@mock.patch("cephadm.inventory.SpecStore.get_specs_by_type")
@mock.patch("cephadm.inventory.SpecStore.__getitem__")
def test_nvmeof_build_blocking_daemon_hosts(
self,
_spec_store_get_item,
_get_specs_by_type,
_get_daemons_by_service,
cephadm_module: CephadmOrchestrator
):
# for nvmeof, the blocking daemon host list should be all hosts with an nvmeof
# daemon that belongs to a service with a different "group" parameter
nvmeof_services = [
ServiceSpec(service_type='nvmeof', pool='foo', group='foo', service_id='foo.foo'),
ServiceSpec(service_type='nvmeof', pool='bar', group='bar', service_id='bar.bar')
]
nvmeof_foo_daemons = [
DaemonDescription(daemon_type='nvmeof', daemon_id='foo.foo.host1', hostname='host1'),
DaemonDescription(daemon_type='nvmeof', daemon_id='foo.foo.host2', hostname='host2')
]
nvmeof_bar_daemons = [
DaemonDescription(daemon_type='nvmeof', daemon_id='bar.bar.host3', hostname='host3')
]
def _get_nvmeof_specs(sname) -> SpecDescription:
if sname == 'nvmeof.foo.foo':
return SpecDescription(
nvmeof_services[0], {}, None, None
)
elif sname == 'nvmeof.bar.bar':
return SpecDescription(
nvmeof_services[1], {}, None, None
)
def _get_nvmeof_daemons(sname) -> List[DaemonDescription]:
if sname == 'nvmeof.foo.foo':
return nvmeof_foo_daemons
elif sname == 'nvmeof.bar.bar':
return nvmeof_bar_daemons
_get_specs_by_type.return_value = {
'nvmeof.foo.foo': nvmeof_services[0],
'nvmeof.bar.bar': nvmeof_services[1],
}
_spec_store_get_item.side_effect = _get_nvmeof_specs
_get_daemons_by_service.side_effect = _get_nvmeof_daemons
# first test for nvmeof.foo.foo, which should get blocking host based
# on nvmeof.bar.bar's daemons
nvmeof_foo_blocking_hosts = NvmeofService(cephadm_module).get_blocking_daemon_hosts('nvmeof.foo.foo')
assert set([h.hostname for h in nvmeof_foo_blocking_hosts]) == set(['host3'])
# now test for nvmeof.bar.bar, which should get blocking host based
# on nvmeof.foo.foo's daemons
nvmeof_bar_blocking_hosts = NvmeofService(cephadm_module).get_blocking_daemon_hosts('nvmeof.bar.bar')
assert set([h.hostname for h in nvmeof_bar_blocking_hosts]) == set(['host1', 'host2'])

View File

@ -1753,3 +1753,107 @@ def test_placement_regex_host_pattern(service_type, placement, hosts, expected_a
daemons=[],
).place()
assert sorted([h.hostname for h in to_add]) == expected_add
class BlockingDaemonHostsTest(NamedTuple):
service_type: str
placement: PlacementSpec
hosts: List[str]
unreachables_hosts: List[str]
blocking_daemon_hosts: List[str]
daemons: List[DaemonDescription]
expected_add: List[List[str]]
expected_remove: List[List[str]]
@pytest.mark.parametrize("service_type,placement,hosts,unreachable_hosts,blocking_daemon_hosts,daemons,expected_add,expected_remove",
[
BlockingDaemonHostsTest(
'crash',
PlacementSpec(count=3),
'host1 host2 host3'.split(),
[],
['host1'],
[],
[['host2', 'host3']],
[[]],
),
BlockingDaemonHostsTest(
'crash',
PlacementSpec(hosts=['host2', 'host3']),
'host1 host2 host3'.split(),
[],
['host2'],
[DaemonDescription('crash', 'host1', 'host1')],
[['host3']],
[['crash.host1']],
),
BlockingDaemonHostsTest(
'crash',
PlacementSpec(hosts=['host1', 'host2', 'host3', 'host4']),
'host1 host2 host3 host4'.split(),
['host1'],
['host2'],
[DaemonDescription('crash', 'host3', 'host3')],
[['host4']],
[[]],
),
BlockingDaemonHostsTest(
'crash',
PlacementSpec(count=4),
'host1 host2 host3 host4'.split(),
['host4'],
['host2'],
[DaemonDescription('crash', 'host3', 'host3')],
[['host1']],
[[]],
),
BlockingDaemonHostsTest(
'crash',
PlacementSpec(hosts=['host1', 'host2', 'host3', 'host4']),
'host1 host2 host3 host4'.split(),
['host1'],
['host4'],
[DaemonDescription('crash', 'host2', 'host2')],
[['host3']],
[[]],
),
BlockingDaemonHostsTest(
'crash',
PlacementSpec(count=2),
'host1 host2 host3'.split(),
[],
['host2'],
[
DaemonDescription('crash', 'host2', 'host2'),
DaemonDescription('crash', 'host3', 'host3')
],
[['host1']],
[['crash.host2']],
),
])
def test_blocking_daemon_host(
service_type,
placement,
hosts,
unreachable_hosts,
blocking_daemon_hosts,
daemons,
expected_add,
expected_remove
):
spec = ServiceSpec(service_type=service_type,
service_id=None,
placement=placement)
hosts, to_add, to_remove = HostAssignment(
spec=spec,
hosts=[HostSpec(h) for h in hosts],
unreachable_hosts=[HostSpec(h) for h in unreachable_hosts],
draining_hosts=[],
blocking_daemon_hosts=[HostSpec(h) for h in blocking_daemon_hosts],
daemons=daemons,
).place()
assert sorted([h.hostname for h in to_add]) in expected_add
assert sorted([h.name() for h in to_remove]) in expected_remove