diff --git a/src/pybind/mgr/dashboard/services/access_control.py b/src/pybind/mgr/dashboard/services/access_control.py index 972da6a2f66..d379d6e2146 100644 --- a/src/pybind/mgr/dashboard/services/access_control.py +++ b/src/pybind/mgr/dashboard/services/access_control.py @@ -525,6 +525,29 @@ class AccessControlDB(object): version = cls.VERSION return "{}{}".format(cls.ACDB_CONFIG_KEY, version) + def check_and_update_db(self): + logger.debug("Checking for previous DB versions") + + def check_migrate_v1_to_current(): + # Check if version 1 exists in the DB and migrate it to current version + v1_db = mgr.get_store(self.accessdb_config_key(1)) + if v1_db: + logger.debug("Found database v1 credentials") + v1_db = json.loads(v1_db) + + for user, _ in v1_db['users'].items(): + v1_db['users'][user]['enabled'] = True + v1_db['users'][user]['pwdExpirationDate'] = None + v1_db['users'][user]['pwdUpdateRequired'] = False + + self.roles = {rn: Role.from_dict(r) for rn, r in v1_db.get('roles', {}).items()} + self.users = {un: User.from_dict(u, dict(self.roles, **SYSTEM_ROLES)) + for un, u in v1_db.get('users', {}).items()} + + self.save() + + check_migrate_v1_to_current() + @classmethod def load(cls): logger.info("Loading user roles DB version=%s", cls.VERSION) @@ -533,6 +556,8 @@ class AccessControlDB(object): if json_db is None: logger.debug("No DB v%s found, creating new...", cls.VERSION) db = cls(cls.VERSION, {}, {}) + # check if we can update from a previous version database + db.check_and_update_db() return db dict_db = json.loads(json_db) diff --git a/src/pybind/mgr/dashboard/tests/test_access_control.py b/src/pybind/mgr/dashboard/tests/test_access_control.py index a4a64174cee..01ee533616d 100644 --- a/src/pybind/mgr/dashboard/tests/test_access_control.py +++ b/src/pybind/mgr/dashboard/tests/test_access_control.py @@ -687,6 +687,115 @@ class AccessControlTest(unittest.TestCase, CLICommandTestMixin): self.validate_persistent_user('admin', ['read-only'], pass_hash, 'admin User', 'admin@user.com') + def test_load_v1(self): + self.CONFIG_KEY_DICT['accessdb_v1'] = ''' + {{ + "users": {{ + "admin": {{ + "username": "admin", + "password": + "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK", + "roles": ["block-manager", "test_role"], + "name": "admin User", + "email": "admin@user.com", + "lastUpdate": {} + }} + }}, + "roles": {{ + "test_role": {{ + "name": "test_role", + "description": "Test Role", + "scopes_permissions": {{ + "{}": ["{}", "{}"], + "{}": ["{}"] + }} + }} + }}, + "version": 1 + }} + '''.format(int(round(time.time())), Scope.ISCSI, Permission.READ, + Permission.UPDATE, Scope.POOL, Permission.CREATE) + + load_access_control_db() + role = self.exec_cmd('ac-role-show', rolename="test_role") + self.assertDictEqual(role, { + 'name': 'test_role', + 'description': "Test Role", + 'scopes_permissions': { + Scope.ISCSI: [Permission.READ, Permission.UPDATE], + Scope.POOL: [Permission.CREATE] + } + }) + user = self.exec_cmd('ac-user-show', username="admin") + self.assertDictEqual(user, { + 'username': 'admin', + 'lastUpdate': user['lastUpdate'], + 'password': + "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK", + 'pwdExpirationDate': None, + 'pwdUpdateRequired': False, + 'name': 'admin User', + 'email': 'admin@user.com', + 'roles': ['block-manager', 'test_role'], + 'enabled': True + }) + + def test_load_v2(self): + self.CONFIG_KEY_DICT['accessdb_v2'] = ''' + {{ + "users": {{ + "admin": {{ + "username": "admin", + "password": + "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK", + "pwdExpirationDate": null, + "pwdUpdateRequired": false, + "roles": ["block-manager", "test_role"], + "name": "admin User", + "email": "admin@user.com", + "lastUpdate": {}, + "enabled": true + }} + }}, + "roles": {{ + "test_role": {{ + "name": "test_role", + "description": "Test Role", + "scopes_permissions": {{ + "{}": ["{}", "{}"], + "{}": ["{}"] + }} + }} + }}, + "version": 2 + }} + '''.format(int(round(time.time())), Scope.ISCSI, Permission.READ, + Permission.UPDATE, Scope.POOL, Permission.CREATE) + + load_access_control_db() + role = self.exec_cmd('ac-role-show', rolename="test_role") + self.assertDictEqual(role, { + 'name': 'test_role', + 'description': "Test Role", + 'scopes_permissions': { + Scope.ISCSI: [Permission.READ, Permission.UPDATE], + Scope.POOL: [Permission.CREATE] + } + }) + user = self.exec_cmd('ac-user-show', username="admin") + self.assertDictEqual(user, { + 'username': 'admin', + 'lastUpdate': user['lastUpdate'], + 'password': + "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK", + 'pwdExpirationDate': None, + 'pwdUpdateRequired': False, + 'name': 'admin User', + 'email': 'admin@user.com', + 'roles': ['block-manager', 'test_role'], + 'enabled': True + }) + def test_password_policy_pw_length(self): Settings.PWD_POLICY_CHECK_LENGTH_ENABLED = True Settings.PWD_POLICY_MIN_LENGTH = 3