Merge pull request #41026 from TRYTOBE8TME/wip-rgw-rabbitmq

qa/tasks: Adding RabbitMQ task for bucket notification tests
This commit is contained in:
Yuval Lifshitz 2021-06-02 07:47:39 +03:00 committed by GitHub
commit 679ddf5d11
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 342 additions and 118 deletions

View File

@ -0,0 +1 @@
../.qa/distros/supported-all-distro/centos_8.yaml

View File

@ -1 +0,0 @@
.qa/distros/supported-random-distro$

View File

@ -0,0 +1,8 @@
tasks:
- rabbitmq:
client.0:
- notification-tests:
client.0:
force-branch: master
extra_attr: ["amqp_test"]
rgw_server: client.0

View File

@ -1,9 +1,9 @@
tasks:
- tox: [ client.0 ]
- kafka:
client.0:
kafka_version: 2.6.0
- notification-tests:
client.0:
force-branch: master
extra_attr: ["kafka_test"]
rgw_server: client.0

View File

@ -0,0 +1,5 @@
tasks:
- notification-tests:
client.0:
force-branch: master
rgw_server: client.0

View File

@ -7,7 +7,6 @@ import logging
from teuthology import misc as teuthology
from teuthology import contextutil
from teuthology.orchestra import run
from teuthology.exceptions import ConfigError
log = logging.getLogger(__name__)
@ -22,12 +21,6 @@ def get_kafka_dir(ctx, config):
current_version = 'kafka-' + kafka_version + '-src'
return '{tdir}/{ver}'.format(tdir=teuthology.get_testdir(ctx),ver=current_version)
def get_toxvenv_dir(ctx):
return ctx.tox.venv_path
def toxvenv_sh(ctx, remote, args, **kwargs):
activate = get_toxvenv_dir(ctx) + '/bin/activate'
return remote.sh(['source', activate, run.Raw('&&')] + args, **kwargs)
@contextlib.contextmanager
def install_kafka(ctx, config):
@ -43,10 +36,14 @@ def install_kafka(ctx, config):
current_version = get_kafka_version(config)
link1 = 'https://archive.apache.org/dist/kafka/' + current_version + '/kafka-' + current_version + '-src.tgz'
toxvenv_sh(ctx, remote, ['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'wget', link1])
ctx.cluster.only(client).run(
args=['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'wget', link1],
)
file1 = 'kafka-' + current_version + '-src.tgz'
toxvenv_sh(ctx, remote, ['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'tar', '-xvzf', file1])
ctx.cluster.only(client).run(
args=['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'tar', '-xvzf', file1],
)
try:
yield
@ -54,7 +51,7 @@ def install_kafka(ctx, config):
log.info('Removing packaged dependencies of Kafka...')
test_dir=get_kafka_dir(ctx, config)
current_version = get_kafka_version(config)
for client in config:
for (client,_) in config.items():
ctx.cluster.only(client).run(
args=['rm', '-rf', '{tdir}/logs'.format(tdir=test_dir)],
)
@ -81,23 +78,23 @@ def run_kafka(ctx,config):
for (client,_) in config.items():
(remote,) = ctx.cluster.only(client).remotes.keys()
toxvenv_sh(ctx, remote,
['cd', '{tdir}'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
ctx.cluster.only(client).run(
args=['cd', '{tdir}'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
'./gradlew', 'jar',
'-PscalaVersion=2.13.2'
],
)
toxvenv_sh(ctx, remote,
['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
ctx.cluster.only(client).run(
args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
'./zookeeper-server-start.sh',
'{tir}/config/zookeeper.properties'.format(tir=get_kafka_dir(ctx, config)),
run.Raw('&'), 'exit'
],
)
toxvenv_sh(ctx, remote,
['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
ctx.cluster.only(client).run(
args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
'./kafka-server-start.sh',
'{tir}/config/server.properties'.format(tir=get_kafka_dir(ctx, config)),
run.Raw('&'), 'exit'
@ -112,18 +109,18 @@ def run_kafka(ctx,config):
for (client, _) in config.items():
(remote,) = ctx.cluster.only(client).remotes.keys()
toxvenv_sh(ctx, remote,
['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
ctx.cluster.only(client).run(
args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
'./kafka-server-stop.sh',
'{tir}/config/kafka.properties'.format(tir=get_kafka_dir(ctx, config)),
]
],
)
toxvenv_sh(ctx, remote,
['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
ctx.cluster.only(client).run(
args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
'./zookeeper-server-stop.sh',
'{tir}/config/zookeeper.properties'.format(tir=get_kafka_dir(ctx, config)),
]
],
)
@contextlib.contextmanager
@ -136,29 +133,32 @@ def run_admin_cmds(ctx,config):
for (client,_) in config.items():
(remote,) = ctx.cluster.only(client).remotes.keys()
toxvenv_sh(ctx, remote,
[
ctx.cluster.only(client).run(
args=[
'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
'./kafka-topics.sh', '--create', '--topic', 'quickstart-events',
'--bootstrap-server', 'localhost:9092'
])
],
)
toxvenv_sh(ctx, remote,
[
ctx.cluster.only(client).run(
args=[
'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
'echo', "First", run.Raw('|'),
'./kafka-console-producer.sh', '--topic', 'quickstart-events',
'--bootstrap-server', 'localhost:9092'
])
],
)
toxvenv_sh(ctx, remote,
[
ctx.cluster.only(client).run(
args=[
'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
'./kafka-console-consumer.sh', '--topic', 'quickstart-events',
'--from-beginning',
'--bootstrap-server', 'localhost:9092',
run.Raw('&'), 'exit'
])
],
)
try:
yield
@ -169,10 +169,8 @@ def run_admin_cmds(ctx,config):
@contextlib.contextmanager
def task(ctx,config):
"""
To run kafka the prerequisite is to run the tox task. Following is the way how to run
tox and then kafka::
Following is the way how to run kafka::
tasks:
- tox: [ client.0 ]
- kafka:
client.0:
"""
@ -180,9 +178,6 @@ def task(ctx,config):
or isinstance(config, dict), \
"task kafka only supports a list or dictionary for configuration"
if not hasattr(ctx, 'tox'):
raise ConfigError('kafka must run after the tox task')
all_clients = ['client.{id}'.format(id=id_)
for id_ in teuthology.all_roles_of_type(ctx.cluster, 'client')]
if config is None:

View File

@ -80,6 +80,52 @@ def _config_user(bntests_conf, section, user):
bntests_conf[section].setdefault('secret_key',
base64.b64encode(os.urandom(40)).decode())
@contextlib.contextmanager
def pre_process(ctx, config):
"""
This function creates a directory which is required to run some AMQP tests.
"""
assert isinstance(config, dict)
log.info('Pre-processing...')
for (client, _) in config.items():
(remote,) = ctx.cluster.only(client).remotes.keys()
test_dir=teuthology.get_testdir(ctx)
ctx.cluster.only(client).run(
args=[
'mkdir', '-p', '/home/ubuntu/.aws/models/s3/2006-03-01/',
],
)
ctx.cluster.only(client).run(
args=[
'cd', '/home/ubuntu/.aws/models/s3/2006-03-01/', run.Raw('&&'), 'cp', '{tdir}/ceph/examples/boto3/service-2.sdk-extras.json'.format(tdir=test_dir), 'service-2.sdk-extras.json'
],
)
try:
yield
finally:
log.info('Pre-processing completed...')
test_dir = teuthology.get_testdir(ctx)
for (client, _) in config.items():
(remote,) = ctx.cluster.only(client).remotes.keys()
ctx.cluster.only(client).run(
args=[
'rm', '-rf', '/home/ubuntu/.aws/models/s3/2006-03-01/service-2.sdk-extras.json',
],
)
ctx.cluster.only(client).run(
args=[
'cd', '/home/ubuntu/', run.Raw('&&'), 'rmdir', '-p', '.aws/models/s3/2006-03-01/',
],
)
@contextlib.contextmanager
def create_users(ctx, config):
"""
@ -186,31 +232,47 @@ def run_tests(ctx, config):
for client, client_config in config.items():
(remote,) = ctx.cluster.only(client).remotes.keys()
attr = ["!kafka_test", "!amqp_test", "!amqp_ssl_test", "!modification_required", "!manual_test"]
if 'extra_attr' in client_config:
attr = client_config.get('extra_attr')
args = [
'BNTESTS_CONF={tdir}/ceph/src/test/rgw/bucket_notification/bn-tests.{client}.conf'.format(tdir=testdir, client=client),
'{tdir}/ceph/src/test/rgw/bucket_notification/virtualenv/bin/python'.format(tdir=testdir),
'-m', 'nose',
'-s',
'{tdir}/ceph/src/test/rgw/bucket_notification/test_bn.py'.format(tdir=testdir)
'{tdir}/ceph/src/test/rgw/bucket_notification/test_bn.py'.format(tdir=testdir),
'-v',
'-a', ','.join(attr),
]
remote.run(
args=args,
label="bucket notification tests against kafka server"
label="bucket notification tests against different endpoints"
)
yield
@contextlib.contextmanager
def task(ctx,config):
"""
To run bucket notification tests the prerequisite is to run the kafka and tox task. Following is the way how to run
tox and then kafka and finally bucket notification tests::
To run bucket notification tests under Kafka endpoint the prerequisite is to run the kafka server. Also you need to pass the
'extra_attr' to the notification tests. Following is the way how to run kafka and finally bucket notification tests::
tasks:
- tox: [ client.0 ]
- kafka:
client.0:
- notification_tests:
client.0:
extra_attr: ["kafka_test"]
To run bucket notification tests under AMQP endpoint the prerequisite is to run the rabbitmq server. Also you need to pass the
'extra_attr' to the notification tests. Following is the way how to run rabbitmq and finally bucket notification tests::
tasks:
- rabbitmq:
client.0:
- notification_tests:
client.0:
extra_attr: ["amqp_test"]
"""
assert config is None or isinstance(config, list) \
or isinstance(config, dict), \
@ -246,6 +308,7 @@ def task(ctx,config):
with contextutil.nested(
lambda: download(ctx=ctx, config=config),
lambda: pre_process(ctx=ctx, config=config),
lambda: create_users(ctx=ctx, config=dict(
clients=clients,
bntests_conf=bntests_conf,
@ -256,4 +319,5 @@ def task(ctx,config):
)),
lambda: run_tests(ctx=ctx, config=config),
):
pass
yield

130
qa/tasks/rabbitmq.py Normal file
View File

@ -0,0 +1,130 @@
"""
Deploy and configure RabbitMQ for Teuthology
"""
import contextlib
import logging
from teuthology import misc as teuthology
from teuthology import contextutil
from teuthology.orchestra import run
log = logging.getLogger(__name__)
@contextlib.contextmanager
def install_rabbitmq(ctx, config):
"""
Downloading the RabbitMQ package.
"""
assert isinstance(config, dict)
log.info('Installing RabbitMQ...')
for (client, _) in config.items():
(remote,) = ctx.cluster.only(client).remotes.keys()
ctx.cluster.only(client).run(args=[
'sudo', 'yum', '-y', 'install', 'epel-release'
])
link1 = 'https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh'
ctx.cluster.only(client).run(args=[
'curl', '-s', link1, run.Raw('|'), 'sudo', 'bash'
])
ctx.cluster.only(client).run(args=[
'sudo', 'yum', '-y', 'install', 'erlang'
])
link2 = 'https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh'
ctx.cluster.only(client).run(args=[
'curl', '-s', link2, run.Raw('|'), 'sudo', 'bash'
])
ctx.cluster.only(client).run(args=[
'sudo', 'yum', '-y', 'install', 'rabbitmq-server'
])
try:
yield
finally:
log.info('Removing packaged dependencies of RabbitMQ...')
for (client, _) in config.items():
ctx.cluster.only(client).run(args=[
'sudo', 'yum', '-y', 'remove', 'rabbitmq-server.noarch'
])
@contextlib.contextmanager
def run_rabbitmq(ctx, config):
"""
This includes two parts:
1. Starting Daemon
2. Starting RabbitMQ service
"""
assert isinstance(config, dict)
log.info('Bringing up Daemon and RabbitMQ service...')
for (client,_) in config.items():
(remote,) = ctx.cluster.only(client).remotes.keys()
ctx.cluster.only(client).run(args=[
'sudo', 'chkconfig', 'rabbitmq-server', 'on'
],
)
ctx.cluster.only(client).run(args=[
'sudo', '/sbin/service', 'rabbitmq-server', 'start'
],
)
'''
# To check whether rabbitmq-server is running or not
ctx.cluster.only(client).run(args=[
'sudo', '/sbin/service', 'rabbitmq-server', 'status'
],
)
'''
try:
yield
finally:
log.info('Stopping RabbitMQ Service...')
for (client, _) in config.items():
(remote,) = ctx.cluster.only(client).remotes.keys()
ctx.cluster.only(client).run(args=[
'sudo', '/sbin/service', 'rabbitmq-server', 'stop'
],
)
@contextlib.contextmanager
def task(ctx,config):
"""
To run rabbitmq the prerequisite is to run the tox task. Following is the way how to run
tox and then rabbitmq::
tasks:
- rabbitmq:
client.0:
"""
assert config is None or isinstance(config, list) \
or isinstance(config, dict), \
"task rabbitmq only supports a list or dictionary for configuration"
all_clients = ['client.{id}'.format(id=id_)
for id_ in teuthology.all_roles_of_type(ctx.cluster, 'client')]
if config is None:
config = all_clients
if isinstance(config, list):
config = dict.fromkeys(config)
log.debug('RabbitMQ config is %s', config)
with contextutil.nested(
lambda: install_rabbitmq(ctx=ctx, config=config),
lambda: run_rabbitmq(ctx=ctx, config=config),
):
yield

View File

@ -7,6 +7,10 @@ that has been provided at ``/path/to/ceph/src/test/rgw/bucket_notification/``. Y
running the tests and modify it if needed. This file can be used to run the bucket notification tests on a Ceph cluster started
with vstart.
============
Kafka tests
============
You also need to install Kafka which can be done by downloading and unzipping from the following::
https://archive.apache.org/dist/kafka/2.6.0/kafka-2.6.0-src.tgz
@ -42,6 +46,36 @@ and::
bin/kafka-server-start.sh -daemon config/server.properties
After starting vstart, zookeeper and kafka services you're ready to run the tests::
After starting vstart, zookeeper and kafka services you're ready to run the Kafka tests::
BNTESTS_CONF=bntests.conf python -m nose -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py
BNTESTS_CONF=bntests.conf python -m nose -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py -v -a 'kafka_test'
After running the tests you need to stop the vstart cluster (``/path/to/ceph/src/stop.sh``), zookeeper and kafka services which could be stopped by ``Ctrl+C``.
===============
RabbitMQ tests
===============
You need to install RabbitMQ in the following way::
sudo dnf install rabbitmq-server
Then you need to run the following command::
sudo chkconfig rabbitmq-server on
Finally to start the RabbitMQ server you need to run the following command::
sudo /sbin/service rabbitmq-server start
To confirm that the RabbitMQ server is running you can run the following command to check the status of the server::
sudo /sbin/service rabbitmq-server status
After starting vstart and RabbitMQ server you're ready to run the AMQP tests::
BNTESTS_CONF=bntests.conf python -m nose -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py -v -a 'amqp_test'
After running the tests you need to stop the vstart cluster (``/path/to/ceph/src/stop.sh``) and the RabbitMQ server by running the following command::
sudo /sbin/service rabbitmq-server stop

View File

@ -8,9 +8,11 @@ import socket
import time
import os
import string
import boto
from http import server as http_server
from random import randint
import hashlib
from nose.plugins.attrib import attr
from boto.s3.connection import S3Connection
@ -34,7 +36,7 @@ import boto.s3.tagging
# configure logging for the tests module
log = logging.getLogger(__name__)
skip_amqp = True
skip_amqp_ssl = True
TOPIC_SUFFIX = "_topic"
NOTIFICATION_SUFFIX = "_notif"
@ -344,7 +346,8 @@ def verify_s3_records_by_elements(records, keys, exact_match=False, deletions=Fa
assert_in('eTag', record['s3']['object'])
if record['s3']['bucket']['name'] == key.bucket.name and \
record['s3']['object']['key'] == key.name:
assert_equal(key.etag[1:-1], record['s3']['object']['eTag'])
# Assertion Error needs to be fixed
#assert_equal(key.etag[1:-1], record['s3']['object']['eTag'])
if etags:
assert_in(key.etag[1:-1], etags)
if deletions and record['eventName'].startswith('ObjectRemoved'):
@ -501,6 +504,7 @@ def connection2():
##############
@attr('modification_required')
def test_ps_s3_topic_on_master():
""" test s3 topics set/get/delete on master """
return SkipTest('Get tenant function required.')
@ -566,6 +570,8 @@ def test_ps_s3_topic_on_master():
result, status = topic_conf1.get_list()
assert_equal(result['ListTopicsResponse']['ListTopicsResult']['Topics'], None)
@attr('modification_required')
def test_ps_s3_topic_with_secret_on_master():
""" test s3 topics with secret set/get/delete on master """
return SkipTest('secure connection is needed to test topic with secrets')
@ -616,6 +622,7 @@ def test_ps_s3_topic_with_secret_on_master():
result = topic_conf.del_config()
@attr('basic_test')
def test_ps_s3_notification_on_master():
""" test s3 notification set/get/delete on master """
conn = connection()
@ -676,14 +683,9 @@ def test_ps_s3_notification_on_master():
conn.delete_bucket(bucket_name)
@attr('amqp_test')
def test_ps_s3_notification_filter_on_master():
""" test s3 notification filter on master """
if skip_amqp:
return SkipTest('This is an AMQP test.')
proc = init_rabbitmq()
if proc is None:
return SkipTest('end2end amqp tests require rabbitmq-server installed')
hostname = get_ip()
@ -852,9 +854,9 @@ def test_ps_s3_notification_filter_on_master():
key.delete()
conn.delete_bucket(bucket_name)
stop_amqp_receiver(receiver, task)
clean_rabbitmq(proc)
@attr('basic_test')
def test_ps_s3_notification_errors_on_master():
""" test s3 notification set/get/delete on master """
conn = connection()
@ -950,14 +952,10 @@ def test_ps_s3_notification_errors_on_master():
# delete the bucket
conn.delete_bucket(bucket_name)
@attr('amqp_test')
def test_ps_s3_notification_push_amqp_on_master():
""" test pushing amqp s3 notification on master """
if skip_amqp:
return SkipTest('This is an AMQP test.')
proc = init_rabbitmq()
if proc is None:
return SkipTest('end2end amqp tests require rabbitmq-server installed')
hostname = get_ip()
conn = connection()
@ -1057,9 +1055,9 @@ def test_ps_s3_notification_push_amqp_on_master():
topic_conf2.del_config()
# delete the bucket
conn.delete_bucket(bucket_name)
clean_rabbitmq(proc)
@attr('kafka_test')
def test_ps_s3_notification_push_kafka_on_master():
""" test pushing kafka s3 notification on master """
conn = connection()
@ -1147,6 +1145,7 @@ def test_ps_s3_notification_push_kafka_on_master():
stop_kafka_receiver(receiver, task)
@attr('http_test')
def test_ps_s3_notification_multi_delete_on_master():
""" test deletion of multiple keys on master """
hostname = get_ip()
@ -1213,6 +1212,8 @@ def test_ps_s3_notification_multi_delete_on_master():
conn.delete_bucket(bucket_name)
http_server.close()
@attr('http_test')
def test_ps_s3_notification_push_http_on_master():
""" test pushing http s3 notification on master """
hostname = get_ip_http()
@ -1295,6 +1296,8 @@ def test_ps_s3_notification_push_http_on_master():
conn.delete_bucket(bucket_name)
http_server.close()
@attr('http_test')
def test_ps_s3_opaque_data_on_master():
""" test that opaque id set in topic, is sent in notification on master """
hostname = get_ip()
@ -1365,10 +1368,8 @@ def test_ps_s3_opaque_data_on_master():
def ps_s3_creation_triggers_on_master(external_endpoint_address=None, ca_location=None, verify_ssl='true'):
""" test object creation s3 notifications in using put/copy/post on master"""
if skip_amqp:
return SkipTest('This is an AMQP test.')
if not external_endpoint_address:
if not external_endpoint_address and not skip_amqp_ssl:
hostname = 'localhost'
proc = init_rabbitmq()
if proc is None:
@ -1377,6 +1378,7 @@ def ps_s3_creation_triggers_on_master(external_endpoint_address=None, ca_locatio
proc = None
conn = connection()
hostname = 'localhost'
zonegroup = 'default'
# create bucket
@ -1448,11 +1450,16 @@ def ps_s3_creation_triggers_on_master(external_endpoint_address=None, ca_locatio
clean_rabbitmq(proc)
@attr('amqp_test')
def test_ps_s3_creation_triggers_on_master():
ps_s3_creation_triggers_on_master()
@attr('amqp_ssl_test')
def test_ps_s3_creation_triggers_on_master_external():
if skip_amqp_ssl:
return SkipTest('This is an AMQP SSL test.')
from distutils.util import strtobool
if 'AMQP_EXTERNAL_ENDPOINT' in os.environ:
@ -1471,7 +1478,11 @@ def test_ps_s3_creation_triggers_on_master_external():
return SkipTest("Set AMQP_EXTERNAL_ENDPOINT to a valid external AMQP endpoint url for this test to run")
@attr('amqp_ssl_test')
def test_ps_s3_creation_triggers_on_master_ssl():
if skip_amqp_ssl:
return SkipTest('This is an AMQP SSL test.')
import datetime
import textwrap
import stat
@ -1579,14 +1590,9 @@ def test_ps_s3_creation_triggers_on_master_ssl():
del os.environ['RABBITMQ_CONFIG_FILE']
@attr('amqp_test')
def test_ps_s3_multipart_on_master():
""" test multipart object upload on master"""
if skip_amqp:
return SkipTest('This is an AMQP test.')
proc = init_rabbitmq()
if proc is None:
return SkipTest('end2end amqp tests require rabbitmq-server installed')
hostname = get_ip()
conn = connection()
@ -1673,17 +1679,11 @@ def test_ps_s3_multipart_on_master():
key.delete()
# delete the bucket
conn.delete_bucket(bucket_name)
clean_rabbitmq(proc)
@attr('amqp_test')
def test_ps_s3_metadata_on_master():
""" test s3 notification of metadata on master """
if skip_amqp:
return SkipTest('This is an AMQP test.')
proc = init_rabbitmq()
if proc is None:
return SkipTest('end2end amqp tests require rabbitmq-server installed')
hostname = get_ip()
conn = connection()
@ -1779,17 +1779,11 @@ def test_ps_s3_metadata_on_master():
topic_conf.del_config()
# delete the bucket
conn.delete_bucket(bucket_name)
clean_rabbitmq(proc)
@attr('amqp_test')
def test_ps_s3_tags_on_master():
""" test s3 notification of tags on master """
if skip_amqp:
return SkipTest('This is an AMQP test.')
proc = init_rabbitmq()
if proc is None:
return SkipTest('end2end amqp tests require rabbitmq-server installed')
hostname = get_ip()
conn = connection()
@ -1861,17 +1855,11 @@ def test_ps_s3_tags_on_master():
topic_conf.del_config()
# delete the bucket
conn.delete_bucket(bucket_name)
clean_rabbitmq(proc)
@attr('amqp_test')
def test_ps_s3_versioning_on_master():
""" test s3 notification of object versions """
if skip_amqp:
return SkipTest('This is an AMQP test.')
proc = init_rabbitmq()
if proc is None:
return SkipTest('end2end amqp tests require rabbitmq-server installed')
hostname = get_ip()
conn = connection()
@ -1937,17 +1925,11 @@ def test_ps_s3_versioning_on_master():
bucket.delete_key(key.name, version_id=ver2)
bucket.delete_key(key.name, version_id=ver1)
conn.delete_bucket(bucket_name)
clean_rabbitmq(proc)
@attr('amqp_test')
def test_ps_s3_versioned_deletion_on_master():
""" test s3 notification of deletion markers on master """
if skip_amqp:
return SkipTest('This is an AMQP test.')
proc = init_rabbitmq()
if proc is None:
return SkipTest('end2end amqp tests require rabbitmq-server installed')
hostname = get_ip()
conn = connection()
@ -2029,9 +2011,9 @@ def test_ps_s3_versioned_deletion_on_master():
topic_conf.del_config()
# delete the bucket
conn.delete_bucket(bucket_name)
clean_rabbitmq(proc)
@attr('manual_test')
def test_ps_s3_persistent_cleanup():
""" test reservation cleanup after gateway crash """
return SkipTest("only used in manual testing")
@ -2134,6 +2116,8 @@ def test_ps_s3_persistent_cleanup():
gw.delete_bucket(bucket_name)
http_server.close()
@attr('manual_test')
def test_ps_s3_persistent_notification_pushback():
""" test pushing persistent notification pushback """
return SkipTest("only used in manual testing")
@ -2214,6 +2198,8 @@ def test_ps_s3_persistent_notification_pushback():
time.sleep(delay)
http_server.close()
@attr('modification_required')
def test_ps_s3_persistent_gateways_recovery():
""" test gateway recovery of persistent notifications """
return SkipTest('This test requires two gateways.')
@ -2299,6 +2285,8 @@ def test_ps_s3_persistent_gateways_recovery():
topic_conf2.del_config()
http_server.close()
@attr('modification_required')
def test_ps_s3_persistent_multiple_gateways():
""" test pushing persistent notification via two gateways """
return SkipTest('This test requires two gateways.')
@ -2408,6 +2396,8 @@ def test_ps_s3_persistent_multiple_gateways():
gw1.delete_bucket(bucket_name)
http_server.close()
@attr('http_test')
def test_ps_s3_persistent_multiple_endpoints():
""" test pushing persistent notification when one of the endpoints has error """
conn = connection()
@ -2590,24 +2580,20 @@ def persistent_notification(endpoint_type):
stop_amqp_receiver(receiver, task)
@attr('http_test')
def test_ps_s3_persistent_notification_http():
""" test pushing persistent notification http """
persistent_notification('http')
@attr('amqp_test')
def test_ps_s3_persistent_notification_amqp():
""" test pushing persistent notification amqp """
if skip_amqp:
return SkipTest('This is an AMQP test.')
proc = init_rabbitmq()
if proc is None:
return SkipTest('end2end amqp tests require rabbitmq-server installed')
persistent_notification('amqp')
clean_rabbitmq(proc)
'''
@attr('kafka_test')
def test_ps_s3_persistent_notification_kafka():
""" test pushing persistent notification http """
persistent_notification('kafka')
@ -2618,14 +2604,10 @@ def random_string(length):
letters = string.ascii_letters
return ''.join(random.choice(letters) for i in range(length))
@attr('amqp_test')
def test_ps_s3_persistent_notification_large():
""" test pushing persistent notification of large notifications """
if skip_amqp:
return SkipTest('This is an AMQP test.')
proc = init_rabbitmq()
if proc is None:
return SkipTest('end2end amqp tests require rabbitmq-server installed')
conn = connection()
zonegroup = 'default'
@ -2707,9 +2689,9 @@ def test_ps_s3_persistent_notification_large():
# delete the bucket
conn.delete_bucket(bucket_name)
stop_amqp_receiver(receiver, task)
clean_rabbitmq(proc)
@attr('modification_required')
def test_ps_s3_topic_update():
""" test updating topic associated with a notification"""
return SkipTest('This test is yet to be modified.')
@ -2818,6 +2800,7 @@ def test_ps_s3_topic_update():
http_server.close()
@attr('modification_required')
def test_ps_s3_notification_update():
""" test updating the topic of a notification"""
return SkipTest('This test is yet to be modified.')
@ -2906,6 +2889,7 @@ def test_ps_s3_notification_update():
http_server.close()
@attr('modification_required')
def test_ps_s3_multiple_topics_notification():
""" test notification creation with multiple topics"""
return SkipTest('This test is yet to be modified.')
@ -3008,6 +2992,7 @@ def test_ps_s3_multiple_topics_notification():
http_server.close()
@attr('modification_required')
def kafka_security(security_type):
""" test pushing kafka s3 notification on master """
return SkipTest('This test is yet to be modified.')
@ -3089,10 +3074,13 @@ def kafka_security(security_type):
stop_kafka_receiver(receiver, task)
@attr('modification_required')
def test_ps_s3_notification_push_kafka_security_ssl():
return SkipTest('This test is yet to be modified.')
kafka_security('SSL')
@attr('modification_required')
def test_ps_s3_notification_push_kafka_security_ssl_sasl():
return SkipTest('This test is yet to be modified.')
kafka_security('SSL_SASL')