rgw/pubsub: support deletion markers and multipart upload

Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
This commit is contained in:
Yuval Lifshitz 2019-09-02 19:24:46 +03:00
parent c345566831
commit 923799fd96
13 changed files with 499 additions and 276 deletions

View File

@ -205,83 +205,13 @@ Response will have the following format:
Notifications
~~~~~~~~~~~~~
Create a Notification
`````````````````````
This will create a publisher for a specific bucket into a topic.
::
PUT /<bucket name>?notification
Request parameters are encoded in XML in the body of the request, with the following format:
::
<NotificationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<TopicConfiguration>
<Id></Id>
<Topic></Topic>
<Event></Event>
</TopicConfiguration>
</NotificationConfiguration>
- Id: name of the notification
- Topic: topic ARN
- Event: either ``s3:ObjectCreated:*``, or ``s3:ObjectRemoved:*`` (multiple ``Event`` tags may be used)
Delete Notification
```````````````````
Delete a specific, or all, notifications from a bucket.
::
DELETE /bucket?notification[=<notification-id>]
Request parameters:
- notification-id: name of the notification (if not provided, all notifications on the bucket are deleted)
Detailed under: `Bucket Operations`_.
.. note::
- Notification deletion is an extension to the S3 notification API
- When the bucket is deleted, any notification defined on it is also deleted
Get/List Notifications
``````````````````````
Get a specific notification, or list all notifications defined on a bucket.
::
GET /bucket?notification[=<notification-id>]
Request parameters:
- notification-id: name of the notification (if not provided, all notifications on the bucket are listed)
Response is XML formatted:
::
<NotificationConfiguration>
<TopicConfiguration>
<Id></Id>
<Topic></Topic>
<Event></Event>
</TopicConfiguration>
</NotificationConfiguration>
- Id: name of the notification
- Topic: topic ARN
- Event: for list of supported events see: `S3 Notification Compatibility`_ (to support multiple types, any combination of wildcard events and ``Event`` tags may be used)
.. note::
- Getting information on a specific notification is an extension to the S3 notification API
- When multiple notifications are fetched from the bucket, multiple ``NotificationConfiguration`` tags will be used
- "Abort Multipart Upload" request does not emit a notification
- "Delete Multiple Objects" request does not emit a notification
- Both "Initiate Multipart Upload" and "POST Object" requests will emit an ``s3:ObjectCreated:Post`` notification
Events
@ -356,3 +286,4 @@ pushed or pulled using the pubsub sync module.
.. _PubSub Module : ../pubsub-module
.. _S3 Notification Compatibility: ../s3-notification-compatibility
.. _AWS Create Topic: https://docs.aws.amazon.com/sns/latest/api/API_CreateTopic.html
.. _Bucket Operations: ../s3/bucketops

View File

