mirror of https://github.com/ceph/ceph synced 2025-01-01 08:32:24 +00:00

Merge pull request #39192 from linuxbox2/wip-lc-notify

rgwlc:  optionally support notifications on object expiration
This commit is contained in:
Matt Benjamin 2022-01-05 09:38:47 -05:00 committed by GitHub
commit 2a465f806a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 638 additions and 225 deletions

View File

@ -79,31 +79,43 @@ However, the following fields may be sent empty, under the different deployment
Event Types
-----------
+----------------------------------------------+-----------------+-------------------------------------------+
| Event | Notification | PubSub |
+==============================================+=================+===========================================+
| ``s3:ObjectCreated:*`` | Supported |
+----------------------------------------------+-----------------+-------------------------------------------+
| ``s3:ObjectCreated:Put`` | Supported | Supported at ``s3:ObjectCreated:*`` level |
+----------------------------------------------+-----------------+-------------------------------------------+
| ``s3:ObjectCreated:Post`` | Supported | Not Supported |
+----------------------------------------------+-----------------+-------------------------------------------+
| ``s3:ObjectCreated:Copy`` | Supported | Supported at ``s3:ObjectCreated:*`` level |
+----------------------------------------------+-----------------+-------------------------------------------+
| ``s3:ObjectCreated:CompleteMultipartUpload`` | Supported | Supported at ``s3:ObjectCreated:*`` level |
+----------------------------------------------+-----------------+-------------------------------------------+
| ``s3:ObjectRemoved:*`` | Supported | Supported only the specific events below |
+----------------------------------------------+-----------------+-------------------------------------------+
| ``s3:ObjectRemoved:Delete`` | Supported |
+----------------------------------------------+-----------------+-------------------------------------------+
| ``s3:ObjectRemoved:DeleteMarkerCreated`` | Supported |
+----------------------------------------------+-----------------+-------------------------------------------+
| ``s3:ObjectRestore:Post`` | Not applicable to Ceph |
+----------------------------------------------+-----------------+-------------------------------------------+
| ``s3:ObjectRestore:Complete`` | Not applicable to Ceph |
+----------------------------------------------+-----------------+-------------------------------------------+
| ``s3:ReducedRedundancyLostObject`` | Not applicable to Ceph |
+----------------------------------------------+-----------------+-------------------------------------------+
+------------------------------------------------+-----------------+-------------------------------------------+
| Event | Notification | PubSub |
+================================================+=================+===========================================+
| ``s3:ObjectCreated:*`` | Supported |
+------------------------------------------------+-----------------+-------------------------------------------+
| ``s3:ObjectCreated:Put`` | Supported | Supported at ``s3:ObjectCreated:*`` level |
+------------------------------------------------+-----------------+-------------------------------------------+
| ``s3:ObjectCreated:Post`` | Supported | Not Supported |
+------------------------------------------------+-----------------+-------------------------------------------+
| ``s3:ObjectCreated:Copy`` | Supported | Supported at ``s3:ObjectCreated:*`` level |
+------------------------------------------------+-----------------+-------------------------------------------+
| ``s3:ObjectCreated:CompleteMultipartUpload`` | Supported | Supported at ``s3:ObjectCreated:*`` level |
+------------------------------------------------+-----------------+-------------------------------------------+
| ``s3:ObjectRemoved:*`` | Supported | Supported only the specific events below |
+------------------------------------------------+-----------------+-------------------------------------------+
| ``s3:ObjectRemoved:Delete`` | Supported |
+------------------------------------------------+-----------------+-------------------------------------------+
| ``s3:ObjectRemoved:DeleteMarkerCreated`` | Supported |
+------------------------------------------------+-----------------+-------------------------------------------+
| ``s3:ObjectLifecycle:Expiration:Current`` | Supported, Ceph extension |
+------------------------------------------------+-----------------+-------------------------------------------+
| ``s3:ObjectLifecycle:Expiration:NonCurrent`` | Supported, Ceph extension |
+------------------------------------------------+-----------------+-------------------------------------------+
| ``s3:ObjectLifecycle:Expiration:DeleteMarker`` | Supported, Ceph extension |
+------------------------------------------------+-----------------+-------------------------------------------+
| ``s3:ObjectLifecycle:Expiration:AbortMultipartUpload`` | Defined, Ceph extension (not generated) |
+------------------------------------------------+-----------------+-------------------------------------------+
| ``s3:ObjectLifecycle:Transition:Current`` | Supported, Ceph extension |
+------------------------------------------------+-----------------+-------------------------------------------+
| ``s3:ObjectLifecycle:Transition:NonCurrent`` | Supported, Ceph extension |
+------------------------------------------------+-----------------+-------------------------------------------+
| ``s3:ObjectRestore:Post`` | Not applicable to Ceph |
+------------------------------------------------+-----------------+-------------------------------------------+
| ``s3:ObjectRestore:Complete`` | Not applicable to Ceph |
+------------------------------------------------+-----------------+-------------------------------------------+
| ``s3:ReducedRedundancyLostObject`` | Not applicable to Ceph |
+----------------------------------------------+-----------------+---------------------------------------------+
.. note::

View File

@ -6,17 +6,18 @@ root=`dirname $0`
run_name=$1
command=$2
CEPH_BIN=$root
CEPH_CONF_PATH=$root/run/$run_name
[[ "$run_name" == "noname" ]] && CEPH_CONF_PATH=$root || CEPH_CONF_PATH=$root/run/$run_name
[ -z "$BUILD_DIR" ] && BUILD_DIR=build
if [ -e CMakeCache.txt ]; then
CEPH_BIN=$PWD/bin
CEPH_CONF_PATH=$PWD/run/$run_name
[[ "$run_name" == "noname" ]] && CEPH_CONF_PATH=$PWD || CEPH_CONF_PATH=$PWD/run/$run_name
elif [ -e $root/../${BUILD_DIR}/CMakeCache.txt ]; then
cd $root/../${BUILD_DIR}
CEPH_BIN=$PWD/bin
CEPH_CONF_PATH=$PWD/run/$run_name
[[ "$run_name" == "noname" ]] && CEPH_CONF_PATH=$PWD || CEPH_CONF_PATH=$PWD/run/$run_name
fi
shift 2

View File

