ceph/qa/tasks/mgr/dashboard/test_rgw.py

142 lines
4.3 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
from __future__ import absolute_import
import urllib
import logging
logger = logging.getLogger(__name__)
from .helper import DashboardTestCase, authenticate
class RgwControllerTest(DashboardTestCase):
@authenticate
def test_rgw_daemon_list(self):
data = self._get('/api/rgw/daemon')
self.assertStatus(200)
self.assertEqual(len(data), 1)
data = data[0]
self.assertIn('id', data)
self.assertIn('version', data)
self.assertIn('server_hostname', data)
@authenticate
def test_rgw_daemon_get(self):
data = self._get('/api/rgw/daemon')
self.assertStatus(200)
data = self._get('/api/rgw/daemon/{}'.format(data[0]['id']))
self.assertStatus(200)
self.assertIn('rgw_metadata', data)
self.assertIn('rgw_id', data)
self.assertIn('rgw_status', data)
self.assertTrue(data['rgw_metadata'])
class RgwProxyExceptionsTest(DashboardTestCase):
@classmethod
def setUpClass(cls):
super(RgwProxyExceptionsTest, cls).setUpClass()
cls._ceph_cmd(['dashboard', 'set-rgw-api-secret-key', ''])
cls._ceph_cmd(['dashboard', 'set-rgw-api-access-key', ''])
@authenticate
def test_no_credentials_exception(self):
resp = self._get('/api/rgw/proxy/status')
self.assertStatus(500)
self.assertIn('detail', resp)
class RgwProxyTest(DashboardTestCase):
@classmethod
def setUpClass(cls):
super(RgwProxyTest, cls).setUpClass()
cls._radosgw_admin_cmd([
'user', 'create', '--uid=admin', '--display-name=admin',
'--system', '--access-key=admin', '--secret=admin'
])
cls._ceph_cmd(['dashboard', 'set-rgw-api-secret-key', 'admin'])
cls._ceph_cmd(['dashboard', 'set-rgw-api-access-key', 'admin'])
def _assert_user_data(self, data):
self.assertIn('caps', data)
self.assertIn('display_name', data)
self.assertIn('email', data)
self.assertIn('keys', data)
self.assertGreaterEqual(len(data['keys']), 1)
self.assertIn('max_buckets', data)
self.assertIn('subusers', data)
self.assertIn('suspended', data)
self.assertIn('swift_keys', data)
self.assertIn('tenant', data)
self.assertIn('user_id', data)
def _test_put(self):
self._put(
'/api/rgw/proxy/user',
params={
'uid': 'teuth-test-user',
'display-name': 'display name',
})
data = self._resp.json()
self._assert_user_data(data)
self.assertStatus(200)
data = self._get(
'/api/rgw/proxy/user', params={'uid': 'teuth-test-user'})
self.assertStatus(200)
self.assertEqual(data['user_id'], 'teuth-test-user')
def _test_get(self):
data = self._get(
'/api/rgw/proxy/user', params={'uid': 'teuth-test-user'})
self._assert_user_data(data)
self.assertStatus(200)
self.assertEquals(data['user_id'], 'teuth-test-user')
def _test_post(self):
"""Updates the user"""
self._post(
'/api/rgw/proxy/user',
params={
'uid': 'teuth-test-user',
'display-name': 'new name'
})
self.assertStatus(200)
self._assert_user_data(self._resp.json())
self.assertEqual(self._resp.json()['display_name'], 'new name')
def _test_delete(self):
self._delete('/api/rgw/proxy/user', params={'uid': 'teuth-test-user'})
self.assertStatus(200)
self._delete('/api/rgw/proxy/user', params={'uid': 'teuth-test-user'})
self.assertStatus(500)
resp = self._resp.json()
self.assertIn('detail', resp)
self.assertIn('failed request with status code 404', resp['detail'])
self.assertIn('"Code":"NoSuchUser"', resp['detail'])
self.assertIn('"HostId"', resp['detail'])
self.assertIn('"RequestId"', resp['detail'])
@authenticate
def test_rgw_proxy(self):
"""Test basic request types"""
self.maxDiff = None
# PUT - Create a user
self._test_put()
# GET - Get the user details
self._test_get()
# POST - Update the user details
self._test_post()
# DELETE - Delete the user
self._test_delete()