rgw/multisite-notification: Add integration test for bucket notifications in multisite config.

Signed-off-by: kchheda3 <kchheda3@bloomberg.net>
This commit is contained in:
kchheda3 2023-12-11 15:47:30 -05:00 committed by Casey Bodley
parent df668e9f65
commit 1ed8df24ae
6 changed files with 238 additions and 2 deletions

View File

@ -1,6 +1,7 @@
import boto
import boto.s3.connection
import boto.iam.connection
import boto3
def get_gateway_connection(gateway, credentials):
""" connect to the given gateway """
@ -39,3 +40,25 @@ def get_gateway_iam_connection(gateway, credentials):
port = gateway.port,
is_secure = False)
return gateway.iam_connection
def get_gateway_s3_client(gateway, credentials, region):
""" connect to boto3 s3 client api of the given gateway """
if gateway.s3_client is None:
gateway.s3_client = boto3.client('s3',
endpoint_url='http://' + gateway.host + ':' + str(gateway.port),
aws_access_key_id=credentials.access_key,
aws_secret_access_key=credentials.secret,
region_name=region)
return gateway.s3_client
def get_gateway_sns_client(gateway, credentials, region):
""" connect to boto3 s3 client api of the given gateway """
if gateway.sns_client is None:
gateway.sns_client = boto3.client('sns',
endpoint_url='http://' + gateway.host + ':' + str(gateway.port),
aws_access_key_id=credentials.access_key,
aws_secret_access_key=credentials.secret,
region_name=region)
return gateway.sns_client

View File

@ -3,7 +3,7 @@ from io import StringIO
import json
from .conn import get_gateway_connection, get_gateway_iam_connection, get_gateway_secure_connection
from .conn import get_gateway_connection, get_gateway_iam_connection, get_gateway_secure_connection, get_gateway_s3_client, get_gateway_sns_client
class Cluster:
""" interface to run commands against a distinct ceph cluster """
@ -27,6 +27,8 @@ class Gateway:
self.secure_connection = None
self.ssl_port = ssl_port
self.iam_connection = None
self.s3_client = None
self.sns_client = None
@abstractmethod
def start(self, args = []):
@ -190,6 +192,9 @@ class ZoneConn(object):
self.secure_conn = get_gateway_secure_connection(self.zone.gateways[0], self.credentials)
self.iam_conn = get_gateway_iam_connection(self.zone.gateways[0], self.credentials)
region = "" if self.zone.zonegroup is None else self.zone.zonegroup.name
self.s3_client = get_gateway_s3_client(self.zone.gateways[0], self.credentials, region)
self.sns_client = get_gateway_sns_client(self.zone.gateways[0], self.credentials,region)
# create connections for the rest of the gateways (if exist)
for gw in list(self.zone.gateways):

View File