@ -232,88 +232,17 @@ List all topics that user defined.
S3-Compliant Notifications
~~~~~~~~~~~~~~~~~~~~~~~~~~
Create a Notification
`````````````````````
This will create a publisher for a specific bucket into a topic, and a subscription
for pushing/pulling events.
The subscription's name will have the same as the notification Id, and could be used later to fetch
and ack events with the subscription API.
::
PUT /<bucket name>?notification
Request parameters are encoded in XML in the body of the request, with the following format:
::
<NotificationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<TopicConfiguration>
<Id></Id>
<Topic></Topic>
<Event></Event>
</TopicConfiguration>
</NotificationConfiguration>
- Id: name of the notification
- Topic: topic ARN
- Event: either ``s3:ObjectCreated:*``, or ``s3:ObjectRemoved:*`` (multiple ``Event`` tags may be used)
Delete Notification
```````````````````
Delete a specific, or all, S3-compliant notifications from a bucket. Associated subscription will also be deleted.
::
DELETE /bucket?notification[=<notification-id>]
Request parameters:
- notification-id: name of the notification (if not provided, all S3-compliant notifications on the bucket are deleted)
Detailed under: `Bucket Operations`_.
.. note::
- Notification deletion is an extension to the S3 notification API
- When the bucket is deleted, any notification defined on it is also deleted.
In this case, the associated subscription will not be deleted automatically (any events of the deleted bucket could still be access),
- Notification creation will also create a subscription for pushing/pulling events
- The generated subscription's name will have the same as the notification Id, and could be used later to fetch and ack events with the subscription API.
- Notification deletion will deletes all generated subscriptions
- In case that bucket deletion implicitly deletes the notification,
the associated subscription will not be deleted automatically (any events of the deleted bucket could still be access),
and will have to be deleted explicitly with the subscription deletion API
Get/List Notifications
``````````````````````
Get a specific S3-compliant notification, or list all S3-compliant notifications defined on a bucket.
::
GET /bucket?notification[=<notification-id>]
Request parameters:
- notification-id: name of the notification (if not provided, all S3-compliant notifications on the bucket are listed)
Response is XML formatted:
::
<NotificationConfiguration>
<TopicConfiguration>
<Id></Id>
<Topic></Topic>
<Event></Event>
</TopicConfiguration>
</NotificationConfiguration>
- Id: name of the notification
- Topic: topic ARN
- Event: either ``s3:ObjectCreated:*``, or ``s3:ObjectRemoved:*`` (multiple ``Event`` tags may be used)
.. note::
- Getting information on a specific notification is an extension to the S3 notification API
- When multiple notifications are fetched from the bucket, multiple ``NotificationConfiguration`` tags will be used
Non S3-Compliant Notifications
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@ -330,7 +259,7 @@ This will create a publisher for a specific bucket into a topic.
Request parameters:
- topic-name: name of topic
- event: event type (string), one of: OBJECT_CREATE, OBJECT_DELETE
- event: event type (string), one of: ``OBJECT_CREATE``, ``OBJECT_DELETE``, ``DELETE_MARKER_CREATE``
Delete Notification Information
```````````````````````````````
@ -564,7 +493,7 @@ the events will have the following event format (JSON):
]}
- id: unique ID of the event, that could be used for acking
- event: either ``OBJECT_CREATE``, or ``OBJECT_DELETE``
- event: one of: ``OBJECT_CREATE``, ``OBJECT_DELETE``, ``DELETE_MARKER_CREATE``
- timestamp: timestamp indicating when the event was sent
- info.attrs.mtime: timestamp indicating when the event was triggered
- info.bucket.bucket_id: id of the bucket
@ -588,3 +517,4 @@ Request parameters:
.. _Multisite : ../multisite
.. _Bucket Notification : ../notifications
.. _Bucket Operations: ../s3/bucketops

View File

@ -79,14 +79,13 @@ Event Types
+----------------------------------------------+-----------------+-------------------------------------------+
| ``s3:ObjectCreated:Copy`` | Supported | Supported at ``s3:ObjectCreated:*`` level |
+----------------------------------------------+-----------------+-------------------------------------------+
| ``s3:ObjectCreated:CompleteMultipartUpload`` | Not Supported | Supported at ``s3:ObjectCreated:*`` level |
| (an extension to AWS) | | |
| ``s3:ObjectCreated:CompleteMultipartUpload`` | Supported | Supported at ``s3:ObjectCreated:*`` level |
+----------------------------------------------+-----------------+-------------------------------------------+
| ``s3:ObjectRemoved:*`` | Supported |
| ``s3:ObjectRemoved:*`` | Supported | Supported only the specific events below |
+----------------------------------------------+-----------------+-------------------------------------------+
| ``s3:ObjectRemoved:Delete`` | Supported | supported at ``s3:ObjectRemoved:*`` level |
| ``s3:ObjectRemoved:Delete`` | Supported |
+----------------------------------------------+-----------------+-------------------------------------------+
| ``s3:ObjectRemoved:DeleteMarkerCreated`` | Supported at ``s3:ObjectRemoved:*`` level |
| ``s3:ObjectRemoved:DeleteMarkerCreated`` | Supported |
+----------------------------------------------+-----------------+-------------------------------------------+
| ``s3:ObjectRestore:Post`` | Not applicable to Ceph |
+----------------------------------------------+-----------------+-------------------------------------------+

View File

@ -477,3 +477,162 @@ Response Entities
| ``Years`` | Integer | The number of years specified for the default retention period. | No |
+-----------------------------+-------------+----------------------------------------------------------------------------------------+----------+
Create Notification
-------------------
Create a publisher for a specific bucket into a topic.
Syntax
~~~~~~
::
PUT /<bucket name>?notification HTTP/1.1
Request Entities
~~~~~~~~~~~~~~~~
Parameters are XML encoded in the body of the request, in the following format:
::
<NotificationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<TopicConfiguration>
<Id></Id>
<Topic></Topic>
<Event></Event>
</TopicConfiguration>
</NotificationConfiguration>
+-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
| Name | Type | Description | Required |
+===============================+===========+======================================================================================+==========+
| ``NotificationConfiguration`` | Container | Holding list of ``TopicConfiguration`` entities | Yes |
+-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
| ``TopicConfiguration`` | Container | Holding ``Id``, ``Topic`` and list of ``Event`` entities | Yes |
+-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
| ``Id`` | String | Name of the notification | Yes |
+-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
| ``Topic`` | String | Topic ARN. Topic must be created beforehand | Yes |
+-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
| ``Event`` | String | List of supported events see: `S3 Notification Compatibility`_. Multiple ``Event`` | No |
| | | entities can be used. If omitted, all events are handled | |
+-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
HTTP Response
~~~~~~~~~~~~~
+---------------+-----------------------+----------------------------------------------------------+
| HTTP Status | Status Code | Description |
+===============+=======================+==========================================================+
| ``400`` | MalformedXML | The XML is not well-formed |
+---------------+-----------------------+----------------------------------------------------------+
| ``400`` | InvalidArgument | Missing Id; Missing/Invalid Topic ARN; Invalid Event |
+---------------+-----------------------+----------------------------------------------------------+
| ``404`` | NoSuchBucket | The bucket does not exist |
+---------------+-----------------------+----------------------------------------------------------+
| ``404`` | NoSuchKey | The topic does not exist |
+---------------+-----------------------+----------------------------------------------------------+
Delete Notification
-------------------
Delete a specific, or all, notifications from a bucket.
.. note::
- Notification deletion is an extension to the S3 notification API
- When the bucket is deleted, any notification defined on it is also deleted
- Deleting an unkown notification (e.g. double delete) is not considered an error
Syntax
~~~~~~
::
DELETE /bucket?notification[=<notification-id>] HTTP/1.1
Parameters
~~~~~~~~~~
+------------------------+-----------+----------------------------------------------------------------------------------------+
| Name | Type | Description |
+========================+===========+========================================================================================+
| ``notification-id`` | String | Name of the notification. If not provided, all notifications on the bucket are deleted |
+------------------------+-----------+----------------------------------------------------------------------------------------+
HTTP Response
~~~~~~~~~~~~~
+---------------+-----------------------+----------------------------------------------------------+
| HTTP Status | Status Code | Description |
+===============+=======================+==========================================================+
| ``404`` | NoSuchBucket | The bucket does not exist |
+---------------+-----------------------+----------------------------------------------------------+
Get/List Notification
---------------------
Get a specific notification, or list all notifications configured on a bucket.
Syntax
~~~~~~
::
GET /bucket?notification[=<notification-id>] HTTP/1.1
Parameters
~~~~~~~~~~
+------------------------+-----------+----------------------------------------------------------------------------------------+
| Name | Type | Description |
+========================+===========+========================================================================================+
| ``notification-id`` | String | Name of the notification. If not provided, all notifications on the bucket are listed |
+------------------------+-----------+----------------------------------------------------------------------------------------+
Response Entities
~~~~~~~~~~~~~~~~~
Response is XML encoded in the body of the request, in the following format:
::
<NotificationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<TopicConfiguration>
<Id></Id>
<Topic></Topic>
<Event></Event>
</TopicConfiguration>
</NotificationConfiguration>
+-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
| Name | Type | Description | Required |
+===============================+===========+======================================================================================+==========+
| ``NotificationConfiguration`` | Container | Holding list of ``TopicConfiguration`` entities | Yes |
+-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
| ``TopicConfiguration`` | Container | Holding ``Id``, ``Topic`` and list of ``Event`` entities | Yes |
+-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
| ``Id`` | String | Name of the notification | Yes |
+-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
| ``Topic`` | String | Topic ARN | Yes |
+-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
| ``Event`` | String | Handled event. Multiple ``Event`` entities may exist | Yes |
+-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
HTTP Response
~~~~~~~~~~~~~
+---------------+-----------------------+----------------------------------------------------------+
| HTTP Status | Status Code | Description |
+===============+=======================+==========================================================+
| ``404`` | NoSuchBucket | The bucket does not exist |
+---------------+-----------------------+----------------------------------------------------------+
| ``404`` | NoSuchKey | The notification does not exist (if provided) |
+---------------+-----------------------+----------------------------------------------------------+
.. _S3 Notification Compatibility: ../s3-notification-compatibility

View File

@ -84,6 +84,12 @@ int publish(const req_state* s,
continue;
}
event_should_be_handled = true;
record.configurationId = topic_filter.s3_id;
ldout(s->cct, 20) << "notification: '" << topic_filter.s3_id <<
"' on topic: '" << topic_cfg.dest.arn_topic <<
"' and bucket: '" << s->bucket.name <<
"' (unique topic: '" << topic_cfg.name <<
"') apply to event of type: '" << to_string(event_type) << "'" << dendl;
try {
// TODO add endpoint LRU cache
const auto push_endpoint = RGWPubSubEndpoint::create(topic_cfg.dest.push_endpoint,

View File

@ -16,6 +16,8 @@ namespace rgw::notify {
return "s3:ObjectCreated:Post";
case ObjectCreatedCopy:
return "s3:ObjectCreated:Copy";
case ObjectCreatedCompleteMultipartUpload:
return "s3:ObjectCreated:CompleteMultipartUpload";
case ObjectRemoved:
return "s3:ObjectRemoved:*";
case ObjectRemovedDelete:
@ -34,12 +36,13 @@ namespace rgw::notify {
case ObjectCreatedPut:
case ObjectCreatedPost:
case ObjectCreatedCopy:
case ObjectCreatedCompleteMultipartUpload:
return "OBJECT_CREATE";
case ObjectRemoved:
case ObjectRemovedDelete:
return "OBJECT_DELETE";
case ObjectRemovedDeleteMarkerCreated:
return "DELETE_MARKER_CREATE";
case ObjectRemoved:
case UnknownEvent:
return "UNKNOWN_EVENT";
}
@ -55,9 +58,11 @@ namespace rgw::notify {
return ObjectCreatedPost;
if (s == "s3:ObjectCreated:Copy")
return ObjectCreatedCopy;
if (s == "s3:ObjectRemoved:*" || s == "OBJECT_DELETE")
if (s == "s3:ObjectCreated:CompleteMultipartUpload")
return ObjectCreatedCompleteMultipartUpload;
if (s == "s3:ObjectRemoved:*")
return ObjectRemoved;
if (s == "s3:ObjectRemoved:Delete")
if (s == "s3:ObjectRemoved:Delete" || s == "OBJECT_DELETE")
return ObjectRemovedDelete;
if (s == "s3:ObjectRemoved:DeleteMarkerCreated" || s == "DELETE_MARKER_CREATE")
return ObjectRemovedDeleteMarkerCreated;

View File

@ -7,14 +7,15 @@
namespace rgw::notify {
enum EventType {
ObjectCreated = 0xF,
ObjectCreatedPut = 0x1,
ObjectCreatedPost = 0x2,
ObjectCreatedCopy = 0x4,
ObjectRemoved = 0xF0,
ObjectRemovedDelete = 0x10,
ObjectRemovedDeleteMarkerCreated = 0x20,
UnknownEvent = 0x100
ObjectCreated = 0xF,
ObjectCreatedPut = 0x1,
ObjectCreatedPost = 0x2,
ObjectCreatedCopy = 0x4,
ObjectCreatedCompleteMultipartUpload = 0x8,
ObjectRemoved = 0xF0,
ObjectRemovedDelete = 0x10,
ObjectRemovedDeleteMarkerCreated = 0x20,
UnknownEvent = 0x100
};
using EventTypeList = std::vector<EventType>;

View File

@ -4278,8 +4278,8 @@ void RGWPostObj::execute()
return;
}
} while (is_next_file_to_upload());
// send request to notification manager
const auto ret = rgw::notify::publish(s, real_time(), etag, rgw::notify::ObjectCreatedPost, store);
const auto ret = rgw::notify::publish(s, ceph::real_clock::now(), etag, rgw::notify::ObjectCreatedPost, store);
if (ret < 0) {
ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
// TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
@ -4705,9 +4705,10 @@ void RGWDeleteObj::execute()
bool check_obj_lock = obj.key.have_instance() && s->bucket_info.obj_lock_enabled();
if (!s->object.empty()) {
op_ret = get_obj_attrs(store, s, obj, attrs);
if (need_object_expiration() || multipart_delete) {
/* check if obj exists, read orig attrs */
op_ret = get_obj_attrs(store, s, obj, attrs);
if (op_ret < 0) {
return;
}
@ -4715,7 +4716,6 @@ void RGWDeleteObj::execute()
if (check_obj_lock) {
/* check if obj exists, read orig attrs */
op_ret = get_obj_attrs(store, s, obj, attrs);
if (op_ret < 0) {
if (op_ret == -ENOENT) {
/* object maybe delete_marker, skip check_obj_lock*/
@ -4726,6 +4726,9 @@ void RGWDeleteObj::execute()
}
}
// ignore return value from get_obj_attrs in all other cases
op_ret = 0;
if (check_obj_lock) {
auto aiter = attrs.find(RGW_ATTR_OBJECT_RETENTION);
if (aiter != attrs.end()) {
@ -4828,9 +4831,10 @@ void RGWDeleteObj::execute()
} else {
op_ret = -EINVAL;
}
// TODO: add etag calculation
std::string etag;
const auto ret = rgw::notify::publish(s, real_time(), etag, rgw::notify::ObjectRemovedDelete, store);
const auto ret = rgw::notify::publish(s, ceph::real_clock::now(), attrs[RGW_ATTR_ETAG].to_str(),
delete_marker && s->object.instance.empty() ? rgw::notify::ObjectRemovedDeleteMarkerCreated : rgw::notify::ObjectRemovedDelete,
store);
if (ret < 0) {
ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
// TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
@ -5142,7 +5146,6 @@ void RGWCopyObj::execute()
this,
s->yield);
// TODO: use s3:ObjectCreated:Copy
const auto ret = rgw::notify::publish(s, mtime, etag, rgw::notify::ObjectCreatedCopy, store);
if (ret < 0) {
ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
@ -5820,6 +5823,14 @@ void RGWInitMultipart::execute()
op_ret = obj_op.write_meta(bl.length(), 0, attrs, s->yield);
} while (op_ret == -EEXIST);
const auto ret = rgw::notify::publish(s, ceph::real_clock::now(), attrs[RGW_ATTR_ETAG].to_str(), rgw::notify::ObjectCreatedPost, store);
if (ret < 0) {
ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
// TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
// this should be global conf (probably returnign a different handler)
// so we don't need to read the configured values before we perform it
}
}
int RGWCompleteMultipart::verify_permission()
@ -6126,6 +6137,14 @@ void RGWCompleteMultipart::execute()
} else {
ldpp_dout(this, 0) << "WARNING: failed to remove object " << meta_obj << dendl;
}
const auto ret = rgw::notify::publish(s, ceph::real_clock::now(), etag, rgw::notify::ObjectCreatedCompleteMultipartUpload, store);
if (ret < 0) {
ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
// TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
// this should be global conf (probably returnign a different handler)
// so we don't need to read the configured values before we perform it
}
}
int RGWCompleteMultipart::MPSerializer::try_lock(

View File

@ -160,6 +160,7 @@ private:
CephContext* const cct;
const std::string endpoint;
const std::string topic;
const std::string exchange;
amqp::connection_ptr_t conn;
ack_level_t ack_level;
std::string str_ack_level;
@ -271,8 +272,9 @@ public:
CephContext* _cct) :
cct(_cct),
endpoint(_endpoint),
topic(_topic),
conn(amqp::connect(endpoint, get_exchange(args))) {
topic(_topic),
exchange(get_exchange(args)),
conn(amqp::connect(endpoint, exchange)) {
if (!conn) {
throw configuration_error("AMQP: failed to create connection to: " + endpoint);
}
@ -393,6 +395,7 @@ public:
std::string str("AMQP(0.9.1) Endpoint");
str += "\nURI: " + endpoint;
str += "\nTopic: " + topic;
str += "\nExchange: " + exchange;
str += "\nAck Level: " + str_ack_level;
return str;
}

View File

@ -509,7 +509,7 @@ void RGWPSCreateNotif_ObjStore_S3::execute() {
ups->remove_topic(unique_topic_name);
return;
}
ldout(s->cct, 20) << "successfully auto-generated notification for unique topic'" << unique_topic_name << "'" << dendl;
ldout(s->cct, 20) << "successfully auto-generated notification for unique topic '" << unique_topic_name << "'" << dendl;
if (!push_only) {
// generate the subscription with destination information from the original topic

View File

@ -346,16 +346,14 @@ private:
}
std::string events_str = s->info.args.get("events", &exists);
if (exists) {
rgw::notify::from_string_list(events_str, events);
if (std::find(events.begin(), events.end(), rgw::notify::UnknownEvent) != events.end()) {
ldout(s->cct, 1) << "invalid event type in list: " << events_str << dendl;
return -EINVAL;
}
} else {
// if no events are provided, we assume all events
events.push_back(rgw::notify::ObjectCreated);
events.push_back(rgw::notify::ObjectRemoved);
if (!exists) {
// if no events are provided, we notify on all of them
events_str = "OBJECT_CREATE,OBJECT_DELETE,DELETE_MARKER_CREATE";
}
rgw::notify::from_string_list(events_str, events);
if (std::find(events.begin(), events.end(), rgw::notify::UnknownEvent) != events.end()) {
ldout(s->cct, 1) << "invalid event type in list: " << events_str << dendl;
return -EINVAL;
}
return notif_bucket_path(s->object.name, bucket_name);
}

View File

@ -20,7 +20,7 @@ from .tests import get_realm, \
gen_bucket_name, \
get_user, \
get_tenant
from .zone_ps import PSTopic, PSTopicS3, PSNotification, PSSubscription, PSNotificationS3, print_connection_info
from .zone_ps import PSTopic, PSTopicS3, PSNotification, PSSubscription, PSNotificationS3, print_connection_info, delete_all_s3_topics
from multisite import User
from nose import SkipTest
from nose.tools import assert_not_equal, assert_equal
@ -152,22 +152,8 @@ class AMQPReceiver(object):
def __init__(self, exchange, topic):
import pika
hostname = get_ip()
retries = 20
connect_ok = False
while retries > 0:
try:
connection = pika.BlockingConnection(pika.ConnectionParameters(host=hostname, port=rabbitmq_port))
connect_ok = True
break
except Exception as error:
retries -= 1
print 'AMQP receiver failed to connect (try %d): %s' % (10 - retries, str(error))
log.info('AMQP receiver failed to connect (try %d): %s', 10 - retries, str(error))
time.sleep(2)
connection = pika.BlockingConnection(pika.ConnectionParameters(host=hostname, port=rabbitmq_port))
if connect_ok == False:
raise error
self.channel = connection.channel()
self.channel.exchange_declare(exchange=exchange, exchange_type='topic', durable=True)
result = self.channel.queue_declare('', exclusive=True)
@ -177,10 +163,11 @@ class AMQPReceiver(object):
on_message_callback=self.on_message,
auto_ack=True)
self.events = []
self.topic = topic
def on_message(self, ch, method, properties, body):
"""callback invoked when a new message arrive on the topic"""
log.info('AMQP received event: %s', body)
log.info('AMQP received event for topic %s:\n %s', self.topic, body)
self.events.append(json.loads(body))
# TODO create a base class for the AMQP and HTTP cases
@ -194,6 +181,11 @@ class AMQPReceiver(object):
verify_events_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions)
self.events = []
def get_and_reset_events(self):
tmp = self.events
self.events = []
return tmp
def amqp_receiver_thread_runner(receiver):
"""main thread function for the amqp receiver"""
@ -613,6 +605,9 @@ def test_ps_s3_topic_on_master():
zonegroup = realm.master_zonegroup()
bucket_name = gen_bucket_name()
topic_name = bucket_name + TOPIC_SUFFIX
# clean all topics
delete_all_s3_topics(zones[0].conn, zonegroup.name)
# create s3 topics
endpoint_address = 'amqp://127.0.0.1:7001'
@ -635,44 +630,36 @@ def test_ps_s3_topic_on_master():
assert_equal(topic_arn,
'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name + '_3')
try:
# get topic 3
result, status = topic_conf3.get_config()
assert_equal(status, 200)
assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn'])
assert_equal(endpoint_address, result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress'])
# Note that endpoint args may be ordered differently in the result
# get topic 3
result, status = topic_conf3.get_config()
assert_equal(status, 200)
assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn'])
assert_equal(endpoint_address, result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress'])
# Note that endpoint args may be ordered differently in the result
# delete topic 1
result = topic_conf1.del_config()
assert_equal(status, 200)
# delete topic 1
result = topic_conf1.del_config()
assert_equal(status, 200)
# try to get a deleted topic (1)
_, status = topic_conf1.get_config()
assert_equal(status, 404)
# try to get a deleted topic
_, status = topic_conf1.get_config()
assert_equal(status, 404)
# get the remaining 2 topics
result = topic_conf1.get_list()
assert_equal(len(result['Topics']), 2)
# delete topics
result = topic_conf2.del_config()
# TODO: should be 200OK
# assert_equal(status, 200)
result = topic_conf3.del_config()
# TODO: should be 200OK
# assert_equal(status, 200)
# get the remaining 2 topics
result = topic_conf1.get_list()
assert_equal(len(result['Topics']), 2)
# delete topics
result = topic_conf2.del_config()
# TODO: should be 200OK
# assert_equal(status, 200)
result = topic_conf3.del_config()
# TODO: should be 200OK
# assert_equal(status, 200)
# get topic list, make sure it is empty
result = topic_conf1.get_list()
assert_equal(len(result['Topics']), 0)
except AssertionError as e:
# topics are stored at user level, so cleanup is needed
# to prevent failures in consequent runs
topic_conf1.del_config()
topic_conf2.del_config()
topic_conf3.del_config()
raise e
# get topic list, make sure it is empty
result = topic_conf1.get_list()
assert_equal(len(result['Topics']), 0)
def test_ps_s3_notification_on_master():
@ -779,8 +766,9 @@ def test_ps_s3_notification_errors_on_master():
assert False, 'missing notification name is expected to fail'
# create s3 notification with invalid topic ARN
invalid_topic_arn = 'kaboom'
topic_conf_list = [{'Id': notification_name,
'TopicArn': 'kaboom',
'TopicArn': invalid_topic_arn,
'Events': ['s3:ObjectCreated:Put']
}]
s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
@ -791,6 +779,20 @@ def test_ps_s3_notification_errors_on_master():
else:
assert False, 'invalid ARN is expected to fail'
# create s3 notification with unknown topic ARN
invalid_topic_arn = 'arn:aws:sns:a::kaboom'
topic_conf_list = [{'Id': notification_name,
'TopicArn': invalid_topic_arn ,
'Events': ['s3:ObjectCreated:Put']
}]
s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
try:
_, _ = s3_notification_conf.set_config()
except Exception as error:
print str(error) + ' - is expected'
else:
assert False, 'unknown topic is expected to fail'
# create s3 notification with wrong bucket
topic_conf_list = [{'Id': notification_name,
'TopicArn': topic_arn,
@ -804,8 +806,16 @@ def test_ps_s3_notification_errors_on_master():
else:
assert False, 'unknown bucket is expected to fail'
# cleanup
topic_conf.del_config()
status = topic_conf.del_config()
# deleting an unknown notification is not considered an error
assert_equal(status, 200)
_, status = topic_conf.get_config()
assert_equal(status, 404)
# cleanup
# delete the bucket
zones[0].delete_bucket(bucket_name)
@ -905,7 +915,7 @@ def test_ps_s3_notification_push_amqp_on_master():
start_time = time.time()
for i in range(number_of_objects):
key = bucket.new_key(str(i))
content = 'bar'
content = str(os.urandom(1024*1024))
thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
thr.start()
client_threads.append(thr)
@ -1596,7 +1606,7 @@ def test_ps_s3_creation_triggers_on_master():
# create s3 notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn,
'Events': ['s3:ObjectCreated:Put', 's3:ObjectCreated:Copy', 's3:ObjectCreated:Post']
'Events': ['s3:ObjectCreated:Put', 's3:ObjectCreated:Copy']
}]
s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
@ -1636,55 +1646,178 @@ def test_ps_s3_creation_triggers_on_master():
clean_rabbitmq(proc)
def test_ps_s3_multipart_on_master():
""" test multipart object upload on master"""
if skip_push_tests:
return SkipTest("PubSub push tests don't run in teuthology")
hostname = get_ip()
proc = init_rabbitmq()
if proc is None:
return SkipTest('end2end amqp tests require rabbitmq-server installed')
zones, _ = init_env(require_ps=False)
realm = get_realm()
zonegroup = realm.master_zonegroup()
# create bucket
bucket_name = gen_bucket_name()
bucket = zones[0].create_bucket(bucket_name)
topic_name = bucket_name + TOPIC_SUFFIX
# start amqp receivers
exchange = 'ex1'
task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name+'_1')
task1.start()
task2, receiver2 = create_amqp_receiver_thread(exchange, topic_name+'_2')
task2.start()
task3, receiver3 = create_amqp_receiver_thread(exchange, topic_name+'_3')
task3.start()
# create s3 topics
endpoint_address = 'amqp://' + hostname
endpoint_args = 'push-endpoint=' + endpoint_address + '&amqp-exchange=' + exchange + '&amqp-ack-level=broker'
topic_conf1 = PSTopicS3(zones[0].conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args)
topic_arn1 = topic_conf1.set_config()
topic_conf2 = PSTopicS3(zones[0].conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args)
topic_arn2 = topic_conf2.set_config()
topic_conf3 = PSTopicS3(zones[0].conn, topic_name+'_3', zonegroup.name, endpoint_args=endpoint_args)
topic_arn3 = topic_conf3.set_config()
# create s3 notifications
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1,
'Events': ['s3:ObjectCreated:*']
},
{'Id': notification_name+'_2', 'TopicArn': topic_arn2,
'Events': ['s3:ObjectCreated:Post']
},
{'Id': notification_name+'_3', 'TopicArn': topic_arn3,
'Events': ['s3:ObjectCreated:CompleteMultipartUpload']
}]
s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
# create objects in the bucket using multi-part upload
fp = tempfile.TemporaryFile(mode='w+b')
content = bytearray(os.urandom(1024*1024))
fp.write(content)
fp.flush()
fp.seek(0)
uploader = bucket.initiate_multipart_upload('multipart')
uploader.upload_part_from_file(fp, 1)
uploader.complete_upload()
fp.close()
print 'wait for 5sec for the messages...'
time.sleep(5)
# check amqp receiver
events = receiver1.get_and_reset_events()
assert_equal(len(events), 3)
events = receiver2.get_and_reset_events()
assert_equal(len(events), 1)
assert_equal(events[0]['eventName'], 's3:ObjectCreated:Post')
assert_equal(events[0]['s3']['configurationId'], notification_name+'_2')
events = receiver3.get_and_reset_events()
assert_equal(len(events), 1)
assert_equal(events[0]['eventName'], 's3:ObjectCreated:CompleteMultipartUpload')
assert_equal(events[0]['s3']['configurationId'], notification_name+'_3')
# cleanup
stop_amqp_receiver(receiver1, task1)
stop_amqp_receiver(receiver2, task2)
stop_amqp_receiver(receiver3, task3)
s3_notification_conf.del_config()
topic_conf1.del_config()
topic_conf2.del_config()
topic_conf3.del_config()
for key in bucket.list():
key.delete()
# delete the bucket
zones[0].delete_bucket(bucket_name)
clean_rabbitmq(proc)
def test_ps_versioned_deletion():
""" test notification of deletion markers """
zones, ps_zones = init_env()
bucket_name = gen_bucket_name()
topic_name = bucket_name+TOPIC_SUFFIX
# create topic
topic_conf = PSTopic(ps_zones[0].conn, topic_name)
topic_conf.set_config()
# create topics
topic_conf1 = PSTopic(ps_zones[0].conn, topic_name+'_1')
_, status = topic_conf1.set_config()
assert_equal(status/100, 2)
topic_conf2 = PSTopic(ps_zones[0].conn, topic_name+'_2')
_, status = topic_conf2.set_config()
assert_equal(status/100, 2)
# create bucket on the first of the rados zones
bucket = zones[0].create_bucket(bucket_name)
bucket.configure_versioning(True)
# wait for sync
zone_meta_checkpoint(ps_zones[0].zone)
# create notifications
# TODO use 'DELETE_MARKER_CREATE'
event_type = 'OBJECT_DELETE'
notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
topic_name, event_type)
_, status = notification_conf.set_config()
event_type1 = 'OBJECT_DELETE'
notification_conf1 = PSNotification(ps_zones[0].conn, bucket_name,
topic_name+'_1',
event_type1)
_, status = notification_conf1.set_config()
assert_equal(status/100, 2)
# create subscription
sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX,
topic_name)
_, status = sub_conf.set_config()
event_type2 = 'DELETE_MARKER_CREATE'
notification_conf2 = PSNotification(ps_zones[0].conn, bucket_name,
topic_name+'_2',
event_type2)
_, status = notification_conf2.set_config()
assert_equal(status/100, 2)
# create subscriptions
sub_conf1 = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX+'_1',
topic_name+'_1')
_, status = sub_conf1.set_config()
assert_equal(status/100, 2)
sub_conf2 = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX+'_2',
topic_name+'_2')
_, status = sub_conf2.set_config()
assert_equal(status/100, 2)
# create objects in the bucket
key = bucket.new_key('foo')
key.set_contents_from_string('bar')
v1 = key.version_id
key.set_contents_from_string('kaboom')
v2 = key.version_id
# create deletion marker
delete_marker_key = bucket.delete_key(key.name)
# wait for sync
zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
# set delete markers
# delete the deletion marker
delete_marker_key.delete()
# delete versions
bucket.delete_key(key.name, version_id=v2)
bucket.delete_key(key.name, version_id=v1)
# wait for sync
zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
# get the delete events from the subscription
result, _ = sub_conf.get_events()
result, _ = sub_conf1.get_events()
parsed_result = json.loads(result)
for event in parsed_result['events']:
log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
assert_equal(str(event['event']), event_type)
assert_equal(str(event['event']), event_type1)
# TODO: verify we have exactly 2 events
assert len(parsed_result['events']) >= 2
result, _ = sub_conf2.get_events()
parsed_result = json.loads(result)
for event in parsed_result['events']:
log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
assert_equal(str(event['event']), event_type2)
# cleanup
# follwing is needed for the cleanup in the case of 3-zones
@ -1697,9 +1830,12 @@ def test_ps_versioned_deletion():
zones[0].delete_bucket(bucket_name)
except:
log.debug('zonegroup_bucket_checkpoint failed, cannot delete bucket')
sub_conf.del_config()
notification_conf.del_config()
topic_conf.del_config()
sub_conf1.del_config()
sub_conf2.del_config()
notification_conf1.del_config()
notification_conf2.del_config()
topic_conf1.del_config()
topic_conf2.del_config()
def test_ps_s3_metadata_on_master():
@ -1791,8 +1927,14 @@ def test_ps_s3_versioned_deletion_on_master():
# create s3 notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
# TODO use s3:ObjectRemoved:DeleteMarkerCreated once supported in the code
topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn,
'Events': ['s3:ObjectRemoved:Delete', 's3:ObjectCreated:Put']
topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn,
'Events': ['s3:ObjectRemoved:*']
},
{'Id': notification_name+'_2', 'TopicArn': topic_arn,
'Events': ['s3:ObjectRemoved:DeleteMarkerCreated']
},
{'Id': notification_name+'_3', 'TopicArn': topic_arn,
'Events': ['s3:ObjectRemoved:Delete']
}]
s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
@ -1804,24 +1946,37 @@ def test_ps_s3_versioned_deletion_on_master():
v1 = key.version_id
key.set_contents_from_string('kaboom')
v2 = key.version_id
keys = list(bucket.list())
# create delete marker (non versioned deletion)
delete_marker_key = bucket.delete_key(key.name)
print 'wait for 5sec for the messages...'
time.sleep(5)
time.sleep(1)
# check amqp receiver
# Note: should not do exact match in case of versioned objects
receiver.verify_s3_events(keys, exact_match=False)
# set delete markers
# versioned deletion
bucket.delete_key(key.name, version_id=v2)
bucket.delete_key(key.name, version_id=v1)
delete_marker_key.delete()
print 'wait for 5sec for the messages...'
time.sleep(5)
# check amqp receiver
# Note: should not do exact match in case of versioned objects
receiver.verify_s3_events(keys, exact_match=False, deletions=True)
events = receiver.get_and_reset_events()
delete_events = 0
delete_marker_create_events = 0
for event in events:
if event['eventName'] == 's3:ObjectRemoved:Delete':
delete_events += 1
assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_3']
if event['eventName'] == 's3:ObjectRemoved:DeleteMarkerCreated':
delete_marker_create_events += 1
assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_2']
# 3 key versions were deleted (v1, v2 and the deletion marker)
# notified over the same topic via 2 notifications (1,3)
assert_equal(delete_events, 3*2)
# 1 deletion marker was created
# notified over the same topic over 2 notifications (1,2)
assert_equal(delete_marker_create_events, 1*2)
# cleanup
stop_amqp_receiver(receiver, task)

View File

@ -130,6 +130,23 @@ class PSTopic:
return self.send_request('GET', get_list=True)
def delete_all_s3_topics(conn, region):
try:
client = boto3.client('sns',
endpoint_url='http://'+conn.host+':'+str(conn.port),
aws_access_key_id=conn.aws_access_key_id,
aws_secret_access_key=conn.aws_secret_access_key,
region_name=region,
config=Config(signature_version='s3'))
topics = client.list_topics()['Topics']
for topic in topics:
print 'topic cleanup, deleting: ' + topic['TopicArn']
assert client.delete_topic(TopicArn=topic['TopicArn'])['ResponseMetadata']['HTTPStatusCode'] == 200
except:
print 'failed to do topic cleanup. if there are topics they may need to be manually deleted'
class PSTopicS3:
"""class to set/list/get/delete a topic
POST ?Action=CreateTopic&Name=<topic name>&push-endpoint=<endpoint>&[<arg1>=<value1>...]]