mgr/dashboard: User database migration has been cut out

This PR will revert changes done by https://tracker.ceph.com/issues/49645 to auto-migrate user database v1 to v2.

Fixes: https://tracker.ceph.com/issues/51443

Signed-off-by: Volker Theile <vtheile@suse.com>
This commit is contained in:
Volker Theile 2021-06-30 14:00:28 +02:00
parent eb20648daf
commit 8292281161
2 changed files with 134 additions and 0 deletions

View File

@ -525,6 +525,29 @@ class AccessControlDB(object):
version = cls.VERSION
return "{}{}".format(cls.ACDB_CONFIG_KEY, version)
def check_and_update_db(self):
logger.debug("Checking for previous DB versions")
def check_migrate_v1_to_current():
# Check if version 1 exists in the DB and migrate it to current version
v1_db = mgr.get_store(self.accessdb_config_key(1))
if v1_db:
logger.debug("Found database v1 credentials")
v1_db = json.loads(v1_db)
for user, _ in v1_db['users'].items():
v1_db['users'][user]['enabled'] = True
v1_db['users'][user]['pwdExpirationDate'] = None
v1_db['users'][user]['pwdUpdateRequired'] = False
self.roles = {rn: Role.from_dict(r) for rn, r in v1_db.get('roles', {}).items()}
self.users = {un: User.from_dict(u, dict(self.roles, **SYSTEM_ROLES))
for un, u in v1_db.get('users', {}).items()}
self.save()
check_migrate_v1_to_current()
@classmethod
def load(cls):
logger.info("Loading user roles DB version=%s", cls.VERSION)
@ -533,6 +556,8 @@ class AccessControlDB(object):
if json_db is None:
logger.debug("No DB v%s found, creating new...", cls.VERSION)
db = cls(cls.VERSION, {}, {})
# check if we can update from a previous version database
db.check_and_update_db()
return db
dict_db = json.loads(json_db)

View File

@ -687,6 +687,115 @@ class AccessControlTest(unittest.TestCase, CLICommandTestMixin):
self.validate_persistent_user('admin', ['read-only'], pass_hash,
'admin User', 'admin@user.com')
def test_load_v1(self):
self.CONFIG_KEY_DICT['accessdb_v1'] = '''
{{
"users": {{
"admin": {{
"username": "admin",
"password":
"$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK",
"roles": ["block-manager", "test_role"],
"name": "admin User",
"email": "admin@user.com",
"lastUpdate": {}
}}
}},
"roles": {{
"test_role": {{
"name": "test_role",
"description": "Test Role",
"scopes_permissions": {{
"{}": ["{}", "{}"],
"{}": ["{}"]
}}
}}
}},
"version": 1
}}
'''.format(int(round(time.time())), Scope.ISCSI, Permission.READ,
Permission.UPDATE, Scope.POOL, Permission.CREATE)
load_access_control_db()
role = self.exec_cmd('ac-role-show', rolename="test_role")
self.assertDictEqual(role, {
'name': 'test_role',
'description': "Test Role",
'scopes_permissions': {
Scope.ISCSI: [Permission.READ, Permission.UPDATE],
Scope.POOL: [Permission.CREATE]
}
})
user = self.exec_cmd('ac-user-show', username="admin")
self.assertDictEqual(user, {
'username': 'admin',
'lastUpdate': user['lastUpdate'],
'password':
"$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK",
'pwdExpirationDate': None,
'pwdUpdateRequired': False,
'name': 'admin User',
'email': 'admin@user.com',
'roles': ['block-manager', 'test_role'],
'enabled': True
})
def test_load_v2(self):
self.CONFIG_KEY_DICT['accessdb_v2'] = '''
{{
"users": {{
"admin": {{
"username": "admin",
"password":
"$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK",
"pwdExpirationDate": null,
"pwdUpdateRequired": false,
"roles": ["block-manager", "test_role"],
"name": "admin User",
"email": "admin@user.com",
"lastUpdate": {},
"enabled": true
}}
}},
"roles": {{
"test_role": {{
"name": "test_role",
"description": "Test Role",
"scopes_permissions": {{
"{}": ["{}", "{}"],
"{}": ["{}"]
}}
}}
}},
"version": 2
}}
'''.format(int(round(time.time())), Scope.ISCSI, Permission.READ,
Permission.UPDATE, Scope.POOL, Permission.CREATE)
load_access_control_db()
role = self.exec_cmd('ac-role-show', rolename="test_role")
self.assertDictEqual(role, {
'name': 'test_role',
'description': "Test Role",
'scopes_permissions': {
Scope.ISCSI: [Permission.READ, Permission.UPDATE],
Scope.POOL: [Permission.CREATE]
}
})
user = self.exec_cmd('ac-user-show', username="admin")
self.assertDictEqual(user, {
'username': 'admin',
'lastUpdate': user['lastUpdate'],
'password':
"$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK",
'pwdExpirationDate': None,
'pwdUpdateRequired': False,
'name': 'admin User',
'email': 'admin@user.com',
'roles': ['block-manager', 'test_role'],
'enabled': True
})
def test_password_policy_pw_length(self):
Settings.PWD_POLICY_CHECK_LENGTH_ENABLED = True
Settings.PWD_POLICY_MIN_LENGTH = 3