@ -17,7 +17,7 @@ from boto.s3.website import WebsiteConfiguration
from boto.s3.cors import CORSConfiguration
from nose.tools import eq_ as eq
from nose.tools import assert_not_equal, assert_equal
from nose.tools import assert_not_equal, assert_equal, assert_true, assert_false
from nose.plugins.attrib import attr
from nose.plugins.skip import SkipTest
@ -66,6 +66,7 @@ num_buckets = 0
run_prefix=''.join(random.choice(string.ascii_lowercase) for _ in range(6))
num_roles = 0
num_topic = 0
def get_zone_connection(zone, credentials):
""" connect to the zone's first gateway """
@ -455,6 +456,13 @@ def gen_role_name():
num_roles += 1
return "roles" + '-' + run_prefix + '-' + str(num_roles)
def gen_topic_name():
global num_topic
num_topic += 1
return "topic" + '-' + run_prefix + '-' + str(num_topic)
class ZonegroupConns:
def __init__(self, zonegroup):
self.zonegroup = zonegroup
@ -502,6 +510,34 @@ def check_all_buckets_dont_exist(zone_conn, buckets):
return True
def get_topics(zone):
"""
Get list of topics in cluster.
"""
cmd = ['topic', 'list'] + zone.zone_args()
topics_json, _ = zone.cluster.admin(cmd, read_only=True)
topics = json.loads(topics_json)
return topics['topics']
def create_topic_per_zone(zonegroup_conns, topics_per_zone=1):
topics = []
zone_topic = []
for zone in zonegroup_conns.rw_zones:
for _ in range(topics_per_zone):
topic_name = gen_topic_name()
log.info('create topic zone=%s name=%s', zone.name, topic_name)
attributes = {
"push-endpoint": "http://kaboom:9999",
"persistent": "true",
}
topic_arn = zone.create_topic(topic_name, attributes)
topics.append(topic_arn)
zone_topic.append((zone, topic_arn))
return topics, zone_topic
def create_role_per_zone(zonegroup_conns, roles_per_zone = 1):
roles = []
zone_role = []
@ -3110,3 +3146,105 @@ def test_sync_flow_symmetrical_zonegroup_all_rgw_down():
test_sync_flow_symmetrical_zonegroup_all()
finally:
start_2nd_rgw(zonegroup)
def test_topic_notification_sync():
zonegroup = realm.master_zonegroup()
zonegroup_meta_checkpoint(zonegroup)
# let wait for users and other settings to sync across all zones.
time.sleep(config.checkpoint_delay)
# create topics in each zone.
zonegroup_conns = ZonegroupConns(zonegroup)
topic_arns, zone_topic = create_topic_per_zone(zonegroup_conns)
log.debug("topic_arns: %s", topic_arns)
zonegroup_meta_checkpoint(zonegroup)
# verify topics exists in all zones
for conn in zonegroup_conns.zones:
topic_list = conn.list_topics()
log.debug("topics for zone=%s = %s", conn.name, topic_list)
assert_equal(len(topic_list), len(topic_arns))
for topic_arn_map in topic_list:
assert_true(topic_arn_map['TopicArn'] in topic_arns)
# create a bucket
bucket = zonegroup_conns.rw_zones[0].create_bucket(gen_bucket_name())
log.debug('created bucket=%s', bucket.name)
zonegroup_meta_checkpoint(zonegroup)
# create bucket_notification in each zone.
notification_ids = []
num = 1
for zone_conn, topic_arn in zone_topic:
noti_id = "bn" + '-' + run_prefix + '-' + str(num)
notification_ids.append(noti_id)
topic_conf = {'Id': noti_id,
'TopicArn': topic_arn,
'Events': ['s3:ObjectCreated:*']
}
num += 1
log.info('creating bucket notification for zone=%s name=%s', zone_conn.name, noti_id)
zone_conn.create_notification(bucket.name, [topic_conf])
zonegroup_meta_checkpoint(zonegroup)
# verify notifications exists in all zones
for conn in zonegroup_conns.zones:
notification_list = conn.list_notifications(bucket.name)
log.debug("notifications for zone=%s = %s", conn.name, notification_list)
assert_equal(len(notification_list), len(topic_arns))
for notification in notification_list:
assert_true(notification['Id'] in notification_ids)
# verify bucket_topic mapping
# create a new bucket and subcribe it to first topic.
bucket_2 = zonegroup_conns.rw_zones[0].create_bucket(gen_bucket_name())
notif_id = "bn-2" + '-' + run_prefix
topic_conf = {'Id': notif_id,
'TopicArn': topic_arns[0],
'Events': ['s3:ObjectCreated:*']
}
zonegroup_conns.rw_zones[0].create_notification(bucket_2.name, [topic_conf])
zonegroup_meta_checkpoint(zonegroup)
for conn in zonegroup_conns.zones:
topics = get_topics(conn.zone)
for topic in topics:
if topic['arn'] == topic_arns[0]:
assert_equal(len(topic['subscribed_buckets']), 2)
assert_true(bucket_2.name in topic['subscribed_buckets'])
else:
assert_equal(len(topic['subscribed_buckets']), 1)
assert_true(bucket.name in topic['subscribed_buckets'])
# delete the 2nd bucket and verify the mapping is removed.
zonegroup_conns.rw_zones[0].delete_bucket(bucket_2.name)
zonegroup_meta_checkpoint(zonegroup)
for conn in zonegroup_conns.zones:
topics = get_topics(conn.zone)
for topic in topics:
assert_equal(len(topic['subscribed_buckets']), 1)
'''TODO(Remove the break once the https://tracker.ceph.com/issues/20802
is fixed, as the secondary site bucket instance info is currently not
getting deleted coz of the bug hence the bucket-topic mapping
deletion is not invoked on secondary sites.)'''
break
# delete notifications
zonegroup_conns.rw_zones[0].delete_notifications(bucket.name)
log.debug('Deleting all notifications for bucket=%s', bucket.name)
zonegroup_meta_checkpoint(zonegroup)
# verify notification deleted in all zones
for conn in zonegroup_conns.zones:
notification_list = conn.list_notifications(bucket.name)
assert_equal(len(notification_list), 0)
# delete topics
for zone_conn, topic_arn in zone_topic:
log.debug('deleting topic zone=%s arn=%s', zone_conn.name, topic_arn)
zone_conn.delete_topic(topic_arn)
zonegroup_meta_checkpoint(zonegroup)
# verify topics deleted in all zones
for conn in zonegroup_conns.zones:
topic_list = conn.list_topics()
assert_equal(len(topic_list), 0)

