mirror of
https://github.com/ceph/ceph
synced 2025-02-07 19:03:18 +00:00
rgw/notifications: send correct size in case of delete marker creation
Fixes: https://tracker.ceph.com/issues/51681 Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
This commit is contained in:
parent
c79942df42
commit
d81e27faa1
@ -105,6 +105,10 @@ Event Types
|
||||
| ``s3:ReducedRedundancyLostObject`` | Not applicable to Ceph |
|
||||
+----------------------------------------------+-----------------+-------------------------------------------+
|
||||
|
||||
.. note::
|
||||
|
||||
The ``s3:ObjectRemoved:DeleteMarkerCreated`` event presents information on the latest version of the object
|
||||
|
||||
Topic Configuration
|
||||
-------------------
|
||||
In the case of bucket notifications, the topics management API will be derived from `AWS Simple Notification Service API`_.
|
||||
|
@ -4810,59 +4810,71 @@ void RGWDeleteObj::execute(optional_yield y)
|
||||
}
|
||||
s->object->set_bucket(s->bucket.get());
|
||||
|
||||
rgw::sal::Attrs attrs;
|
||||
|
||||
bool check_obj_lock = s->object->have_instance() && s->bucket->get_info().obj_lock_enabled();
|
||||
|
||||
if (!rgw::sal::Object::empty(s->object.get())) {
|
||||
op_ret = s->object->get_obj_attrs(s->obj_ctx, s->yield, this);
|
||||
if (op_ret < 0) {
|
||||
if (need_object_expiration() || multipart_delete) {
|
||||
return;
|
||||
uint64_t obj_size = 0;
|
||||
std::string etag;
|
||||
RGWObjectCtx* obj_ctx = static_cast<RGWObjectCtx *>(s->obj_ctx);
|
||||
{
|
||||
RGWObjState* astate = nullptr;
|
||||
bool check_obj_lock = s->object->have_instance() && s->bucket->get_info().obj_lock_enabled();
|
||||
|
||||
op_ret = s->object->get_obj_state(this, obj_ctx, &astate, s->yield, true);
|
||||
if (op_ret < 0) {
|
||||
if (need_object_expiration() || multipart_delete) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (check_obj_lock) {
|
||||
/* check if obj exists, read orig attrs */
|
||||
if (op_ret == -ENOENT) {
|
||||
/* object maybe delete_marker, skip check_obj_lock*/
|
||||
check_obj_lock = false;
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
obj_size = astate->size;
|
||||
etag = astate->attrset[RGW_ATTR_ETAG].to_str();
|
||||
}
|
||||
|
||||
// ignore return value from get_obj_attrs in all other cases
|
||||
op_ret = 0;
|
||||
|
||||
if (check_obj_lock) {
|
||||
/* check if obj exists, read orig attrs */
|
||||
if (op_ret == -ENOENT) {
|
||||
/* object maybe delete_marker, skip check_obj_lock*/
|
||||
check_obj_lock = false;
|
||||
} else {
|
||||
ceph_assert(astate);
|
||||
int object_lock_response = verify_object_lock(this, astate->attrset, bypass_perm, bypass_governance_mode);
|
||||
if (object_lock_response != 0) {
|
||||
op_ret = object_lock_response;
|
||||
if (op_ret == -EACCES) {
|
||||
s->err.message = "forbidden by object lock";
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
attrs = s->object->get_attrs();
|
||||
}
|
||||
|
||||
// ignore return value from get_obj_attrs in all other cases
|
||||
op_ret = 0;
|
||||
if (multipart_delete) {
|
||||
if (!astate) {
|
||||
op_ret = -ERR_NOT_SLO_MANIFEST;
|
||||
return;
|
||||
}
|
||||
|
||||
const auto slo_attr = astate->attrset.find(RGW_ATTR_SLO_MANIFEST);
|
||||
|
||||
if (slo_attr != astate->attrset.end()) {
|
||||
op_ret = handle_slo_manifest(slo_attr->second, y);
|
||||
if (op_ret < 0) {
|
||||
ldpp_dout(this, 0) << "ERROR: failed to handle slo manifest ret=" << op_ret << dendl;
|
||||
}
|
||||
} else {
|
||||
op_ret = -ERR_NOT_SLO_MANIFEST;
|
||||
}
|
||||
|
||||
if (check_obj_lock) {
|
||||
int object_lock_response = verify_object_lock(this, attrs, bypass_perm, bypass_governance_mode);
|
||||
if (object_lock_response != 0) {
|
||||
op_ret = object_lock_response;
|
||||
if (op_ret == -EACCES) {
|
||||
s->err.message = "forbidden by object lock";
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (multipart_delete) {
|
||||
const auto slo_attr = attrs.find(RGW_ATTR_SLO_MANIFEST);
|
||||
|
||||
if (slo_attr != attrs.end()) {
|
||||
op_ret = handle_slo_manifest(slo_attr->second, y);
|
||||
if (op_ret < 0) {
|
||||
ldpp_dout(this, 0) << "ERROR: failed to handle slo manifest ret=" << op_ret << dendl;
|
||||
}
|
||||
} else {
|
||||
op_ret = -ERR_NOT_SLO_MANIFEST;
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
// make reservation for notification if needed
|
||||
const auto versioned_object = s->bucket->versioning_enabled();
|
||||
const auto event_type = versioned_object && s->object->get_instance().empty() ?
|
||||
@ -4874,9 +4886,8 @@ void RGWDeleteObj::execute(optional_yield y)
|
||||
return;
|
||||
}
|
||||
|
||||
RGWObjectCtx *obj_ctx = static_cast<RGWObjectCtx *>(s->obj_ctx);
|
||||
s->object->set_atomic(s->obj_ctx);
|
||||
|
||||
|
||||
bool ver_restored = false;
|
||||
op_ret = s->object->swift_versioning_restore(s->obj_ctx, ver_restored, this);
|
||||
if (op_ret < 0) {
|
||||
@ -4924,10 +4935,8 @@ void RGWDeleteObj::execute(optional_yield y)
|
||||
op_ret = 0;
|
||||
}
|
||||
|
||||
const auto obj_state = obj_ctx->get_state(s->object->get_obj());
|
||||
|
||||
// send request to notification manager
|
||||
int ret = res->publish_commit(this, obj_state->size, obj_state->mtime, attrs[RGW_ATTR_ETAG].to_str(), version_id);
|
||||
int ret = res->publish_commit(this, obj_size, ceph::real_clock::now(), etag, version_id);
|
||||
if (ret < 0) {
|
||||
ldpp_dout(this, 1) << "ERROR: publishing notification failed, with error: " << ret << dendl;
|
||||
// too late to rollback operation, hence op_ret is not set here
|
||||
@ -6879,29 +6888,38 @@ void RGWDeleteMultiObj::execute(optional_yield y)
|
||||
}
|
||||
}
|
||||
|
||||
// verify_object_lock
|
||||
bool check_obj_lock = obj->have_instance() && bucket->get_info().obj_lock_enabled();
|
||||
if (check_obj_lock) {
|
||||
int get_attrs_response = obj->get_obj_attrs(s->obj_ctx, s->yield, this);
|
||||
if (get_attrs_response < 0) {
|
||||
if (get_attrs_response == -ENOENT) {
|
||||
uint64_t obj_size = 0;
|
||||
std::string etag;
|
||||
|
||||
if (!rgw::sal::Object::empty(obj.get())) {
|
||||
RGWObjState* astate = nullptr;
|
||||
bool check_obj_lock = obj->have_instance() && bucket->get_info().obj_lock_enabled();
|
||||
const auto ret = obj->get_obj_state(this, obj_ctx, &astate, s->yield, true);
|
||||
|
||||
if (ret < 0) {
|
||||
if (ret == -ENOENT) {
|
||||
// object maybe delete_marker, skip check_obj_lock
|
||||
check_obj_lock = false;
|
||||
} else {
|
||||
// Something went wrong.
|
||||
send_partial_response(*iter, false, "", get_attrs_response);
|
||||
send_partial_response(*iter, false, "", ret);
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
obj_size = astate->size;
|
||||
etag = astate->attrset[RGW_ATTR_ETAG].to_str();
|
||||
}
|
||||
|
||||
if (check_obj_lock) {
|
||||
ceph_assert(astate);
|
||||
int object_lock_response = verify_object_lock(this, astate->attrset, bypass_perm, bypass_governance_mode);
|
||||
if (object_lock_response != 0) {
|
||||
send_partial_response(*iter, false, "", object_lock_response);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (check_obj_lock) {
|
||||
int object_lock_response = verify_object_lock(this, obj->get_attrs(), bypass_perm, bypass_governance_mode);
|
||||
if (object_lock_response != 0) {
|
||||
send_partial_response(*iter, false, "", object_lock_response);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
// make reservation for notification if needed
|
||||
const auto versioned_object = s->bucket->versioning_enabled();
|
||||
const auto event_type = versioned_object && obj->get_instance().empty() ?
|
||||
@ -6929,12 +6947,8 @@ void RGWDeleteMultiObj::execute(optional_yield y)
|
||||
|
||||
send_partial_response(*iter, obj->get_delete_marker(), del_op->result.version_id, op_ret);
|
||||
|
||||
const auto obj_state = obj_ctx->get_state(obj->get_obj());
|
||||
bufferlist etag_bl;
|
||||
const auto etag = obj_state->get_attr(RGW_ATTR_ETAG, etag_bl) ? etag_bl.to_str() : "";
|
||||
|
||||
// send request to notification manager
|
||||
int ret = res->publish_commit(this, obj_state->size, obj_state->mtime, etag, version_id);
|
||||
int ret = res->publish_commit(this, obj_size, ceph::real_clock::now(), etag, version_id);
|
||||
if (ret < 0) {
|
||||
ldpp_dout(this, 1) << "ERROR: publishing notification failed, with error: " << ret << dendl;
|
||||
// too late to rollback operation, hence op_ret is not set here
|
||||
|
@ -1981,9 +1981,13 @@ def test_ps_s3_versioned_deletion_on_master():
|
||||
|
||||
# create objects in the bucket
|
||||
key = bucket.new_key('foo')
|
||||
key.set_contents_from_string('bar')
|
||||
content = str(os.urandom(512))
|
||||
size1 = len(content)
|
||||
key.set_contents_from_string(content)
|
||||
ver1 = key.version_id
|
||||
key.set_contents_from_string('kaboom')
|
||||
content = str(os.urandom(511))
|
||||
size2 = len(content)
|
||||
key.set_contents_from_string(content)
|
||||
ver2 = key.version_id
|
||||
# create delete marker (non versioned deletion)
|
||||
delete_marker_key = bucket.delete_key(key.name)
|
||||
@ -1994,7 +1998,6 @@ def test_ps_s3_versioned_deletion_on_master():
|
||||
# versioned deletion
|
||||
bucket.delete_key(key.name, version_id=ver2)
|
||||
bucket.delete_key(key.name, version_id=ver1)
|
||||
delete_marker_key.delete()
|
||||
|
||||
print('wait for 5sec for the messages...')
|
||||
time.sleep(5)
|
||||
@ -2006,6 +2009,7 @@ def test_ps_s3_versioned_deletion_on_master():
|
||||
for event_list in events:
|
||||
for event in event_list['Records']:
|
||||
version = event['s3']['object']['versionId']
|
||||
size = event['s3']['object']['size']
|
||||
if version not in versions:
|
||||
print('version mismatch: '+version+' not in: '+str(versions))
|
||||
assert False
|
||||
@ -2013,19 +2017,22 @@ def test_ps_s3_versioned_deletion_on_master():
|
||||
print('version ok: '+version+' in: '+str(versions))
|
||||
if event['eventName'] == 'ObjectRemoved:Delete':
|
||||
delete_events += 1
|
||||
assert size in [size1, size2]
|
||||
assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_3']
|
||||
if event['eventName'] == 'ObjectRemoved:DeleteMarkerCreated':
|
||||
delete_marker_create_events += 1
|
||||
assert size == size2
|
||||
assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_2']
|
||||
|
||||
# 3 key versions were deleted (v1, v2 and the deletion marker)
|
||||
# 2 key versions were deleted
|
||||
# notified over the same topic via 2 notifications (1,3)
|
||||
assert_equal(delete_events, 3*2)
|
||||
assert_equal(delete_events, 2*2)
|
||||
# 1 deletion marker was created
|
||||
# notified over the same topic over 2 notifications (1,2)
|
||||
assert_equal(delete_marker_create_events, 1*2)
|
||||
|
||||
# cleanup
|
||||
delete_marker_key.delete()
|
||||
stop_amqp_receiver(receiver, task)
|
||||
s3_notification_conf.del_config()
|
||||
topic_conf.del_config()
|
||||
|
Loading…
Reference in New Issue
Block a user