mirror of
https://github.com/ceph/ceph
synced 2024-12-29 15:03:33 +00:00
Merge pull request #39139 from TRYTOBE8TME/wip-rgw-bucket-tests-separation-new
Wip rgw bucket tests separation new
This commit is contained in:
commit
b903b2e011
0
qa/suites/rgw/notifications/%
Normal file
0
qa/suites/rgw/notifications/%
Normal file
1
qa/suites/rgw/notifications/.qa
Symbolic link
1
qa/suites/rgw/notifications/.qa
Symbolic link
@ -0,0 +1 @@
|
||||
../.qa
|
1
qa/suites/rgw/notifications/beast.yaml
Symbolic link
1
qa/suites/rgw/notifications/beast.yaml
Symbolic link
@ -0,0 +1 @@
|
||||
.qa/rgw_frontend/beast.yaml
|
1
qa/suites/rgw/notifications/bluestore-bitmap.yaml
Symbolic link
1
qa/suites/rgw/notifications/bluestore-bitmap.yaml
Symbolic link
@ -0,0 +1 @@
|
||||
.qa/objectstore/bluestore-bitmap.yaml
|
1
qa/suites/rgw/notifications/fixed-2.yaml
Symbolic link
1
qa/suites/rgw/notifications/fixed-2.yaml
Symbolic link
@ -0,0 +1 @@
|
||||
.qa/clusters/fixed-2.yaml
|
13
qa/suites/rgw/notifications/overrides.yaml
Normal file
13
qa/suites/rgw/notifications/overrides.yaml
Normal file
@ -0,0 +1,13 @@
|
||||
overrides:
|
||||
ceph:
|
||||
wait-for-scrub: false
|
||||
conf:
|
||||
client:
|
||||
setuser: ceph
|
||||
setgroup: ceph
|
||||
debug rgw: 20
|
||||
rgw crypt s3 kms backend: testing
|
||||
rgw crypt s3 kms encryption keys: testkey-1=YmluCmJvb3N0CmJvb3N0LWJ1aWxkCmNlcGguY29uZgo= testkey-2=aWIKTWFrZWZpbGUKbWFuCm91dApzcmMKVGVzdGluZwo=
|
||||
rgw crypt require ssl: false
|
||||
rgw:
|
||||
storage classes: LUKEWARM, FROZEN
|
1
qa/suites/rgw/notifications/supported-random-distro$
Symbolic link
1
qa/suites/rgw/notifications/supported-random-distro$
Symbolic link
@ -0,0 +1 @@
|
||||
.qa/distros/supported-random-distro$
|
0
qa/suites/rgw/notifications/tasks/+
Normal file
0
qa/suites/rgw/notifications/tasks/+
Normal file
20
qa/suites/rgw/notifications/tasks/0-install.yaml
Normal file
20
qa/suites/rgw/notifications/tasks/0-install.yaml
Normal file
@ -0,0 +1,20 @@
|
||||
# see http://tracker.ceph.com/issues/20360 and http://tracker.ceph.com/issues/18126
|
||||
os_type: centos
|
||||
|
||||
tasks:
|
||||
- install:
|
||||
# flavor: notcmalloc
|
||||
- ceph:
|
||||
- openssl_keys:
|
||||
- rgw:
|
||||
client.0:
|
||||
# valgrind: [--tool=memcheck, --max-threads=1024] # http://tracker.ceph.com/issues/25214
|
||||
|
||||
overrides:
|
||||
ceph:
|
||||
conf:
|
||||
global:
|
||||
osd_min_pg_log_entries: 10
|
||||
osd_max_pg_log_entries: 10
|
||||
client:
|
||||
rgw lc debug interval: 10
|
10
qa/suites/rgw/notifications/tasks/test_kafka.yaml
Normal file
10
qa/suites/rgw/notifications/tasks/test_kafka.yaml
Normal file
@ -0,0 +1,10 @@
|
||||
tasks:
|
||||
- tox: [ client.0 ]
|
||||
- kafka:
|
||||
client.0:
|
||||
kafka_version: 2.6.0
|
||||
- notification-tests:
|
||||
client.0:
|
||||
force-branch: wip-rgw-bucket-tests-separation-new
|
||||
rgw_server: client.0
|
||||
git_remote: https://github.com/TRYTOBE8TME/
|
196
qa/tasks/kafka.py
Normal file
196
qa/tasks/kafka.py
Normal file
@ -0,0 +1,196 @@
|
||||
"""
|
||||
Deploy and configure Kafka for Teuthology
|
||||
"""
|
||||
import contextlib
|
||||
import logging
|
||||
|
||||
from teuthology import misc as teuthology
|
||||
from teuthology import contextutil
|
||||
from teuthology.orchestra import run
|
||||
from teuthology.exceptions import ConfigError
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
def get_kafka_version(config):
|
||||
for client, client_config in config.items():
|
||||
if 'kafka_version' in client_config:
|
||||
kafka_version = client_config.get('kafka_version')
|
||||
return kafka_version
|
||||
|
||||
def get_kafka_dir(ctx, config):
|
||||
kafka_version = get_kafka_version(config)
|
||||
current_version = 'kafka-' + kafka_version + '-src'
|
||||
return '{tdir}/{ver}'.format(tdir=teuthology.get_testdir(ctx),ver=current_version)
|
||||
|
||||
def get_toxvenv_dir(ctx):
|
||||
return ctx.tox.venv_path
|
||||
|
||||
def toxvenv_sh(ctx, remote, args, **kwargs):
|
||||
activate = get_toxvenv_dir(ctx) + '/bin/activate'
|
||||
return remote.sh(['source', activate, run.Raw('&&')] + args, **kwargs)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def install_kafka(ctx, config):
|
||||
"""
|
||||
Downloading the kafka tar file.
|
||||
"""
|
||||
assert isinstance(config, dict)
|
||||
log.info('Installing Kafka...')
|
||||
|
||||
for (client, _) in config.items():
|
||||
(remote,) = ctx.cluster.only(client).remotes.keys()
|
||||
test_dir=teuthology.get_testdir(ctx)
|
||||
current_version = get_kafka_version(config)
|
||||
|
||||
link1 = 'https://archive.apache.org/dist/kafka/' + current_version + '/kafka-' + current_version + '-src.tgz'
|
||||
toxvenv_sh(ctx, remote, ['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'wget', link1])
|
||||
|
||||
file1 = 'kafka-' + current_version + '-src.tgz'
|
||||
toxvenv_sh(ctx, remote, ['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'tar', '-xvzf', file1])
|
||||
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
log.info('Removing packaged dependencies of Kafka...')
|
||||
test_dir=get_kafka_dir(ctx, config)
|
||||
current_version = get_kafka_version(config)
|
||||
for client in config:
|
||||
ctx.cluster.only(client).run(
|
||||
args=['rm', '-rf', test_dir],
|
||||
)
|
||||
|
||||
rmfile1 = 'kafka-' + current_version + '-src.tgz'
|
||||
ctx.cluster.only(client).run(
|
||||
args=['rm', '-rf', '{tdir}/{doc}'.format(tdir=teuthology.get_testdir(ctx),doc=rmfile1)],
|
||||
)
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def run_kafka(ctx,config):
|
||||
"""
|
||||
This includes two parts:
|
||||
1. Starting Zookeeper service
|
||||
2. Starting Kafka service
|
||||
"""
|
||||
assert isinstance(config, dict)
|
||||
log.info('Bringing up Zookeeper and Kafka services...')
|
||||
for (client,_) in config.items():
|
||||
(remote,) = ctx.cluster.only(client).remotes.keys()
|
||||
|
||||
toxvenv_sh(ctx, remote,
|
||||
['cd', '{tdir}'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
|
||||
'./gradlew', 'jar',
|
||||
'-PscalaVersion=2.13.2'
|
||||
],
|
||||
)
|
||||
|
||||
toxvenv_sh(ctx, remote,
|
||||
['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
|
||||
'./zookeeper-server-start.sh',
|
||||
'{tir}/config/zookeeper.properties'.format(tir=get_kafka_dir(ctx, config)),
|
||||
run.Raw('&'), 'exit'
|
||||
],
|
||||
)
|
||||
|
||||
toxvenv_sh(ctx, remote,
|
||||
['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
|
||||
'./kafka-server-start.sh',
|
||||
'{tir}/config/server.properties'.format(tir=get_kafka_dir(ctx, config)),
|
||||
run.Raw('&'), 'exit'
|
||||
],
|
||||
)
|
||||
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
log.info('Stopping Zookeeper and Kafka Services...')
|
||||
|
||||
for (client, _) in config.items():
|
||||
(remote,) = ctx.cluster.only(client).remotes.keys()
|
||||
|
||||
toxvenv_sh(ctx, remote,
|
||||
['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
|
||||
'./kafka-server-stop.sh',
|
||||
'{tir}/config/kafka.properties'.format(tir=get_kafka_dir(ctx, config)),
|
||||
]
|
||||
)
|
||||
|
||||
toxvenv_sh(ctx, remote,
|
||||
['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
|
||||
'./zookeeper-server-stop.sh',
|
||||
'{tir}/config/zookeeper.properties'.format(tir=get_kafka_dir(ctx, config)),
|
||||
]
|
||||
)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def run_admin_cmds(ctx,config):
|
||||
"""
|
||||
Running Kafka Admin commands in order to check the working of producer anf consumer and creation of topic.
|
||||
"""
|
||||
assert isinstance(config, dict)
|
||||
log.info('Checking kafka server through producer/consumer commands...')
|
||||
for (client,_) in config.items():
|
||||
(remote,) = ctx.cluster.only(client).remotes.keys()
|
||||
|
||||
toxvenv_sh(ctx, remote,
|
||||
[
|
||||
'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
|
||||
'./kafka-topics.sh', '--create', '--topic', 'quickstart-events',
|
||||
'--bootstrap-server', 'localhost:9092'
|
||||
])
|
||||
|
||||
toxvenv_sh(ctx, remote,
|
||||
[
|
||||
'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
|
||||
'echo', "First", run.Raw('|'),
|
||||
'./kafka-console-producer.sh', '--topic', 'quickstart-events',
|
||||
'--bootstrap-server', 'localhost:9092'
|
||||
])
|
||||
|
||||
toxvenv_sh(ctx, remote,
|
||||
[
|
||||
'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
|
||||
'./kafka-console-consumer.sh', '--topic', 'quickstart-events',
|
||||
'--from-beginning',
|
||||
'--bootstrap-server', 'localhost:9092',
|
||||
run.Raw('&'), 'exit'
|
||||
])
|
||||
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
pass
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def task(ctx,config):
|
||||
"""
|
||||
To run kafka the prerequisite is to run the tox task. Following is the way how to run
|
||||
tox and then kafka::
|
||||
tasks:
|
||||
- tox: [ client.0 ]
|
||||
- kafka:
|
||||
client.0:
|
||||
"""
|
||||
assert config is None or isinstance(config, list) \
|
||||
or isinstance(config, dict), \
|
||||
"task kafka only supports a list or dictionary for configuration"
|
||||
|
||||
if not hasattr(ctx, 'tox'):
|
||||
raise ConfigError('kafka must run after the tox task')
|
||||
|
||||
all_clients = ['client.{id}'.format(id=id_)
|
||||
for id_ in teuthology.all_roles_of_type(ctx.cluster, 'client')]
|
||||
if config is None:
|
||||
config = all_clients
|
||||
if isinstance(config, list):
|
||||
config = dict.fromkeys(config)
|
||||
|
||||
log.debug('Kafka config is %s', config)
|
||||
|
||||
with contextutil.nested(
|
||||
lambda: install_kafka(ctx=ctx, config=config),
|
||||
lambda: run_kafka(ctx=ctx, config=config),
|
||||
lambda: run_admin_cmds(ctx=ctx, config=config),
|
||||
):
|
||||
yield
|
259
qa/tasks/notification_tests.py
Normal file
259
qa/tasks/notification_tests.py
Normal file
@ -0,0 +1,259 @@
|
||||
"""
|
||||
Run a set of bucket notification tests on rgw.
|
||||
"""
|
||||
from io import BytesIO
|
||||
from configobj import ConfigObj
|
||||
import base64
|
||||
import contextlib
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
import string
|
||||
|
||||
from teuthology import misc as teuthology
|
||||
from teuthology import contextutil
|
||||
from teuthology.config import config as teuth_config
|
||||
from teuthology.orchestra import run
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def download(ctx, config):
|
||||
"""
|
||||
Download the bucket notification tests from the git builder.
|
||||
Remove downloaded test file upon exit.
|
||||
The context passed in should be identical to the context
|
||||
passed in to the main task.
|
||||
"""
|
||||
assert isinstance(config, dict)
|
||||
log.info('Downloading bucket-notification-tests...')
|
||||
testdir = teuthology.get_testdir(ctx)
|
||||
for (client, client_config) in config.items():
|
||||
bntests_branch = client_config.get('force-branch', None)
|
||||
if not bntests_branch:
|
||||
raise ValueError(
|
||||
"Could not determine what branch to use for bn-tests. Please add 'force-branch: {bn-tests branch name}' to the .yaml config for this bucket notifications tests task.")
|
||||
|
||||
log.info("Using branch '%s' for bucket notifications tests", bntests_branch)
|
||||
sha1 = client_config.get('sha1')
|
||||
git_remote = client_config.get('git_remote', teuth_config.ceph_git_base_url)
|
||||
ctx.cluster.only(client).run(
|
||||
args=[
|
||||
'git', 'clone',
|
||||
'-b', bntests_branch,
|
||||
git_remote + 'ceph.git',
|
||||
'{tdir}/ceph'.format(tdir=testdir),
|
||||
],
|
||||
)
|
||||
if sha1 is not None:
|
||||
ctx.cluster.only(client).run(
|
||||
args=[
|
||||
'cd', '{tdir}/ceph'.format(tdir=testdir),
|
||||
run.Raw('&&'),
|
||||
'git', 'reset', '--hard', sha1,
|
||||
],
|
||||
)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
log.info('Removing bucket-notifications-tests...')
|
||||
testdir = teuthology.get_testdir(ctx)
|
||||
for client in config:
|
||||
ctx.cluster.only(client).run(
|
||||
args=[
|
||||
'rm',
|
||||
'-rf',
|
||||
'{tdir}/ceph'.format(tdir=testdir),
|
||||
],
|
||||
)
|
||||
|
||||
def _config_user(bntests_conf, section, user):
|
||||
"""
|
||||
Configure users for this section by stashing away keys, ids, and
|
||||
email addresses.
|
||||
"""
|
||||
bntests_conf[section].setdefault('user_id', user)
|
||||
bntests_conf[section].setdefault('email', '{user}+test@test.test'.format(user=user))
|
||||
bntests_conf[section].setdefault('display_name', 'Mr. {user}'.format(user=user))
|
||||
bntests_conf[section].setdefault('access_key',
|
||||
''.join(random.choice(string.ascii_uppercase) for i in range(20)))
|
||||
bntests_conf[section].setdefault('secret_key',
|
||||
base64.b64encode(os.urandom(40)).decode())
|
||||
|
||||
@contextlib.contextmanager
|
||||
def create_users(ctx, config):
|
||||
"""
|
||||
Create a main and an alternate s3 user.
|
||||
"""
|
||||
assert isinstance(config, dict)
|
||||
log.info('Creating rgw user...')
|
||||
testdir = teuthology.get_testdir(ctx)
|
||||
|
||||
users = {'s3 main': 'foo'}
|
||||
for client in config['clients']:
|
||||
bntests_conf = config['bntests_conf'][client]
|
||||
for section, user in users.items():
|
||||
_config_user(bntests_conf, section, '{user}.{client}'.format(user=user, client=client))
|
||||
log.debug('Creating user {user} on {host}'.format(user=bntests_conf[section]['user_id'], host=client))
|
||||
cluster_name, daemon_type, client_id = teuthology.split_role(client)
|
||||
client_with_id = daemon_type + '.' + client_id
|
||||
ctx.cluster.only(client).run(
|
||||
args=[
|
||||
'adjust-ulimits',
|
||||
'ceph-coverage',
|
||||
'{tdir}/archive/coverage'.format(tdir=testdir),
|
||||
'radosgw-admin',
|
||||
'-n', client_with_id,
|
||||
'user', 'create',
|
||||
'--uid', bntests_conf[section]['user_id'],
|
||||
'--display-name', bntests_conf[section]['display_name'],
|
||||
'--access-key', bntests_conf[section]['access_key'],
|
||||
'--secret', bntests_conf[section]['secret_key'],
|
||||
'--cluster', cluster_name,
|
||||
],
|
||||
)
|
||||
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
for client in config['clients']:
|
||||
for user in users.values():
|
||||
uid = '{user}.{client}'.format(user=user, client=client)
|
||||
cluster_name, daemon_type, client_id = teuthology.split_role(client)
|
||||
client_with_id = daemon_type + '.' + client_id
|
||||
ctx.cluster.only(client).run(
|
||||
args=[
|
||||
'adjust-ulimits',
|
||||
'ceph-coverage',
|
||||
'{tdir}/archive/coverage'.format(tdir=testdir),
|
||||
'radosgw-admin',
|
||||
'-n', client_with_id,
|
||||
'user', 'rm',
|
||||
'--uid', uid,
|
||||
'--purge-data',
|
||||
'--cluster', cluster_name,
|
||||
],
|
||||
)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def configure(ctx, config):
|
||||
assert isinstance(config, dict)
|
||||
log.info('Configuring bucket-notifications-tests...')
|
||||
testdir = teuthology.get_testdir(ctx)
|
||||
for client, properties in config['clients'].items():
|
||||
(remote,) = ctx.cluster.only(client).remotes.keys()
|
||||
bntests_conf = config['bntests_conf'][client]
|
||||
|
||||
conf_fp = BytesIO()
|
||||
bntests_conf.write(conf_fp)
|
||||
remote.write_file(
|
||||
path='{tdir}/ceph/src/test/rgw/bucket_notification/bn-tests.{client}.conf'.format(tdir=testdir, client=client),
|
||||
data=conf_fp.getvalue(),
|
||||
)
|
||||
|
||||
remote.run(
|
||||
args=[
|
||||
'cd',
|
||||
'{tdir}/ceph/src/test/rgw/bucket_notification'.format(tdir=testdir),
|
||||
run.Raw('&&'),
|
||||
'./bootstrap',
|
||||
],
|
||||
)
|
||||
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
log.info('Removing bn-tests.conf file...')
|
||||
testdir = teuthology.get_testdir(ctx)
|
||||
for client, properties in config['clients'].items():
|
||||
(remote,) = ctx.cluster.only(client).remotes.keys()
|
||||
remote.run(
|
||||
args=['rm', '-f',
|
||||
'{tdir}/ceph/src/test/rgw/bucket_notification/bn-tests.{client}.conf'.format(tdir=testdir,client=client),
|
||||
],
|
||||
)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def run_tests(ctx, config):
|
||||
"""
|
||||
Run the bucket notifications tests after everything is set up.
|
||||
:param ctx: Context passed to task
|
||||
:param config: specific configuration information
|
||||
"""
|
||||
assert isinstance(config, dict)
|
||||
log.info('Running bucket-notifications-tests...')
|
||||
testdir = teuthology.get_testdir(ctx)
|
||||
for client, client_config in config.items():
|
||||
(remote,) = ctx.cluster.only(client).remotes.keys()
|
||||
|
||||
args = [
|
||||
'BNTESTS_CONF={tdir}/ceph/src/test/rgw/bucket_notification/bn-tests.{client}.conf'.format(tdir=testdir, client=client),
|
||||
'{tdir}/ceph/src/test/rgw/bucket_notification/virtualenv/bin/python'.format(tdir=testdir),
|
||||
'-m', 'nose',
|
||||
'-s',
|
||||
'{tdir}/ceph/src/test/rgw/bucket_notification/test_bn.py'.format(tdir=testdir)
|
||||
]
|
||||
|
||||
remote.run(
|
||||
args=args,
|
||||
label="bucket notification tests against kafka server"
|
||||
)
|
||||
yield
|
||||
|
||||
@contextlib.contextmanager
|
||||
def task(ctx,config):
|
||||
"""
|
||||
To run bucket notification tests the prerequisite is to run the kafka and tox task. Following is the way how to run
|
||||
tox and then kafka and finally bucket notification tests::
|
||||
tasks:
|
||||
- tox: [ client.0 ]
|
||||
- kafka:
|
||||
client.0:
|
||||
- notification_tests:
|
||||
client.0:
|
||||
"""
|
||||
assert config is None or isinstance(config, list) \
|
||||
or isinstance(config, dict), \
|
||||
"task kafka only supports a list or dictionary for configuration"
|
||||
|
||||
all_clients = ['client.{id}'.format(id=id_)
|
||||
for id_ in teuthology.all_roles_of_type(ctx.cluster, 'client')]
|
||||
if config is None:
|
||||
config = all_clients
|
||||
if isinstance(config, list):
|
||||
config = dict.fromkeys(config)
|
||||
clients=config.keys()
|
||||
|
||||
log.debug('Notifications config is %s', config)
|
||||
|
||||
bntests_conf = {}
|
||||
|
||||
for client in clients:
|
||||
endpoint = ctx.rgw.role_endpoints.get(client)
|
||||
assert endpoint, 'bntests: no rgw endpoint for {}'.format(client)
|
||||
|
||||
bntests_conf[client] = ConfigObj(
|
||||
indent_type='',
|
||||
infile={
|
||||
'DEFAULT':
|
||||
{
|
||||
'port':endpoint.port,
|
||||
'host':endpoint.dns_name,
|
||||
},
|
||||
's3 main':{}
|
||||
}
|
||||
)
|
||||
|
||||
with contextutil.nested(
|
||||
lambda: download(ctx=ctx, config=config),
|
||||
lambda: create_users(ctx=ctx, config=dict(
|
||||
clients=clients,
|
||||
bntests_conf=bntests_conf,
|
||||
)),
|
||||
lambda: configure(ctx=ctx, config=dict(
|
||||
clients=config,
|
||||
bntests_conf=bntests_conf,
|
||||
)),
|
||||
lambda: run_tests(ctx=ctx, config=config),
|
||||
):
|
||||
yield
|
47
src/test/rgw/bucket_notification/README.rst
Normal file
47
src/test/rgw/bucket_notification/README.rst
Normal file
@ -0,0 +1,47 @@
|
||||
============================
|
||||
Bucket Notification tests
|
||||
============================
|
||||
|
||||
You will need to use the sample configuration file named ``bntests.conf.SAMPLE``
|
||||
that has been provided at ``/path/to/ceph/src/test/rgw/bucket_notification/``. You can also copy this file to the directory where you are
|
||||
running the tests and modify it if needed. This file can be used to run the bucket notification tests on a Ceph cluster started
|
||||
with vstart.
|
||||
|
||||
You also need to install Kafka which can be done by downloading and unzipping from the following::
|
||||
|
||||
https://archive.apache.org/dist/kafka/2.6.0/kafka-2.6.0-src.tgz
|
||||
|
||||
Then inside the kafka config directory (``/path/to/kafka-2.6.0-src/config/``) you need to create a file named ``kafka_server_jaas.conf``
|
||||
with the following content::
|
||||
|
||||
KafkaClient {
|
||||
org.apache.kafka.common.security.plain.PlainLoginModule required
|
||||
username="alice"
|
||||
password="alice-secret";
|
||||
};
|
||||
|
||||
After creating this above file run the following command in kafka directory (``/path/to/kafka-2.6.0-src/``)::
|
||||
|
||||
./gradlew jar -PscalaVersion=2.13.2
|
||||
|
||||
After following the above steps next is you need to start the Zookeeper and Kafka services.
|
||||
Here's the commands which can be used to start these services. For starting
|
||||
Zookeeper service run::
|
||||
|
||||
bin/zookeeper-server-start.sh config/zookeeper.properties
|
||||
|
||||
and then run to start the Kafka service::
|
||||
|
||||
bin/kafka-server-start.sh config/server.properties
|
||||
|
||||
If you want to run Zookeeper and Kafka services in background add ``-daemon`` at the end of the command like::
|
||||
|
||||
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
|
||||
|
||||
and::
|
||||
|
||||
bin/kafka-server-start.sh -daemon config/server.properties
|
||||
|
||||
After starting vstart, zookeeper and kafka services you're ready to run the tests::
|
||||
|
||||
BNTESTS_CONF=bntests.conf python -m nose -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py
|
48
src/test/rgw/bucket_notification/__init__.py
Normal file
48
src/test/rgw/bucket_notification/__init__.py
Normal file
@ -0,0 +1,48 @@
|
||||
import configparser
|
||||
import os
|
||||
|
||||
def setup():
|
||||
cfg = configparser.RawConfigParser()
|
||||
try:
|
||||
path = os.environ['BNTESTS_CONF']
|
||||
except KeyError:
|
||||
raise RuntimeError(
|
||||
'To run tests, point environment '
|
||||
+ 'variable BNTESTS_CONF to a config file.',
|
||||
)
|
||||
cfg.read(path)
|
||||
|
||||
if not cfg.defaults():
|
||||
raise RuntimeError('Your config file is missing the DEFAULT section!')
|
||||
if not cfg.has_section("s3 main"):
|
||||
raise RuntimeError('Your config file is missing the "s3 main" section!')
|
||||
|
||||
defaults = cfg.defaults()
|
||||
|
||||
global default_host
|
||||
default_host = defaults.get("host")
|
||||
|
||||
global default_port
|
||||
default_port = int(defaults.get("port"))
|
||||
|
||||
global main_access_key
|
||||
main_access_key = cfg.get('s3 main',"access_key")
|
||||
|
||||
global main_secret_key
|
||||
main_secret_key = cfg.get('s3 main',"secret_key")
|
||||
|
||||
def get_config_host():
|
||||
global default_host
|
||||
return default_host
|
||||
|
||||
def get_config_port():
|
||||
global default_port
|
||||
return default_port
|
||||
|
||||
def get_access_key():
|
||||
global main_access_key
|
||||
return main_access_key
|
||||
|
||||
def get_secret_key():
|
||||
global main_secret_key
|
||||
return main_secret_key
|
237
src/test/rgw/bucket_notification/api.py
Normal file
237
src/test/rgw/bucket_notification/api.py
Normal file
@ -0,0 +1,237 @@
|
||||
import logging
|
||||
import ssl
|
||||
import urllib
|
||||
import hmac
|
||||
import hashlib
|
||||
import base64
|
||||
import xmltodict
|
||||
from http import client as http_client
|
||||
from urllib import parse as urlparse
|
||||
from time import gmtime, strftime
|
||||
import boto3
|
||||
from botocore.client import Config
|
||||
|
||||
log = logging.getLogger('bucket_notification.tests')
|
||||
|
||||
NO_HTTP_BODY = ''
|
||||
|
||||
def put_object_tagging(conn, bucket_name, key, tags):
|
||||
client = boto3.client('s3',
|
||||
endpoint_url='http://'+conn.host+':'+str(conn.port),
|
||||
aws_access_key_id=conn.aws_access_key_id,
|
||||
aws_secret_access_key=conn.aws_secret_access_key,
|
||||
config=Config(signature_version='s3'))
|
||||
return client.put_object(Body='aaaaaaaaaaa', Bucket=bucket_name, Key=key, Tagging=tags)
|
||||
|
||||
def make_request(conn, method, resource, parameters=None, sign_parameters=False, extra_parameters=None):
|
||||
"""generic request sending to pubsub radogw
|
||||
should cover: topics, notificatios and subscriptions
|
||||
"""
|
||||
url_params = ''
|
||||
if parameters is not None:
|
||||
url_params = urlparse.urlencode(parameters)
|
||||
# remove 'None' from keys with no values
|
||||
url_params = url_params.replace('=None', '')
|
||||
url_params = '?' + url_params
|
||||
if extra_parameters is not None:
|
||||
url_params = url_params + '&' + extra_parameters
|
||||
string_date = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
|
||||
string_to_sign = method + '\n\n\n' + string_date + '\n' + resource
|
||||
if sign_parameters:
|
||||
string_to_sign += url_params
|
||||
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key.encode('utf-8'),
|
||||
string_to_sign.encode('utf-8'),
|
||||
hashlib.sha1).digest()).decode('ascii')
|
||||
headers = {'Authorization': 'AWS '+conn.aws_access_key_id+':'+signature,
|
||||
'Date': string_date,
|
||||
'Host': conn.host+':'+str(conn.port)}
|
||||
http_conn = http_client.HTTPConnection(conn.host, conn.port)
|
||||
if log.getEffectiveLevel() <= 10:
|
||||
http_conn.set_debuglevel(5)
|
||||
http_conn.request(method, resource+url_params, NO_HTTP_BODY, headers)
|
||||
response = http_conn.getresponse()
|
||||
data = response.read()
|
||||
status = response.status
|
||||
http_conn.close()
|
||||
return data.decode('utf-8'), status
|
||||
|
||||
def delete_all_s3_topics(zone, region):
|
||||
try:
|
||||
conn = zone.secure_conn if zone.secure_conn is not None else zone.conn
|
||||
protocol = 'https' if conn.is_secure else 'http'
|
||||
client = boto3.client('sns',
|
||||
endpoint_url=protocol+'://'+conn.host+':'+str(conn.port),
|
||||
aws_access_key_id=conn.aws_access_key_id,
|
||||
aws_secret_access_key=conn.aws_secret_access_key,
|
||||
region_name=region,
|
||||
verify='./cert.pem',
|
||||
config=Config(signature_version='s3'))
|
||||
|
||||
topics = client.list_topics()['Topics']
|
||||
for topic in topics:
|
||||
print('topic cleanup, deleting: ' + topic['TopicArn'])
|
||||
assert client.delete_topic(TopicArn=topic['TopicArn'])['ResponseMetadata']['HTTPStatusCode'] == 200
|
||||
except Exception as err:
|
||||
print('failed to do topic cleanup: ' + str(err))
|
||||
|
||||
def delete_all_objects(conn, bucket_name):
|
||||
client = boto3.client('s3',
|
||||
endpoint_url='http://'+conn.host+':'+str(conn.port),
|
||||
aws_access_key_id=conn.aws_access_key_id,
|
||||
aws_secret_access_key=conn.aws_secret_access_key)
|
||||
|
||||
objects = []
|
||||
for key in client.list_objects(Bucket=bucket_name)['Contents']:
|
||||
objects.append({'Key': key['Key']})
|
||||
# delete objects from the bucket
|
||||
response = client.delete_objects(Bucket=bucket_name,
|
||||
Delete={'Objects': objects})
|
||||
|
||||
|
||||
class PSTopicS3:
|
||||
"""class to set/list/get/delete a topic
|
||||
POST ?Action=CreateTopic&Name=<topic name>[&OpaqueData=<data>[&push-endpoint=<endpoint>&[<arg1>=<value1>...]]]
|
||||
POST ?Action=ListTopics
|
||||
POST ?Action=GetTopic&TopicArn=<topic-arn>
|
||||
POST ?Action=DeleteTopic&TopicArn=<topic-arn>
|
||||
"""
|
||||
def __init__(self, conn, topic_name, region, endpoint_args=None, opaque_data=None):
|
||||
self.conn = conn
|
||||
self.topic_name = topic_name.strip()
|
||||
assert self.topic_name
|
||||
self.topic_arn = ''
|
||||
self.attributes = {}
|
||||
if endpoint_args is not None:
|
||||
self.attributes = {nvp[0] : nvp[1] for nvp in urlparse.parse_qsl(endpoint_args, keep_blank_values=True)}
|
||||
if opaque_data is not None:
|
||||
self.attributes['OpaqueData'] = opaque_data
|
||||
protocol = 'https' if conn.is_secure else 'http'
|
||||
self.client = boto3.client('sns',
|
||||
endpoint_url=protocol+'://'+conn.host+':'+str(conn.port),
|
||||
aws_access_key_id=conn.aws_access_key_id,
|
||||
aws_secret_access_key=conn.aws_secret_access_key,
|
||||
region_name=region,
|
||||
verify='./cert.pem',
|
||||
config=Config(signature_version='s3'))
|
||||
|
||||
def get_config(self):
|
||||
"""get topic info"""
|
||||
parameters = {'Action': 'GetTopic', 'TopicArn': self.topic_arn}
|
||||
body = urlparse.urlencode(parameters)
|
||||
string_date = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
|
||||
content_type = 'application/x-www-form-urlencoded; charset=utf-8'
|
||||
resource = '/'
|
||||
method = 'POST'
|
||||
string_to_sign = method + '\n\n' + content_type + '\n' + string_date + '\n' + resource
|
||||
log.debug('StringTosign: %s', string_to_sign)
|
||||
signature = base64.b64encode(hmac.new(self.conn.aws_secret_access_key.encode('utf-8'),
|
||||
string_to_sign.encode('utf-8'),
|
||||
hashlib.sha1).digest()).decode('ascii')
|
||||
headers = {'Authorization': 'AWS '+self.conn.aws_access_key_id+':'+signature,
|
||||
'Date': string_date,
|
||||
'Host': self.conn.host+':'+str(self.conn.port),
|
||||
'Content-Type': content_type}
|
||||
if self.conn.is_secure:
|
||||
http_conn = http_client.HTTPSConnection(self.conn.host, self.conn.port,
|
||||
context=ssl.create_default_context(cafile='./cert.pem'))
|
||||
else:
|
||||
http_conn = http_client.HTTPConnection(self.conn.host, self.conn.port)
|
||||
http_conn.request(method, resource, body, headers)
|
||||
response = http_conn.getresponse()
|
||||
data = response.read()
|
||||
status = response.status
|
||||
http_conn.close()
|
||||
dict_response = xmltodict.parse(data)
|
||||
return dict_response, status
|
||||
|
||||
def set_config(self):
|
||||
"""set topic"""
|
||||
result = self.client.create_topic(Name=self.topic_name, Attributes=self.attributes)
|
||||
self.topic_arn = result['TopicArn']
|
||||
return self.topic_arn
|
||||
|
||||
def del_config(self):
|
||||
"""delete topic"""
|
||||
result = self.client.delete_topic(TopicArn=self.topic_arn)
|
||||
return result['ResponseMetadata']['HTTPStatusCode']
|
||||
|
||||
def get_list(self):
|
||||
"""list all topics"""
|
||||
# note that boto3 supports list_topics(), however, the result only show ARNs
|
||||
parameters = {'Action': 'ListTopics'}
|
||||
body = urlparse.urlencode(parameters)
|
||||
string_date = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
|
||||
content_type = 'application/x-www-form-urlencoded; charset=utf-8'
|
||||
resource = '/'
|
||||
method = 'POST'
|
||||
string_to_sign = method + '\n\n' + content_type + '\n' + string_date + '\n' + resource
|
||||
log.debug('StringTosign: %s', string_to_sign)
|
||||
signature = base64.b64encode(hmac.new(self.conn.aws_secret_access_key.encode('utf-8'),
|
||||
string_to_sign.encode('utf-8'),
|
||||
hashlib.sha1).digest()).decode('ascii')
|
||||
headers = {'Authorization': 'AWS '+self.conn.aws_access_key_id+':'+signature,
|
||||
'Date': string_date,
|
||||
'Host': self.conn.host+':'+str(self.conn.port),
|
||||
'Content-Type': content_type}
|
||||
if self.conn.is_secure:
|
||||
http_conn = http_client.HTTPSConnection(self.conn.host, self.conn.port,
|
||||
context=ssl.create_default_context(cafile='./cert.pem'))
|
||||
else:
|
||||
http_conn = http_client.HTTPConnection(self.conn.host, self.conn.port)
|
||||
http_conn.request(method, resource, body, headers)
|
||||
response = http_conn.getresponse()
|
||||
data = response.read()
|
||||
status = response.status
|
||||
http_conn.close()
|
||||
dict_response = xmltodict.parse(data)
|
||||
return dict_response, status
|
||||
|
||||
class PSNotificationS3:
|
||||
"""class to set/get/delete an S3 notification
|
||||
PUT /<bucket>?notification
|
||||
GET /<bucket>?notification[=<notification>]
|
||||
DELETE /<bucket>?notification[=<notification>]
|
||||
"""
|
||||
def __init__(self, conn, bucket_name, topic_conf_list):
|
||||
self.conn = conn
|
||||
assert bucket_name.strip()
|
||||
self.bucket_name = bucket_name
|
||||
self.resource = '/'+bucket_name
|
||||
self.topic_conf_list = topic_conf_list
|
||||
self.client = boto3.client('s3',
|
||||
endpoint_url='http://'+conn.host+':'+str(conn.port),
|
||||
aws_access_key_id=conn.aws_access_key_id,
|
||||
aws_secret_access_key=conn.aws_secret_access_key,
|
||||
config=Config(signature_version='s3'))
|
||||
|
||||
def send_request(self, method, parameters=None):
|
||||
"""send request to radosgw"""
|
||||
return make_request(self.conn, method, self.resource,
|
||||
parameters=parameters, sign_parameters=True)
|
||||
|
||||
def get_config(self, notification=None):
|
||||
"""get notification info"""
|
||||
parameters = None
|
||||
if notification is None:
|
||||
response = self.client.get_bucket_notification_configuration(Bucket=self.bucket_name)
|
||||
status = response['ResponseMetadata']['HTTPStatusCode']
|
||||
return response, status
|
||||
parameters = {'notification': notification}
|
||||
response, status = self.send_request('GET', parameters=parameters)
|
||||
dict_response = xmltodict.parse(response)
|
||||
return dict_response, status
|
||||
|
||||
def set_config(self):
|
||||
"""set notification"""
|
||||
response = self.client.put_bucket_notification_configuration(Bucket=self.bucket_name,
|
||||
NotificationConfiguration={
|
||||
'TopicConfigurations': self.topic_conf_list
|
||||
})
|
||||
status = response['ResponseMetadata']['HTTPStatusCode']
|
||||
return response, status
|
||||
|
||||
def del_config(self, notification=None):
|
||||
"""delete notification"""
|
||||
parameters = {'notification': notification}
|
||||
|
||||
return self.send_request('DELETE', parameters)
|
10
src/test/rgw/bucket_notification/bntests.conf.SAMPLE
Normal file
10
src/test/rgw/bucket_notification/bntests.conf.SAMPLE
Normal file
@ -0,0 +1,10 @@
|
||||
[DEFAULT]
|
||||
port = 8000
|
||||
host = localhost
|
||||
|
||||
[s3 main]
|
||||
access_key = 0555b35654ad1656d804
|
||||
secret_key = h7GhxuBLTrlhVUyxSPUKUV8r/2EI4ngqJxD7iBdBYLhwluN30JaT3Q==
|
||||
display_name = M. Tester
|
||||
user_id = testid
|
||||
email = tester@ceph.com
|
45
src/test/rgw/bucket_notification/bootstrap
Executable file
45
src/test/rgw/bucket_notification/bootstrap
Executable file
@ -0,0 +1,45 @@
|
||||
#!/bin/sh
|
||||
set -e
|
||||
|
||||
if [ -f /etc/debian_version ]; then
|
||||
for package in python3-pip python3-virtualenv python3-dev python3-xmltodict python3-pika libevent-dev libxml2-dev libxslt-dev zlib1g-dev; do
|
||||
if [ "$(dpkg --status -- $package 2>/dev/null|sed -n 's/^Status: //p')" != "install ok installed" ]; then
|
||||
# add a space after old values
|
||||
missing="${missing:+$missing }$package"
|
||||
fi
|
||||
done
|
||||
if [ -n "$missing" ]; then
|
||||
echo "$0: missing required DEB packages. Installing via sudo." 1>&2
|
||||
sudo apt-get -y install $missing
|
||||
fi
|
||||
fi
|
||||
if [ -f /etc/redhat-release ]; then
|
||||
for package in python3-pip python3-virtualenv python3-devel python3-xmltodict python3-pika libevent-devel libxml2-devel libxslt-devel zlib-devel; do
|
||||
if [ "$(rpm -qa $package 2>/dev/null)" == "" ]; then
|
||||
missing="${missing:+$missing }$package"
|
||||
fi
|
||||
done
|
||||
if [ -n "$missing" ]; then
|
||||
echo "$0: missing required RPM packages. Installing via sudo." 1>&2
|
||||
sudo yum -y install $missing
|
||||
fi
|
||||
fi
|
||||
|
||||
virtualenv -p python3 --system-site-packages --distribute virtualenv
|
||||
|
||||
# avoid pip bugs
|
||||
./virtualenv/bin/pip install --upgrade pip
|
||||
#pip3 install --upgrade setuptools cffi # address pip issue: https://github.com/pypa/pip/issues/6264
|
||||
|
||||
# work-around change in pip 1.5
|
||||
#./virtualenv/bin/pip install six
|
||||
#./virtualenv/bin/pip install -I nose
|
||||
#./virtualenv/bin/pip install setuptools
|
||||
|
||||
./virtualenv/bin/pip install -U -r requirements.txt
|
||||
|
||||
# forbid setuptools from using the network because it'll try to use
|
||||
# easy_install, and we really wanted pip; next line will fail if pip
|
||||
# requirements.txt does not match setup.py requirements -- sucky but
|
||||
# good enough for now
|
||||
./virtualenv/bin/python setup.py develop
|
5
src/test/rgw/bucket_notification/requirements.txt
Normal file
5
src/test/rgw/bucket_notification/requirements.txt
Normal file
@ -0,0 +1,5 @@
|
||||
nose >=1.0.0
|
||||
boto >=2.6.0
|
||||
boto3 >=1.0.0
|
||||
configparser >=5.0.0
|
||||
kafka-python >=2.0.0
|
19
src/test/rgw/bucket_notification/setup.py
Normal file
19
src/test/rgw/bucket_notification/setup.py
Normal file
@ -0,0 +1,19 @@
|
||||
#!/usr/bin/python
|
||||
from setuptools import setup, find_packages
|
||||
|
||||
setup(
|
||||
name='bn_tests',
|
||||
version='0.0.1',
|
||||
packages=find_packages(),
|
||||
|
||||
author='Kalpesh Pandya',
|
||||
author_email='kapandya@redhat.com',
|
||||
description='Bucket Notification compatibility tests',
|
||||
license='MIT',
|
||||
keywords='bn web testing',
|
||||
|
||||
install_requires=[
|
||||
'boto >=2.0b4',
|
||||
'boto3 >=1.0.0'
|
||||
],
|
||||
)
|
2833
src/test/rgw/bucket_notification/test_bn.py
Normal file
2833
src/test/rgw/bucket_notification/test_bn.py
Normal file
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@ -14,14 +14,6 @@ from botocore.client import Config
|
||||
|
||||
log = logging.getLogger('rgw_multi.tests')
|
||||
|
||||
def put_object_tagging(conn, bucket_name, key, tags):
|
||||
client = boto3.client('s3',
|
||||
endpoint_url='http://'+conn.host+':'+str(conn.port),
|
||||
aws_access_key_id=conn.aws_access_key_id,
|
||||
aws_secret_access_key=conn.aws_secret_access_key,
|
||||
config=Config(signature_version='s3'))
|
||||
return client.put_object(Body='aaaaaaaaaaa', Bucket=bucket_name, Key=key, Tagging=tags)
|
||||
|
||||
|
||||
def get_object_tagging(conn, bucket, object_key):
|
||||
client = boto3.client('s3',
|
||||
@ -147,41 +139,6 @@ class PSTopic:
|
||||
return self.send_request('GET', get_list=True)
|
||||
|
||||
|
||||
def delete_all_s3_topics(zone, region):
|
||||
try:
|
||||
conn = zone.secure_conn if zone.secure_conn is not None else zone.conn
|
||||
protocol = 'https' if conn.is_secure else 'http'
|
||||
client = boto3.client('sns',
|
||||
endpoint_url=protocol+'://'+conn.host+':'+str(conn.port),
|
||||
aws_access_key_id=conn.aws_access_key_id,
|
||||
aws_secret_access_key=conn.aws_secret_access_key,
|
||||
region_name=region,
|
||||
verify='./cert.pem',
|
||||
config=Config(signature_version='s3'))
|
||||
|
||||
topics = client.list_topics()['Topics']
|
||||
for topic in topics:
|
||||
print('topic cleanup, deleting: ' + topic['TopicArn'])
|
||||
assert client.delete_topic(TopicArn=topic['TopicArn'])['ResponseMetadata']['HTTPStatusCode'] == 200
|
||||
except Exception as err:
|
||||
print('failed to do topic cleanup: ' + str(err))
|
||||
|
||||
|
||||
def delete_all_objects(conn, bucket_name):
|
||||
client = boto3.client('s3',
|
||||
endpoint_url='http://'+conn.host+':'+str(conn.port),
|
||||
aws_access_key_id=conn.aws_access_key_id,
|
||||
aws_secret_access_key=conn.aws_secret_access_key)
|
||||
|
||||
objects = []
|
||||
for key in client.list_objects(Bucket=bucket_name)['Contents']:
|
||||
objects.append({'Key': key['Key']})
|
||||
# delete objects from the bucket
|
||||
response = client.delete_objects(Bucket=bucket_name,
|
||||
Delete={'Objects': objects})
|
||||
return response
|
||||
|
||||
|
||||
class PSTopicS3:
|
||||
"""class to set/list/get/delete a topic
|
||||
POST ?Action=CreateTopic&Name=<topic name>[&OpaqueData=<data>[&push-endpoint=<endpoint>&[<arg1>=<value1>...]]]
|
||||
|
Loading…
Reference in New Issue
Block a user