Merge pull request #38560 from pcuzner/cephadm-maintenance-mode

cephadm: add host-maintenance subcommand

Reviewed-by: Juan Miguel Olmo Martínez <jolmomar@redhat.com>
This commit is contained in:
Sebastian Wagner 2021-01-07 11:02:15 +01:00 committed by GitHub
commit 9f7c9bf411
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 336 additions and 13 deletions

View File

@ -6306,6 +6306,74 @@ def command_exporter():
exporter.run()
##################################
def systemd_target_state(target_name: str, subsystem: str = 'ceph') -> bool:
# TODO: UNITTEST
return os.path.exists(
os.path.join(
UNIT_DIR,
f"{subsystem}.target.wants",
target_name
)
)
@infer_fsid
def command_maintenance():
if not args.fsid:
raise Error('must pass --fsid to specify cluster')
target = f"ceph-{args.fsid}.target"
if args.maintenance_action.lower() == 'enter':
logger.info("Requested to place host into maintenance")
if systemd_target_state(target):
_out, _err, code = call(
['systemctl', 'disable', target],
verbose_on_failure=False
)
if code:
logger.error(f"Failed to disable the {target} target")
return "failed - to disable the target"
else:
# stopping a target waits by default
_out, _err, code = call(
['systemctl', 'stop', target],
verbose_on_failure=False
)
if code:
logger.error(f"Failed to stop the {target} target")
return "failed - to disable the target"
else:
return f"success - systemd target {target} disabled"
else:
return "skipped - target already disabled"
else:
logger.info("Requested to exit maintenance state")
# exit maintenance request
if not systemd_target_state(target):
_out, _err, code = call(
['systemctl', 'enable', target],
verbose_on_failure=False
)
if code:
logger.error(f"Failed to enable the {target} target")
return "failed - unable to enable the target"
else:
# starting a target waits by default
_out, _err, code = call(
['systemctl', 'start', target],
verbose_on_failure=False
)
if code:
logger.error(f"Failed to start the {target} target")
return "failed - unable to start the target"
else:
return f"success - systemd target {target} enabled and started"
##################################
@ -6869,6 +6937,18 @@ def _get_parser():
help='daemon identifer for the exporter')
parser_exporter.set_defaults(func=command_exporter)
parser_maintenance = subparsers.add_parser(
'host-maintenance', help='Manage the maintenance state of a host')
parser_maintenance.add_argument(
'--fsid',
help='cluster FSID')
parser_maintenance.add_argument(
"maintenance_action",
type=str,
choices=['enter', 'exit'],
help="Maintenance action - enter maintenance, or exit maintenance")
parser_maintenance.set_defaults(func=command_maintenance)
return parser

View File

@ -641,3 +641,30 @@ iMN28C2bKGao5UHvdER1rGy7
req=Request("http://localhost:9443/v1/metadata/health",headers=hdrs, method="PATCH")
with pytest.raises(HTTPError, match=r"HTTP Error 501: .*") as e:
urlopen(req)
class TestMaintenance:
systemd_target = "ceph.00000000-0000-0000-0000-000000c0ffee.target"
def test_systemd_target_OK(self, tmp_path):
base = tmp_path
wants = base / "ceph.target.wants"
wants.mkdir()
target = wants / TestMaintenance.systemd_target
target.touch()
cd.UNIT_DIR = str(base)
assert cd.systemd_target_state(target.name)
def test_systemd_target_NOTOK(self, tmp_path):
base = tmp_path
cd.UNIT_DIR = str(base)
assert not cd.systemd_target_state(TestMaintenance.systemd_target)
def test_parser_OK(self):
args = cd._parse_args(['host-maintenance', 'enter'])
assert args.maintenance_action == 'enter'
def test_parser_BAD(self):
with pytest.raises(SystemExit):
cd._parse_args(['host-maintenance', 'wah'])

View File