@ -26,9 +26,10 @@
#include "rgw_zone.h"
#include "rgw_string.h"
#include "rgw_multi.h"
#include "rgw_sal.h"
#include "rgw_sal_rados.h"
#include "rgw_rados.h"
#include "rgw_lc_tier.h"
#include "rgw_notify.h"
// this seems safe to use, at least for now--arguably, we should
// prefer header-only fmt, in general
@ -565,7 +566,13 @@ struct lc_op_ctx {
}; /* lc_op_ctx */
static int remove_expired_obj(const DoutPrefixProvider *dpp, lc_op_ctx& oc, bool remove_indeed)
static std::string lc_id = "rgw lifecycle";
static std::string lc_req_id = "0";
static int remove_expired_obj(
const DoutPrefixProvider *dpp, lc_op_ctx& oc, bool remove_indeed,
rgw::notify::EventType event_type)
{
auto& store = oc.store;
auto& bucket_info = oc.bucket->get_info();
@ -589,17 +596,60 @@ static int remove_expired_obj(const DoutPrefixProvider *dpp, lc_op_ctx& oc, bool
return ret;
}
obj = bucket->get_object(obj_key);
std::unique_ptr<rgw::sal::Object::DeleteOp> del_op = obj->get_delete_op(&oc.rctx);
// XXXX currently, rgw::sal::Bucket.owner is always null here
std::unique_ptr<rgw::sal::User> user;
if (! bucket->get_owner()) {
auto& bucket_info = bucket->get_info();
user = store->get_user(bucket_info.owner);
// forgive me, lord
if (user) {
bucket->set_owner(user.get());
}
}
del_op->params.versioning_status = obj->get_bucket()->get_info().versioning_status();
obj = bucket->get_object(obj_key);
std::unique_ptr<rgw::sal::Object::DeleteOp> del_op
= obj->get_delete_op(&oc.rctx);
del_op->params.versioning_status
= obj->get_bucket()->get_info().versioning_status();
del_op->params.obj_owner.set_id(rgw_user {meta.owner});
del_op->params.obj_owner.set_name(meta.owner_display_name);
del_op->params.bucket_owner.set_id(bucket_info.owner);
del_op->params.unmod_since = meta.mtime;
del_op->params.marker_version_id = version_id;
return del_op->delete_obj(dpp, null_yield);
std::unique_ptr<rgw::sal::Notification> notify
= store->get_notification(dpp, obj.get(), nullptr, &oc.rctx, event_type,
bucket.get(), lc_id,
const_cast<std::string&>(oc.bucket->get_tenant()),
lc_req_id, null_yield);
/* can eliminate cast when reservation is lifted into Notification */
auto notify_res = static_cast<rgw::sal::RadosNotification*>(notify.get())->get_reservation();
ret = rgw::notify::publish_reserve(dpp, event_type, notify_res, nullptr);
if (ret < 0) {
ldpp_dout(dpp, 1)
<< "ERROR: notify reservation failed, deferring delete of object k="
<< o.key
<< dendl;
return ret;
}
ret = del_op->delete_obj(dpp, null_yield);
if (ret < 0) {
ldpp_dout(dpp, 1) <<
"ERROR: publishing notification failed, with error: " << ret << dendl;
} else {
// send request to notification manager
(void) rgw::notify::publish_commit(
obj.get(), obj->get_obj_size(), ceph::real_clock::now(),
obj->get_attrs()[RGW_ATTR_ETAG].to_str(), version_id, event_type,
notify_res, dpp);
}
return ret;
} /* remove_expired_obj */
class LCOpAction {
@ -1077,7 +1127,8 @@ public:
auto& o = oc.o;
int r;
if (o.is_delete_marker()) {
r = remove_expired_obj(oc.dpp, oc, true);
r = remove_expired_obj(oc.dpp, oc, true,
rgw::notify::ObjectExpirationDeleteMarker);
if (r < 0) {
ldpp_dout(oc.dpp, 0) << "ERROR: current is-dm remove_expired_obj "
<< oc.bucket << ":" << o.key
@ -1090,7 +1141,8 @@ public:
<< " " << oc.wq->thr_name() << dendl;
} else {
/* ! o.is_delete_marker() */
r = remove_expired_obj(oc.dpp, oc, !oc.bucket->versioned());
r = remove_expired_obj(oc.dpp, oc, !oc.bucket->versioned(),
rgw::notify::ObjectExpirationCurrent);
if (r < 0) {
ldpp_dout(oc.dpp, 0) << "ERROR: remove_expired_obj "
<< oc.bucket << ":" << o.key
@ -1137,7 +1189,8 @@ public:
int process(lc_op_ctx& oc) {
auto& o = oc.o;
int r = remove_expired_obj(oc.dpp, oc, true);
int r = remove_expired_obj(oc.dpp, oc, true,
rgw::notify::ObjectExpirationNoncurrent);
if (r < 0) {
ldpp_dout(oc.dpp, 0) << "ERROR: remove_expired_obj (non-current expiration) "
<< oc.bucket << ":" << o.key
@ -1181,7 +1234,8 @@ public:
int process(lc_op_ctx& oc) {
auto& o = oc.o;
int r = remove_expired_obj(oc.dpp, oc, true);
int r = remove_expired_obj(oc.dpp, oc, true,
rgw::notify::ObjectExpirationDeleteMarker);
if (r < 0) {
ldpp_dout(oc.dpp, 0) << "ERROR: remove_expired_obj (delete marker expiration) "
<< oc.bucket << ":" << o.key
@ -1279,11 +1333,11 @@ public:
/* If bucket is versioned, create delete_marker for current version
*/
if (oc.bucket->versioned() && oc.o.is_current() && !oc.o.is_delete_marker()) {
ret = remove_expired_obj(oc.dpp, oc, false);
ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") current & not delete_marker" << " versioned_epoch: " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl;
ret = remove_expired_obj(oc.dpp, oc, false, rgw::notify::ObjectExpiration);
ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") current & not delete_marker" << " versioned_epoch: " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl;
} else {
ret = remove_expired_obj(oc.dpp, oc, true);
ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") not current " << "versioned_epoch: " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl;
ret = remove_expired_obj(oc.dpp, oc, true, rgw::notify::ObjectExpiration);
ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") not current " << "versioned_epoch: " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl;
}
return ret;
}

View File

@ -616,22 +616,25 @@ int remove_persistent_topic(const std::string& topic_name, optional_yield y) {
return s_manager->remove_persistent_topic(topic_name, y);
}
rgw::sal::Object* get_object_with_atttributes(const req_state* s, rgw::sal::Object* obj) {
rgw::sal::Object* get_object_with_atttributes(
const reservation_t& res, rgw::sal::Object* obj) {
// in case of copy obj, the tags and metadata are taken from source
const auto src_obj = s->src_object ? s->src_object.get() : obj;
const auto src_obj = res.src_object ? res.src_object : obj;
if (src_obj->get_attrs().empty()) {
if (!src_obj->get_bucket()) {
src_obj->set_bucket(s->bucket.get());
src_obj->set_bucket(res.bucket);
}
if (src_obj->get_obj_attrs(s->obj_ctx, s->yield, s) < 0) {
if (src_obj->get_obj_attrs(res.obj_ctx, res.yield, res.dpp) < 0) {
return nullptr;
}
}
return src_obj;
}
void metadata_from_attributes(const req_state* s, rgw::sal::Object* obj, KeyValueMap& metadata) {
const auto src_obj = get_object_with_atttributes(s, obj);
static inline void metadata_from_attributes(
reservation_t& res, rgw::sal::Object* obj) {
auto& metadata = res.x_meta_map;
const auto src_obj = get_object_with_atttributes(res, obj);
if (!src_obj) {
return;
}
@ -646,8 +649,9 @@ void metadata_from_attributes(const req_state* s, rgw::sal::Object* obj, KeyValu
}
}
void tags_from_attributes(const req_state* s, rgw::sal::Object* obj, KeyMultiValueMap& tags) {
const auto src_obj = get_object_with_atttributes(s, obj);
static inline void tags_from_attributes(
const reservation_t& res, rgw::sal::Object* obj, KeyMultiValueMap& tags) {
const auto src_obj = get_object_with_atttributes(res, obj);
if (!src_obj) {
return;
}
@ -667,7 +671,7 @@ void tags_from_attributes(const req_state* s, rgw::sal::Object* obj, KeyMultiVal
}
// populate event from request
void populate_event_from_request(const reservation_t& res,
static inline void populate_event(reservation_t& res,
rgw::sal::Object* obj,
uint64_t size,
const ceph::real_time& mtime,
@ -675,16 +679,15 @@ void populate_event_from_request(const reservation_t& res,
const std::string& version,
EventType event_type,
rgw_pubsub_s3_event& event) {
const auto s = res.s;
event.eventTime = mtime;
event.eventName = to_event_string(event_type);
event.userIdentity = s->user->get_id().id; // user that triggered the change
event.x_amz_request_id = s->req_id; // request ID of the original change
event.x_amz_id_2 = s->host_id; // RGW on which the change was made
event.userIdentity = res.user_id; // user that triggered the change
event.x_amz_request_id = res.req_id; // request ID of the original change
event.x_amz_id_2 = res.store->getRados()->host_id; // RGW on which the change was made
// configurationId is filled from notification configuration
event.bucket_name = s->bucket_name;
event.bucket_ownerIdentity = s->bucket_owner.get_id().id;
event.bucket_arn = to_string(rgw::ARN(s->bucket->get_key()));
event.bucket_name = res.bucket->get_name();
event.bucket_ownerIdentity = res.bucket->get_owner()->get_id().id;
event.bucket_arn = to_string(rgw::ARN(res.bucket->get_key()));
event.object_key = res.object_name ? *res.object_name : obj->get_name();
event.object_size = size;
event.object_etag = etag;
@ -695,27 +698,30 @@ void populate_event_from_request(const reservation_t& res,
boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t),
std::back_inserter(event.object_sequencer));
set_event_id(event.id, etag, ts);
event.bucket_id = s->bucket->get_bucket_id();
// pass metadata
if (res.cached_metadata.empty()) {
event.bucket_id = res.bucket->get_bucket_id();
// pass meta data
if (res.x_meta_map.empty()) {
// no metadata cached:
// either no metadata exist or no metadata filter was used
event.x_meta_map = s->info.x_meta_map;
metadata_from_attributes(s, obj, event.x_meta_map);
metadata_from_attributes(res, obj);
} else {
event.x_meta_map = std::move(res.cached_metadata);
event.x_meta_map = res.x_meta_map;
}
// pass tags
if (s->tagset.get_tags().empty()) {
if (!res.tagset ||
(*res.tagset).get_tags().empty()) {
// try to fetch the tags from the attributes
tags_from_attributes(s, obj, event.tags);
tags_from_attributes(res, obj, event.tags);
} else {
event.tags = s->tagset.get_tags();
event.tags = (*res.tagset).get_tags();
}
// opaque data will be filled from topic configuration
}
bool notification_match(reservation_t& res, const rgw_pubsub_topic_filter& filter, EventType event, const RGWObjTags* req_tags) {
static inline bool notification_match(reservation_t& res,
const rgw_pubsub_topic_filter& filter,
EventType event,
const RGWObjTags* req_tags) {
if (!match(filter.events, event)) {
return false;
}
@ -725,12 +731,13 @@ bool notification_match(reservation_t& res, const rgw_pubsub_topic_filter& filte
return false;
}
const auto s = res.s;
if (!filter.s3_filter.metadata_filter.kv.empty()) {
// metadata filter exists
res.cached_metadata = s->info.x_meta_map;
metadata_from_attributes(s, obj, res.cached_metadata);
if (!match(filter.s3_filter.metadata_filter, res.cached_metadata)) {
if (res.s) {
res.x_meta_map = res.s->info.x_meta_map;
}
metadata_from_attributes(res, obj);
if (!match(filter.s3_filter.metadata_filter, res.x_meta_map)) {
return false;
}
}
@ -742,15 +749,15 @@ bool notification_match(reservation_t& res, const rgw_pubsub_topic_filter& filte
if (!match(filter.s3_filter.tag_filter, req_tags->get_tags())) {
return false;
}
} else if (!s->tagset.get_tags().empty()) {
} else if (res.tagset && !(*res.tagset).get_tags().empty()) {
// tags were cached in req_state
if (!match(filter.s3_filter.tag_filter, s->tagset.get_tags())) {
if (!match(filter.s3_filter.tag_filter, (*res.tagset).get_tags())) {
return false;
}
} else {
// try to fetch tags from the attributes
KeyMultiValueMap tags;
tags_from_attributes(s, obj, tags);
tags_from_attributes(res, obj, tags);
if (!match(filter.s3_filter.tag_filter, tags)) {
return false;
}
@ -760,12 +767,13 @@ bool notification_match(reservation_t& res, const rgw_pubsub_topic_filter& filte
return true;
}
int publish_reserve(const DoutPrefixProvider *dpp, EventType event_type,
reservation_t& res,
const RGWObjTags* req_tags)
int publish_reserve(const DoutPrefixProvider* dpp,
EventType event_type,
reservation_t& res,
const RGWObjTags* req_tags)
{
RGWPubSub ps(res.store, res.s->user->get_id().tenant);
RGWPubSub::Bucket ps_bucket(&ps, res.s->bucket->get_key());
RGWPubSub ps(res.store, res.user_tenant);
RGWPubSub::Bucket ps_bucket(&ps, res.bucket->get_key());
rgw_pubsub_bucket_topics bucket_topics;
auto rc = ps_bucket.get_topics(&bucket_topics);
if (rc < 0) {
@ -779,9 +787,9 @@ int publish_reserve(const DoutPrefixProvider *dpp, EventType event_type,
// notification does not apply to req_state
continue;
}
ldpp_dout(dpp, 20) << "INFO: notification: '" << topic_filter.s3_id <<
ldpp_dout(res.dpp, 20) << "INFO: notification: '" << topic_filter.s3_id <<
"' on topic: '" << topic_cfg.dest.arn_topic <<
"' and bucket: '" << res.s->bucket->get_name() <<
"' and bucket: '" << res.bucket->get_name() <<
"' (unique topic: '" << topic_cfg.name <<
"') apply to event of type: '" << to_string(event_type) << "'" << dendl;
@ -795,17 +803,19 @@ int publish_reserve(const DoutPrefixProvider *dpp, EventType event_type,
int rval;
const auto& queue_name = topic_cfg.dest.arn_topic;
cls_2pc_queue_reserve(op, res.size, 1, &obl, &rval);
auto ret = rgw_rados_operate(dpp, res.store->getRados()->get_notif_pool_ctx(),
queue_name, &op, res.s->yield, librados::OPERATION_RETURNVEC);
auto ret = rgw_rados_operate(
res.dpp, res.store->getRados()->get_notif_pool_ctx(),
queue_name, &op, res.yield, librados::OPERATION_RETURNVEC);
if (ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to reserve notification on queue: " << queue_name
<< ". error: " << ret << dendl;
ldpp_dout(res.dpp, 1) <<
"ERROR: failed to reserve notification on queue: "
<< queue_name << ". error: " << ret << dendl;
// if no space is left in queue we ask client to slow down
return (ret == -ENOSPC) ? -ERR_RATE_LIMITED : ret;
}
ret = cls_2pc_queue_reserve_result(obl, res_id);
if (ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to parse reservation id. error: " << ret << dendl;
ldpp_dout(res.dpp, 1) << "ERROR: failed to parse reservation id. error: " << ret << dendl;
return ret;
}
}
@ -815,86 +825,102 @@ int publish_reserve(const DoutPrefixProvider *dpp, EventType event_type,
}
int publish_commit(rgw::sal::Object* obj,
uint64_t size,
const ceph::real_time& mtime,
const std::string& etag,
const std::string& version,
EventType event_type,
reservation_t& res,
const DoutPrefixProvider *dpp)
uint64_t size,
const ceph::real_time& mtime,
const std::string& etag,
const std::string& version,
EventType event_type,
reservation_t& res,
const DoutPrefixProvider* dpp)
{
for (auto& topic : res.topics) {
if (topic.cfg.dest.persistent && topic.res_id == cls_2pc_reservation::NO_ID) {
if (topic.cfg.dest.persistent &&
topic.res_id == cls_2pc_reservation::NO_ID) {
// nothing to commit or already committed/aborted
continue;
}
event_entry_t event_entry;
populate_event_from_request(res, obj, size, mtime, etag, version, event_type, event_entry.event);
populate_event(res, obj, size, mtime, etag, version, event_type, event_entry.event);
event_entry.event.configurationId = topic.configurationId;
event_entry.event.opaque_data = topic.cfg.opaque_data;
if (topic.cfg.dest.persistent) {
event_entry.push_endpoint = std::move(topic.cfg.dest.push_endpoint);
event_entry.push_endpoint_args = std::move(topic.cfg.dest.push_endpoint_args);
event_entry.arn_topic = std::move(topic.cfg.dest.arn_topic);
event_entry.push_endpoint_args =
std::move(topic.cfg.dest.push_endpoint_args);
event_entry.arn_topic = topic.cfg.dest.arn_topic;
bufferlist bl;
encode(event_entry, bl);
const auto& queue_name = topic.cfg.dest.arn_topic;
if (bl.length() > res.size) {
// try to make a larger reservation, fail only if this is not possible
ldpp_dout(dpp, 5) << "WARNING: committed size: " << bl.length() << " exceeded reserved size: " << res.size <<
" . trying to make a larger reservation on queue:" << queue_name << dendl;
ldpp_dout(dpp, 5) << "WARNING: committed size: " << bl.length()
<< " exceeded reserved size: " << res.size
<<
" . trying to make a larger reservation on queue:" << queue_name
<< dendl;
// first cancel the existing reservation
librados::ObjectWriteOperation op;
cls_2pc_queue_abort(op, topic.res_id);
auto ret = rgw_rados_operate(dpp, res.store->getRados()->get_notif_pool_ctx(),
topic.cfg.dest.arn_topic, &op,
res.s->yield);
auto ret = rgw_rados_operate(
dpp, res.store->getRados()->get_notif_pool_ctx(),
topic.cfg.dest.arn_topic, &op,
res.yield);
if (ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to abort reservation: " << topic.res_id <<
ldpp_dout(dpp, 1) << "ERROR: failed to abort reservation: "
<< topic.res_id <<
" when trying to make a larger reservation on queue: " << queue_name
<< ". error: " << ret << dendl;
<< ". error: " << ret << dendl;
return ret;
}
// now try to make a bigger one
bufferlist obl;
buffer::list obl;
int rval;
cls_2pc_queue_reserve(op, bl.length(), 1, &obl, &rval);
ret = rgw_rados_operate(dpp, res.store->getRados()->get_notif_pool_ctx(),
queue_name, &op, res.s->yield, librados::OPERATION_RETURNVEC);
ret = rgw_rados_operate(
dpp, res.store->getRados()->get_notif_pool_ctx(),
queue_name, &op, res.yield, librados::OPERATION_RETURNVEC);
if (ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to reserve extra space on queue: " << queue_name
<< ". error: " << ret << dendl;
ldpp_dout(dpp, 1) << "ERROR: failed to reserve extra space on queue: "
<< queue_name
<< ". error: " << ret << dendl;
return (ret == -ENOSPC) ? -ERR_RATE_LIMITED : ret;
}
ret = cls_2pc_queue_reserve_result(obl, topic.res_id);
if (ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to parse reservation id for extra space. error: " << ret << dendl;
ldpp_dout(dpp, 1) << "ERROR: failed to parse reservation id for "
"extra space. error: " << ret << dendl;
return ret;
}
}
std::vector<bufferlist> bl_data_vec{std::move(bl)};
std::vector<buffer::list> bl_data_vec{std::move(bl)};
librados::ObjectWriteOperation op;
cls_2pc_queue_commit(op, bl_data_vec, topic.res_id);
const auto ret = rgw_rados_operate(dpp, res.store->getRados()->get_notif_pool_ctx(),
queue_name, &op,
res.s->yield);
const auto ret = rgw_rados_operate(
dpp, res.store->getRados()->get_notif_pool_ctx(),
queue_name, &op, res.yield);
topic.res_id = cls_2pc_reservation::NO_ID;
if (ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to commit reservation to queue: " << queue_name
<< ". error: " << ret << dendl;
ldpp_dout(dpp, 1) << "ERROR: failed to commit reservation to queue: "
<< queue_name << ". error: " << ret
<< dendl;
return ret;
}
} else {
try {
// TODO add endpoint LRU cache
const auto push_endpoint = RGWPubSubEndpoint::create(topic.cfg.dest.push_endpoint,
topic.cfg.dest.arn_topic,
RGWHTTPArgs(topic.cfg.dest.push_endpoint_args, dpp),
res.s->cct);
ldpp_dout(dpp, 20) << "INFO: push endpoint created: " << topic.cfg.dest.push_endpoint << dendl;
const auto ret = push_endpoint->send_to_completion_async(res.s->cct, event_entry.event, res.s->yield);
const auto push_endpoint = RGWPubSubEndpoint::create(
topic.cfg.dest.push_endpoint,
topic.cfg.dest.arn_topic,
RGWHTTPArgs(topic.cfg.dest.push_endpoint_args, dpp),
dpp->get_cct());
ldpp_dout(res.dpp, 20) << "INFO: push endpoint created: "
<< topic.cfg.dest.push_endpoint << dendl;
const auto ret = push_endpoint->send_to_completion_async(
dpp->get_cct(), event_entry.event, res.yield);
if (ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: push to endpoint " << topic.cfg.dest.push_endpoint << " failed. error: " << ret << dendl;
ldpp_dout(dpp, 1) << "ERROR: push to endpoint "
<< topic.cfg.dest.push_endpoint
<< " failed. error: " << ret << dendl;
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
return ret;
}
@ -910,20 +936,22 @@ int publish_commit(rgw::sal::Object* obj,
return 0;
}
int publish_abort(const DoutPrefixProvider *dpp, reservation_t& res) {
extern int publish_abort(reservation_t& res) {
for (auto& topic : res.topics) {
if (!topic.cfg.dest.persistent || topic.res_id == cls_2pc_reservation::NO_ID) {
if (!topic.cfg.dest.persistent ||
topic.res_id == cls_2pc_reservation::NO_ID) {
// nothing to abort or already committed/aborted
continue;
}
const auto& queue_name = topic.cfg.dest.arn_topic;
librados::ObjectWriteOperation op;
cls_2pc_queue_abort(op, topic.res_id);
const auto ret = rgw_rados_operate(dpp, res.store->getRados()->get_notif_pool_ctx(),
queue_name, &op,
res.s->yield);
const auto ret = rgw_rados_operate(
res.dpp, res.store->getRados()->get_notif_pool_ctx(),
queue_name, &op, res.yield);
if (ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to abort reservation: " << topic.res_id <<
ldpp_dout(res.dpp, 1) << "ERROR: failed to abort reservation: "
<< topic.res_id <<
" from queue: " << queue_name << ". error: " << ret << dendl;
return ret;
}
@ -932,9 +960,45 @@ int publish_abort(const DoutPrefixProvider *dpp, reservation_t& res) {
return 0;
}
reservation_t::reservation_t(const DoutPrefixProvider* _dpp,
rgw::sal::RadosStore* _store,
req_state* _s,
rgw::sal::Object* _object,
rgw::sal::Object* _src_object,
const std::string* _object_name) :
dpp(_s), store(_store), s(_s), size(0) /* XXX */, obj_ctx(_s->obj_ctx),
object(_object), src_object(_src_object), bucket(_s->bucket.get()),
object_name(_object_name),
tagset(_s->tagset),
x_meta_map(_s->info.x_meta_map),
user_id(_s->user->get_id().id),
user_tenant(_s->user->get_id().tenant),
req_id(_s->req_id),
yield(_s->yield)
{}
reservation_t::reservation_t(const DoutPrefixProvider* _dpp,
rgw::sal::RadosStore* _store,
RGWObjectCtx* _obj_ctx,
rgw::sal::Object* _object,
rgw::sal::Object* _src_object,
rgw::sal::Bucket* _bucket,
std::string& _user_id,
std::string& _user_tenant,
std::string& _req_id,
optional_yield y) :
dpp(_dpp), store(_store), s(nullptr), size(0) /* XXX */,
obj_ctx(_obj_ctx),
object(_object), src_object(_src_object), bucket(_bucket),
object_name(nullptr),
user_id(_user_id),
user_tenant(_user_tenant),
req_id(_req_id),
yield(y)
{}
reservation_t::~reservation_t() {
publish_abort(dpp, *this);
}
publish_abort(*this);
}
} // namespace rgw::notify

View File

@ -43,27 +43,52 @@ int remove_persistent_topic(const std::string& topic_name, optional_yield y);
// then used to commit or abort the reservation
struct reservation_t {
struct topic_t {
topic_t(const std::string& _configurationId, const rgw_pubsub_topic& _cfg, cls_2pc_reservation::id_t _res_id) :
configurationId(_configurationId), cfg(_cfg), res_id(_res_id) {}
topic_t(const std::string& _configurationId, const rgw_pubsub_topic& _cfg,
cls_2pc_reservation::id_t _res_id) :
configurationId(_configurationId), cfg(_cfg), res_id(_res_id) {}
const std::string configurationId;
const rgw_pubsub_topic cfg;
std::string configurationId;
rgw_pubsub_topic cfg;
// res_id is reset after topic is committed/aborted
cls_2pc_reservation::id_t res_id;
};
const DoutPrefixProvider *dpp;
const DoutPrefixProvider* dpp;
std::vector<topic_t> topics;
rgw::sal::RadosStore* const store;
const req_state* const s;
size_t size;
RGWObjectCtx* obj_ctx;
rgw::sal::Object* const object;
rgw::sal::Object* const src_object; // may differ from object
rgw::sal::Bucket* const bucket;
const std::string* const object_name;
KeyValueMap cached_metadata;
boost::optional<RGWObjTags&> tagset;
meta_map_t x_meta_map; // metadata cached by value
std::string user_id;
std::string user_tenant;
std::string req_id;
optional_yield yield;
reservation_t(const DoutPrefixProvider *_dpp, rgw::sal::RadosStore* _store, const req_state* _s,
rgw::sal::Object* _object, const std::string* _object_name) :
dpp(_dpp), store(_store), s(_s), object(_object), object_name(_object_name) {}
/* ctor for rgw_op callers */
reservation_t(const DoutPrefixProvider* _dpp,
rgw::sal::RadosStore* _store,
req_state* _s,
rgw::sal::Object* _object,
rgw::sal::Object* _src_object,
const std::string* _object_name);
/* ctor for non-request caller (e.g., lifecycle) */
reservation_t(const DoutPrefixProvider* _dpp,
rgw::sal::RadosStore* _store,
RGWObjectCtx* _obj_ctx,
rgw::sal::Object* _object,
rgw::sal::Object* _src_object,
rgw::sal::Bucket* _bucket,
std::string& _user_id,
std::string& _user_tenant,
std::string& _req_id,
optional_yield y);
// dtor doing resource leak guarding
// aborting the reservation if not already committed or aborted
@ -71,10 +96,10 @@ struct reservation_t {
};
// create a reservation on the 2-phase-commit queue
int publish_reserve(const DoutPrefixProvider *dpp,
EventType event_type,
reservation_t& reservation,
const RGWObjTags* req_tags);
int publish_reserve(const DoutPrefixProvider *dpp,
EventType event_type,
reservation_t& reservation,
const RGWObjTags* req_tags);
// commit the reservation to the queue
int publish_commit(rgw::sal::Object* obj,

View File

@ -8,43 +8,73 @@ namespace rgw::notify {
std::string to_string(EventType t) {
switch (t) {
case ObjectCreated:
return "s3:ObjectCreated:*";
case ObjectCreatedPut:
return "s3:ObjectCreated:Put";
case ObjectCreatedPost:
return "s3:ObjectCreated:Post";
case ObjectCreatedCopy:
return "s3:ObjectCreated:Copy";
case ObjectCreatedCompleteMultipartUpload:
return "s3:ObjectCreated:CompleteMultipartUpload";
case ObjectRemoved:
return "s3:ObjectRemoved:*";
case ObjectRemovedDelete:
return "s3:ObjectRemoved:Delete";
case ObjectRemovedDeleteMarkerCreated:
return "s3:ObjectRemoved:DeleteMarkerCreated";
case UnknownEvent:
return "s3:UnknownEvet";
case ObjectCreated:
return "s3:ObjectCreated:*";
case ObjectCreatedPut:
return "s3:ObjectCreated:Put";
case ObjectCreatedPost:
return "s3:ObjectCreated:Post";
case ObjectCreatedCopy:
return "s3:ObjectCreated:Copy";
case ObjectCreatedCompleteMultipartUpload:
return "s3:ObjectCreated:CompleteMultipartUpload";
case ObjectRemoved:
return "s3:ObjectRemoved:*";
case ObjectRemovedDelete:
return "s3:ObjectRemoved:Delete";
case ObjectRemovedDeleteMarkerCreated:
return "s3:ObjectRemoved:DeleteMarkerCreated";
case ObjectLifecycle:
return "s3:ObjectLifecycle:*";
case ObjectExpiration:
return "s3:ObjectLifecycle:Expiration:*";
case ObjectExpirationCurrent:
return "s3:ObjectLifecycle:Expiration:Current";
case ObjectExpirationNoncurrent:
return "s3:ObjectLifecycle:Expiration:Noncurrent";
case ObjectExpirationDeleteMarker:
return "s3:ObjectLifecycle:Expiration:DeleteMarker";
case ObjectExpirationAbortMPU:
return "s3:ObjectLifecycle:Expiration:AbortMPU";
case ObjectTransition:
return "s3:ObjectLifecycle:Transition:*";
case ObjectTransitionCurrent:
return "s3:ObjectLifecycle:Transition:Current";
case ObjectTransitionNoncurrent:
return "s3:ObjectLifecycle:Transition:Noncurrent";
case UnknownEvent:
return "s3:UnknownEvent";
}
return "s3:UnknownEvent";
}
std::string to_ceph_string(EventType t) {
switch (t) {
case ObjectCreated:
case ObjectCreatedPut:
case ObjectCreatedPost:
case ObjectCreatedCopy:
case ObjectCreatedCompleteMultipartUpload:
return "OBJECT_CREATE";
case ObjectRemovedDelete:
return "OBJECT_DELETE";
case ObjectRemovedDeleteMarkerCreated:
return "DELETE_MARKER_CREATE";
case ObjectRemoved:
case UnknownEvent:
return "UNKNOWN_EVENT";
case ObjectCreated:
case ObjectCreatedPut:
case ObjectCreatedPost:
case ObjectCreatedCopy:
case ObjectCreatedCompleteMultipartUpload:
return "OBJECT_CREATE";
case ObjectRemovedDelete:
return "OBJECT_DELETE";
case ObjectRemovedDeleteMarkerCreated:
return "DELETE_MARKER_CREATE";
case ObjectLifecycle:
return "OBJECT_LIFECYCLE";
case ObjectExpiration:
case ObjectExpirationCurrent:
case ObjectExpirationNoncurrent:
case ObjectExpirationDeleteMarker:
case ObjectExpirationAbortMPU:
return "OBJECT_EXPIRATION";
case ObjectTransition:
case ObjectTransitionCurrent:
case ObjectTransitionNoncurrent:
return "OBJECT_TRANSITION";
case ObjectRemoved:
case UnknownEvent:
return "UNKNOWN_EVENT";
}
return "UNKNOWN_EVENT";
}
@ -70,6 +100,24 @@ namespace rgw::notify {
return ObjectRemovedDelete;
if (s == "s3:ObjectRemoved:DeleteMarkerCreated" || s == "DELETE_MARKER_CREATE")
return ObjectRemovedDeleteMarkerCreated;
if (s == "s3:ObjectLifecycle:*")
return ObjectLifecycle;
if (s == "s3:ObjectLifecycle:Expiration:*" || s == "OBJECT_EXPIRATION")
return ObjectExpiration;
if (s == "s3:ObjectLifecycle:Expiration:Current")
return ObjectExpirationCurrent;
if (s == "s3:ObjectLifecycle:Expiration:Noncurrent")
return ObjectExpirationNoncurrent;
if (s == "s3:ObjectLifecycle:Expiration:DeleteMarker")
return ObjectExpirationDeleteMarker;
if (s == "s3:ObjectLifecycle:Expiration:AbortMultipartUpload")
return ObjectExpirationAbortMPU;
if (s == "s3:ObjectLifecycle:Transition:*" || s == "OBJECT_TRANSITION")
return ObjectTransition;
if (s == "s3:ObjectLifecycle:Transition:Current")
return ObjectTransitionCurrent;
if (s == "s3:ObjectLifecycle:Transition:Noncurrent")
return ObjectTransitionNoncurrent;
return UnknownEvent;
}

View File

@ -15,7 +15,17 @@ namespace rgw::notify {
ObjectRemoved = 0xF0,
ObjectRemovedDelete = 0x10,
ObjectRemovedDeleteMarkerCreated = 0x20,
UnknownEvent = 0x100
// lifecycle events (RGW extension)
ObjectLifecycle = 0xFF00,
ObjectExpiration = 0xF00,
ObjectExpirationCurrent = 0x100,
ObjectExpirationNoncurrent = 0x200,
ObjectExpirationDeleteMarker = 0x400,
ObjectExpirationAbortMPU = 0x800,
ObjectTransition = 0xF000,
ObjectTransitionCurrent = 0x1000,
ObjectTransitionNoncurrent = 0x2000,
UnknownEvent = 0x10000
};
using EventTypeList = std::vector<EventType>;

View File

@ -3904,8 +3904,10 @@ void RGWPutObj::execute(optional_yield y)
}
// make reservation for notification if needed
std::unique_ptr<rgw::sal::Notification> res = store->get_notification(s->object.get(),
s, rgw::notify::ObjectCreatedPut);
std::unique_ptr<rgw::sal::Notification> res
= store->get_notification(
s->object.get(), s->src_object.get(), s,
rgw::notify::ObjectCreatedPut);
if(!multipart) {
op_ret = res->publish_reserve(this, obj_tags.get());
if (op_ret < 0) {
@ -4304,7 +4306,8 @@ void RGWPostObj::execute(optional_yield y)
}
// make reservation for notification if needed
std::unique_ptr<rgw::sal::Notification> res = store->get_notification(s->object.get(), s, rgw::notify::ObjectCreatedPost);
std::unique_ptr<rgw::sal::Notification> res
= store->get_notification(s->object.get(), s->src_object.get(), s, rgw::notify::ObjectCreatedPost);
op_ret = res->publish_reserve(this);
if (op_ret < 0) {
return;
@ -4985,10 +4988,13 @@ void RGWDeleteObj::execute(optional_yield y)
// make reservation for notification if needed
const auto versioned_object = s->bucket->versioning_enabled();
const auto event_type = versioned_object && s->object->get_instance().empty() ?
rgw::notify::ObjectRemovedDeleteMarkerCreated : rgw::notify::ObjectRemovedDelete;
std::unique_ptr<rgw::sal::Notification> res = store->get_notification(s->object.get(),
s, event_type);
const auto event_type = versioned_object &&
s->object->get_instance().empty() ?
rgw::notify::ObjectRemovedDeleteMarkerCreated :
rgw::notify::ObjectRemovedDelete;
std::unique_ptr<rgw::sal::Notification> res
= store->get_notification(s->object.get(), s->src_object.get(), s,
event_type);
op_ret = res->publish_reserve(this);
if (op_ret < 0) {
return;
@ -5388,8 +5394,10 @@ void RGWCopyObj::execute(optional_yield y)
return;
// make reservation for notification if needed
std::unique_ptr<rgw::sal::Notification> res = store->get_notification(s->object.get(),
s, rgw::notify::ObjectCreatedCopy);
std::unique_ptr<rgw::sal::Notification> res
= store->get_notification(
s->object.get(), s->src_object.get(),
s, rgw::notify::ObjectCreatedCopy);
op_ret = res->publish_reserve(this);
if (op_ret < 0) {
return;
@ -6349,8 +6357,8 @@ void RGWCompleteMultipart::execute(optional_yield y)
// make reservation for notification if needed
std::unique_ptr<rgw::sal::Notification> res = store->get_notification(meta_obj.get(),
s, rgw::notify::ObjectCreatedCompleteMultipartUpload, &s->object->get_name());
std::unique_ptr<rgw::sal::Notification> res
= store->get_notification(meta_obj.get(), nullptr, s, rgw::notify::ObjectCreatedCompleteMultipartUpload, &s->object->get_name());
op_ret = res->publish_reserve(this);
if (op_ret < 0) {
return;
@ -6915,10 +6923,11 @@ void RGWDeleteMultiObj::execute(optional_yield y)
// make reservation for notification if needed
const auto versioned_object = s->bucket->versioning_enabled();
const auto event_type = versioned_object && obj->get_instance().empty() ?
rgw::notify::ObjectRemovedDeleteMarkerCreated : rgw::notify::ObjectRemovedDelete;
std::unique_ptr<rgw::sal::Notification> res = store->get_notification(obj.get(),
s, event_type);
const auto event_type = versioned_object && obj->get_instance().empty() ?
rgw::notify::ObjectRemovedDeleteMarkerCreated :
rgw::notify::ObjectRemovedDelete;
std::unique_ptr<rgw::sal::Notification> res
= store->get_notification(obj.get(), s->src_object.get(), s, event_type);
op_ret = res->publish_reserve(this);
if (op_ret < 0) {
send_partial_response(*iter, false, "", op_ret);

View File

@ -284,10 +284,18 @@ class Store {
virtual std::unique_ptr<Lifecycle> get_lifecycle(void) = 0;
/** Get a @a Completions object. Used for Async I/O tracking */
virtual std::unique_ptr<Completions> get_completions(void) = 0;
/** Get a @a Notification object. Used to communicate with non-RGW daemons, such as
* management/tracking software */
virtual std::unique_ptr<Notification> get_notification(rgw::sal::Object* obj, struct req_state* s,
/** Get a @a Notification object. Used to communicate with non-RGW daemons, such as
* management/tracking software */
/** RGWOp variant */
virtual std::unique_ptr<Notification> get_notification(rgw::sal::Object* obj, rgw::sal::Object* src_obj, struct req_state* s,
rgw::notify::EventType event_type, const std::string* object_name=nullptr) = 0;
/** No-req_state variant (e.g., rgwlc) */
virtual std::unique_ptr<Notification> get_notification(
const DoutPrefixProvider* dpp, rgw::sal::Object* obj, rgw::sal::Object* src_obj, RGWObjectCtx* rctx,
rgw::notify::EventType event_type, rgw::sal::Bucket* _bucket, std::string& _user_id, std::string& _user_tenant,
std::string& _req_id, optional_yield y) = 0;
/** Get access to the lifecycle management thread */
virtual RGWLC* get_rgwlc(void) = 0;
/** Get access to the coroutine registry. Used to create new coroutine managers */
@ -630,6 +638,12 @@ class Bucket {
virtual RGWAccessControlPolicy& get_acl(void) = 0;
/** Set the ACL for this bucket */
virtual int set_acl(const DoutPrefixProvider* dpp, RGWAccessControlPolicy& acl, optional_yield y) = 0;
// XXXX hack
void set_owner(rgw::sal::User* _owner) {
owner = _owner;
}
/** Load this bucket from the backing store. Requires the key to be set, fills other fields */
virtual int load_bucket(const DoutPrefixProvider* dpp, optional_yield y) = 0;
/** Read the bucket stats from the backing Store, synchronous */
@ -1319,10 +1333,14 @@ public:
class Notification {
protected:
Object* obj;
Object* src_obj;
rgw::notify::EventType event_type;
public:
Notification(Object* _obj, rgw::notify::EventType _type) : obj(_obj), event_type(_type) {}
Notification(Object* _obj, Object* _src_obj, rgw::notify::EventType _type)
: obj(_obj), src_obj(_src_obj), event_type(_type)
{}
virtual ~Notification() = default;
/** Indicate the start of the event associated with this notification */

View File

@ -1716,11 +1716,21 @@ namespace rgw::sal {
return new LCDBSerializer(store, oid, lock_name, cookie);
}
std::unique_ptr<Notification> DBStore::get_notification(rgw::sal::Object* obj,
struct req_state* s,
rgw::notify::EventType event_type, const std::string* object_name)
std::unique_ptr<Notification> DBStore::get_notification(
rgw::sal::Object* obj, rgw::sal::Object* src_obj, struct req_state* s,
rgw::notify::EventType event_type, const std::string* object_name)
{
return std::make_unique<DBNotification>(obj, event_type);
return std::make_unique<DBNotification>(obj, src_obj, event_type);
}
std::unique_ptr<Notification> DBStore::get_notification(
const DoutPrefixProvider* dpp, rgw::sal::Object* obj,
rgw::sal::Object* src_obj, RGWObjectCtx* rctx,
rgw::notify::EventType event_type, rgw::sal::Bucket* _bucket,
std::string& _user_id, std::string& _user_tenant, std::string& _req_id,
optional_yield y)
{
return std::make_unique<DBNotification>(obj, src_obj, event_type);
}
RGWLC* DBStore::get_rgwlc(void) {

View File

@ -59,11 +59,9 @@ public:
class DBNotification : public Notification {
protected:
Object* obj;
rgw::notify::EventType event_type;
public:
DBNotification(Object* _obj, rgw::notify::EventType _type) : Notification(_obj, _type), obj(_obj), event_type(_type) {}
DBNotification(Object* _obj, Object* _src_obj, rgw::notify::EventType _type)
: Notification(_obj, _src_obj, _type) {}
~DBNotification() = default;
virtual int publish_reserve(const DoutPrefixProvider *dpp, RGWObjTags* obj_tags = nullptr) override { return 0;}
@ -715,11 +713,20 @@ public:
virtual int cluster_stat(RGWClusterStat& stats) override;
virtual std::unique_ptr<Lifecycle> get_lifecycle(void) override;
virtual std::unique_ptr<Completions> get_completions(void) override;
virtual std::unique_ptr<Notification> get_notification(rgw::sal::Object* obj, struct req_state* s,
rgw::notify::EventType event_type, const std::string* object_name=nullptr) override;
virtual std::unique_ptr<Notification> get_notification(
rgw::sal::Object* obj, rgw::sal::Object* src_obj, struct req_state* s,
rgw::notify::EventType event_type, const std::string* object_name) override;
virtual std::unique_ptr<Notification> get_notification(
const DoutPrefixProvider* dpp, rgw::sal::Object* obj,
rgw::sal::Object* src_obj, RGWObjectCtx* rctx,
rgw::notify::EventType event_type, rgw::sal::Bucket* _bucket,
std::string& _user_id, std::string& _user_tenant, std::string& _req_id,
optional_yield y) override;
virtual RGWLC* get_rgwlc(void) override;
virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return NULL; }
virtual int log_usage(const DoutPrefixProvider *dpp, map<rgw_user_bucket, RGWUsageBatch>& usage_info) override;
virtual int log_op(const DoutPrefixProvider *dpp, std::string& oid, bufferlist& bl) override;
virtual int register_to_service_map(const DoutPrefixProvider *dpp, const string& daemon_type,

View File

@ -1141,12 +1141,15 @@ std::unique_ptr<Completions> RadosStore::get_completions(void)
return std::make_unique<RadosCompletions>();
}
std::unique_ptr<Notification> RadosStore::get_notification(rgw::sal::Object* obj,
struct req_state* s,
rgw::notify::EventType event_type,
const std::string* object_name)
std::unique_ptr<Notification> RadosStore::get_notification(
rgw::sal::Object* obj, rgw::sal::Object* src_obj, struct req_state* s, rgw::notify::EventType event_type, const std::string* object_name)
{
return std::make_unique<RadosNotification>(s, this, obj, s, event_type, object_name);
return std::make_unique<RadosNotification>(s, this, obj, src_obj, s, event_type, object_name);
}
std::unique_ptr<Notification> RadosStore::get_notification(const DoutPrefixProvider* dpp, rgw::sal::Object* obj, rgw::sal::Object* src_obj, RGWObjectCtx* rctx, rgw::notify::EventType event_type, rgw::sal::Bucket* _bucket, std::string& _user_id, std::string& _user_tenant, std::string& _req_id, optional_yield y)
{
return std::make_unique<RadosNotification>(dpp, this, obj, src_obj, rctx, event_type, _bucket, _user_id, _user_tenant, _req_id, y);
}
int RadosStore::delete_raw_obj(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj)

View File

@ -396,7 +396,12 @@ class RadosStore : public Store {
virtual int cluster_stat(RGWClusterStat& stats) override;
virtual std::unique_ptr<Lifecycle> get_lifecycle(void) override;
virtual std::unique_ptr<Completions> get_completions(void) override;
virtual std::unique_ptr<Notification> get_notification(rgw::sal::Object* obj, struct req_state* s, rgw::notify::EventType event_type, const std::string* object_name=nullptr) override;
// op variant
virtual std::unique_ptr<Notification> get_notification(rgw::sal::Object* obj, rgw::sal::Object* src_obj, struct req_state* s, rgw::notify::EventType event_type, const std::string* object_name=nullptr) override;
// non-op variant (e.g., rgwlc)
virtual std::unique_ptr<Notification> get_notification(const DoutPrefixProvider* dpp, rgw::sal::Object* obj, rgw::sal::Object* src_obj, RGWObjectCtx* rctx, rgw::notify::EventType event_type, rgw::sal::Bucket* _bucket, std::string& _user_id, std::string& _user_tenant, std::string& _req_id, optional_yield y) override;
virtual RGWLC* get_rgwlc(void) override { return rados->get_lc(); }
virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return rados->get_cr_registry(); }
@ -608,14 +613,26 @@ public:
class RadosNotification : public Notification {
RadosStore* store;
/* XXX it feels incorrect to me that rgw::notify::reservation_t is
* currently RADOS-specific; instead, I think notification types such as
* reservation_t should be generally visible, whereas the internal
* notification behavior should be made portable (e.g., notification
* to non-RADOS message sinks) */
rgw::notify::reservation_t res;
public:
RadosNotification(const DoutPrefixProvider *_dpp, RadosStore* _store, Object* _obj, req_state* _s,
rgw::notify::EventType _type, const std::string* object_name=nullptr) :
Notification(_obj, _type), store(_store), res(_dpp, _store, _s, _obj, object_name) { }
RadosNotification(const DoutPrefixProvider* _dpp, RadosStore* _store, Object* _obj, Object* _src_obj, req_state* _s, rgw::notify::EventType _type, const std::string* object_name=nullptr) :
Notification(_obj, _src_obj, _type), store(_store), res(_dpp, _store, _s, _obj, _src_obj, object_name) { }
RadosNotification(const DoutPrefixProvider* _dpp, RadosStore* _store, Object* _obj, Object* _src_obj, RGWObjectCtx* rctx, rgw::notify::EventType _type, rgw::sal::Bucket* _bucket, std::string& _user_id, std::string& _user_tenant, std::string& _req_id, optional_yield y) :
Notification(_obj, _src_obj, _type), store(_store), res(_dpp, _store, rctx, _obj, _src_obj, _bucket, _user_id, _user_tenant, _req_id, y) {}
~RadosNotification() = default;
rgw::notify::reservation_t& get_reservation(void) {
return res;
}
virtual int publish_reserve(const DoutPrefixProvider *dpp, RGWObjTags* obj_tags = nullptr) override;
virtual int publish_commit(const DoutPrefixProvider* dpp, uint64_t size,
const ceph::real_time& mtime, const std::string& etag, const std::string& version) override;

View File

@ -359,7 +359,8 @@ private:
std::string events_str = s->info.args.get("events", &exists);
if (!exists) {
// if no events are provided, we notify on all of them
events_str = "OBJECT_CREATE,OBJECT_DELETE,DELETE_MARKER_CREATE";
events_str =
"OBJECT_CREATE,OBJECT_DELETE,DELETE_MARKER_CREATE,OBJECT_EXPIRATION";
}
rgw::notify::from_string_list(events_str, events);
if (std::find(events.begin(), events.end(), rgw::notify::UnknownEvent) != events.end()) {

View File

@ -10,6 +10,8 @@ from urllib import parse as urlparse
from time import gmtime, strftime
import boto3
from botocore.client import Config
import os
import subprocess
log = logging.getLogger('bucket_notification.tests')
@ -231,3 +233,19 @@ class PSNotificationS3:
parameters = {'notification': notification}
return self.send_request('DELETE', parameters)
test_path = os.path.normpath(os.path.dirname(os.path.realpath(__file__))) + '/../'
def bash(cmd, **kwargs):
log.debug('running command: %s', ' '.join(cmd))
kwargs['stdout'] = subprocess.PIPE
process = subprocess.Popen(cmd, **kwargs)
s = process.communicate()[0].decode('utf-8')
return (s, process.returncode)
def admin(args, **kwargs):
""" radosgw-admin command """
cmd = [test_path + 'test-rgw-call.sh', 'call_rgw_admin', 'noname'] + args
return bash(cmd, **kwargs)

View File

@ -13,6 +13,8 @@ from http import server as http_server
from random import randint
import hashlib
from nose.plugins.attrib import attr
import boto3
import datetime
from boto.s3.connection import S3Connection
@ -27,7 +29,8 @@ from .api import PSTopicS3, \
PSNotificationS3, \
delete_all_s3_topics, \
delete_all_objects, \
put_object_tagging
put_object_tagging, \
admin
from nose import SkipTest
from nose.tools import assert_not_equal, assert_equal, assert_in
@ -1259,8 +1262,12 @@ def test_ps_s3_notification_push_kafka_on_master():
# name is constant for manual testing
topic_name = bucket_name+'_topic'
# create consumer on the topic
try:
s3_notification_conf = None
topic_conf1 = None
topic_conf2 = None
receiver = None
task, receiver = create_kafka_receiver_thread(topic_name+'_1')
task.start()
@ -1324,15 +1331,21 @@ def test_ps_s3_notification_push_kafka_on_master():
print('wait for 5sec for the messages...')
time.sleep(5)
receiver.verify_s3_events(keys, exact_match=True, deletions=True, etags=etags)
except Exception as e:
print(e)
assert False
finally:
# cleanup
s3_notification_conf.del_config()
topic_conf1.del_config()
topic_conf2.del_config()
if s3_notification_conf is not None:
s3_notification_conf.del_config()
if topic_conf1 is not None:
topic_conf1.del_config()
if topic_conf2 is not None:
topic_conf2.del_config()
# delete the bucket
conn.delete_bucket(bucket_name)
stop_kafka_receiver(receiver, task)
if receiver is not None:
stop_kafka_receiver(receiver, task)
@attr('http_test')
@ -1556,6 +1569,109 @@ def test_ps_s3_opaque_data_on_master():
conn.delete_bucket(bucket_name)
http_server.close()
@attr('http_test')
def test_ps_s3_lifecycle_on_master():
""" test that when object is deleted due to lifecycle policy, notification is sent on master """
hostname = get_ip()
conn = connection()
zonegroup = 'default'
# create random port for the http server
host = get_ip()
port = random.randint(10000, 20000)
# start an http server in a separate thread
number_of_objects = 10
http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
# create bucket
bucket_name = gen_bucket_name()
bucket = conn.create_bucket(bucket_name)
topic_name = bucket_name + TOPIC_SUFFIX
# create s3 topic
endpoint_address = 'http://'+host+':'+str(port)
endpoint_args = 'push-endpoint='+endpoint_address
opaque_data = 'http://1.2.3.4:8888'
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args, opaque_data=opaque_data)
topic_arn = topic_conf.set_config()
# create s3 notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name,
'TopicArn': topic_arn,
'Events': ['s3:ObjectLifecycle:Expiration:*']
}]
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
# create objects in the bucket
obj_prefix = 'ooo'
client_threads = []
start_time = time.time()
content = 'bar'
for i in range(number_of_objects):
key = bucket.new_key(obj_prefix + str(i))
thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
thr.start()
client_threads.append(thr)
[thr.join() for thr in client_threads]
time_diff = time.time() - start_time
print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
# create lifecycle policy
client = boto3.client('s3',
endpoint_url='http://'+conn.host+':'+str(conn.port),
aws_access_key_id=conn.aws_access_key_id,
aws_secret_access_key=conn.aws_secret_access_key)
yesterday = datetime.date.today() - datetime.timedelta(days=1)
response = client.put_bucket_lifecycle_configuration(Bucket=bucket_name,
LifecycleConfiguration={'Rules': [
{
'ID': 'rule1',
'Expiration': {'Date': yesterday.isoformat()},
'Filter': {'Prefix': obj_prefix},
'Status': 'Enabled',
}
]
}
)
# start lifecycle processing
admin(['lc', 'process'])
print('wait for 5sec for the messages...')
time.sleep(5)
# check http receiver does not have messages
keys = list(bucket.list())
print('total number of objects: ' + str(len(keys)))
event_keys = []
events = http_server.get_and_reset_events()
for event in events:
assert_equal(event['Records'][0]['eventName'], 'ObjectLifecycle:Expiration:Current')
event_keys.append(event['Records'][0]['s3']['object']['key'])
for key in keys:
key_found = False
for event_key in event_keys:
if event_key == key:
key_found = True
break
if not key_found:
err = 'no lifecycle event found for key: ' + str(key)
log.error(events)
assert False, err
# cleanup
for key in keys:
key.delete()
[thr.join() for thr in client_threads]
topic_conf.del_config()
s3_notification_conf.del_config(notification=notification_name)
# delete the bucket
conn.delete_bucket(bucket_name)
http_server.close()
def ps_s3_creation_triggers_on_master(external_endpoint_address=None, ca_location=None, verify_ssl='true'):
""" test object creation s3 notifications in using put/copy/post on master"""