Merge pull request #48679 from cfsnyder/wip-rgw-multi-obj-del-perf

rgw: concurrency for multi object deletes

Reviewed-by: Yuval Lifshitz <ylifshit@redhat.com>
Reviewed-by: Casey Bodley <cbodley@redhat.com>
This commit is contained in:
Casey Bodley 2022-12-07 14:09:10 -05:00 committed by GitHub
commit 8b0acb2c0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 225 additions and 144 deletions

View File

@ -175,6 +175,14 @@ options:
services:
- rgw
with_legacy: true
- name: rgw_multi_obj_del_max_aio
type: uint
level: advanced
desc: Max number of concurrent RADOS requests per multi-object delete request.
default: 16
services:
- rgw
with_legacy: true
# whether or not the quota/gc threads should be started
- name: rgw_enable_quota_threads
type: bool

View File

@ -6838,12 +6838,172 @@ void RGWDeleteMultiObj::write_ops_log_entry(rgw_log_entry& entry) const {
entry.delete_multi_obj_meta.objects = std::move(ops_log_entries);
}
void RGWDeleteMultiObj::wait_flush(optional_yield y,
boost::asio::deadline_timer *formatter_flush_cond,
std::function<bool()> predicate)
{
if (y && formatter_flush_cond) {
auto yc = y.get_yield_context();
while (!predicate()) {
boost::system::error_code error;
formatter_flush_cond->async_wait(yc[error]);
rgw_flush_formatter(s, s->formatter);
}
}
}
void RGWDeleteMultiObj::handle_individual_object(const rgw_obj_key& o, optional_yield y,
boost::asio::deadline_timer *formatter_flush_cond)
{
std::string version_id;
std::unique_ptr<rgw::sal::Object> obj = bucket->get_object(o);
if (s->iam_policy || ! s->iam_user_policies.empty() || !s->session_policies.empty()) {
auto identity_policy_res = eval_identity_or_session_policies(this, s->iam_user_policies, s->env,
o.instance.empty() ?
rgw::IAM::s3DeleteObject :
rgw::IAM::s3DeleteObjectVersion,
ARN(obj->get_obj()));
if (identity_policy_res == Effect::Deny) {
send_partial_response(o, false, "", -EACCES, formatter_flush_cond);
return;
}
rgw::IAM::Effect e = Effect::Pass;
rgw::IAM::PolicyPrincipal princ_type = rgw::IAM::PolicyPrincipal::Other;
if (s->iam_policy) {
ARN obj_arn(obj->get_obj());
e = s->iam_policy->eval(s->env,
*s->auth.identity,
o.instance.empty() ?
rgw::IAM::s3DeleteObject :
rgw::IAM::s3DeleteObjectVersion,
obj_arn,
princ_type);
}
if (e == Effect::Deny) {
send_partial_response(o, false, "", -EACCES, formatter_flush_cond);
return;
}
if (!s->session_policies.empty()) {
auto session_policy_res = eval_identity_or_session_policies(this, s->session_policies, s->env,
o.instance.empty() ?
rgw::IAM::s3DeleteObject :
rgw::IAM::s3DeleteObjectVersion,
ARN(obj->get_obj()));
if (session_policy_res == Effect::Deny) {
send_partial_response(o, false, "", -EACCES, formatter_flush_cond);
return;
}
if (princ_type == rgw::IAM::PolicyPrincipal::Role) {
//Intersection of session policy and identity policy plus intersection of session policy and bucket policy
if ((session_policy_res != Effect::Allow || identity_policy_res != Effect::Allow) &&
(session_policy_res != Effect::Allow || e != Effect::Allow)) {
send_partial_response(o, false, "", -EACCES, formatter_flush_cond);
return;
}
} else if (princ_type == rgw::IAM::PolicyPrincipal::Session) {
//Intersection of session policy and identity policy plus bucket policy
if ((session_policy_res != Effect::Allow || identity_policy_res != Effect::Allow) && e != Effect::Allow) {
send_partial_response(o, false, "", -EACCES, formatter_flush_cond);
return;
}
} else if (princ_type == rgw::IAM::PolicyPrincipal::Other) {// there was no match in the bucket policy
if (session_policy_res != Effect::Allow || identity_policy_res != Effect::Allow) {
send_partial_response(o, false, "", -EACCES, formatter_flush_cond);
return;
}
}
send_partial_response(o, false, "", -EACCES, formatter_flush_cond);
return;
}
if ((identity_policy_res == Effect::Pass && e == Effect::Pass && !acl_allowed)) {
send_partial_response(o, false, "", -EACCES, formatter_flush_cond);
return;
}
}
uint64_t obj_size = 0;
std::string etag;
if (!rgw::sal::Object::empty(obj.get())) {
RGWObjState* astate = nullptr;
bool check_obj_lock = obj->have_instance() && bucket->get_info().obj_lock_enabled();
const auto ret = obj->get_obj_state(this, &astate, y, true);
if (ret < 0) {
if (ret == -ENOENT) {
// object maybe delete_marker, skip check_obj_lock
check_obj_lock = false;
} else {
// Something went wrong.
send_partial_response(o, false, "", ret, formatter_flush_cond);
return;
}
} else {
obj_size = astate->size;
etag = astate->attrset[RGW_ATTR_ETAG].to_str();
}
if (check_obj_lock) {
ceph_assert(astate);
int object_lock_response = verify_object_lock(this, astate->attrset, bypass_perm, bypass_governance_mode);
if (object_lock_response != 0) {
send_partial_response(o, false, "", object_lock_response, formatter_flush_cond);
return;
}
}
}
// make reservation for notification if needed
const auto versioned_object = s->bucket->versioning_enabled();
const auto event_type = versioned_object && obj->get_instance().empty() ?
rgw::notify::ObjectRemovedDeleteMarkerCreated :
rgw::notify::ObjectRemovedDelete;
std::unique_ptr<rgw::sal::Notification> res
= store->get_notification(obj.get(), s->src_object.get(), s, event_type);
op_ret = res->publish_reserve(this);
if (op_ret < 0) {
send_partial_response(o, false, "", op_ret, formatter_flush_cond);
return;
}
obj->set_atomic();
std::unique_ptr<rgw::sal::Object::DeleteOp> del_op = obj->get_delete_op();
del_op->params.versioning_status = obj->get_bucket()->get_info().versioning_status();
del_op->params.obj_owner = s->owner;
del_op->params.bucket_owner = s->bucket_owner;
del_op->params.marker_version_id = version_id;
op_ret = del_op->delete_obj(this, y);
if (op_ret == -ENOENT) {
op_ret = 0;
}
send_partial_response(o, obj->get_delete_marker(), del_op->result.version_id, op_ret, formatter_flush_cond);
// send request to notification manager
int ret = res->publish_commit(this, obj_size, ceph::real_clock::now(), etag, version_id);
if (ret < 0) {
ldpp_dout(this, 1) << "ERROR: publishing notification failed, with error: " << ret << dendl;
// too late to rollback operation, hence op_ret is not set here
}
}
void RGWDeleteMultiObj::execute(optional_yield y)
{
RGWMultiDelDelete *multi_delete;
vector<rgw_obj_key>::iterator iter;
RGWMultiDelXMLParser parser;
uint32_t aio_count = 0;
const uint32_t max_aio = s->cct->_conf->rgw_multi_obj_del_max_aio;
char* buf;
std::optional<boost::asio::deadline_timer> formatter_flush_cond;
if (y) {
formatter_flush_cond = std::make_optional<boost::asio::deadline_timer>(y.get_io_context());
}
buf = data.c_str();
if (!buf) {
@ -6904,142 +7064,23 @@ void RGWDeleteMultiObj::execute(optional_yield y)
for (iter = multi_delete->objects.begin();
iter != multi_delete->objects.end();
++iter) {
std::string version_id;
std::unique_ptr<rgw::sal::Object> obj = bucket->get_object(*iter);
if (s->iam_policy || ! s->iam_user_policies.empty() || !s->session_policies.empty()) {
auto identity_policy_res = eval_identity_or_session_policies(this, s->iam_user_policies, s->env,
iter->instance.empty() ?
rgw::IAM::s3DeleteObject :
rgw::IAM::s3DeleteObjectVersion,
ARN(obj->get_obj()));
if (identity_policy_res == Effect::Deny) {
send_partial_response(*iter, false, "", -EACCES);
continue;
}
rgw::IAM::Effect e = Effect::Pass;
rgw::IAM::PolicyPrincipal princ_type = rgw::IAM::PolicyPrincipal::Other;
if (s->iam_policy) {
ARN obj_arn(obj->get_obj());
e = s->iam_policy->eval(s->env,
*s->auth.identity,
iter->instance.empty() ?
rgw::IAM::s3DeleteObject :
rgw::IAM::s3DeleteObjectVersion,
obj_arn,
princ_type);
}
if (e == Effect::Deny) {
send_partial_response(*iter, false, "", -EACCES);
continue;
}
if (!s->session_policies.empty()) {
auto session_policy_res = eval_identity_or_session_policies(this, s->session_policies, s->env,
iter->instance.empty() ?
rgw::IAM::s3DeleteObject :
rgw::IAM::s3DeleteObjectVersion,
ARN(obj->get_obj()));
if (session_policy_res == Effect::Deny) {
send_partial_response(*iter, false, "", -EACCES);
continue;
}
if (princ_type == rgw::IAM::PolicyPrincipal::Role) {
//Intersection of session policy and identity policy plus intersection of session policy and bucket policy
if ((session_policy_res != Effect::Allow || identity_policy_res != Effect::Allow) &&
(session_policy_res != Effect::Allow || e != Effect::Allow)) {
send_partial_response(*iter, false, "", -EACCES);
continue;
}
} else if (princ_type == rgw::IAM::PolicyPrincipal::Session) {
//Intersection of session policy and identity policy plus bucket policy
if ((session_policy_res != Effect::Allow || identity_policy_res != Effect::Allow) && e != Effect::Allow) {
send_partial_response(*iter, false, "", -EACCES);
continue;
}
} else if (princ_type == rgw::IAM::PolicyPrincipal::Other) {// there was no match in the bucket policy
if (session_policy_res != Effect::Allow || identity_policy_res != Effect::Allow) {
send_partial_response(*iter, false, "", -EACCES);
continue;
}
}
send_partial_response(*iter, false, "", -EACCES);
continue;
}
if ((identity_policy_res == Effect::Pass && e == Effect::Pass && !acl_allowed)) {
send_partial_response(*iter, false, "", -EACCES);
continue;
}
}
uint64_t obj_size = 0;
std::string etag;
if (!rgw::sal::Object::empty(obj.get())) {
RGWObjState* astate = nullptr;
bool check_obj_lock = obj->have_instance() && bucket->get_info().obj_lock_enabled();
const auto ret = obj->get_obj_state(this, &astate, s->yield, true);
if (ret < 0) {
if (ret == -ENOENT) {
// object maybe delete_marker, skip check_obj_lock
check_obj_lock = false;
} else {
// Something went wrong.
send_partial_response(*iter, false, "", ret);
continue;
}
} else {
obj_size = astate->size;
etag = astate->attrset[RGW_ATTR_ETAG].to_str();
}
if (check_obj_lock) {
ceph_assert(astate);
int object_lock_response = verify_object_lock(this, astate->attrset, bypass_perm, bypass_governance_mode);
if (object_lock_response != 0) {
send_partial_response(*iter, false, "", object_lock_response);
continue;
}
}
}
// make reservation for notification if needed
const auto versioned_object = s->bucket->versioning_enabled();
const auto event_type = versioned_object && obj->get_instance().empty() ?
rgw::notify::ObjectRemovedDeleteMarkerCreated :
rgw::notify::ObjectRemovedDelete;
std::unique_ptr<rgw::sal::Notification> res
= store->get_notification(obj.get(), s->src_object.get(), s, event_type);
op_ret = res->publish_reserve(this);
if (op_ret < 0) {
send_partial_response(*iter, false, "", op_ret);
continue;
}
obj->set_atomic();
std::unique_ptr<rgw::sal::Object::DeleteOp> del_op = obj->get_delete_op();
del_op->params.versioning_status = obj->get_bucket()->get_info().versioning_status();
del_op->params.obj_owner = s->owner;
del_op->params.bucket_owner = s->bucket_owner;
del_op->params.marker_version_id = version_id;
op_ret = del_op->delete_obj(this, y);
if (op_ret == -ENOENT) {
op_ret = 0;
}
send_partial_response(*iter, obj->get_delete_marker(), del_op->result.version_id, op_ret);
// send request to notification manager
int ret = res->publish_commit(this, obj_size, ceph::real_clock::now(), etag, version_id);
if (ret < 0) {
ldpp_dout(this, 1) << "ERROR: publishing notification failed, with error: " << ret << dendl;
// too late to rollback operation, hence op_ret is not set here
rgw_obj_key obj_key = *iter;
if (y && max_aio > 1) {
wait_flush(y, &*formatter_flush_cond, [&aio_count, max_aio] {
return aio_count < max_aio;
});
aio_count++;
spawn::spawn(y.get_yield_context(), [this, &y, &aio_count, obj_key, &formatter_flush_cond] (yield_context yield) {
handle_individual_object(obj_key, optional_yield { y.get_io_context(), yield }, &*formatter_flush_cond);
aio_count--;
});
} else {
handle_individual_object(obj_key, y, &*formatter_flush_cond);
}
}
wait_flush(y, &*formatter_flush_cond, [this, n=multi_delete->objects.size()] {
return n == ops_log_entries.size();
});
/* set the return code to zero, errors at this point will be
dumped to the response */

View File

@ -26,6 +26,7 @@
#include <boost/utility/in_place_factory.hpp>
#include <boost/function.hpp>
#include <boost/container/flat_map.hpp>
#include <boost/asio/deadline_timer.hpp>
#include "common/armor.h"
#include "common/mime.h"
@ -2029,6 +2030,29 @@ public:
class RGWDeleteMultiObj : public RGWOp {
/**
* Handles the deletion of an individual object and uses
* set_partial_response to record the outcome.
*/
void handle_individual_object(const rgw_obj_key& o,
optional_yield y,
boost::asio::deadline_timer *formatter_flush_cond);
/**
* When the request is being executed in a coroutine, performs
* the actual formatter flushing and is responsible for the
* termination condition (when when all partial object responses
* have been sent). Note that the formatter flushing must be handled
* on the coroutine that invokes the execute method vs. the
* coroutines that are spawned to handle individual objects because
* the flush logic uses a yield context that was captured
* and saved on the req_state vs. one that is passed on the stack.
* This is a no-op in the case where we're not executing as a coroutine.
*/
void wait_flush(optional_yield y,
boost::asio::deadline_timer *formatter_flush_cond,
std::function<bool()> predicate);
protected:
std::vector<delete_multi_obj_entry> ops_log_entries;
bufferlist data;
@ -2039,7 +2063,6 @@ protected:
bool bypass_perm;
bool bypass_governance_mode;
public:
RGWDeleteMultiObj() {
quiet = false;
@ -2047,6 +2070,7 @@ public:
bypass_perm = true;
bypass_governance_mode = false;
}
int verify_permission(optional_yield y) override;
void pre_exec() override;
void execute(optional_yield y) override;
@ -2054,8 +2078,9 @@ public:
virtual int get_params(optional_yield y) = 0;
virtual void send_status() = 0;
virtual void begin_response() = 0;
virtual void send_partial_response(rgw_obj_key& key, bool delete_marker,
const std::string& marker_version_id, int ret) = 0;
virtual void send_partial_response(const rgw_obj_key& key, bool delete_marker,
const std::string& marker_version_id, int ret,
boost::asio::deadline_timer *formatter_flush_cond) = 0;
virtual void end_response() = 0;
const char* name() const override { return "multi_object_delete"; }
RGWOpType get_type() override { return RGW_OP_DELETE_MULTI_OBJ; }

View File

@ -5261,7 +5261,7 @@ int RGWRados::Object::Delete::delete_obj(optional_yield y, const DoutPrefixProvi
store->remove_rgw_head_obj(op);
auto& ioctx = ref.pool.ioctx();
r = rgw_rados_operate(dpp, ioctx, ref.obj.oid, &op, null_yield);
r = rgw_rados_operate(dpp, ioctx, ref.obj.oid, &op, y);
/* raced with another operation, object state is indeterminate */
const bool need_invalidate = (r == -ECANCELED);
@ -7773,7 +7773,7 @@ int RGWRados::raw_obj_stat(const DoutPrefixProvider *dpp,
op.read(0, cct->_conf->rgw_max_chunk_size, first_chunk, NULL);
}
bufferlist outbl;
r = rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, &op, &outbl, null_yield);
r = rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, &op, &outbl, y);
if (epoch) {
*epoch = ref.pool.ioctx().get_last_version();

View File

@ -4138,9 +4138,11 @@ void RGWDeleteMultiObj_ObjStore_S3::begin_response()
rgw_flush_formatter(s, s->formatter);
}
void RGWDeleteMultiObj_ObjStore_S3::send_partial_response(rgw_obj_key& key,
void RGWDeleteMultiObj_ObjStore_S3::send_partial_response(const rgw_obj_key& key,
bool delete_marker,
const string& marker_version_id, int ret)
const string& marker_version_id,
int ret,
boost::asio::deadline_timer *formatter_flush_cond)
{
if (!key.empty()) {
delete_multi_obj_entry ops_log_entry;
@ -4186,7 +4188,11 @@ void RGWDeleteMultiObj_ObjStore_S3::send_partial_response(rgw_obj_key& key,
}
ops_log_entries.push_back(std::move(ops_log_entry));
rgw_flush_formatter(s, s->formatter);
if (formatter_flush_cond) {
formatter_flush_cond->cancel();
} else {
rgw_flush_formatter(s, s->formatter);
}
}
}

View File

@ -516,8 +516,9 @@ public:
int get_params(optional_yield y) override;
void send_status() override;
void begin_response() override;
void send_partial_response(rgw_obj_key& key, bool delete_marker,
const std::string& marker_version_id, int ret) override;
void send_partial_response(const rgw_obj_key& key, bool delete_marker,
const std::string& marker_version_id, int ret,
boost::asio::deadline_timer *formatter_flush_cond) override;
void end_response() override;
};