mgr/dashboard: Simplify authentication protocol

By removing the dependency to PyJWT we also remove the dependency to the cryptographic library which
in the dashboard module will create a crash. In newer implementations of the library PyO3 is used to run
rust code in order to encrypt with Elliptic Curves. This is never used in the dashboard communication so
a much simpler implementation where we only use the hmac sha256 algorithm to create the signed JWT message
could be used.

Fixes: https://forum.proxmox.com/threads/ceph-warning-post-upgrade-to-v8.129371
Signed-off-by: Daniel Persson <mailto.woden@gmail.com>
This commit is contained in:
Daniel Persson 2023-11-29 09:39:51 +00:00
parent 171d2b5b38
commit c616a9d017
5 changed files with 74 additions and 11 deletions

View File

@ -1,6 +1,5 @@
CherryPy~=13.1
more-itertools~=8.14
PyJWT~=2.0
bcrypt~=3.1
python3-saml~=1.4
requests~=2.26

View File

@ -121,3 +121,15 @@ class GrafanaError(Exception):
class PasswordPolicyException(Exception):
pass
class ExpiredSignatureError(Exception):
pass
class InvalidTokenError(Exception):
pass
class InvalidAlgorithmError(Exception):
pass

View File

@ -1,7 +1,6 @@
bcrypt
CherryPy
more-itertools
PyJWT
pyopenssl
requests
Routes

View File

@ -1,17 +1,19 @@
# -*- coding: utf-8 -*-
import base64
import hashlib
import hmac
import json
import logging
import os
import threading
import time
import uuid
from base64 import b64encode
import cherrypy
import jwt
from .. import mgr
from ..exceptions import ExpiredSignatureError, InvalidAlgorithmError, InvalidTokenError
from .access_control import LocalAuthenticator, UserDoesNotExist
cherrypy.config.update({
@ -33,7 +35,7 @@ class JwtManager(object):
@staticmethod
def _gen_secret():
secret = os.urandom(16)
return b64encode(secret).decode('utf-8')
return base64.b64encode(secret).decode('utf-8')
@classmethod
def init(cls):
@ -45,6 +47,54 @@ class JwtManager(object):
mgr.set_store('jwt_secret', secret)
cls._secret = secret
@classmethod
def array_to_base64_string(cls, message):
jsonstr = json.dumps(message, sort_keys=True).replace(" ", "")
string_bytes = base64.urlsafe_b64encode(bytes(jsonstr, 'UTF-8'))
return string_bytes.decode('UTF-8').replace("=", "")
@classmethod
def encode(cls, message, secret):
header = {"alg": cls.JWT_ALGORITHM, "typ": "JWT"}
base64_header = cls.array_to_base64_string(header)
base64_message = cls.array_to_base64_string(message)
base64_secret = base64.urlsafe_b64encode(hmac.new(
bytes(secret, 'UTF-8'),
msg=bytes(base64_header + "." + base64_message, 'UTF-8'),
digestmod=hashlib.sha256
).digest()).decode('UTF-8').replace("=", "")
return base64_header + "." + base64_message + "." + base64_secret
@classmethod
def decode(cls, message, secret):
split_message = message.split(".")
base64_header = split_message[0]
base64_message = split_message[1]
base64_secret = split_message[2]
decoded_header = json.loads(base64.urlsafe_b64decode(base64_header))
if decoded_header['alg'] != cls.JWT_ALGORITHM:
raise InvalidAlgorithmError()
incoming_secret = base64.urlsafe_b64encode(hmac.new(
bytes(secret, 'UTF-8'),
msg=bytes(base64_header + "." + base64_message, 'UTF-8'),
digestmod=hashlib.sha256
).digest()).decode('UTF-8').replace("=", "")
if base64_secret != incoming_secret:
raise InvalidTokenError()
# We add ==== as padding to ignore the requirement to have correct padding in
# the urlsafe_b64decode method.
decoded_message = json.loads(base64.urlsafe_b64decode(base64_message + "===="))
now = int(time.time())
if decoded_message['exp'] < now:
raise ExpiredSignatureError()
return decoded_message
@classmethod
def gen_token(cls, username):
if not cls._secret:
@ -59,13 +109,13 @@ class JwtManager(object):
'iat': now,
'username': username
}
return jwt.encode(payload, cls._secret, algorithm=cls.JWT_ALGORITHM) # type: ignore
return cls.encode(payload, cls._secret) # type: ignore
@classmethod
def decode_token(cls, token):
if not cls._secret:
cls.init()
return jwt.decode(token, cls._secret, algorithms=cls.JWT_ALGORITHM) # type: ignore
return cls.decode(token, cls._secret) # type: ignore
@classmethod
def get_token_from_header(cls):
@ -99,8 +149,8 @@ class JwtManager(object):
@classmethod
def get_user(cls, token):
try:
dtoken = JwtManager.decode_token(token)
if not JwtManager.is_blocklisted(dtoken['jti']):
dtoken = cls.decode_token(token)
if not cls.is_blocklisted(dtoken['jti']):
user = AuthManager.get_user(dtoken['username'])
if user.last_update <= dtoken['iat']:
return user
@ -110,10 +160,12 @@ class JwtManager(object):
)
else:
cls.logger.debug('Token is block-listed') # type: ignore
except jwt.ExpiredSignatureError:
except ExpiredSignatureError:
cls.logger.debug("Token has expired") # type: ignore
except jwt.InvalidTokenError:
except InvalidTokenError:
cls.logger.debug("Failed to decode token") # type: ignore
except InvalidAlgorithmError:
cls.logger.debug("Only the HS256 algorithm is supported.") # type: ignore
except UserDoesNotExist:
cls.logger.debug( # type: ignore
"Invalid token: user %s does not exist", dtoken['username']

View File

@ -20,6 +20,7 @@ addopts =
deps =
-rrequirements.txt
-cconstraints.txt
PyJWT
[base-test]
deps =