mirror of
https://github.com/ceph/ceph
synced 2025-02-23 02:57:21 +00:00
Merge pull request #18954 from adamemerson/wip-hole-in-the-bucket-dear-liza
rgw: Add try_refresh_bucket_info function rgw: Add retry_raced_bucket_write rgw: Handle stale bucket info in RGWPutMetadataBucket rgw: Handle stale bucket info in RGWSetBucketVersioning rgw: Handle stale bucket info in RGWSetBucketWebsite rgw: Handle stale bucket info in RGWDeleteBucketWebsite rgw: Handle stale bucket info in RGWPutBucketPolicy rgw: Handle stale bucket info in RGWDeleteBucketPolicy rgw: Expire entries in bucket info cache Reviewed-by: Yehuda Sadeh <yehuda@redhat.com>
This commit is contained in:
commit
d07588a1aa
@ -4300,7 +4300,7 @@ std::vector<Option> get_global_options() {
|
||||
Option("debug_deliberately_leak_memory", Option::TYPE_BOOL, Option::LEVEL_DEV)
|
||||
.set_default(false)
|
||||
.set_description(""),
|
||||
|
||||
|
||||
Option("debug_asserts_on_shutdown", Option::TYPE_BOOL,Option::LEVEL_DEV)
|
||||
.set_default(false)
|
||||
.set_description("Enable certain asserts to check for refcounting bugs on shutdown; see http://tracker.ceph.com/issues/21738"),
|
||||
@ -5540,6 +5540,26 @@ std::vector<Option> get_rgw_options() {
|
||||
Option("rgw_reshard_thread_interval", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
|
||||
.set_default(10_min)
|
||||
.set_description(""),
|
||||
|
||||
Option("rgw_bucket_info_cache_expiry_interval", Option::TYPE_UINT,
|
||||
Option::LEVEL_ADVANCED)
|
||||
.set_default(15_min)
|
||||
.set_description("Number of seconds before entries in the bucket info "
|
||||
"cache are assumed stale and re-fetched. Zero is never.")
|
||||
.add_tag("performance")
|
||||
.add_service("rgw")
|
||||
.set_long_description("The Rados Gateway stores metadata about buckets in "
|
||||
"an internal cache. This should be kept consistent "
|
||||
"by the OSD's relaying notify events between "
|
||||
"multiple watching RGW processes. In the event "
|
||||
"that this notification protocol fails, bounding "
|
||||
"the length of time that any data in the cache will "
|
||||
"be assumed valid will ensure that any RGW instance "
|
||||
"that falls out of sync will eventually recover. "
|
||||
"This seems to be an issue mostly for large numbers "
|
||||
"of RGW instances under heavy use. If you would like "
|
||||
"to turn off cache expiry, set this value to zero."),
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -634,6 +634,37 @@ void rgw_bucket_object_pre_exec(struct req_state *s)
|
||||
dump_bucket_from_state(s);
|
||||
}
|
||||
|
||||
// So! Now and then when we try to update bucket information, the
|
||||
// bucket has changed during the course of the operation. (Or we have
|
||||
// a cache consistency problem that Watch/Notify isn't ruling out
|
||||
// completely.)
|
||||
//
|
||||
// When this happens, we need to update the bucket info and try
|
||||
// again. We have, however, to try the right *part* again. We can't
|
||||
// simply re-send, since that will obliterate the previous update.
|
||||
//
|
||||
// Thus, callers of this function should include everything that
|
||||
// merges information to be changed into the bucket information as
|
||||
// well as the call to set it.
|
||||
//
|
||||
// The called function must return an integer, negative on error. In
|
||||
// general, they should just return op_ret.
|
||||
namespace {
|
||||
template<typename F>
|
||||
int retry_raced_bucket_write(RGWRados* g, req_state* s, const F& f) {
|
||||
auto r = f();
|
||||
for (auto i = 0u; i < 15u && r == -ECANCELED; ++i) {
|
||||
r = g->try_refresh_bucket_info(s->bucket_info, nullptr,
|
||||
&s->bucket_attrs);
|
||||
if (r >= 0) {
|
||||
r = f();
|
||||
}
|
||||
}
|
||||
return r;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
int RGWGetObj::verify_permission()
|
||||
{
|
||||
obj = rgw_obj(s->bucket, s->object);
|
||||
@ -2063,17 +2094,20 @@ void RGWSetBucketVersioning::execute()
|
||||
}
|
||||
}
|
||||
|
||||
if (versioning_status == VersioningEnabled) {
|
||||
s->bucket_info.flags |= BUCKET_VERSIONED;
|
||||
s->bucket_info.flags &= ~BUCKET_VERSIONS_SUSPENDED;
|
||||
} else if (versioning_status == VersioningSuspended) {
|
||||
s->bucket_info.flags |= (BUCKET_VERSIONED | BUCKET_VERSIONS_SUSPENDED);
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
op_ret = retry_raced_bucket_write(store, s, [this] {
|
||||
if (versioning_status == VersioningEnabled) {
|
||||
s->bucket_info.flags |= BUCKET_VERSIONED;
|
||||
s->bucket_info.flags &= ~BUCKET_VERSIONS_SUSPENDED;
|
||||
} else if (versioning_status == VersioningSuspended) {
|
||||
s->bucket_info.flags |= (BUCKET_VERSIONED | BUCKET_VERSIONS_SUSPENDED);
|
||||
} else {
|
||||
return op_ret;
|
||||
}
|
||||
op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(),
|
||||
&s->bucket_attrs);
|
||||
return op_ret;
|
||||
});
|
||||
|
||||
op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(),
|
||||
&s->bucket_attrs);
|
||||
if (op_ret < 0) {
|
||||
ldout(s->cct, 0) << "NOTICE: put_bucket_info on bucket=" << s->bucket.name
|
||||
<< " returned err=" << op_ret << dendl;
|
||||
@ -2123,10 +2157,14 @@ void RGWSetBucketWebsite::execute()
|
||||
}
|
||||
}
|
||||
|
||||
s->bucket_info.has_website = true;
|
||||
s->bucket_info.website_conf = website_conf;
|
||||
op_ret = retry_raced_bucket_write(store, s, [this] {
|
||||
s->bucket_info.has_website = true;
|
||||
s->bucket_info.website_conf = website_conf;
|
||||
op_ret = store->put_bucket_instance_info(s->bucket_info, false,
|
||||
real_time(), &s->bucket_attrs);
|
||||
return op_ret;
|
||||
});
|
||||
|
||||
op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(), &s->bucket_attrs);
|
||||
if (op_ret < 0) {
|
||||
ldout(s->cct, 0) << "NOTICE: put_bucket_info on bucket=" << s->bucket.name << " returned err=" << op_ret << dendl;
|
||||
return;
|
||||
@ -2145,10 +2183,13 @@ void RGWDeleteBucketWebsite::pre_exec()
|
||||
|
||||
void RGWDeleteBucketWebsite::execute()
|
||||
{
|
||||
s->bucket_info.has_website = false;
|
||||
s->bucket_info.website_conf = RGWBucketWebsiteConf();
|
||||
|
||||
op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(), &s->bucket_attrs);
|
||||
op_ret = retry_raced_bucket_write(store, s, [this] {
|
||||
s->bucket_info.has_website = false;
|
||||
s->bucket_info.website_conf = RGWBucketWebsiteConf();
|
||||
op_ret = store->put_bucket_instance_info(s->bucket_info, false,
|
||||
real_time(), &s->bucket_attrs);
|
||||
return op_ret;
|
||||
});
|
||||
if (op_ret < 0) {
|
||||
ldout(s->cct, 0) << "NOTICE: put_bucket_info on bucket=" << s->bucket.name << " returned err=" << op_ret << dendl;
|
||||
return;
|
||||
@ -3990,55 +4031,61 @@ void RGWPutMetadataBucket::execute()
|
||||
return;
|
||||
}
|
||||
|
||||
/* Encode special metadata first as we're using std::map::emplace under
|
||||
* the hood. This method will add the new items only if the map doesn't
|
||||
* contain such keys yet. */
|
||||
if (has_policy) {
|
||||
if (s->dialect.compare("swift") == 0) {
|
||||
auto old_policy = \
|
||||
static_cast<RGWAccessControlPolicy_SWIFT*>(s->bucket_acl.get());
|
||||
auto new_policy = static_cast<RGWAccessControlPolicy_SWIFT*>(&policy);
|
||||
new_policy->filter_merge(policy_rw_mask, old_policy);
|
||||
policy = *new_policy;
|
||||
}
|
||||
buffer::list bl;
|
||||
policy.encode(bl);
|
||||
emplace_attr(RGW_ATTR_ACL, std::move(bl));
|
||||
}
|
||||
op_ret = retry_raced_bucket_write(store, s, [this] {
|
||||
/* Encode special metadata first as we're using std::map::emplace under
|
||||
* the hood. This method will add the new items only if the map doesn't
|
||||
* contain such keys yet. */
|
||||
if (has_policy) {
|
||||
if (s->dialect.compare("swift") == 0) {
|
||||
auto old_policy = \
|
||||
static_cast<RGWAccessControlPolicy_SWIFT*>(s->bucket_acl.get());
|
||||
auto new_policy = static_cast<RGWAccessControlPolicy_SWIFT*>(&policy);
|
||||
new_policy->filter_merge(policy_rw_mask, old_policy);
|
||||
policy = *new_policy;
|
||||
}
|
||||
buffer::list bl;
|
||||
policy.encode(bl);
|
||||
emplace_attr(RGW_ATTR_ACL, std::move(bl));
|
||||
}
|
||||
|
||||
if (has_cors) {
|
||||
buffer::list bl;
|
||||
cors_config.encode(bl);
|
||||
emplace_attr(RGW_ATTR_CORS, std::move(bl));
|
||||
}
|
||||
if (has_cors) {
|
||||
buffer::list bl;
|
||||
cors_config.encode(bl);
|
||||
emplace_attr(RGW_ATTR_CORS, std::move(bl));
|
||||
}
|
||||
|
||||
/* It's supposed that following functions WILL NOT change any special
|
||||
* attributes (like RGW_ATTR_ACL) if they are already present in attrs. */
|
||||
prepare_add_del_attrs(s->bucket_attrs, rmattr_names, attrs);
|
||||
populate_with_generic_attrs(s, attrs);
|
||||
/* It's supposed that following functions WILL NOT change any
|
||||
* special attributes (like RGW_ATTR_ACL) if they are already
|
||||
* present in attrs. */
|
||||
prepare_add_del_attrs(s->bucket_attrs, rmattr_names, attrs);
|
||||
populate_with_generic_attrs(s, attrs);
|
||||
|
||||
/* According to the Swift's behaviour and its container_quota WSGI middleware
|
||||
* implementation: anyone with write permissions is able to set the bucket
|
||||
* quota. This stays in contrast to account quotas that can be set only by
|
||||
* clients holding reseller admin privileges. */
|
||||
op_ret = filter_out_quota_info(attrs, rmattr_names, s->bucket_info.quota);
|
||||
if (op_ret < 0) {
|
||||
return;
|
||||
}
|
||||
/* According to the Swift's behaviour and its container_quota
|
||||
* WSGI middleware implementation: anyone with write permissions
|
||||
* is able to set the bucket quota. This stays in contrast to
|
||||
* account quotas that can be set only by clients holding
|
||||
* reseller admin privileges. */
|
||||
op_ret = filter_out_quota_info(attrs, rmattr_names, s->bucket_info.quota);
|
||||
if (op_ret < 0) {
|
||||
return op_ret;
|
||||
}
|
||||
|
||||
if (swift_ver_location) {
|
||||
s->bucket_info.swift_ver_location = *swift_ver_location;
|
||||
s->bucket_info.swift_versioning = (! swift_ver_location->empty());
|
||||
}
|
||||
if (swift_ver_location) {
|
||||
s->bucket_info.swift_ver_location = *swift_ver_location;
|
||||
s->bucket_info.swift_versioning = (!swift_ver_location->empty());
|
||||
}
|
||||
|
||||
/* Web site of Swift API. */
|
||||
filter_out_website(attrs, rmattr_names, s->bucket_info.website_conf);
|
||||
s->bucket_info.has_website = !s->bucket_info.website_conf.is_empty();
|
||||
/* Web site of Swift API. */
|
||||
filter_out_website(attrs, rmattr_names, s->bucket_info.website_conf);
|
||||
s->bucket_info.has_website = !s->bucket_info.website_conf.is_empty();
|
||||
|
||||
/* Setting attributes also stores the provided bucket info. Due to this
|
||||
* fact, the new quota settings can be serialized with the same call. */
|
||||
op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs,
|
||||
&s->bucket_info.objv_tracker);
|
||||
/* Setting attributes also stores the provided bucket info. Due
|
||||
* to this fact, the new quota settings can be serialized with
|
||||
* the same call. */
|
||||
op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs,
|
||||
&s->bucket_info.objv_tracker);
|
||||
return op_ret;
|
||||
});
|
||||
}
|
||||
|
||||
int RGWPutMetadataObject::verify_permission()
|
||||
@ -6836,15 +6883,15 @@ void RGWPutBucketPolicy::execute()
|
||||
}
|
||||
|
||||
try {
|
||||
Policy p(s->cct, s->bucket_tenant, in_data);
|
||||
auto attrs = s->bucket_attrs;
|
||||
attrs[RGW_ATTR_IAM_POLICY].clear();
|
||||
attrs[RGW_ATTR_IAM_POLICY].append(p.text);
|
||||
op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs,
|
||||
&s->bucket_info.objv_tracker);
|
||||
if (op_ret == -ECANCELED) {
|
||||
op_ret = 0; /* lost a race, but it's ok because policies are immutable */
|
||||
}
|
||||
const Policy p(s->cct, s->bucket_tenant, in_data);
|
||||
op_ret = retry_raced_bucket_write(store, s, [&p, this] {
|
||||
auto attrs = s->bucket_attrs;
|
||||
attrs[RGW_ATTR_IAM_POLICY].clear();
|
||||
attrs[RGW_ATTR_IAM_POLICY].append(p.text);
|
||||
op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs,
|
||||
&s->bucket_info.objv_tracker);
|
||||
return op_ret;
|
||||
});
|
||||
} catch (rgw::IAM::PolicyParseException& e) {
|
||||
ldout(s->cct, 20) << "failed to parse policy: " << e.what() << dendl;
|
||||
op_ret = -EINVAL;
|
||||
@ -6912,11 +6959,11 @@ int RGWDeleteBucketPolicy::verify_permission()
|
||||
|
||||
void RGWDeleteBucketPolicy::execute()
|
||||
{
|
||||
auto attrs = s->bucket_attrs;
|
||||
attrs.erase(RGW_ATTR_IAM_POLICY);
|
||||
op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs,
|
||||
&s->bucket_info.objv_tracker);
|
||||
if (op_ret == -ECANCELED) {
|
||||
op_ret = 0; /* lost a race, but it's ok because policies are immutable */
|
||||
}
|
||||
op_ret = retry_raced_bucket_write(store, s, [this] {
|
||||
auto attrs = s->bucket_attrs;
|
||||
attrs.erase(RGW_ATTR_IAM_POLICY);
|
||||
op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs,
|
||||
&s->bucket_info.objv_tracker);
|
||||
return op_ret;
|
||||
});
|
||||
}
|
||||
|
@ -120,6 +120,7 @@ protected:
|
||||
int do_aws4_auth_completion();
|
||||
|
||||
virtual int init_quota();
|
||||
|
||||
public:
|
||||
RGWOp()
|
||||
: s(nullptr),
|
||||
|
@ -11901,15 +11901,27 @@ int RGWRados::convert_old_bucket_info(RGWObjectCtx& obj_ctx,
|
||||
return 0;
|
||||
}
|
||||
|
||||
int RGWRados::get_bucket_info(RGWObjectCtx& obj_ctx,
|
||||
const string& tenant, const string& bucket_name, RGWBucketInfo& info,
|
||||
real_time *pmtime, map<string, bufferlist> *pattrs)
|
||||
int RGWRados::_get_bucket_info(RGWObjectCtx& obj_ctx,
|
||||
const string& tenant,
|
||||
const string& bucket_name,
|
||||
RGWBucketInfo& info,
|
||||
real_time *pmtime,
|
||||
map<string, bufferlist> *pattrs,
|
||||
boost::optional<obj_version> refresh_version)
|
||||
{
|
||||
bucket_info_entry e;
|
||||
string bucket_entry;
|
||||
rgw_make_bucket_entry_name(tenant, bucket_name, bucket_entry);
|
||||
|
||||
|
||||
if (binfo_cache->find(bucket_entry, &e)) {
|
||||
if (refresh_version &&
|
||||
e.info.objv_tracker.read_version.compare(&(*refresh_version))) {
|
||||
lderr(cct) << "WARNING: The bucket info cache is inconsistent. This is "
|
||||
<< "a failure that should be debugged. I am a nice machine, "
|
||||
<< "so I will try to recover." << dendl;
|
||||
binfo_cache->invalidate(bucket_entry);
|
||||
}
|
||||
info = e.info;
|
||||
if (pattrs)
|
||||
*pattrs = e.attrs;
|
||||
@ -11960,6 +11972,7 @@ int RGWRados::get_bucket_info(RGWObjectCtx& obj_ctx,
|
||||
e.info.ep_objv = ot.read_version;
|
||||
info = e.info;
|
||||
if (ret < 0) {
|
||||
lderr(cct) << "ERROR: get_bucket_instance_from_oid failed: " << ret << dendl;
|
||||
info.bucket.tenant = tenant;
|
||||
info.bucket.name = bucket_name;
|
||||
// XXX and why return anything in case of an error anyway?
|
||||
@ -11981,9 +11994,35 @@ int RGWRados::get_bucket_info(RGWObjectCtx& obj_ctx,
|
||||
ldout(cct, 20) << "couldn't put binfo cache entry, might have raced with data changes" << dendl;
|
||||
}
|
||||
|
||||
if (refresh_version &&
|
||||
refresh_version->compare(&info.objv_tracker.read_version)) {
|
||||
lderr(cct) << "WARNING: The OSD has the same version I have. Something may "
|
||||
<< "have gone squirrelly. An administrator may have forced a "
|
||||
<< "change; otherwise there is a problem somewhere." << dendl;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int RGWRados::get_bucket_info(RGWObjectCtx& obj_ctx,
|
||||
const string& tenant, const string& bucket_name,
|
||||
RGWBucketInfo& info,
|
||||
real_time *pmtime, map<string, bufferlist> *pattrs)
|
||||
{
|
||||
return _get_bucket_info(obj_ctx, tenant, bucket_name, info, pmtime,
|
||||
pattrs, boost::none);
|
||||
}
|
||||
|
||||
int RGWRados::try_refresh_bucket_info(RGWBucketInfo& info,
|
||||
ceph::real_time *pmtime,
|
||||
map<string, bufferlist> *pattrs)
|
||||
{
|
||||
RGWObjectCtx obj_ctx(this);
|
||||
|
||||
return _get_bucket_info(obj_ctx, info.bucket.tenant, info.bucket.name,
|
||||
info, pmtime, pattrs, info.objv_tracker.read_version);
|
||||
}
|
||||
|
||||
int RGWRados::put_bucket_entrypoint_info(const string& tenant_name, const string& bucket_name, RGWBucketEntryPoint& entry_point,
|
||||
bool exclusive, RGWObjVersionTracker& objv_tracker, real_time mtime,
|
||||
map<string, bufferlist> *pattrs)
|
||||
|
@ -3416,12 +3416,32 @@ public:
|
||||
|
||||
int convert_old_bucket_info(RGWObjectCtx& obj_ctx, const string& tenant_name, const string& bucket_name);
|
||||
static void make_bucket_entry_name(const string& tenant_name, const string& bucket_name, string& bucket_entry);
|
||||
|
||||
|
||||
private:
|
||||
int _get_bucket_info(RGWObjectCtx& obj_ctx, const string& tenant,
|
||||
const string& bucket_name, RGWBucketInfo& info,
|
||||
real_time *pmtime,
|
||||
map<string, bufferlist> *pattrs,
|
||||
boost::optional<obj_version> refresh_version);
|
||||
public:
|
||||
|
||||
|
||||
int get_bucket_info(RGWObjectCtx& obj_ctx,
|
||||
const string& tenant_name, const string& bucket_name,
|
||||
RGWBucketInfo& info,
|
||||
ceph::real_time *pmtime, map<string, bufferlist> *pattrs = NULL);
|
||||
const string& tenant_name, const string& bucket_name,
|
||||
RGWBucketInfo& info,
|
||||
ceph::real_time *pmtime, map<string, bufferlist> *pattrs = NULL);
|
||||
|
||||
// Returns true on successful refresh. Returns false if there was an
|
||||
// error or the version stored on the OSD is the same as that
|
||||
// presented in the BucketInfo structure.
|
||||
//
|
||||
int try_refresh_bucket_info(RGWBucketInfo& info,
|
||||
ceph::real_time *pmtime,
|
||||
map<string, bufferlist> *pattrs = nullptr);
|
||||
|
||||
int put_linked_bucket_info(RGWBucketInfo& info, bool exclusive, ceph::real_time mtime, obj_version *pep_objv,
|
||||
map<string, bufferlist> *pattrs, bool create_entry_point);
|
||||
map<string, bufferlist> *pattrs, bool create_entry_point);
|
||||
|
||||
int cls_rgw_init_index(librados::IoCtx& io_ctx, librados::ObjectWriteOperation& op, string& oid);
|
||||
int cls_obj_prepare_op(BucketShard& bs, RGWModifyOp op, string& tag, rgw_obj& obj, uint16_t bilog_flags, rgw_zone_set *zones_trace = nullptr);
|
||||
@ -3736,25 +3756,32 @@ public:
|
||||
|
||||
template <class T>
|
||||
class RGWChainedCacheImpl : public RGWChainedCache {
|
||||
ceph::timespan expiry;
|
||||
RWLock lock;
|
||||
|
||||
map<string, T> entries;
|
||||
map<string, std::pair<T, ceph::coarse_mono_time>> entries;
|
||||
|
||||
public:
|
||||
RGWChainedCacheImpl() : lock("RGWChainedCacheImpl::lock") {}
|
||||
|
||||
void init(RGWRados *store) {
|
||||
store->register_chained_cache(this);
|
||||
expiry = std::chrono::seconds(store->ctx()->_conf->get_val<uint64_t>(
|
||||
"rgw_bucket_info_cache_expiry_interval"));
|
||||
}
|
||||
|
||||
bool find(const string& key, T *entry) {
|
||||
RWLock::RLocker rl(lock);
|
||||
typename map<string, T>::iterator iter = entries.find(key);
|
||||
auto iter = entries.find(key);
|
||||
if (iter == entries.end()) {
|
||||
return false;
|
||||
}
|
||||
if (expiry.count() &&
|
||||
(ceph::coarse_mono_clock::now() - iter->second.second) > expiry) {
|
||||
return false;
|
||||
}
|
||||
|
||||
*entry = iter->second;
|
||||
*entry = iter->second.first;
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -3768,7 +3795,10 @@ public:
|
||||
void chain_cb(const string& key, void *data) override {
|
||||
T *entry = static_cast<T *>(data);
|
||||
RWLock::WLocker wl(lock);
|
||||
entries[key] = *entry;
|
||||
entries[key].first = *entry;
|
||||
if (expiry.count() > 0) {
|
||||
entries[key].second = ceph::coarse_mono_clock::now();
|
||||
}
|
||||
}
|
||||
|
||||
void invalidate(const string& key) override {
|
||||
|
Loading…
Reference in New Issue
Block a user