mgr/orchestrator: Rename PlacementSpec.nodes -> PlacementSpec.hosts

By definition in the orchestrator's docs, we're using `host` insted of `node`

Fixes wrong usage of PlacementSpec in mgr/ansible

Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
This commit is contained in:
Sebastian Wagner 2019-12-13 17:32:00 +01:00
parent 7222ba6f6a
commit 70bae966aa
5 changed files with 68 additions and 58 deletions

View File

@ -487,7 +487,7 @@ class Module(MgrModule, orchestrator.Orchestrator):
# Add the hosts to the inventory in the right group
hosts = spec.hosts
hosts = spec.placement.hosts
if not hosts:
raise orchestrator.OrchestratorError("No hosts provided. "
"At least one destination host is needed to install the RGW "

View File

@ -1127,6 +1127,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
return self._update_mons(spec)
def _update_mons(self, spec):
# type: (orchestrator.StatefulServiceSpec) -> orchestrator.Completion
"""
Adjust the number of cluster monitors.
"""
@ -1138,30 +1139,30 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
if spec.count < num_mons:
raise NotImplementedError("Removing monitors is not supported.")
self.log.debug("Trying to update monitors on: {}".format(spec.placement.nodes))
self.log.debug("Trying to update monitors on: {}".format(spec.placement.hosts))
# check that all the hosts are registered
[self._require_hosts(host.hostname) for host in spec.placement.nodes]
[self._require_hosts(host.hostname) for host in spec.placement.hosts]
# current support requires a network to be specified
for host, network, _ in spec.placement.nodes:
for host, network, _ in spec.placement.hosts:
if not network:
raise RuntimeError("Host '{}' is missing a network spec".format(host))
def update_mons_with_daemons(daemons):
for _, _, name in spec.placement.nodes:
for _, _, name in spec.placement.hosts:
if name and len([d for d in daemons if d.service_instance == name]):
raise RuntimeError('name %s alrady exists', name)
# explicit placement: enough hosts provided?
num_new_mons = spec.count - num_mons
if len(spec.placement.nodes) < num_new_mons:
if len(spec.placement.hosts) < num_new_mons:
raise RuntimeError("Error: {} hosts provided, expected {}".format(
len(spec.placement.nodes), num_new_mons))
len(spec.placement.hosts), num_new_mons))
self.log.info("creating {} monitors on hosts: '{}'".format(
num_new_mons, ",".join(map(lambda h: ":".join(h), spec.placement.nodes))))
num_new_mons, ",".join(map(lambda h: ":".join(h), spec.placement.hosts))))
# TODO: we may want to chain the creation of the monitors so they join
# the quorum one at a time.
return self._create_mon(spec.placement.nodes)
return self._create_mon(spec.placement.hosts)
return self._get_services('mon').then(update_mons_with_daemons)
@async_map_completion
@ -1183,6 +1184,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
return self._create_daemon('mgr', name, host, keyring)
def update_mgrs(self, spec):
# type: (orchestrator.StatefulServiceSpec) -> orchestrator.Completion
"""
Adjust the number of cluster managers.
"""
@ -1190,13 +1192,14 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
return self._get_services('mgr').then(lambda daemons: self._update_mgrs(spec, daemons))
def _update_mgrs(self, spec, daemons):
# type: (orchestrator.StatefulServiceSpec, List[orchestrator.ServiceDescription]) -> orchestrator.Completion
num_mgrs = len(daemons)
if spec.count == num_mgrs:
return orchestrator.Completion(value="The requested number of managers exist.")
self.log.debug("Trying to update managers on: {}".format(spec.placement.nodes))
self.log.debug("Trying to update managers on: {}".format(spec.placement.hosts))
# check that all the hosts are registered
[self._require_hosts(host.hostname) for host in spec.placement.nodes]
[self._require_hosts(host.hostname) for host in spec.placement.hosts]
if spec.count < num_mgrs:
num_to_remove = num_mgrs - spec.count
@ -1230,39 +1233,39 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
else:
# we assume explicit placement by which there are the same number of
# hosts specified as the size of increase in number of daemons.
num_new_mgrs = spec.placement.count - num_mgrs
if len(spec.placement.nodes) < num_new_mgrs:
num_new_mgrs = spec.count - num_mgrs
if len(spec.placement.hosts) < num_new_mgrs:
raise RuntimeError(
"Error: {} hosts provided, expected {}".format(
len(spec.placement.nodes), num_new_mgrs))
len(spec.placement.hosts), num_new_mgrs))
for host_spec in spec.placement.nodes:
for host_spec in spec.placement.hosts:
if host_spec.name and len([d for d in daemons if d.service_instance == host_spec.name]):
raise RuntimeError('name %s alrady exists', host_spec.name)
for host_spec in spec.placement.nodes:
for host_spec in spec.placement.hosts:
if host_spec.name and len([d for d in daemons if d.service_instance == host_spec.name]):
raise RuntimeError('name %s alrady exists', host_spec.name)
self.log.info("creating {} managers on hosts: '{}'".format(
num_new_mgrs, ",".join([_spec.hostname for _spec in spec.placement.nodes])))
num_new_mgrs, ",".join([_spec.hostname for _spec in spec.placement.hosts])))
args = []
for host_spec in spec.placement.nodes:
for host_spec in spec.placement.hosts:
name = host_spec.name or self.get_unique_name(daemons)
host = host_spec.hostname
args.append((host, name))
return self._create_mgr(args)
def add_mds(self, spec):
if not spec.placement.nodes or len(spec.placement.nodes) < spec.placement.count:
if not spec.placement.hosts or len(spec.placement.hosts) < spec.placement.count:
raise RuntimeError("must specify at least %d hosts" % spec.placement.count)
return self._get_services('mds').then(lambda ds: self._add_mds(ds, spec))
def _add_mds(self, daemons, spec):
args = []
num_added = 0
for host, _, name in spec.placement.nodes:
for host, _, name in spec.placement.hosts:
if num_added >= spec.count:
break
mds_id = self.get_unique_name(daemons, spec.name, name)
@ -1309,7 +1312,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
return self._get_services('mds').then(_remove_mds)
def add_rgw(self, spec):
if not spec.placement.nodes or len(spec.placement.nodes) < spec.count:
if not spec.placement.hosts or len(spec.placement.hosts) < spec.count:
raise RuntimeError("must specify at least %d hosts" % spec.count)
# ensure rgw_realm and rgw_zone is set for these daemons
ret, out, err = self.mon_command({
@ -1328,7 +1331,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
def _add_rgw(daemons):
args = []
num_added = 0
for host, _, name in spec.placement.nodes:
for host, _, name in spec.placement.hosts:
if num_added >= spec.count:
break
rgw_id = self.get_unique_name(daemons, spec.name, name)
@ -1375,14 +1378,14 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
return self._update_service('rgw', self.add_rgw, spec)
def add_rbd_mirror(self, spec):
if not spec.placement.nodes or len(spec.placement.nodes) < spec.count:
if not spec.placement.hosts or len(spec.placement.hosts) < spec.count:
raise RuntimeError("must specify at least %d hosts" % spec.count)
self.log.debug('nodes %s' % spec.placement.nodes)
self.log.debug('nodes %s' % spec.placement.hosts)
def _add_rbd_mirror(daemons):
args = []
num_added = 0
for host, _, name in spec.placement.nodes:
for host, _, name in spec.placement.hosts:
if num_added >= spec.count:
break
daemon_id = self.get_unique_name(daemons, None, name)
@ -1530,7 +1533,7 @@ class NodeAssignment(object):
service_type=None, # type: Optional[str]
):
assert spec and get_hosts_func and service_type
self.spec = spec
self.spec = spec # type: orchestrator.StatefulServiceSpec
self.scheduler = scheduler if scheduler else SimpleScheduler(self.spec.placement)
self.get_hosts_func = get_hosts_func
self.service_type = service_type
@ -1556,7 +1559,7 @@ class NodeAssignment(object):
logger.info("Found labels. Assinging nodes that match the label")
candidates = [HostSpec(x[0], '', '') for x in self.get_hosts_func()] # TODO: query for labels
logger.info('Assigning nodes to spec: {}'.format(candidates))
self.spec.placement.set_nodes(candidates)
self.spec.placement.set_hosts(candidates)
def assign_nodes(self):
# type: () -> None
@ -1565,7 +1568,7 @@ class NodeAssignment(object):
"""
# If no imperative or declarative host assignments, use the scheduler to pick from the
# host pool (assuming `count` is set)
if not self.spec.placement.label and not self.spec.placement.nodes and self.spec.placement.count:
if not self.spec.placement.label and not self.spec.placement.hosts and self.spec.placement.count:
logger.info("Found num spec. Looking for labeled nodes.")
# TODO: actually query for labels (self.service_type)
candidates = self.scheduler.place([x[0] for x in self.get_hosts_func()],
@ -1577,7 +1580,7 @@ class NodeAssignment(object):
format(self.spec.placement.count))
else:
logger.info('Assigning nodes to spec: {}'.format(candidates))
self.spec.placement.set_nodes(candidates)
self.spec.placement.set_hosts(candidates)
return None
candidates = self.scheduler.place([x[0] for x in self.get_hosts_func()], count=self.spec.placement.count)
@ -1587,6 +1590,6 @@ class NodeAssignment(object):
format(self.spec.placement.count, len(candidates)))
logger.info('Assigning nodes to spec: {}'.format(candidates))
self.spec.placement.set_nodes(candidates)
self.spec.placement.set_hosts(candidates)
return None

View File

@ -85,7 +85,7 @@ class TestCephadm(object):
@mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
def test_mon_update(self, _send_command, _get_connection, cephadm_module):
with self._with_host(cephadm_module, 'test'):
ps = PlacementSpec(nodes=['test:0.0.0.0=a'], count=1)
ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
c = cephadm_module.update_mons(StatefulServiceSpec(placement=ps))
assert self._wait(cephadm_module, c) == ["(Re)deployed mon.a on host 'test'"]
@ -95,7 +95,7 @@ class TestCephadm(object):
@mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
def test_mgr_update(self, _send_command, _get_connection, cephadm_module):
with self._with_host(cephadm_module, 'test'):
ps = PlacementSpec(nodes=['test:0.0.0.0=a'], count=1)
ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
c = cephadm_module.update_mgrs(StatefulServiceSpec(placement=ps))
[out] = self._wait(cephadm_module, c)
assert "(Re)deployed mgr." in out
@ -117,7 +117,7 @@ class TestCephadm(object):
@mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
def test_mds(self, _send_command, _get_connection, cephadm_module):
with self._with_host(cephadm_module, 'test'):
ps = PlacementSpec(nodes=['test'], count=1)
ps = PlacementSpec(hosts=['test'], count=1)
c = cephadm_module.add_mds(StatelessServiceSpec('name', placement=ps))
[out] = self._wait(cephadm_module, c)
assert "(Re)deployed mds.name." in out
@ -129,7 +129,7 @@ class TestCephadm(object):
@mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
def test_rgw(self, _send_command, _get_connection, cephadm_module):
with self._with_host(cephadm_module, 'test'):
ps = PlacementSpec(nodes=['test'], count=1)
ps = PlacementSpec(hosts=['test'], count=1)
c = cephadm_module.add_rgw(RGWSpec('realm', 'zone', placement=ps))
[out] = self._wait(cephadm_module, c)
assert "(Re)deployed rgw.realm.zone." in out
@ -160,7 +160,7 @@ class TestCephadm(object):
@mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
def test_rbd_mirror(self, _send_command, _get_connection, cephadm_module):
with self._with_host(cephadm_module, 'test'):
ps = PlacementSpec(nodes=['test'], count=1)
ps = PlacementSpec(hosts=['test'], count=1)
c = cephadm_module.add_rbd_mirror(StatelessServiceSpec(name='name', placement=ps))
[out] = self._wait(cephadm_module, c)
assert "(Re)deployed rbd-mirror." in out

View File

@ -1050,22 +1050,23 @@ class PlacementSpec(object):
"""
For APIs that need to specify a node subset
"""
def __init__(self, label=None, nodes=None, count=None):
def __init__(self, label=None, hosts=None, count=None):
# type: (Optional[str], Optional[List], Optional[int]) -> None
self.label = label
if nodes:
if all([isinstance(node, HostSpec) for node in nodes]):
self.nodes = nodes
if hosts:
if all([isinstance(host, HostSpec) for host in hosts]):
self.hosts = hosts # type: List[HostSpec]
else:
self.nodes = [parse_host_specs(x, require_network=False) for x in nodes if x]
self.hosts = [parse_host_specs(x, require_network=False) for x in hosts if x]
else:
self.nodes = []
self.hosts = []
self.count = count # type: Optional[int]
def set_nodes(self, nodes):
# To backpopulate the .nodes attribute when using labels or count
def set_hosts(self, hosts):
# To backpopulate the .hosts attribute when using labels or count
# in the orchestrator backend.
self.nodes = nodes
self.hosts = hosts
@classmethod
def from_dict(cls, data):
@ -1074,7 +1075,7 @@ class PlacementSpec(object):
return _cls
def validate(self):
if self.nodes and self.label:
if self.hosts and self.label:
# TODO: a less generic Exception
raise Exception('Node and label are mutually exclusive')
if self.count is not None and self.count <= 0:
@ -1193,9 +1194,15 @@ class StatefulServiceSpec(object):
"""
# TODO: create base class for Stateless/Stateful service specs and propertly inherit
def __init__(self, name=None, placement=None):
self.placement = PlacementSpec() if placement is None else placement
# type: (Optional[str], Optional[PlacementSpec]) -> None
self.placement = PlacementSpec() if placement is None else placement # type: PlacementSpec
self.name = name
self.count = self.placement.count if self.placement is not None else 1 # for backwards-compatibility
# for backwards-compatibility
if self.placement is not None and self.placement.count is not None:
self.count = self.placement.count
else:
self.count = 1
class StatelessServiceSpec(object):
@ -1211,12 +1218,12 @@ class StatelessServiceSpec(object):
# start the services.
def __init__(self, name, placement=None):
self.placement = PlacementSpec() if placement is None else placement
self.placement = PlacementSpec() if placement is None else placement # type: PlacementSpec
#: Give this set of statelss services a name: typically it would
#: be the name of a CephFS filesystem, RGW zone, etc. Must be unique
#: within one ceph cluster.
self.name = name
self.name = name # type: str
#: Count of service instances
self.count = self.placement.count if self.placement is not None else 1 # for backwards-compatibility
@ -1273,7 +1280,7 @@ class RGWSpec(StatelessServiceSpec):
#: List of hosts where RGWs should run. Not for Rook.
if hosts:
self.placement.hosts = hosts
self.placement = PlacementSpec(hosts=hosts)
#: is multisite
self.rgw_multisite = rgw_multisite

View File

@ -384,7 +384,7 @@ Usage:
def _rbd_mirror_add(self, num=None, hosts=None):
spec = orchestrator.StatelessServiceSpec(
None,
placement=orchestrator.PlacementSpec(nodes=hosts, count=num))
placement=orchestrator.PlacementSpec(hosts=hosts, count=num))
completion = self.add_rbd_mirror(spec)
self._orchestrator_wait([completion])
orchestrator.raise_if_exception(completion)
@ -399,7 +399,7 @@ Usage:
def _rbd_mirror_update(self, num, label=None, hosts=[]):
spec = orchestrator.StatelessServiceSpec(
None,
placement=orchestrator.PlacementSpec(nodes=hosts, count=num, label=label))
placement=orchestrator.PlacementSpec(hosts=hosts, count=num, label=label))
completion = self.update_rbd_mirror(spec)
self._orchestrator_wait([completion])
orchestrator.raise_if_exception(completion)
@ -424,7 +424,7 @@ Usage:
def _mds_add(self, fs_name, num=None, hosts=None):
spec = orchestrator.StatelessServiceSpec(
fs_name,
placement=orchestrator.PlacementSpec(nodes=hosts, count=num))
placement=orchestrator.PlacementSpec(hosts=hosts, count=num))
completion = self.add_mds(spec)
self._orchestrator_wait([completion])
orchestrator.raise_if_exception(completion)
@ -438,7 +438,7 @@ Usage:
"name=hosts,type=CephString,n=N,req=false "
'Update the number of MDS instances for the given fs_name')
def _mds_update(self, fs_name, num=None, label=None, hosts=[]):
placement = orchestrator.PlacementSpec(label=label, count=num, nodes=hosts)
placement = orchestrator.PlacementSpec(label=label, count=num, hosts=hosts)
placement.validate()
spec = orchestrator.StatelessServiceSpec(
@ -483,7 +483,7 @@ Usage:
rgw_spec = orchestrator.RGWSpec(
rgw_realm=realm_name,
rgw_zone=zone_name,
placement=orchestrator.PlacementSpec(nodes=hosts, count=num))
placement=orchestrator.PlacementSpec(hosts=hosts, count=num))
completion = self.add_rgw(rgw_spec)
self._orchestrator_wait([completion])
@ -502,7 +502,7 @@ Usage:
spec = orchestrator.RGWSpec(
rgw_realm=realm_name,
rgw_zone=zone_name,
placement=orchestrator.PlacementSpec(nodes=hosts, label=label, count=num))
placement=orchestrator.PlacementSpec(hosts=hosts, label=label, count=num))
completion = self.update_rgw(spec)
self._orchestrator_wait([completion])
orchestrator.raise_if_exception(completion)
@ -587,7 +587,7 @@ Usage:
'Update the number of manager instances')
def _update_mgrs(self, num=None, hosts=[], label=None):
placement = orchestrator.PlacementSpec(label=label, count=num, nodes=hosts)
placement = orchestrator.PlacementSpec(label=label, count=num, hosts=hosts)
placement.validate()
spec = orchestrator.StatefulServiceSpec(placement=placement)
@ -605,7 +605,7 @@ Usage:
'Update the number of monitor instances')
def _update_mons(self, num=None, hosts=[], label=None):
placement = orchestrator.PlacementSpec(label=label, count=num, nodes=hosts)
placement = orchestrator.PlacementSpec(label=label, count=num, hosts=hosts)
if not hosts and not label:
# Improve Error message. Point to parse_host_spec examples
raise orchestrator.OrchestratorValidationError("Mons need a host spec. (host, network, name(opt))")