@ -101,6 +101,10 @@ class Inventory:
def all_specs(self) -> List[HostSpec]:
return list(map(self.spec_from_dict, self._inventory.values()))
def get_host_with_state(self, state: str = "") -> List[str]:
"""return a list of host names in a specific state"""
return [h for h in self._inventory if self._inventory[h].get("status", "").lower() == state]
def save(self) -> None:
self.mgr.set_store('inventory', json.dumps(self._inventory))
@ -449,6 +453,13 @@ class HostCache():
result.append(d)
return result
def get_daemon_types(self, hostname: str) -> List[str]:
"""Provide a list of the types of daemons on the host"""
result = set()
for _d, dm in self.daemons[hostname].items():
result.add(dm.daemon_type)
return list(result)
def get_daemon_names(self):
# type: () -> List[str]
r = []

View File

@ -124,6 +124,24 @@ def service_inactive(spec_name: str) -> Callable:
return inner
def host_exists(hostname_position: int = 1) -> Callable:
"""Check that a hostname exists in the inventory"""
def inner(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
this = args[0] # self object
hostname = args[hostname_position]
if hostname not in this.cache.get_hosts():
candidates = ','.join([h for h in this.cache.get_hosts() if h.startswith(hostname)])
help_msg = f"Did you mean {candidates}?" if candidates else ""
raise OrchestratorError(
f"Cannot find host '{hostname}' in the inventory. {help_msg}")
return func(*args, **kwargs)
return wrapper
return inner
class ContainerInspectInfo(NamedTuple):
image_id: str
ceph_version: Optional[str]
@ -1284,7 +1302,7 @@ To check that the host is reachable:
def _hosts_with_daemon_inventory(self) -> List[HostSpec]:
"""
Returns all hosts that went through _refresh_host_daemons().
Returns all usable hosts that went through _refresh_host_daemons().
This mitigates a potential race, where new host was added *after*
``_refresh_host_daemons()`` was called, but *before*
@ -1293,7 +1311,8 @@ To check that the host is reachable:
"""
return [
h for h in self.inventory.all_specs()
if self.cache.host_had_daemon_refresh(h.hostname)
if self.cache.host_had_daemon_refresh(h.hostname) and
h.status.lower() not in ['maintenance', 'offline']
]
def _add_host(self, spec):
@ -1369,11 +1388,8 @@ To check that the host is reachable:
self.log.info('Removed label %s to host %s' % (label, host))
return 'Removed label %s from host %s' % (label, host)
@trivial_completion
def host_ok_to_stop(self, hostname: str) -> str:
if hostname not in self.cache.get_hosts():
raise OrchestratorError(f'Cannot find host "{hostname}"')
def _host_ok_to_stop(self, hostname: str) -> Tuple[int, str]:
self.log.debug("running host-ok-to-stop checks")
daemons = self.cache.get_daemons()
daemon_map = defaultdict(lambda: [])
for dd in daemons:
@ -1384,14 +1400,159 @@ To check that the host is reachable:
r = self.cephadm_services[daemon_type].ok_to_stop(daemon_ids)
if r.retval:
self.log.error(f'It is NOT safe to stop host {hostname}')
raise orchestrator.OrchestratorError(
r.stderr,
errno=r.retval)
return r.retval, r.stderr
return 0, f'It is presumed safe to stop host {hostname}'
@trivial_completion
def host_ok_to_stop(self, hostname: str) -> str:
if hostname not in self.cache.get_hosts():
raise OrchestratorError(f'Cannot find host "{hostname}"')
rc, msg = self._host_ok_to_stop(hostname)
if rc:
raise OrchestratorError(msg, errno=rc)
msg = f'It is presumed safe to stop host {hostname}'
self.log.info(msg)
return msg
def _set_maintenance_healthcheck(self) -> None:
"""Raise/update or clear the maintenance health check as needed"""
in_maintenance = self.inventory.get_host_with_state("maintenance")
if not in_maintenance:
self.set_health_checks({
"HOST_IN_MAINTENANCE": {}
})
else:
s = "host is" if len(in_maintenance) == 1 else "hosts are"
self.set_health_checks({
"HOST_IN_MAINTENANCE": {
"severity": "warning",
"summary": f"{len(in_maintenance)} {s} in maintenance mode",
"detail": [f"{h} is in maintenance" for h in in_maintenance],
}
})
@trivial_completion
@host_exists()
def enter_host_maintenance(self, hostname: str) -> str:
""" Attempt to place a cluster host in maintenance
Placing a host into maintenance disables the cluster's ceph target in systemd
and stops all ceph daemons. If the host is an osd host we apply the noout flag
for the host subtree in crush to prevent data movement during a host maintenance
window.
:param hostname: (str) name of the host (must match an inventory hostname)
:raises OrchestratorError: Hostname is invalid, host is already in maintenance
"""
if len(self.cache.get_hosts()) == 1:
raise OrchestratorError(f"Maintenance feature is not supported on single node clusters")
# if upgrade is active, deny
if self.upgrade.upgrade_state:
raise OrchestratorError(
f"Unable to place {hostname} in maintenance with upgrade active/paused")
tgt_host = self.inventory._inventory[hostname]
if tgt_host.get("status", "").lower() == "maintenance":
raise OrchestratorError(f"Host {hostname} is already in maintenance")
host_daemons = self.cache.get_daemon_types(hostname)
self.log.debug("daemons on host {}".format(','.join(host_daemons)))
if host_daemons:
# daemons on this host, so check the daemons can be stopped
# and if so, place the host into maintenance by disabling the target
rc, msg = self._host_ok_to_stop(hostname)
if rc:
raise OrchestratorError(msg, errno=rc)
# call the host-maintenance function
out, _err, _code = self._run_cephadm(hostname, cephadmNoImage, "host-maintenance",
["enter"],
error_ok=True)
if out:
raise OrchestratorError(
f"Failed to place {hostname} into maintenance for cluster {self._cluster_fsid}")
if "osd" in host_daemons:
crush_node = hostname if '.' not in hostname else hostname.split('.')[0]
rc, out, err = self.mon_command({
'prefix': 'osd set-group',
'flags': 'noout',
'who': [crush_node],
'format': 'json'
})
if rc:
self.log.warning(
f"maintenance mode request for {hostname} failed to SET the noout group (rc={rc})")
raise OrchestratorError(
f"Unable to set the osds on {hostname} to noout (rc={rc})")
else:
self.log.info(
f"maintenance mode request for {hostname} has SET the noout group")
# update the host status in the inventory
tgt_host["status"] = "maintenance"
self.inventory._inventory[hostname] = tgt_host
self.inventory.save()
self._set_maintenance_healthcheck()
return f"Ceph cluster {self._cluster_fsid} on {hostname} moved to maintenance"
@trivial_completion
@host_exists()
def exit_host_maintenance(self, hostname: str) -> str:
"""Exit maintenance mode and return a host to an operational state
Returning from maintnenance will enable the clusters systemd target and
start it, and remove any noout that has been added for the host if the
host has osd daemons
:param hostname: (str) host name
:raises OrchestratorError: Unable to return from maintenance, or unset the
noout flag
"""
tgt_host = self.inventory._inventory[hostname]
if tgt_host['status'] != "maintenance":
raise OrchestratorError(f"Host {hostname} is not in maintenance mode")
out, _err, _code = self._run_cephadm(hostname, cephadmNoImage, 'host-maintenance',
['exit'],
error_ok=True)
if out:
raise OrchestratorError(
f"Failed to exit maintenance state for host {hostname}, cluster {self._cluster_fsid}")
if "osd" in self.cache.get_daemon_types(hostname):
crush_node = hostname if '.' not in hostname else hostname.split('.')[0]
rc, _out, _err = self.mon_command({
'prefix': 'osd unset-group',
'flags': 'noout',
'who': [crush_node],
'format': 'json'
})
if rc:
self.log.warning(
f"exit maintenance request failed to UNSET the noout group for {hostname}, (rc={rc})")
raise OrchestratorError(f"Unable to set the osds on {hostname} to noout (rc={rc})")
else:
self.log.info(
f"exit maintenance request has UNSET for the noout group on host {hostname}")
# update the host record status
tgt_host['status'] = ""
self.inventory._inventory[hostname] = tgt_host
self.inventory.save()
self._set_maintenance_healthcheck()
return f"Ceph cluster {self._cluster_fsid} on {hostname} has exited maintenance mode"
def get_minimal_ceph_conf(self) -> str:
_, config, _ = self.check_mon_command({
"prefix": "config generate-minimal-conf",
@ -2305,6 +2466,9 @@ To check that the host is reachable:
@trivial_completion
def upgrade_check(self, image: str, version: str) -> str:
if self.inventory.get_host_with_state("maintenance"):
raise OrchestratorError("check aborted - you have hosts in maintenance state")
if version:
target_name = self.container_image_base + ':v' + version
elif image:
@ -2342,6 +2506,8 @@ To check that the host is reachable:
@trivial_completion
def upgrade_start(self, image: str, version: str) -> str:
if self.inventory.get_host_with_state("maintenance"):
raise OrchestratorError("upgrade aborted - you have host(s) in maintenance state")
return self.upgrade.upgrade_start(image, version)
@trivial_completion

View File

@ -111,6 +111,10 @@ class CephadmServe:
@forall_hosts
def refresh(host: str) -> None:
# skip hosts that are in maintenance - they could be powered off
if self.mgr.inventory._inventory[host].get("status", "").lower() == "maintenance":
return
if self.mgr.cache.host_needs_check(host):
r = self._check_host(host)
if r is not None:

View File

@ -829,6 +829,18 @@ class Orchestrator(object):
"""
raise NotImplementedError()
def enter_host_maintenance(self, hostname: str) -> Completion:
"""
Place a host in maintenance, stopping daemons and disabling it's systemd target
"""
raise NotImplementedError()
def exit_host_maintenance(self, hostname: str) -> Completion:
"""
Return a host from maintenance, restarting the clusters systemd target
"""
raise NotImplementedError()
def get_inventory(self, host_filter: Optional['InventoryFilter'] = None, refresh: bool = False) -> Completion[List['InventoryHost']]:
"""
Returns something that was created by `ceph-volume inventory`.

View File

@ -329,7 +329,8 @@ class OrchestratorCli(OrchestratorClientMixin, MgrModule,
table.left_padding_width = 0
table.right_padding_width = 2
for host in sorted(completion.result, key=lambda h: h.hostname):
table.add_row((host.hostname, host.addr, ' '.join(host.labels), host.status))
table.add_row((host.hostname, host.addr, ' '.join(
host.labels), host.status.capitalize()))
output = table.get_string()
return HandleCommandResult(stdout=output)
@ -365,6 +366,28 @@ class OrchestratorCli(OrchestratorClientMixin, MgrModule,
raise_if_exception(completion)
return HandleCommandResult(stdout=completion.result_str())
@_cli_write_command(
'orch host maintenance enter',
'name=hostname,type=CephString',
desc='Prepare a host for maintenance by shutting down and disabling all Ceph daemons (cephadm only)')
def _host_maintenance_enter(self, hostname: str):
completion = self.enter_host_maintenance(hostname)
self._orchestrator_wait([completion])
raise_if_exception(completion)
return HandleCommandResult(stdout=completion.result_str())
@_cli_write_command(
'orch host maintenance exit',
'name=hostname,type=CephString',
desc='Return a host from maintenance, restarting all Ceph daemons (cephadm only)')
def _host_maintenance_exit(self, hostname: str):
completion = self.exit_host_maintenance(hostname)
self._orchestrator_wait([completion])
raise_if_exception(completion)
return HandleCommandResult(stdout=completion.result_str())
@_cli_read_command(
'orch device ls',
"name=hostname,type=CephString,n=N,req=false "
@ -614,7 +637,7 @@ class OrchestratorCli(OrchestratorClientMixin, MgrModule,
remove_column = 'CONTAINER ID'
if table.get_string(fields=[remove_column], border=False,
header=False).count('<unknown>') == len(daemons):
header=False).count('<unknown>') == len(daemons):
try:
table.del_column(remove_column)
except AttributeError as e: