mirror of
https://github.com/ceph/ceph
synced 2025-02-21 01:47:25 +00:00
Merge pull request #44266 from adk3798/http-reconfig
mgr/cephadm: reconfig agents over http Reviewed-by: Sebastian Wagner <sewagner@redhat.com>
This commit is contained in:
commit
2a450f68a1
@ -3702,6 +3702,8 @@ class MgrListener(Thread):
|
||||
logger.error(err_str)
|
||||
else:
|
||||
conn.send(b'ACK')
|
||||
if 'config' in data:
|
||||
self.agent.wakeup()
|
||||
self.agent.ls_gatherer.wakeup()
|
||||
self.agent.volume_gatherer.wakeup()
|
||||
logger.debug(f'Got mgr message {data}')
|
||||
@ -3713,6 +3715,15 @@ class MgrListener(Thread):
|
||||
|
||||
def handle_json_payload(self, data: Dict[Any, Any]) -> None:
|
||||
self.agent.ack = int(data['counter'])
|
||||
if 'config' in data:
|
||||
logger.info('Received new config from mgr')
|
||||
config = data['config']
|
||||
for filename in config:
|
||||
if filename in self.agent.required_files:
|
||||
with open(os.path.join(self.agent.daemon_dir, filename), 'w') as f:
|
||||
f.write(config[filename])
|
||||
self.agent.pull_conf_settings()
|
||||
self.agent.wakeup()
|
||||
|
||||
|
||||
class CephadmAgent():
|
||||
@ -3722,12 +3733,19 @@ class CephadmAgent():
|
||||
loop_interval = 30
|
||||
stop = False
|
||||
|
||||
required_files = ['agent.json', 'keyring']
|
||||
required_files = [
|
||||
'agent.json',
|
||||
'keyring',
|
||||
'root_cert.pem',
|
||||
'listener.crt',
|
||||
'listener.key',
|
||||
]
|
||||
|
||||
def __init__(self, ctx: CephadmContext, fsid: str, daemon_id: Union[int, str] = ''):
|
||||
self.ctx = ctx
|
||||
self.fsid = fsid
|
||||
self.daemon_id = daemon_id
|
||||
self.starting_port = 14873
|
||||
self.target_ip = ''
|
||||
self.target_port = ''
|
||||
self.host = ''
|
||||
@ -3741,22 +3759,30 @@ class CephadmAgent():
|
||||
self.ack = 1
|
||||
self.event = Event()
|
||||
self.mgr_listener = MgrListener(self)
|
||||
self.ls_gatherer = AgentGatherer(self, self._get_ls, 'Ls')
|
||||
self.volume_gatherer = AgentGatherer(self, self._ceph_volume, 'Volume')
|
||||
self.ls_gatherer = AgentGatherer(self, lambda: self._get_ls(), 'Ls')
|
||||
self.volume_gatherer = AgentGatherer(self, lambda: self._ceph_volume(enhanced=False), 'Volume')
|
||||
self.device_enhanced_scan = False
|
||||
self.recent_iteration_run_times: List[float] = [0.0, 0.0, 0.0]
|
||||
self.recent_iteration_index: int = 0
|
||||
self.cached_ls_values: Dict[str, Dict[str, str]] = {}
|
||||
|
||||
def validate(self, config: Dict[str, str] = {}) -> None:
|
||||
# check for the required files
|
||||
for fname in self.required_files:
|
||||
if fname not in config:
|
||||
raise Error('required file missing from config: %s' % fname)
|
||||
|
||||
def deploy_daemon_unit(self, config: Dict[str, str] = {}) -> None:
|
||||
if not config:
|
||||
raise Error('Agent needs a config')
|
||||
assert isinstance(config, dict)
|
||||
self.validate(config)
|
||||
|
||||
# Create the required config files in the daemons dir, with restricted permissions
|
||||
for filename in config:
|
||||
with open(os.path.join(self.daemon_dir, filename), 'w') as f:
|
||||
f.write(config[filename])
|
||||
if filename in self.required_files:
|
||||
with open(os.path.join(self.daemon_dir, filename), 'w') as f:
|
||||
f.write(config[filename])
|
||||
|
||||
with open(os.path.join(self.daemon_dir, 'unit.run'), 'w') as f:
|
||||
f.write(self.unit_run())
|
||||
@ -3810,14 +3836,14 @@ WantedBy=ceph-{fsid}.target
|
||||
def wakeup(self) -> None:
|
||||
self.event.set()
|
||||
|
||||
def run(self) -> None:
|
||||
def pull_conf_settings(self) -> None:
|
||||
try:
|
||||
with open(self.config_path, 'r') as f:
|
||||
config = json.load(f)
|
||||
self.target_ip = config['target_ip']
|
||||
self.target_port = config['target_port']
|
||||
self.loop_interval = int(config['refresh_period'])
|
||||
starting_port = int(config['listener_port'])
|
||||
self.starting_port = int(config['listener_port'])
|
||||
self.host = config['host']
|
||||
use_lsm = config['device_enhanced_scan']
|
||||
except Exception as e:
|
||||
@ -3836,15 +3862,19 @@ WantedBy=ceph-{fsid}.target
|
||||
self.device_enhanced_scan = False
|
||||
if use_lsm.lower() == 'true':
|
||||
self.device_enhanced_scan = True
|
||||
self.volume_gatherer.update_func(lambda: self._ceph_volume(enhanced=self.device_enhanced_scan))
|
||||
|
||||
def run(self) -> None:
|
||||
self.pull_conf_settings()
|
||||
|
||||
try:
|
||||
for _ in range(1001):
|
||||
if not port_in_use(self.ctx, starting_port):
|
||||
self.listener_port = str(starting_port)
|
||||
if not port_in_use(self.ctx, self.starting_port):
|
||||
self.listener_port = str(self.starting_port)
|
||||
break
|
||||
starting_port += 1
|
||||
self.starting_port += 1
|
||||
if not self.listener_port:
|
||||
raise Error(f'All 1000 ports starting at {str(starting_port - 1001)} taken.')
|
||||
raise Error(f'All 1000 ports starting at {str(self.starting_port - 1001)} taken.')
|
||||
except Exception as e:
|
||||
raise Error(f'Failed to pick port for agent to listen on: {e}')
|
||||
|
||||
@ -4097,6 +4127,9 @@ class AgentGatherer(Thread):
|
||||
def wakeup(self) -> None:
|
||||
self.event.set()
|
||||
|
||||
def update_func(self, func: Callable) -> None:
|
||||
self.func = func
|
||||
|
||||
|
||||
def command_agent(ctx: CephadmContext) -> None:
|
||||
agent = CephadmAgent(ctx, ctx.fsid, ctx.daemon_id)
|
||||
|
@ -8,23 +8,23 @@ import tempfile
|
||||
import threading
|
||||
import time
|
||||
|
||||
# from orchestrator import OrchestratorError
|
||||
from mgr_util import verify_tls_files
|
||||
from orchestrator import DaemonDescriptionStatus
|
||||
from orchestrator import DaemonDescriptionStatus, OrchestratorError
|
||||
from orchestrator._interface import daemon_type_to_service
|
||||
from ceph.utils import datetime_now
|
||||
from ceph.deployment.inventory import Devices
|
||||
from ceph.deployment.service_spec import ServiceSpec, PlacementSpec
|
||||
from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from OpenSSL import crypto
|
||||
from cryptography import x509
|
||||
from cryptography.x509.oid import NameOID
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||
from cryptography.hazmat.primitives import hashes, serialization
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
|
||||
from typing import Any, Dict, List, Set, Tuple, TYPE_CHECKING
|
||||
from typing import Any, Dict, List, Set, Tuple, \
|
||||
TYPE_CHECKING, Optional
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from cephadm.module import CephadmOrchestrator
|
||||
@ -53,7 +53,17 @@ class CherryPyThread(threading.Thread):
|
||||
|
||||
def run(self) -> None:
|
||||
try:
|
||||
self.ssl_certs.generate_root_cert()
|
||||
try:
|
||||
old_creds = self.mgr.get_store('cephadm_endpoint_credentials')
|
||||
if not old_creds:
|
||||
raise OrchestratorError('No old credentials for cephadm endpoint found')
|
||||
old_creds_dict = json.loads(old_creds)
|
||||
old_key = old_creds_dict['key']
|
||||
old_cert = old_creds_dict['cert']
|
||||
self.ssl_certs.load_root_credentials(old_cert, old_key)
|
||||
except (OrchestratorError, json.decoder.JSONDecodeError, KeyError, ValueError):
|
||||
self.ssl_certs.generate_root_cert()
|
||||
|
||||
cert, key = self.ssl_certs.generate_cert()
|
||||
|
||||
self.key_tmp = tempfile.NamedTemporaryFile()
|
||||
@ -82,6 +92,12 @@ class CherryPyThread(threading.Thread):
|
||||
self.mgr.log.debug('Starting cherrypy engine...')
|
||||
self.start_engine()
|
||||
self.mgr.log.debug('Cherrypy engine started.')
|
||||
cephadm_endpoint_creds = {
|
||||
'cert': self.ssl_certs.get_root_cert(),
|
||||
'key': self.ssl_certs.get_root_key()
|
||||
}
|
||||
self.mgr.set_store('cephadm_endpoint_credentials', json.dumps(cephadm_endpoint_creds))
|
||||
self.mgr._kick_serve_loop()
|
||||
# wait for the shutdown event
|
||||
self.cherrypy_shutdown_event.wait()
|
||||
self.cherrypy_shutdown_event.clear()
|
||||
@ -338,7 +354,7 @@ class CephadmAgentHelpers:
|
||||
def __init__(self, mgr: "CephadmOrchestrator"):
|
||||
self.mgr: "CephadmOrchestrator" = mgr
|
||||
|
||||
def _request_agent_acks(self, hosts: Set[str], increment: bool = False) -> None:
|
||||
def _request_agent_acks(self, hosts: Set[str], increment: bool = False, new_config: Optional[Dict[str, str]] = None) -> None:
|
||||
for host in hosts:
|
||||
if increment:
|
||||
self.mgr.cache.metadata_up_to_date[host] = False
|
||||
@ -346,8 +362,11 @@ class CephadmAgentHelpers:
|
||||
self.mgr.cache.agent_counter[host] = 1
|
||||
elif increment:
|
||||
self.mgr.cache.agent_counter[host] = self.mgr.cache.agent_counter[host] + 1
|
||||
payload: Dict[str, Any] = {'counter': self.mgr.cache.agent_counter[host]}
|
||||
if new_config:
|
||||
payload['config'] = new_config
|
||||
message_thread = AgentMessageThread(
|
||||
host, self.mgr.cache.agent_ports[host], {'counter': self.mgr.cache.agent_counter[host]}, self.mgr)
|
||||
host, self.mgr.cache.agent_ports[host], payload, self.mgr)
|
||||
message_thread.start()
|
||||
|
||||
def _agent_down(self, host: str) -> bool:
|
||||
@ -450,8 +469,33 @@ class CephadmAgentHelpers:
|
||||
last_deps, last_config = self.mgr.cache.get_daemon_last_config_deps(
|
||||
host, agent.name())
|
||||
if not last_config or last_deps != deps:
|
||||
# if root cert is the dep that changed, we must use ssh to reconfig
|
||||
# so it's necessary to check this one specifically
|
||||
root_cert_match = False
|
||||
try:
|
||||
root_cert = self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
|
||||
if last_deps and root_cert in last_deps:
|
||||
root_cert_match = True
|
||||
except Exception:
|
||||
pass
|
||||
daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(agent)
|
||||
self.mgr._daemon_action(daemon_spec, action='reconfig')
|
||||
# we need to know the agent port to try to reconfig w/ http
|
||||
# otherwise there is no choice but a full ssh reconfig
|
||||
if host in self.mgr.cache.agent_ports and root_cert_match:
|
||||
daemon_spec = self.mgr.cephadm_services[daemon_type_to_service(
|
||||
daemon_spec.daemon_type)].prepare_create(daemon_spec)
|
||||
self.mgr.cache.agent_timestamp[daemon_spec.host] = datetime_now()
|
||||
self.mgr.cache.agent_counter[daemon_spec.host] = 1
|
||||
self.mgr.agent_helpers._request_agent_acks(
|
||||
hosts={daemon_spec.host},
|
||||
increment=True,
|
||||
new_config=daemon_spec.final_config
|
||||
)
|
||||
self.mgr.cache.update_daemon_config_deps(
|
||||
daemon_spec.host, daemon_spec.name(), daemon_spec.deps, datetime_now())
|
||||
self.mgr.cache.save_host(daemon_spec.host)
|
||||
else:
|
||||
self.mgr._daemon_action(daemon_spec, action='reconfig')
|
||||
return False
|
||||
except Exception as e:
|
||||
self.mgr.log.debug(
|
||||
@ -473,7 +517,6 @@ class SSLCerts:
|
||||
self.mgr = mgr
|
||||
self.root_cert: Any
|
||||
self.root_key: Any
|
||||
self.root_subj: Any
|
||||
|
||||
def generate_root_cert(self) -> Tuple[str, str]:
|
||||
self.root_key = rsa.generate_private_key(
|
||||
@ -508,7 +551,7 @@ class SSLCerts:
|
||||
private_key=self.root_key, algorithm=hashes.SHA256(), backend=default_backend()
|
||||
)
|
||||
|
||||
cert_str = crypto.dump_certificate(crypto.FILETYPE_PEM, self.root_cert).decode('utf-8')
|
||||
cert_str = self.root_cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8')
|
||||
key_str = self.root_key.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
@ -564,7 +607,7 @@ class SSLCerts:
|
||||
private_key=self.root_key, algorithm=hashes.SHA256(), backend=default_backend()
|
||||
)
|
||||
|
||||
cert_str = crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode('utf-8')
|
||||
cert_str = cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8')
|
||||
key_str = private_key.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
@ -575,12 +618,25 @@ class SSLCerts:
|
||||
|
||||
def get_root_cert(self) -> str:
|
||||
try:
|
||||
return crypto.dump_certificate(crypto.FILETYPE_PEM, self.root_cert).decode('utf-8')
|
||||
return self.root_cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8')
|
||||
except AttributeError:
|
||||
return ''
|
||||
|
||||
def get_root_key(self) -> str:
|
||||
try:
|
||||
return crypto.dump_certificate(crypto.FILETYPE_PEM, self.root_key).decode('utf-8')
|
||||
return self.root_key.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
encryption_algorithm=serialization.NoEncryption(),
|
||||
).decode('utf-8')
|
||||
except AttributeError:
|
||||
return ''
|
||||
|
||||
def load_root_credentials(self, cert: str, priv_key: str) -> None:
|
||||
given_cert = x509.load_pem_x509_certificate(cert.encode('utf-8'), backend=default_backend())
|
||||
tz = given_cert.not_valid_after.tzinfo
|
||||
if datetime.now(tz) >= given_cert.not_valid_after:
|
||||
raise OrchestratorError('Given cert is expired')
|
||||
self.root_cert = given_cert
|
||||
self.root_key = serialization.load_pem_private_key(
|
||||
data=priv_key.encode('utf-8'), backend=default_backend(), password=None)
|
||||
|
@ -512,6 +512,9 @@ class HostCache():
|
||||
self.agent_counter[host] = int(j.get('agent_counter', 1))
|
||||
self.metadata_up_to_date[host] = False
|
||||
self.agent_keys[host] = str(j.get('agent_keys', ''))
|
||||
agent_port = int(j.get('agent_ports', 0))
|
||||
if agent_port:
|
||||
self.agent_ports[host] = agent_port
|
||||
|
||||
self.mgr.log.debug(
|
||||
'HostCache.load: host %s has %d daemons, '
|
||||
@ -706,6 +709,8 @@ class HostCache():
|
||||
j['agent_counter'] = self.agent_counter[host]
|
||||
if host in self.agent_keys:
|
||||
j['agent_keys'] = self.agent_keys[host]
|
||||
if host in self.agent_ports:
|
||||
j['agent_ports'] = self.agent_ports[host]
|
||||
|
||||
self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j))
|
||||
|
||||
|
@ -1286,6 +1286,11 @@ class CephadmServe:
|
||||
await self._deploy_cephadm_binary(host, addr)
|
||||
out, err, code = await self.mgr.ssh._execute_command(
|
||||
host, cmd, stdin=stdin, addr=addr)
|
||||
# if there is an agent on this host, make sure it is using the most recent
|
||||
# vesion of cephadm binary
|
||||
if host in self.mgr.inventory:
|
||||
for agent in self.mgr.cache.get_daemons_by_type('agent', host):
|
||||
self.mgr._schedule_daemon_action(agent.name(), 'redeploy')
|
||||
|
||||
except Exception as e:
|
||||
await self.mgr.ssh._reset_con(host)
|
||||
|
Loading…
Reference in New Issue
Block a user