mirror of
https://github.com/ceph/ceph
synced 2025-01-22 19:15:41 +00:00
9ff778cdaa
Fixes: https://tracker.ceph.com/issues/49344 Signed-off-by: Ernesto Puerta <epuertat@redhat.com>
566 lines
22 KiB
Python
566 lines
22 KiB
Python
# -*- coding: utf-8 -*-
|
|
# pylint: disable=too-many-public-methods
|
|
|
|
from __future__ import absolute_import
|
|
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
|
|
from .helper import DashboardTestCase
|
|
|
|
|
|
class UserTest(DashboardTestCase):
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super(UserTest, cls).setUpClass()
|
|
cls._ceph_cmd(['dashboard', 'set-pwd-policy-enabled', 'true'])
|
|
cls._ceph_cmd(['dashboard', 'set-pwd-policy-check-length-enabled', 'true'])
|
|
cls._ceph_cmd(['dashboard', 'set-pwd-policy-check-oldpwd-enabled', 'true'])
|
|
cls._ceph_cmd(['dashboard', 'set-pwd-policy-check-username-enabled', 'true'])
|
|
cls._ceph_cmd(['dashboard', 'set-pwd-policy-check-exclusion-list-enabled', 'true'])
|
|
cls._ceph_cmd(['dashboard', 'set-pwd-policy-check-complexity-enabled', 'true'])
|
|
cls._ceph_cmd(['dashboard', 'set-pwd-policy-check-sequential-chars-enabled', 'true'])
|
|
cls._ceph_cmd(['dashboard', 'set-pwd-policy-check-repetitive-chars-enabled', 'true'])
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
cls._ceph_cmd(['dashboard', 'set-pwd-policy-check-username-enabled', 'false'])
|
|
cls._ceph_cmd(['dashboard', 'set-pwd-policy-check-exclusion-list-enabled', 'false'])
|
|
cls._ceph_cmd(['dashboard', 'set-pwd-policy-check-complexity-enabled', 'false'])
|
|
cls._ceph_cmd(['dashboard', 'set-pwd-policy-check-sequential-chars-enabled', 'false'])
|
|
cls._ceph_cmd(['dashboard', 'set-pwd-policy-check-repetitive-chars-enabled', 'false'])
|
|
super(UserTest, cls).tearDownClass()
|
|
|
|
@classmethod
|
|
def _create_user(cls, username=None, password=None, name=None, email=None, roles=None,
|
|
enabled=True, pwd_expiration_date=None, pwd_update_required=False):
|
|
data = {}
|
|
if username:
|
|
data['username'] = username
|
|
if password:
|
|
data['password'] = password
|
|
if name:
|
|
data['name'] = name
|
|
if email:
|
|
data['email'] = email
|
|
if roles:
|
|
data['roles'] = roles
|
|
if pwd_expiration_date:
|
|
data['pwdExpirationDate'] = pwd_expiration_date
|
|
data['pwdUpdateRequired'] = pwd_update_required
|
|
data['enabled'] = enabled
|
|
cls._post("/api/user", data)
|
|
|
|
@classmethod
|
|
def _reset_login_to_admin(cls, username=None):
|
|
cls.logout()
|
|
if username:
|
|
cls.delete_user(username)
|
|
cls.login('admin', 'admin')
|
|
|
|
def test_crud_user(self):
|
|
self._create_user(username='user1',
|
|
password='mypassword10#',
|
|
name='My Name',
|
|
email='my@email.com',
|
|
roles=['administrator'])
|
|
self.assertStatus(201)
|
|
user = self.jsonBody()
|
|
|
|
self._get('/api/user/user1')
|
|
self.assertStatus(200)
|
|
self.assertJsonBody({
|
|
'username': 'user1',
|
|
'name': 'My Name',
|
|
'email': 'my@email.com',
|
|
'roles': ['administrator'],
|
|
'lastUpdate': user['lastUpdate'],
|
|
'enabled': True,
|
|
'pwdExpirationDate': None,
|
|
'pwdUpdateRequired': False
|
|
})
|
|
|
|
self._put('/api/user/user1', {
|
|
'name': 'My New Name',
|
|
'email': 'mynew@email.com',
|
|
'roles': ['block-manager'],
|
|
})
|
|
self.assertStatus(200)
|
|
user = self.jsonBody()
|
|
self.assertJsonBody({
|
|
'username': 'user1',
|
|
'name': 'My New Name',
|
|
'email': 'mynew@email.com',
|
|
'roles': ['block-manager'],
|
|
'lastUpdate': user['lastUpdate'],
|
|
'enabled': True,
|
|
'pwdExpirationDate': None,
|
|
'pwdUpdateRequired': False
|
|
})
|
|
|
|
self._delete('/api/user/user1')
|
|
self.assertStatus(204)
|
|
|
|
def test_crd_disabled_user(self):
|
|
self._create_user(username='klara',
|
|
password='mypassword10#',
|
|
name='Klara Musterfrau',
|
|
email='klara@musterfrau.com',
|
|
roles=['administrator'],
|
|
enabled=False)
|
|
self.assertStatus(201)
|
|
user = self.jsonBody()
|
|
|
|
# Restart dashboard module.
|
|
self._unload_module('dashboard')
|
|
self._load_module('dashboard')
|
|
time.sleep(10)
|
|
|
|
self._get('/api/user/klara')
|
|
self.assertStatus(200)
|
|
self.assertJsonBody({
|
|
'username': 'klara',
|
|
'name': 'Klara Musterfrau',
|
|
'email': 'klara@musterfrau.com',
|
|
'roles': ['administrator'],
|
|
'lastUpdate': user['lastUpdate'],
|
|
'enabled': False,
|
|
'pwdExpirationDate': None,
|
|
'pwdUpdateRequired': False
|
|
})
|
|
|
|
self._delete('/api/user/klara')
|
|
self.assertStatus(204)
|
|
|
|
def test_list_users(self):
|
|
self._get('/api/user')
|
|
self.assertStatus(200)
|
|
user = self.jsonBody()
|
|
self.assertEqual(len(user), 1)
|
|
user = user[0]
|
|
self.assertJsonBody([{
|
|
'username': 'admin',
|
|
'name': None,
|
|
'email': None,
|
|
'roles': ['administrator'],
|
|
'lastUpdate': user['lastUpdate'],
|
|
'enabled': True,
|
|
'pwdExpirationDate': None,
|
|
'pwdUpdateRequired': False
|
|
}])
|
|
|
|
def test_create_user_already_exists(self):
|
|
self._create_user(username='admin',
|
|
password='mypassword10#',
|
|
name='administrator',
|
|
email='my@email.com',
|
|
roles=['administrator'])
|
|
self.assertStatus(400)
|
|
self.assertError(code='username_already_exists',
|
|
component='user')
|
|
|
|
def test_create_user_invalid_role(self):
|
|
self._create_user(username='user1',
|
|
password='mypassword10#',
|
|
name='My Name',
|
|
email='my@email.com',
|
|
roles=['invalid-role'])
|
|
self.assertStatus(400)
|
|
self.assertError(code='role_does_not_exist',
|
|
component='user')
|
|
|
|
def test_create_user_invalid_chars_in_name(self):
|
|
self._create_user(username='userö',
|
|
password='mypassword10#',
|
|
name='administrator',
|
|
email='my@email.com',
|
|
roles=['administrator'])
|
|
self.assertStatus(400)
|
|
self.assertError(code='ceph_type_not_valid',
|
|
component='user')
|
|
|
|
def test_delete_user_does_not_exist(self):
|
|
self._delete('/api/user/user2')
|
|
self.assertStatus(404)
|
|
|
|
@DashboardTestCase.RunAs('test', 'test', [{'user': ['create', 'read', 'update', 'delete']}])
|
|
def test_delete_current_user(self):
|
|
self._delete('/api/user/test')
|
|
self.assertStatus(400)
|
|
self.assertError(code='cannot_delete_current_user',
|
|
component='user')
|
|
|
|
@DashboardTestCase.RunAs('test', 'test', [{'user': ['create', 'read', 'update', 'delete']}])
|
|
def test_disable_current_user(self):
|
|
self._put('/api/user/test', {'enabled': False})
|
|
self.assertStatus(400)
|
|
self.assertError(code='cannot_disable_current_user',
|
|
component='user')
|
|
|
|
def test_update_user_does_not_exist(self):
|
|
self._put('/api/user/user2', {'name': 'My New Name'})
|
|
self.assertStatus(404)
|
|
|
|
def test_update_user_invalid_role(self):
|
|
self._put('/api/user/admin', {'roles': ['invalid-role']})
|
|
self.assertStatus(400)
|
|
self.assertError(code='role_does_not_exist',
|
|
component='user')
|
|
|
|
def test_change_password_from_other_user(self):
|
|
self._post('/api/user/test2/change_password', {
|
|
'old_password': 'abc',
|
|
'new_password': 'xyz'
|
|
})
|
|
self.assertStatus(400)
|
|
self.assertError(code='invalid_user_context', component='user')
|
|
|
|
def test_change_password_old_not_match(self):
|
|
self._post('/api/user/admin/change_password', {
|
|
'old_password': 'foo',
|
|
'new_password': 'bar'
|
|
})
|
|
self.assertStatus(400)
|
|
self.assertError(code='invalid_old_password', component='user')
|
|
|
|
def test_change_password_as_old_password(self):
|
|
self.create_user('test1', 'mypassword10#', ['read-only'], force_password=False)
|
|
self.login('test1', 'mypassword10#')
|
|
self._post('/api/user/test1/change_password', {
|
|
'old_password': 'mypassword10#',
|
|
'new_password': 'mypassword10#'
|
|
})
|
|
self.assertStatus(400)
|
|
self.assertError('password_policy_validation_failed', 'user',
|
|
'Password must not be the same as the previous one.')
|
|
self._reset_login_to_admin('test1')
|
|
|
|
def test_change_password_contains_username(self):
|
|
self.create_user('test1', 'mypassword10#', ['read-only'], force_password=False)
|
|
self.login('test1', 'mypassword10#')
|
|
self._post('/api/user/test1/change_password', {
|
|
'old_password': 'mypassword10#',
|
|
'new_password': 'mypasstest1@#'
|
|
})
|
|
self.assertStatus(400)
|
|
self.assertError('password_policy_validation_failed', 'user',
|
|
'Password must not contain username.')
|
|
self._reset_login_to_admin('test1')
|
|
|
|
def test_change_password_contains_forbidden_words(self):
|
|
self.create_user('test1', 'mypassword10#', ['read-only'], force_password=False)
|
|
self.login('test1', 'mypassword10#')
|
|
self._post('/api/user/test1/change_password', {
|
|
'old_password': 'mypassword10#',
|
|
'new_password': 'mypassOSD01'
|
|
})
|
|
self.assertStatus(400)
|
|
self.assertError('password_policy_validation_failed', 'user',
|
|
'Password must not contain the keyword "OSD".')
|
|
self._reset_login_to_admin('test1')
|
|
|
|
def test_change_password_contains_sequential_characters(self):
|
|
self.create_user('test1', 'mypassword10#', ['read-only'], force_password=False)
|
|
self.login('test1', 'mypassword10#')
|
|
self._post('/api/user/test1/change_password', {
|
|
'old_password': 'mypassword10#',
|
|
'new_password': 'mypass123456!@$'
|
|
})
|
|
self.assertStatus(400)
|
|
self.assertError('password_policy_validation_failed', 'user',
|
|
'Password must not contain sequential characters.')
|
|
self._reset_login_to_admin('test1')
|
|
|
|
def test_change_password_contains_repetetive_characters(self):
|
|
self.create_user('test1', 'mypassword10#', ['read-only'], force_password=False)
|
|
self.login('test1', 'mypassword10#')
|
|
self._post('/api/user/test1/change_password', {
|
|
'old_password': 'mypassword10#',
|
|
'new_password': 'aaaaA1@!#'
|
|
})
|
|
self.assertStatus(400)
|
|
self.assertError('password_policy_validation_failed', 'user',
|
|
'Password must not contain repetitive characters.')
|
|
self._reset_login_to_admin('test1')
|
|
|
|
@DashboardTestCase.RunAs('test1', 'mypassword10#', ['read-only'], False)
|
|
def test_change_password(self):
|
|
self._post('/api/user/test1/change_password', {
|
|
'old_password': 'mypassword10#',
|
|
'new_password': 'newpassword01#'
|
|
})
|
|
self.assertStatus(200)
|
|
self.logout()
|
|
self._post('/api/auth', {'username': 'test1', 'password': 'mypassword10#'})
|
|
self.assertStatus(400)
|
|
self.assertError(code='invalid_credentials', component='auth')
|
|
|
|
def test_create_user_password_cli(self):
|
|
exitcode = self._ceph_cmd_with_secret(['dashboard', 'ac-user-create',
|
|
'test1'],
|
|
'mypassword10#',
|
|
return_exit_code=True)
|
|
self.assertEqual(exitcode, 0)
|
|
self.delete_user('test1')
|
|
|
|
@DashboardTestCase.RunAs('test2', 'foo_bar_10#', force_password=False, login=False)
|
|
def test_change_user_password_cli(self):
|
|
exitcode = self._ceph_cmd_with_secret(['dashboard', 'ac-user-set-password',
|
|
'test2'],
|
|
'foo_new-password01#',
|
|
return_exit_code=True)
|
|
self.assertEqual(exitcode, 0)
|
|
|
|
def test_create_user_password_force_cli(self):
|
|
exitcode = self._ceph_cmd_with_secret(['dashboard', 'ac-user-create',
|
|
'--force-password', 'test11'],
|
|
'bar',
|
|
return_exit_code=True)
|
|
self.assertEqual(exitcode, 0)
|
|
self.delete_user('test11')
|
|
|
|
@DashboardTestCase.RunAs('test22', 'foo_bar_10#', force_password=False, login=False)
|
|
def test_change_user_password_force_cli(self):
|
|
exitcode = self._ceph_cmd_with_secret(['dashboard', 'ac-user-set-password',
|
|
'--force-password', 'test22'],
|
|
'bar',
|
|
return_exit_code=True)
|
|
self.assertEqual(exitcode, 0)
|
|
|
|
def test_create_user_password_cli_fail(self):
|
|
exitcode = self._ceph_cmd_with_secret(['dashboard', 'ac-user-create',
|
|
'test3'],
|
|
'foo',
|
|
return_exit_code=True)
|
|
self.assertNotEqual(exitcode, 0)
|
|
|
|
@DashboardTestCase.RunAs('test4', 'x1z_tst+_10#', force_password=False, login=False)
|
|
def test_change_user_password_cli_fail(self):
|
|
exitcode = self._ceph_cmd_with_secret(['dashboard', 'ac-user-set-password',
|
|
'test4'],
|
|
'bar',
|
|
return_exit_code=True)
|
|
self.assertNotEqual(exitcode, 0)
|
|
|
|
def test_create_user_with_pwd_expiration_date(self):
|
|
future_date = datetime.utcnow() + timedelta(days=10)
|
|
future_date = int(time.mktime(future_date.timetuple()))
|
|
|
|
self._create_user(username='user1',
|
|
password='mypassword10#',
|
|
name='My Name',
|
|
email='my@email.com',
|
|
roles=['administrator'],
|
|
pwd_expiration_date=future_date)
|
|
self.assertStatus(201)
|
|
user = self.jsonBody()
|
|
|
|
self._get('/api/user/user1')
|
|
self.assertStatus(200)
|
|
self.assertJsonBody({
|
|
'username': 'user1',
|
|
'name': 'My Name',
|
|
'email': 'my@email.com',
|
|
'roles': ['administrator'],
|
|
'lastUpdate': user['lastUpdate'],
|
|
'enabled': True,
|
|
'pwdExpirationDate': future_date,
|
|
'pwdUpdateRequired': False
|
|
})
|
|
self._delete('/api/user/user1')
|
|
|
|
def test_create_with_pwd_expiration_date_not_valid(self):
|
|
past_date = datetime.utcnow() - timedelta(days=10)
|
|
past_date = int(time.mktime(past_date.timetuple()))
|
|
|
|
self._create_user(username='user1',
|
|
password='mypassword10#',
|
|
name='My Name',
|
|
email='my@email.com',
|
|
roles=['administrator'],
|
|
pwd_expiration_date=past_date)
|
|
self.assertStatus(400)
|
|
self.assertError(code='pwd_past_expiration_date', component='user')
|
|
|
|
def test_create_with_default_expiration_date(self):
|
|
future_date_1 = datetime.utcnow() + timedelta(days=9)
|
|
future_date_1 = int(time.mktime(future_date_1.timetuple()))
|
|
future_date_2 = datetime.utcnow() + timedelta(days=11)
|
|
future_date_2 = int(time.mktime(future_date_2.timetuple()))
|
|
|
|
self._ceph_cmd(['dashboard', 'set-user-pwd-expiration-span', '10'])
|
|
self._create_user(username='user1',
|
|
password='mypassword10#',
|
|
name='My Name',
|
|
email='my@email.com',
|
|
roles=['administrator'])
|
|
self.assertStatus(201)
|
|
|
|
user = self._get('/api/user/user1')
|
|
self.assertStatus(200)
|
|
self.assertIsNotNone(user['pwdExpirationDate'])
|
|
self.assertGreater(user['pwdExpirationDate'], future_date_1)
|
|
self.assertLess(user['pwdExpirationDate'], future_date_2)
|
|
|
|
self._delete('/api/user/user1')
|
|
self._ceph_cmd(['dashboard', 'set-user-pwd-expiration-span', '0'])
|
|
|
|
def test_pwd_expiration_date_update(self):
|
|
self._ceph_cmd(['dashboard', 'set-user-pwd-expiration-span', '10'])
|
|
self.create_user('user1', 'mypassword10#', ['administrator'])
|
|
|
|
user_1 = self._get('/api/user/user1')
|
|
self.assertStatus(200)
|
|
|
|
# Let's wait 1 s to ensure pwd expiration date is not the same
|
|
time.sleep(1)
|
|
|
|
self.login('user1', 'mypassword10#')
|
|
self._post('/api/user/user1/change_password', {
|
|
'old_password': 'mypassword10#',
|
|
'new_password': 'newpassword01#'
|
|
})
|
|
self.assertStatus(200)
|
|
|
|
# Compare password expiration dates.
|
|
self._reset_login_to_admin()
|
|
user_1_pwd_changed = self._get('/api/user/user1')
|
|
self.assertStatus(200)
|
|
self.assertLess(user_1['pwdExpirationDate'], user_1_pwd_changed['pwdExpirationDate'])
|
|
|
|
# Cleanup
|
|
self.delete_user('user1')
|
|
self._ceph_cmd(['dashboard', 'set-user-pwd-expiration-span', '0'])
|
|
|
|
def test_pwd_update_required(self):
|
|
self._create_user(username='user1',
|
|
password='mypassword10#',
|
|
name='My Name',
|
|
email='my@email.com',
|
|
roles=['administrator'],
|
|
pwd_update_required=True)
|
|
self.assertStatus(201)
|
|
|
|
user_1 = self._get('/api/user/user1')
|
|
self.assertStatus(200)
|
|
self.assertEqual(user_1['pwdUpdateRequired'], True)
|
|
|
|
self.login('user1', 'mypassword10#')
|
|
self.assertStatus(201)
|
|
|
|
self._get('/api/osd')
|
|
self.assertStatus(403)
|
|
self._reset_login_to_admin('user1')
|
|
|
|
def test_pwd_update_required_change_pwd(self):
|
|
self._create_user(username='user1',
|
|
password='mypassword10#',
|
|
name='My Name',
|
|
email='my@email.com',
|
|
roles=['administrator'],
|
|
pwd_update_required=True)
|
|
self.assertStatus(201)
|
|
|
|
self.login('user1', 'mypassword10#')
|
|
self._post('/api/user/user1/change_password', {
|
|
'old_password': 'mypassword10#',
|
|
'new_password': 'newpassword01#'
|
|
})
|
|
|
|
self.login('user1', 'newpassword01#')
|
|
user_1 = self._get('/api/user/user1')
|
|
self.assertStatus(200)
|
|
self.assertEqual(user_1['pwdUpdateRequired'], False)
|
|
self._get('/api/osd')
|
|
self.assertStatus(200)
|
|
self._reset_login_to_admin('user1')
|
|
|
|
def test_validate_password_weak(self):
|
|
self._post('/api/user/validate_password', {
|
|
'password': 'mypassword1'
|
|
})
|
|
self.assertStatus(200)
|
|
self.assertJsonBody({
|
|
'valid': True,
|
|
'credits': 11,
|
|
'valuation': 'Weak'
|
|
})
|
|
|
|
def test_validate_password_ok(self):
|
|
self._post('/api/user/validate_password', {
|
|
'password': 'mypassword1!@'
|
|
})
|
|
self.assertStatus(200)
|
|
self.assertJsonBody({
|
|
'valid': True,
|
|
'credits': 17,
|
|
'valuation': 'OK'
|
|
})
|
|
|
|
def test_validate_password_strong(self):
|
|
self._post('/api/user/validate_password', {
|
|
'password': 'testpassword0047!@'
|
|
})
|
|
self.assertStatus(200)
|
|
self.assertJsonBody({
|
|
'valid': True,
|
|
'credits': 22,
|
|
'valuation': 'Strong'
|
|
})
|
|
|
|
def test_validate_password_very_strong(self):
|
|
self._post('/api/user/validate_password', {
|
|
'password': 'testpassword#!$!@$'
|
|
})
|
|
self.assertStatus(200)
|
|
self.assertJsonBody({
|
|
'valid': True,
|
|
'credits': 30,
|
|
'valuation': 'Very strong'
|
|
})
|
|
|
|
def test_validate_password_fail(self):
|
|
self._post('/api/user/validate_password', {
|
|
'password': 'foo'
|
|
})
|
|
self.assertStatus(200)
|
|
self.assertJsonBody({
|
|
'valid': False,
|
|
'credits': 0,
|
|
'valuation': 'Password is too weak.'
|
|
})
|
|
|
|
def test_validate_password_fail_name(self):
|
|
self._post('/api/user/validate_password', {
|
|
'password': 'x1zhugo_10',
|
|
'username': 'hugo'
|
|
})
|
|
self.assertStatus(200)
|
|
self.assertJsonBody({
|
|
'valid': False,
|
|
'credits': 0,
|
|
'valuation': 'Password must not contain username.'
|
|
})
|
|
|
|
def test_validate_password_fail_oldpwd(self):
|
|
self._post('/api/user/validate_password', {
|
|
'password': 'x1zt-st10',
|
|
'old_password': 'x1zt-st10'
|
|
})
|
|
self.assertStatus(200)
|
|
self.assertJsonBody({
|
|
'valid': False,
|
|
'credits': 0,
|
|
'valuation': 'Password must not be the same as the previous one.'
|
|
})
|
|
|
|
def test_create_user_pwd_update_required(self):
|
|
self.create_user('foo', 'bar', cmd_args=['--pwd_update_required'])
|
|
self._get('/api/user/foo')
|
|
self.assertStatus(200)
|
|
self.assertJsonSubset({
|
|
'username': 'foo',
|
|
'pwdUpdateRequired': True
|
|
})
|
|
self.delete_user('foo')
|