View File

@ -310,6 +310,27 @@ class CloudZone(Zone):
def has_role(self, role_name):
assert False
def create_topic(self, topicname, attributes):
assert False
def delete_topic(self, topic_arn):
assert False
def get_topic(self, topic_arn):
assert False
def list_topics(self):
assert False
def create_notification(self, bucket_name, config):
assert False
def delete_notifications(self, bucket_name):
assert False
def list_notifications(self, bucket_name):
assert False
def get_conn(self, credentials):
return self.Conn(self, credentials)

View File

@ -252,6 +252,27 @@ class ESZone(Zone):
def has_role(self, role_name):
assert False
def create_topic(self, topicname, attributes):
assert False
def delete_topic(self, topic_arn):
assert False
def list_topics(self):
assert False
def get_topic(self, topic_arn):
assert False
def create_notification(self, bucket_name, config):
assert False
def delete_notification(self, bucket_name):
assert False
def list_notifications(self, bucket_name):
assert False
def get_conn(self, credentials):
return self.Conn(self, credentials)

View File

@ -142,6 +142,34 @@ class RadosZone(Zone):
return False
return True
def create_topic(self, topicname, attributes):
result = self.sns_client.create_topic(Name=topicname, Attributes=attributes)
self.topic_arn = result['TopicArn']
return self.topic_arn
def delete_topic(self, topic_arn):
return self.sns_client.delete_topic(TopicArn=topic_arn)
def get_topic(self, topic_arn):
return self.sns_client.get_topic_attributes(TopicArn=topic_arn)
def list_topics(self):
return self.sns_client.list_topics()['Topics']
def create_notification(self, bucket_name, topic_conf_list):
return self.s3_client.put_bucket_notification_configuration(
Bucket=bucket_name, NotificationConfiguration={'TopicConfigurations': topic_conf_list})
def delete_notifications(self, bucket_name):
return self.s3_client.put_bucket_notification_configuration(Bucket=bucket_name,
NotificationConfiguration={})
def list_notifications(self, bucket_name):
out = self.s3_client.get_bucket_notification_configuration(Bucket=bucket_name)
if 'TopicConfigurations' in out:
return out['TopicConfigurations']
return []
def get_conn(self, credentials):
return self.Conn(self, credentials)