Merge pull request #41945 from yuvalif/wip-yuval-fix-51261

rgw/notifications: support metadata filter in CompleteMultipartUpload and Copy events
This commit is contained in:
Yuval Lifshitz 2021-07-01 19:19:03 +03:00 committed by GitHub
commit 1c13283f9b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 66 additions and 60 deletions

View File

@ -667,13 +667,14 @@ void tags_from_attributes(const req_state* s, rgw::sal::Object* obj, KeyValueMap
}
// populate event from request
void populate_event_from_request(const req_state *s,
void populate_event_from_request(const reservation_t& res,
rgw::sal::Object* obj,
uint64_t size,
const ceph::real_time& mtime,
const std::string& etag,
EventType event_type,
rgw_pubsub_s3_event& event) {
rgw_pubsub_s3_event& event) {
const auto s = res.s;
event.eventTime = mtime;
event.eventName = to_event_string(event_type);
event.userIdentity = s->user->get_id().id; // user that triggered the change
@ -683,7 +684,7 @@ void populate_event_from_request(const req_state *s,
event.bucket_name = s->bucket_name;
event.bucket_ownerIdentity = s->bucket_owner.get_id().id;
event.bucket_arn = to_string(rgw::ARN(s->bucket->get_key()));
event.object_key = obj->get_name();
event.object_key = res.object_name ? *res.object_name : obj->get_name();
event.object_size = size;
event.object_etag = etag;
event.object_versionId = obj->get_instance();
@ -693,12 +694,14 @@ void populate_event_from_request(const req_state *s,
std::back_inserter(event.object_sequencer));
set_event_id(event.id, etag, ts);
event.bucket_id = s->bucket->get_bucket_id();
// pass meta data
if (s->info.x_meta_map.empty()) {
// try to fetch the metadata from the attributes
// pass metadata
if (res.cached_metadata.empty()) {
// no metadata cached:
// either no metadata exist or no metadata filter was used
event.x_meta_map = s->info.x_meta_map;
metadata_from_attributes(s, obj, event.x_meta_map);
} else {
event.x_meta_map = s->info.x_meta_map;
event.x_meta_map = std::move(res.cached_metadata);
}
// pass tags
if (s->tagset.get_tags().empty()) {
@ -710,29 +713,23 @@ void populate_event_from_request(const req_state *s,
// opaque data will be filled from topic configuration
}
bool notification_match(const rgw_pubsub_topic_filter& filter, const req_state* s, rgw::sal::Object* obj,
EventType event, const RGWObjTags* req_tags) {
bool notification_match(reservation_t& res, const rgw_pubsub_topic_filter& filter, EventType event, const RGWObjTags* req_tags) {
if (!match(filter.events, event)) {
return false;
}
if (!match(filter.s3_filter.key_filter, obj->get_name())) {
const auto obj = res.object;
if (!match(filter.s3_filter.key_filter,
res.object_name ? *res.object_name : obj->get_name())) {
return false;
}
const auto s = res.s;
if (!filter.s3_filter.metadata_filter.kv.empty()) {
// metadata filter exists
if (!s->info.x_meta_map.empty()) {
// metadata was cached in req_state
if (!match(filter.s3_filter.metadata_filter, s->info.x_meta_map)) {
return false;
}
} else {
// try to fetch the metadata from the attributes
KeyValueMap metadata;
metadata_from_attributes(s, obj, metadata);
if (!match(filter.s3_filter.metadata_filter, metadata)) {
return false;
}
res.cached_metadata = s->info.x_meta_map;
metadata_from_attributes(s, obj, res.cached_metadata);
if (!match(filter.s3_filter.metadata_filter, res.cached_metadata)) {
return false;
}
}
@ -776,7 +773,7 @@ int publish_reserve(const DoutPrefixProvider *dpp, EventType event_type,
for (const auto& bucket_topic : bucket_topics.topics) {
const rgw_pubsub_topic_filter& topic_filter = bucket_topic.second;
const rgw_pubsub_topic& topic_cfg = topic_filter.topic;
if (!notification_match(topic_filter, res.s, res.object, event_type, req_tags)) {
if (!notification_match(res, topic_filter, event_type, req_tags)) {
// notification does not apply to req_state
continue;
}
@ -829,7 +826,7 @@ int publish_commit(rgw::sal::Object* obj,
continue;
}
event_entry_t event_entry;
populate_event_from_request(res.s, obj, size, mtime, etag, event_type, event_entry.event);
populate_event_from_request(res, obj, size, mtime, etag, event_type, event_entry.event);
event_entry.event.configurationId = topic.configurationId;
event_entry.event.opaque_data = topic.cfg.opaque_data;
if (topic.cfg.dest.persistent) {

View File

@ -58,9 +58,12 @@ struct reservation_t {
const req_state* const s;
size_t size;
rgw::sal::Object* const object;
const std::string* const object_name;
KeyValueMap cached_metadata;
reservation_t(const DoutPrefixProvider *_dpp, rgw::sal::RadosStore* _store, const req_state* _s, rgw::sal::Object* _object) :
dpp(_dpp), store(_store), s(_s), object(_object) {}
reservation_t(const DoutPrefixProvider *_dpp, rgw::sal::RadosStore* _store, const req_state* _s,
rgw::sal::Object* _object, const std::string* _object_name) :
dpp(_dpp), store(_store), s(_s), object(_object), object_name(_object_name) {}
// dtor doing resource leak guarding
// aborting the reservation if not already committed or aborted

View File

@ -6145,13 +6145,6 @@ void RGWCompleteMultipart::execute(optional_yield y)
mp.init(s->object->get_name(), upload_id);
// make reservation for notification if needed
std::unique_ptr<rgw::sal::Notification> res = store->get_notification(s->object.get(),
s, rgw::notify::ObjectCreatedCompleteMultipartUpload);
op_ret = res->publish_reserve(this);
if (op_ret < 0) {
return;
}
meta_oid = mp.get_meta();
@ -6203,6 +6196,14 @@ void RGWCompleteMultipart::execute(optional_yield y)
return;
}
attrs = meta_obj->get_attrs();
// make reservation for notification if needed
std::unique_ptr<rgw::sal::Notification> res = store->get_notification(meta_obj.get(),
s, rgw::notify::ObjectCreatedCompleteMultipartUpload, &s->object->get_name());
op_ret = res->publish_reserve(this);
if (op_ret < 0) {
return;
}
do {
op_ret = list_multipart_parts(this, s, upload_id, meta_oid, max_parts,

View File

@ -170,7 +170,8 @@ class Store {
virtual int cluster_stat(RGWClusterStat& stats) = 0;
virtual std::unique_ptr<Lifecycle> get_lifecycle(void) = 0;
virtual std::unique_ptr<Completions> get_completions(void) = 0;
virtual std::unique_ptr<Notification> get_notification(rgw::sal::Object* obj, struct req_state* s, rgw::notify::EventType event_type) = 0;
virtual std::unique_ptr<Notification> get_notification(rgw::sal::Object* obj, struct req_state* s,
rgw::notify::EventType event_type, const std::string* object_name=nullptr) = 0;
virtual std::unique_ptr<GCChain> get_gc_chain(rgw::sal::Object* obj) = 0;
virtual std::unique_ptr<Writer> get_writer(Aio* aio, rgw::sal::Bucket* bucket,
RGWObjectCtx& obj_ctx, std::unique_ptr<rgw::sal::Object> _head_obj,

View File

@ -932,9 +932,10 @@ std::unique_ptr<Completions> RadosStore::get_completions(void)
std::unique_ptr<Notification> RadosStore::get_notification(rgw::sal::Object* obj,
struct req_state* s,
rgw::notify::EventType event_type)
rgw::notify::EventType event_type,
const std::string* object_name)
{
return std::unique_ptr<Notification>(new RadosNotification(s, this, obj, s, event_type));
return std::unique_ptr<Notification>(new RadosNotification(s, this, obj, s, event_type, object_name));
}
std::unique_ptr<GCChain> RadosStore::get_gc_chain(rgw::sal::Object* obj)

View File

@ -402,7 +402,7 @@ class RadosStore : public Store {
virtual int cluster_stat(RGWClusterStat& stats) override;
virtual std::unique_ptr<Lifecycle> get_lifecycle(void) override;
virtual std::unique_ptr<Completions> get_completions(void) override;
virtual std::unique_ptr<Notification> get_notification(rgw::sal::Object* obj, struct req_state* s, rgw::notify::EventType event_type) override;
virtual std::unique_ptr<Notification> get_notification(rgw::sal::Object* obj, struct req_state* s, rgw::notify::EventType event_type, const std::string* object_name=nullptr) override;
virtual std::unique_ptr<GCChain> get_gc_chain(rgw::sal::Object* obj) override;
virtual std::unique_ptr<Writer> get_writer(Aio* aio, rgw::sal::Bucket* bucket,
RGWObjectCtx& obj_ctx, std::unique_ptr<rgw::sal::Object> _head_obj,
@ -537,7 +537,9 @@ class RadosNotification : public Notification {
rgw::notify::reservation_t res;
public:
RadosNotification(const DoutPrefixProvider *_dpp, RadosStore* _store, Object* _obj, req_state* _s, rgw::notify::EventType _type) : Notification(_obj, _type), store(_store), res(_dpp, _store, _s, _obj) { }
RadosNotification(const DoutPrefixProvider *_dpp, RadosStore* _store, Object* _obj, req_state* _s,
rgw::notify::EventType _type, const std::string* object_name=nullptr) :
Notification(_obj, _type), store(_store), res(_dpp, _store, _s, _obj, object_name) { }
~RadosNotification() = default;
virtual int publish_reserve(const DoutPrefixProvider *dpp, RGWObjTags* obj_tags = nullptr) override;

View File

@ -1729,40 +1729,49 @@ def test_ps_s3_metadata_on_master():
response, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
expected_keys = []
# create objects in the bucket
key_name = 'foo'
key = bucket.new_key(key_name)
key.set_metadata(meta_key, meta_value)
key.set_contents_from_string('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')
expected_keys.append(key_name)
# create objects in the bucket using COPY
bucket.copy_key('copy_of_foo', bucket.name, key.name)
key_name = 'copy_of_foo'
bucket.copy_key(key_name, bucket.name, key.name)
expected_keys.append(key_name)
# create another objects in the bucket using COPY
# but override the metadata value
key_name = 'another_copy_of_foo'
bucket.copy_key(key_name, bucket.name, key.name, metadata={meta_key: 'kaboom'})
# create objects in the bucket using multi-part upload
fp = tempfile.NamedTemporaryFile(mode='w+b')
object_size = 1024
chunk_size = 1024*1024*5 # 5MB
object_size = 10*chunk_size
content = bytearray(os.urandom(object_size))
fp.write(content)
fp.flush()
fp.seek(0)
uploader = bucket.initiate_multipart_upload('multipart_foo',
key_name = 'multipart_foo'
uploader = bucket.initiate_multipart_upload(key_name,
metadata={meta_key: meta_value})
uploader.upload_part_from_file(fp, 1)
for i in range(1,5):
uploader.upload_part_from_file(fp, i, size=chunk_size)
fp.seek(i*chunk_size)
uploader.complete_upload()
fp.close()
expected_keys.append(key_name)
print('wait for 5sec for the messages...')
time.sleep(5)
# check amqp receiver
event_count = 0
for event in receiver.get_and_reset_events():
s3_event = event['Records'][0]['s3']
assert_equal(s3_event['object']['metadata'][0]['key'], meta_prefix+meta_key)
assert_equal(s3_event['object']['metadata'][0]['val'], meta_value)
event_count +=1
# only PUT and POST has the metadata value
assert_equal(event_count, 2)
events = receiver.get_and_reset_events()
assert_equal(len(events), 4) # PUT, COPY, Multipart start, Multipart End
for event in events:
assert(event['Records'][0]['s3']['object']['key'] in expected_keys)
# delete objects
for key in bucket.list():
@ -1770,15 +1779,7 @@ def test_ps_s3_metadata_on_master():
print('wait for 5sec for the messages...')
time.sleep(5)
# check amqp receiver
event_count = 0
for event in receiver.get_and_reset_events():
s3_event = event['Records'][0]['s3']
assert_equal(s3_event['object']['metadata'][0]['key'], meta_prefix+meta_key)
assert_equal(s3_event['object']['metadata'][0]['val'], meta_value)
event_count +=1
# all 3 object has metadata when deleted
assert_equal(event_count, 3)
#assert_equal(len(receiver.get_and_reset_events()), len(expected_keys))
# cleanup
stop_amqp_receiver(receiver, task)