mirror of
https://github.com/ceph/ceph
synced 2025-01-09 20:52:09 +00:00
Merge pull request #46925 from rkachach/fix_issue_44461
mgr/cephadm: add a simple mechanism to check grafana cert/key Reviewed-by: Adam King <adking@redhat.com>
This commit is contained in:
commit
dc47193f6f
@ -22,7 +22,7 @@ from cephadm.autotune import MemoryAutotuner
|
||||
from cephadm.utils import forall_hosts, cephadmNoImage, is_repo_digest, \
|
||||
CephadmNoImage, CEPH_TYPES, ContainerInspectInfo
|
||||
from mgr_module import MonCommandFailed
|
||||
from mgr_util import format_bytes
|
||||
from mgr_util import format_bytes, verify_tls, get_cert_issuer_info, ServerConfigException
|
||||
|
||||
from . import utils
|
||||
|
||||
@ -93,6 +93,8 @@ class CephadmServe:
|
||||
|
||||
self._check_daemons()
|
||||
|
||||
self._check_certificates()
|
||||
|
||||
self._purge_deleted_services()
|
||||
|
||||
self._check_for_moved_osds()
|
||||
@ -112,6 +114,39 @@ class CephadmServe:
|
||||
self.log.debug("serve loop wake")
|
||||
self.log.debug("serve exit")
|
||||
|
||||
def _check_certificates(self) -> None:
|
||||
for d in self.mgr.cache.get_daemons_by_type('grafana'):
|
||||
cert = self.mgr.get_store(f'{d.hostname}/grafana_crt')
|
||||
key = self.mgr.get_store(f'{d.hostname}/grafana_key')
|
||||
if (not cert or not cert.strip()) and (not key or not key.strip()):
|
||||
# certificate/key are empty... nothing to check
|
||||
return
|
||||
|
||||
try:
|
||||
get_cert_issuer_info(cert)
|
||||
verify_tls(cert, key)
|
||||
self.mgr.remove_health_warning('CEPHADM_CERT_ERROR')
|
||||
except ServerConfigException as e:
|
||||
err_msg = f"""
|
||||
Detected invalid grafana certificates. Please, use the following commands:
|
||||
|
||||
> ceph config-key set mgr/cephadm/{d.hostname}/grafana_crt -i <path-to-ctr-file>
|
||||
> ceph config-key set mgr/cephadm/{d.hostname}/grafana_key -i <path-to-key-file>
|
||||
|
||||
to set valid key and certificate or reset their value to an empty string
|
||||
in case you want cephadm to generate self-signed Grafana certificates.
|
||||
|
||||
Once done, run the following command to reconfig the daemon:
|
||||
|
||||
> ceph orch daemon reconfig grafana.{d.hostname}
|
||||
|
||||
"""
|
||||
self.log.error(f'Detected invalid grafana certificate on host {d.hostname}: {e}')
|
||||
self.mgr.set_health_warning('CEPHADM_CERT_ERROR',
|
||||
f'Invalid grafana certificate on host {d.hostname}: {e}',
|
||||
1, [err_msg])
|
||||
break
|
||||
|
||||
def _serve_sleep(self) -> None:
|
||||
sleep_interval = max(
|
||||
30,
|
||||
|
@ -12,7 +12,7 @@ from orchestrator import DaemonDescription
|
||||
from ceph.deployment.service_spec import AlertManagerSpec, GrafanaSpec, ServiceSpec, \
|
||||
SNMPGatewaySpec, PrometheusSpec
|
||||
from cephadm.services.cephadmservice import CephadmService, CephadmDaemonDeploySpec
|
||||
from mgr_util import verify_tls, ServerConfigException, create_self_signed_cert, build_url
|
||||
from mgr_util import verify_tls, ServerConfigException, create_self_signed_cert, build_url, get_cert_issuer_info
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -52,28 +52,7 @@ class GrafanaService(CephadmService):
|
||||
grafana_data_sources = self.mgr.template.render(
|
||||
'services/grafana/ceph-dashboard.yml.j2', {'hosts': prom_services, 'loki_host': loki_host})
|
||||
|
||||
cert_path = f'{daemon_spec.host}/grafana_crt'
|
||||
key_path = f'{daemon_spec.host}/grafana_key'
|
||||
cert = self.mgr.get_store(cert_path)
|
||||
pkey = self.mgr.get_store(key_path)
|
||||
if cert and pkey:
|
||||
try:
|
||||
verify_tls(cert, pkey)
|
||||
except ServerConfigException as e:
|
||||
logger.warning('Provided grafana TLS certificates invalid: %s', str(e))
|
||||
cert, pkey = None, None
|
||||
if not (cert and pkey):
|
||||
cert, pkey = create_self_signed_cert('Ceph', daemon_spec.host)
|
||||
self.mgr.set_store(cert_path, cert)
|
||||
self.mgr.set_store(key_path, pkey)
|
||||
if 'dashboard' in self.mgr.get('mgr_map')['modules']:
|
||||
self.mgr.check_mon_command({
|
||||
'prefix': 'dashboard set-grafana-api-ssl-verify',
|
||||
'value': 'false',
|
||||
})
|
||||
|
||||
spec: GrafanaSpec = cast(
|
||||
GrafanaSpec, self.mgr.spec_store.active_specs[daemon_spec.service_name])
|
||||
spec: GrafanaSpec = cast(GrafanaSpec, self.mgr.spec_store.active_specs[daemon_spec.service_name])
|
||||
grafana_ini = self.mgr.template.render(
|
||||
'services/grafana/grafana.ini.j2', {
|
||||
'initial_admin_password': spec.initial_admin_password,
|
||||
@ -85,6 +64,7 @@ class GrafanaService(CephadmService):
|
||||
self.mgr.check_mon_command(
|
||||
{'prefix': 'dashboard set-grafana-api-password'}, inbuf=spec.initial_admin_password)
|
||||
|
||||
cert, pkey = self.prepare_certificates(daemon_spec)
|
||||
config_file = {
|
||||
'files': {
|
||||
"grafana.ini": grafana_ini,
|
||||
@ -95,6 +75,59 @@ class GrafanaService(CephadmService):
|
||||
}
|
||||
return config_file, sorted(deps)
|
||||
|
||||
def prepare_certificates(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[str, str]:
|
||||
cert_path = f'{daemon_spec.host}/grafana_crt'
|
||||
key_path = f'{daemon_spec.host}/grafana_key'
|
||||
cert = self.mgr.get_store(cert_path)
|
||||
pkey = self.mgr.get_store(key_path)
|
||||
certs_present = (cert and pkey)
|
||||
is_valid_certificate = False
|
||||
(org, cn) = (None, None)
|
||||
if certs_present:
|
||||
try:
|
||||
(org, cn) = get_cert_issuer_info(cert)
|
||||
verify_tls(cert, pkey)
|
||||
is_valid_certificate = True
|
||||
except ServerConfigException as e:
|
||||
logger.warning(f'Provided grafana TLS certificates are invalid: {e}')
|
||||
|
||||
if is_valid_certificate:
|
||||
# let's clear health error just in case it was set
|
||||
self.mgr.remove_health_warning('CEPHADM_CERT_ERROR')
|
||||
return cert, pkey
|
||||
|
||||
# certificate is not valid, to avoid overwriting user generated
|
||||
# certificates we only re-generate in case of self signed certificates
|
||||
# that were originally generated by cephadm or in case cert/key are empty.
|
||||
if not certs_present or (org == 'Ceph' and cn == 'cephadm'):
|
||||
logger.info('Regenerating cephadm self-signed grafana TLS certificates')
|
||||
cert, pkey = create_self_signed_cert('Ceph', daemon_spec.host)
|
||||
self.mgr.set_store(cert_path, cert)
|
||||
self.mgr.set_store(key_path, pkey)
|
||||
if 'dashboard' in self.mgr.get('mgr_map')['modules']:
|
||||
self.mgr.check_mon_command({
|
||||
'prefix': 'dashboard set-grafana-api-ssl-verify',
|
||||
'value': 'false',
|
||||
})
|
||||
self.mgr.remove_health_warning('CEPHADM_CERT_ERROR') # clear if any
|
||||
else:
|
||||
# the certificate was not generated by cephadm, we cannot overwrite
|
||||
# it by new self-signed ones. Let's warn the user to fix the issue
|
||||
err_msg = """
|
||||
Detected invalid grafana certificates. Set mgr/cephadm/grafana_crt
|
||||
and mgr/cephadm/grafana_key to valid certificates or reset their value
|
||||
to an empty string in case you want cephadm to generate self-signed Grafana
|
||||
certificates.
|
||||
|
||||
Once done, run the following command to reconfig the daemon:
|
||||
|
||||
> ceph orch daemon reconfig <grafana-daemon>
|
||||
|
||||
"""
|
||||
self.mgr.set_health_warning('CEPHADM_CERT_ERROR', 'Invalid grafana certificate: ', 1, [err_msg])
|
||||
|
||||
return cert, pkey
|
||||
|
||||
def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
|
||||
# Use the least-created one as the active daemon
|
||||
if daemon_descrs:
|
||||
|
@ -23,6 +23,10 @@ from cephadm.tests.fixtures import with_host, with_service, _run_cephadm, async_
|
||||
from orchestrator import OrchestratorError
|
||||
from orchestrator._interface import DaemonDescription
|
||||
|
||||
grafana_cert = """-----BEGIN CERTIFICATE-----\nMIICxjCCAa4CEQDIZSujNBlKaLJzmvntjukjMA0GCSqGSIb3DQEBDQUAMCExDTAL\nBgNVBAoMBENlcGgxEDAOBgNVBAMMB2NlcGhhZG0wHhcNMjIwNzEzMTE0NzA3WhcN\nMzIwNzEwMTE0NzA3WjAhMQ0wCwYDVQQKDARDZXBoMRAwDgYDVQQDDAdjZXBoYWRt\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAyyMe4DMA+MeYK7BHZMHB\nq7zjliEOcNgxomjU8qbf5USF7Mqrf6+/87XWqj4pCyAW8x0WXEr6A56a+cmBVmt+\nqtWDzl020aoId6lL5EgLLn6/kMDCCJLq++Lg9cEofMSvcZh+lY2f+1p+C+00xent\nrLXvXGOilAZWaQfojT2BpRnNWWIFbpFwlcKrlg2G0cFjV5c1m6a0wpsQ9JHOieq0\nSvwCixajwq3CwAYuuiU1wjI4oJO4Io1+g8yB3nH2Mo/25SApCxMXuXh4kHLQr/T4\n4hqisvG4uJYgKMcSIrWj5o25mclByGi1UI/kZkCUES94i7Z/3ihx4Bad0AMs/9tw\nFwIDAQABMA0GCSqGSIb3DQEBDQUAA4IBAQAf+pwz7Gd7mDwU2LY0TQXsK6/8KGzh\nHuX+ErOb8h5cOAbvCnHjyJFWf6gCITG98k9nxU9NToG0WYuNm/max1y/54f0dtxZ\npUo6KSNl3w6iYCfGOeUIj8isi06xMmeTgMNzv8DYhDt+P2igN6LenqWTVztogkiV\nxQ5ZJFFLEw4sN0CXnrZX3t5ruakxLXLTLKeE0I91YJvjClSBGkVJq26wOKQNHMhx\npWxeydQ5EgPZY+Aviz5Dnxe8aB7oSSovpXByzxURSabOuCK21awW5WJCGNpmqhWK\nZzACBDEstccj57c4OGV0eayHJRsluVr2e9NHRINZA3qdB37e6gsI1xHo\n-----END CERTIFICATE-----\n"""
|
||||
|
||||
grafana_key = """-----BEGIN PRIVATE KEY-----\nMIIEvAIBADANBgkqhkiG9w0BAQEFAASCBKYwggSiAgEAAoIBAQDLIx7gMwD4x5gr\nsEdkwcGrvOOWIQ5w2DGiaNTypt/lRIXsyqt/r7/ztdaqPikLIBbzHRZcSvoDnpr5\nyYFWa36q1YPOXTbRqgh3qUvkSAsufr+QwMIIkur74uD1wSh8xK9xmH6VjZ/7Wn4L\n7TTF6e2ste9cY6KUBlZpB+iNPYGlGc1ZYgVukXCVwquWDYbRwWNXlzWbprTCmxD0\nkc6J6rRK/AKLFqPCrcLABi66JTXCMjigk7gijX6DzIHecfYyj/blICkLExe5eHiQ\nctCv9PjiGqKy8bi4liAoxxIitaPmjbmZyUHIaLVQj+RmQJQRL3iLtn/eKHHgFp3Q\nAyz/23AXAgMBAAECggEAVoTB3Mm8azlPlaQB9GcV3tiXslSn+uYJ1duCf0sV52dV\nBzKW8s5fGiTjpiTNhGCJhchowqxoaew+o47wmGc2TvqbpeRLuecKrjScD0GkCYyQ\neM2wlshEbz4FhIZdgS6gbuh9WaM1dW/oaZoBNR5aTYo7xYTmNNeyLA/jO2zr7+4W\n5yES1lMSBXpKk7bDGKYY4bsX2b5RLr2Grh2u2bp7hoLABCEvuu8tSQdWXLEXWpXo\njwmV3hc6tabypIa0mj2Dmn2Dmt1ppSO0AZWG/WAizN3f4Z0r/u9HnbVrVmh0IEDw\n3uf2LP5o3msG9qKCbzv3lMgt9mMr70HOKnJ8ohMSKQKBgQDLkNb+0nr152HU9AeJ\nvdz8BeMxcwxCG77iwZphZ1HprmYKvvXgedqWtS6FRU+nV6UuQoPUbQxJBQzrN1Qv\nwKSlOAPCrTJgNgF/RbfxZTrIgCPuK2KM8I89VZv92TSGi362oQA4MazXC8RAWjoJ\nSu1/PHzK3aXOfVNSLrOWvIYeZQKBgQD/dgT6RUXKg0UhmXj7ExevV+c7oOJTDlMl\nvLngrmbjRgPO9VxLnZQGdyaBJeRngU/UXfNgajT/MU8B5fSKInnTMawv/tW7634B\nw3v6n5kNIMIjJmENRsXBVMllDTkT9S7ApV+VoGnXRccbTiDapBThSGd0wri/CuwK\nNWK1YFOeywKBgEDyI/XG114PBUJ43NLQVWm+wx5qszWAPqV/2S5MVXD1qC6zgCSv\nG9NLWN1CIMimCNg6dm7Wn73IM7fzvhNCJgVkWqbItTLG6DFf3/DPODLx1wTMqLOI\nqFqMLqmNm9l1Nec0dKp5BsjRQzq4zp1aX21hsfrTPmwjxeqJZdioqy2VAoGAXR5X\nCCdSHlSlUW8RE2xNOOQw7KJjfWT+WAYoN0c7R+MQplL31rRU7dpm1bLLRBN11vJ8\nMYvlT5RYuVdqQSP6BkrX+hLJNBvOLbRlL+EXOBrVyVxHCkDe+u7+DnC4epbn+N8P\nLYpwqkDMKB7diPVAizIKTBxinXjMu5fkKDs5n+sCgYBbZheYKk5M0sIxiDfZuXGB\nkf4mJdEkTI1KUGRdCwO/O7hXbroGoUVJTwqBLi1tKqLLarwCITje2T200BYOzj82\nqwRkCXGtXPKnxYEEUOiFx9OeDrzsZV00cxsEnX0Zdj+PucQ/J3Cvd0dWUspJfLHJ\n39gnaegswnz9KMQAvzKFdg==\n-----END PRIVATE KEY-----\n"""
|
||||
|
||||
|
||||
class FakeInventory:
|
||||
def get_addr(self, name: str) -> str:
|
||||
@ -540,8 +544,8 @@ class TestMonitoring:
|
||||
_run_cephadm.side_effect = async_side_effect(("{}", "", 0))
|
||||
|
||||
with with_host(cephadm_module, "test"):
|
||||
cephadm_module.set_store("test/grafana_crt", "c")
|
||||
cephadm_module.set_store("test/grafana_key", "k")
|
||||
cephadm_module.set_store("test/grafana_crt", grafana_cert)
|
||||
cephadm_module.set_store("test/grafana_key", grafana_key)
|
||||
with with_service(
|
||||
cephadm_module, PrometheusSpec("prometheus")
|
||||
) as _, with_service(cephadm_module, ServiceSpec("mgr")) as _, with_service(
|
||||
@ -597,12 +601,10 @@ class TestMonitoring:
|
||||
basicAuth: false
|
||||
isDefault: true
|
||||
editable: false""").lstrip(),
|
||||
'certs/cert_file': dedent("""
|
||||
# generated by cephadm
|
||||
c""").lstrip(),
|
||||
'certs/cert_key': dedent("""
|
||||
# generated by cephadm
|
||||
k""").lstrip(),
|
||||
'certs/cert_file': dedent(f"""
|
||||
# generated by cephadm\n{grafana_cert}""").lstrip(),
|
||||
'certs/cert_key': dedent(f"""
|
||||
# generated by cephadm\n{grafana_key}""").lstrip(),
|
||||
}
|
||||
|
||||
_run_cephadm.assert_called_with(
|
||||
|
@ -554,10 +554,13 @@ def verify_cacrt_content(crt):
|
||||
try:
|
||||
x509 = crypto.load_certificate(crypto.FILETYPE_PEM, crt)
|
||||
if x509.has_expired():
|
||||
logger.warning('Certificate has expired: {}'.format(crt))
|
||||
org, cn = get_cert_issuer_info(crt)
|
||||
end_date = datetime.datetime.strptime(x509.get_notAfter().decode('ascii'), '%Y%m%d%H%M%SZ')
|
||||
msg = f'Certificate issued by "{org}/{cn}" expired on {end_date}'
|
||||
logger.warning(msg)
|
||||
raise ServerConfigException(msg)
|
||||
except (ValueError, crypto.Error) as e:
|
||||
raise ServerConfigException(
|
||||
'Invalid certificate: {}'.format(str(e)))
|
||||
raise ServerConfigException(f'Invalid certificate: {e}')
|
||||
|
||||
|
||||
def verify_cacrt(cert_fname):
|
||||
@ -576,6 +579,22 @@ def verify_cacrt(cert_fname):
|
||||
raise ServerConfigException(
|
||||
'Invalid certificate {}: {}'.format(cert_fname, str(e)))
|
||||
|
||||
def get_cert_issuer_info(crt: str) -> Tuple[Optional[str],Optional[str]]:
|
||||
"""Basic validation of a ca cert"""
|
||||
|
||||
from OpenSSL import crypto, SSL
|
||||
try:
|
||||
(org_name, cn) = (None, None)
|
||||
cert = crypto.load_certificate(crypto.FILETYPE_PEM, crt)
|
||||
components = cert.get_issuer().get_components()
|
||||
for c in components:
|
||||
if c[0].decode() == 'O': # org comp
|
||||
org_name = c[1].decode()
|
||||
elif c[0].decode() == 'CN': # common name comp
|
||||
cn = c[1].decode()
|
||||
return (org_name, cn)
|
||||
except (ValueError, crypto.Error) as e:
|
||||
raise ServerConfigException(f'Invalid certificate key: {e}')
|
||||
|
||||
def verify_tls(crt, key):
|
||||
# type: (str, str) -> None
|
||||
@ -601,8 +620,10 @@ def verify_tls(crt, key):
|
||||
context.use_privatekey(_key)
|
||||
context.check_privatekey()
|
||||
except crypto.Error as e:
|
||||
logger.warning(
|
||||
'Private key and certificate do not match up: {}'.format(str(e)))
|
||||
logger.warning('Private key and certificate do not match up: {}'.format(str(e)))
|
||||
except SSL.Error as e:
|
||||
raise ServerConfigException(f'Invalid cert/key pair: {e}')
|
||||
|
||||
|
||||
|
||||
def verify_tls_files(cert_fname, pkey_fname):
|
||||
|
@ -1,9 +1,15 @@
|
||||
from mgr_util import create_self_signed_cert, verify_tls, ServerConfigException
|
||||
from mgr_util import create_self_signed_cert, verify_tls, ServerConfigException, get_cert_issuer_info
|
||||
from OpenSSL import crypto, SSL
|
||||
|
||||
import unittest
|
||||
|
||||
|
||||
valid_ceph_cert = """-----BEGIN CERTIFICATE-----\nMIICxjCCAa4CEQCpHIQuSYhCII1J0SVGYnT1MA0GCSqGSIb3DQEBDQUAMCExDTAL\nBgNVBAoMBENlcGgxEDAOBgNVBAMMB2NlcGhhZG0wHhcNMjIwNzA2MTE1MjUyWhcN\nMzIwNzAzMTE1MjUyWjAhMQ0wCwYDVQQKDARDZXBoMRAwDgYDVQQDDAdjZXBoYWRt\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAn2ApFna2CVYE7RDtjJVk\ncJTcJQrjzDOlCoZtxb1QMCQZMXjx/7d6bseQP+dkkeA0hZxnjJZWeu6c/YnQ1JiT\n2aDuDpWoJAaiinHRJyZuY5tqG+ggn95RdToZVbeC+0uALzYi4UFacC3sfpkyIKBR\nic43+2fQNz0PZ+8INSTtm75Y53gbWuGF7Dv95200AmAN2/u8LKWZIvdhbRborxOF\nlK2T40qbj9eH3ewIN/6Eibxrvg4va3pIoOaq0XdJHAL/MjDGJAtahPIenwcjuega\n4PSlB0h3qiyFXz7BG8P0QsPP6slyD58ZJtCGtJiWPOhlq47DlnWlJzRGDEFLLryf\n8wIDAQABMA0GCSqGSIb3DQEBDQUAA4IBAQBixd7RZawlYiTZaCmv3Vy7X/hhabac\nE/YiuFt1YMe0C9+D8IcCQN/IRww/Bi7Af6tm+ncHT9GsOGWX6hahXDKTw3b9nSDi\nETvjkUTYOayZGfhYpRA6m6e/2ypcUYsiXRDY9zneDKCdPREIA1D6L2fROHetFX9r\nX9rSry01xrYwNlYA1e6GLMXm2NaGsLT3JJlRBtT3P7f1jtRGXcwkc7ns0AtW0uNj\nGqRLHfJazdgWJFsj8vBdMs7Ci0C/b5/f7J/DLpPCvUA3Fqwn9MzHl01UwlDsKy1a\nROi4cfQNOLbWX8g3PfIlqtdGYNA77UPxvy1SUimmtdopZaEVWKkqeWYK\n-----END CERTIFICATE-----\n
|
||||
"""
|
||||
|
||||
invalid_cert = """-----BEGIN CERTIFICATE-----\nMIICxjCCAa4CEQCpHIQuSYhCII1J0SVGYnT1MA0GCSqGSIb3DQEBDQUAMCExDTAL\nBgNVBAoMBENlcGgxEDAOBgNVBAMMB2NlcGhhZG0wHhcNMjIwNzA2MTE1MjUyWhcN\nMzIwNzAzMTE1MjUyWjAhMQ0wCwYDVQQKDARDZXBoMRAwDgYDVQQDDAdjZXBoYWRt\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEBn2ApFna2CVYE7RDtjJVk\ncJTcJQrjzDOlCoZtxb1QMCQZMXjx/7d6bseQP+dkkeA0hZxnjJZWeu6c/YnQ1JiT\n2aDuDpWoJAaiinHRJyZuY5tqG+ggn95RdToZVbeC+0uALzYi4UFacC3sfpkyIKBR\nic43+2fQNz0PZ+8INSTtm75Y53gbWuGF7Dv95200AmAN2/u8LKWZIvdhbRborxOF\nlK2T40qbj9eH3ewIN/6Eibxrvg4va3pIoOaq0XdJHAL/MjDGJAtahPIenwcjuega\n4PSlB0h3qiyFXz7BG8P0QsPP6slyD58ZJtCGtJiWPOhlq47DlnWlJzRGDEFLLryf\n8wIDAQABMA0GCSqGSIb3DQEBDQUAA4IBAQBixd7RZawlYiTZaCmv3Vy7X/hhabac\nE/YiuFt1YMe0C9+D8IcCQN/IRww/Bi7Af6tm+ncHT9GsOGWX6hahXDKTw3b9nSDi\nETvjkUTYOayZGfhYpRA6m6e/2ypcUYsiXRDY9zneDKCdPREIA1D6L2fROHetFX9r\nX9rSry01xrYwNlYA1e6GLMXm2NaGsLT3JJlRBtT3P7f1jtRGXcwkc7ns0AtW0uNj\nGqRLHfJazdgWJFsj8vBdMs7Ci0C/b5/f7J/DLpPCvUA3Fqwn9MzHl01UwlDsKy1a\nROi4cfQNOLbWX8g3PfIlqtdGYNA77UPxvy1SUimmtdopZa\n-----END CERTIFICATE-----\n
|
||||
"""
|
||||
|
||||
class TLSchecks(unittest.TestCase):
|
||||
|
||||
def test_defaults(self):
|
||||
@ -33,4 +39,17 @@ class TLSchecks(unittest.TestCase):
|
||||
new_key.generate_key(crypto.TYPE_RSA, 2048)
|
||||
new_key = crypto.dump_privatekey(crypto.FILETYPE_PEM, new_key).decode('utf-8')
|
||||
|
||||
self.assertRaises(SSL.Error, verify_tls, crt, new_key)
|
||||
self.assertRaises(ServerConfigException, verify_tls, crt, new_key)
|
||||
|
||||
def test_get_cert_issuer_info(self):
|
||||
|
||||
# valid certificate
|
||||
org, cn = get_cert_issuer_info(valid_ceph_cert)
|
||||
assert org == 'Ceph'
|
||||
assert cn == 'cephadm'
|
||||
|
||||
# empty certificate
|
||||
self.assertRaises(ServerConfigException, get_cert_issuer_info, '')
|
||||
|
||||
# invalid certificate
|
||||
self.assertRaises(ServerConfigException, get_cert_issuer_info, invalid_cert)
|
||||
|
Loading…
Reference in New Issue
Block a user