rgw: svc_cache: more work

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
This commit is contained in:
Yehuda Sadeh 2018-09-06 01:16:01 -07:00
parent 074936b5c2
commit 771eafc973
9 changed files with 373 additions and 654 deletions

View File

@ -195,460 +195,4 @@ public:
void invalidate_all();
};
template <class T>
class RGWCache : public T
{
ObjectCache cache;
int list_objects_raw_init(rgw_pool& pool, RGWAccessHandle *handle) {
return T::list_objects_raw_init(pool, handle);
}
int list_objects_raw_next(rgw_bucket_dir_entry& obj, RGWAccessHandle *handle) {
return T::list_objects_raw_next(obj, handle);
}
string normal_name(rgw_pool& pool, const std::string& oid) {
std::string buf;
buf.reserve(pool.name.size() + pool.ns.size() + oid.size() + 2);
buf.append(pool.name).append("+").append(pool.ns).append("+").append(oid);
return buf;
}
void normalize_pool_and_obj(rgw_pool& src_pool, const string& src_obj, rgw_pool& dst_pool, string& dst_obj);
int init_rados() override {
int ret;
cache.set_ctx(T::cct);
ret = T::init_rados();
if (ret < 0)
return ret;
return 0;
}
bool need_watch_notify() override {
return true;
}
int distribute_cache(const string& normal_name, rgw_raw_obj& obj, ObjectCacheInfo& obj_info, int op);
int watch_cb(uint64_t notify_id,
uint64_t cookie,
uint64_t notifier_id,
bufferlist& bl) override;
void set_cache_enabled(bool state) override {
cache.set_enabled(state);
}
public:
RGWCache() {}
void register_chained_cache(RGWChainedCache *cc) override {
cache.chain_cache(cc);
}
int system_obj_set_attrs(void *ctx, rgw_raw_obj& obj,
map<string, bufferlist>& attrs,
map<string, bufferlist>* rmattrs,
RGWObjVersionTracker *objv_tracker);
int put_system_obj_impl(rgw_raw_obj& obj, uint64_t size, real_time *mtime,
map<std::string, bufferlist>& attrs, int flags,
const bufferlist& data,
RGWObjVersionTracker *objv_tracker,
real_time set_mtime) override;
int put_system_obj_data(void *ctx, rgw_raw_obj& obj, const bufferlist& bl, off_t ofs, bool exclusive,
RGWObjVersionTracker *objv_tracker = nullptr) override;
int get_system_obj(RGWObjectCtx& obj_ctx, RGWRados::SystemObject::Read::GetObjState& read_state,
RGWObjVersionTracker *objv_tracker, rgw_raw_obj& obj,
bufferlist& bl, off_t ofs, off_t end,
map<string, bufferlist> *attrs,
rgw_cache_entry_info *cache_info,
boost::optional<obj_version> refresh_version = boost::none) override;
int raw_obj_stat(rgw_raw_obj& obj, uint64_t *psize, real_time *pmtime, uint64_t *epoch, map<string, bufferlist> *attrs,
bufferlist *first_chunk, RGWObjVersionTracker *objv_tracker) override;
int delete_system_obj(rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker) override;
bool chain_cache_entry(std::initializer_list<rgw_cache_entry_info *> cache_info_entries, RGWChainedCache::Entry *chained_entry) override {
return cache.chain_cache_entry(cache_info_entries, chained_entry);
}
void call_list(const std::optional<std::string>& filter,
Formatter* format) override;
bool call_inspect(const std::string& target, Formatter* format) override;
bool call_erase(const std::string& target) override;
void call_zap() override;
};
template <class T>
void RGWCache<T>::normalize_pool_and_obj(rgw_pool& src_pool, const string& src_obj, rgw_pool& dst_pool, string& dst_obj)
{
if (src_obj.size()) {
dst_pool = src_pool;
dst_obj = src_obj;
} else {
dst_pool = T::get_zone_params().domain_root;
dst_obj = src_pool.name;
}
}
template <class T>
int RGWCache<T>::delete_system_obj(rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker)
{
rgw_pool pool;
string oid;
normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
string name = normal_name(pool, oid);
cache.remove(name);
ObjectCacheInfo info;
distribute_cache(name, obj, info, REMOVE_OBJ);
return T::delete_system_obj(obj, objv_tracker);
}
template <class T>
int RGWCache<T>::get_system_obj(RGWObjectCtx& obj_ctx, RGWRados::SystemObject::Read::GetObjState& read_state,
RGWObjVersionTracker *objv_tracker, rgw_raw_obj& obj,
bufferlist& obl, off_t ofs, off_t end,
map<string, bufferlist> *attrs,
rgw_cache_entry_info *cache_info,
boost::optional<obj_version> refresh_version)
{
rgw_pool pool;
string oid;
normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
if (ofs != 0)
return T::get_system_obj(obj_ctx, read_state, objv_tracker, obj, obl, ofs, end, attrs, cache_info);
string name = normal_name(pool, oid);
ObjectCacheInfo info;
uint32_t flags = CACHE_FLAG_DATA;
if (objv_tracker)
flags |= CACHE_FLAG_OBJV;
if (attrs)
flags |= CACHE_FLAG_XATTRS;
if ((cache.get(name, info, flags, cache_info) == 0) &&
(!refresh_version || !info.version.compare(&(*refresh_version)))) {
if (info.status < 0)
return info.status;
bufferlist& bl = info.data;
bufferlist::iterator i = bl.begin();
obl.clear();
i.copy_all(obl);
if (objv_tracker)
objv_tracker->read_version = info.version;
if (attrs)
*attrs = info.xattrs;
return bl.length();
}
int r = T::get_system_obj(obj_ctx, read_state, objv_tracker, obj, obl, ofs, end, attrs, cache_info);
if (r < 0) {
if (r == -ENOENT) { // only update ENOENT, we'd rather retry other errors
info.status = r;
cache.put(name, info, cache_info);
}
return r;
}
if (obl.length() == end + 1) {
/* in this case, most likely object contains more data, we can't cache it */
return r;
}
bufferptr p(r);
bufferlist& bl = info.data;
bl.clear();
bufferlist::iterator o = obl.begin();
o.copy_all(bl);
info.status = 0;
info.flags = flags;
if (objv_tracker) {
info.version = objv_tracker->read_version;
}
if (attrs) {
info.xattrs = *attrs;
}
cache.put(name, info, cache_info);
return r;
}
template <class T>
int RGWCache<T>::system_obj_set_attrs(void *ctx, rgw_raw_obj& obj,
map<string, bufferlist>& attrs,
map<string, bufferlist>* rmattrs,
RGWObjVersionTracker *objv_tracker)
{
rgw_pool pool;
string oid;
normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
ObjectCacheInfo info;
info.xattrs = attrs;
if (rmattrs)
info.rm_xattrs = *rmattrs;
info.status = 0;
info.flags = CACHE_FLAG_MODIFY_XATTRS;
if (objv_tracker) {
info.version = objv_tracker->write_version;
info.flags |= CACHE_FLAG_OBJV;
}
int ret = T::system_obj_set_attrs(ctx, obj, attrs, rmattrs, objv_tracker);
string name = normal_name(pool, oid);
if (ret >= 0) {
cache.put(name, info, NULL);
int r = distribute_cache(name, obj, info, UPDATE_OBJ);
if (r < 0)
mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl;
} else {
cache.remove(name);
}
return ret;
}
template <class T>
int RGWCache<T>::put_system_obj_impl(rgw_raw_obj& obj, uint64_t size, real_time *mtime,
map<std::string, bufferlist>& attrs, int flags,
const bufferlist& data,
RGWObjVersionTracker *objv_tracker,
real_time set_mtime)
{
rgw_pool pool;
string oid;
normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
ObjectCacheInfo info;
info.xattrs = attrs;
info.status = 0;
info.data = data;
info.flags = CACHE_FLAG_XATTRS | CACHE_FLAG_DATA | CACHE_FLAG_META;
if (objv_tracker) {
info.version = objv_tracker->write_version;
info.flags |= CACHE_FLAG_OBJV;
}
ceph::real_time result_mtime;
int ret = T::put_system_obj_impl(obj, size, &result_mtime, attrs, flags, data,
objv_tracker, set_mtime);
if (mtime) {
*mtime = result_mtime;
}
info.meta.mtime = result_mtime;
info.meta.size = size;
string name = normal_name(pool, oid);
if (ret >= 0) {
cache.put(name, info, NULL);
// Only distribute the cache information if we did not just create
// the object with the exclusive flag. Note: PUT_OBJ_EXCL implies
// PUT_OBJ_CREATE. Generally speaking, when successfully creating
// a system object with the exclusive flag it is not necessary to
// call distribute_cache, as a) it's unclear whether other RGWs
// will need that system object in the near-term and b) it
// generates additional network traffic.
if (!(flags & PUT_OBJ_EXCL)) {
int r = distribute_cache(name, obj, info, UPDATE_OBJ);
if (r < 0)
mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl;
}
} else {
cache.remove(name);
}
return ret;
}
template <class T>
int RGWCache<T>::put_system_obj_data(void *ctx, rgw_raw_obj& obj, const bufferlist& data, off_t ofs, bool exclusive,
RGWObjVersionTracker *objv_tracker)
{
rgw_pool pool;
string oid;
normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
ObjectCacheInfo info;
bool cacheable = false;
if ((ofs == 0) || (ofs == -1)) {
cacheable = true;
info.data = data;
info.meta.size = data.length();
info.status = 0;
info.flags = CACHE_FLAG_DATA;
}
if (objv_tracker) {
info.version = objv_tracker->write_version;
info.flags |= CACHE_FLAG_OBJV;
}
int ret = T::put_system_obj_data(ctx, obj, data, ofs, exclusive, objv_tracker);
if (cacheable) {
string name = normal_name(pool, oid);
if (ret >= 0) {
cache.put(name, info, NULL);
int r = distribute_cache(name, obj, info, UPDATE_OBJ);
if (r < 0)
mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl;
} else {
cache.remove(name);
}
}
return ret;
}
template <class T>
int RGWCache<T>::raw_obj_stat(rgw_raw_obj& obj, uint64_t *psize, real_time *pmtime,
uint64_t *pepoch, map<string, bufferlist> *attrs,
bufferlist *first_chunk, RGWObjVersionTracker *objv_tracker)
{
rgw_pool pool;
string oid;
normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
string name = normal_name(pool, oid);
uint64_t size;
real_time mtime;
uint64_t epoch;
ObjectCacheInfo info;
uint32_t flags = CACHE_FLAG_META | CACHE_FLAG_XATTRS;
if (objv_tracker)
flags |= CACHE_FLAG_OBJV;
int r = cache.get(name, info, flags, NULL);
if (r == 0) {
if (info.status < 0)
return info.status;
size = info.meta.size;
mtime = info.meta.mtime;
epoch = info.epoch;
if (objv_tracker)
objv_tracker->read_version = info.version;
goto done;
}
r = T::raw_obj_stat(obj, &size, &mtime, &epoch, &info.xattrs, first_chunk, objv_tracker);
if (r < 0) {
if (r == -ENOENT) {
info.status = r;
cache.put(name, info, NULL);
}
return r;
}
info.status = 0;
info.epoch = epoch;
info.meta.mtime = mtime;
info.meta.size = size;
info.flags = CACHE_FLAG_META | CACHE_FLAG_XATTRS;
if (objv_tracker) {
info.flags |= CACHE_FLAG_OBJV;
info.version = objv_tracker->read_version;
}
cache.put(name, info, NULL);
done:
if (psize)
*psize = size;
if (pmtime)
*pmtime = mtime;
if (pepoch)
*pepoch = epoch;
if (attrs)
*attrs = info.xattrs;
return 0;
}
template <class T>
int RGWCache<T>::distribute_cache(const string& normal_name, rgw_raw_obj& obj, ObjectCacheInfo& obj_info, int op)
{
RGWCacheNotifyInfo info;
info.op = op;
info.obj_info = obj_info;
info.obj = obj;
bufferlist bl;
encode(info, bl);
return T::distribute(normal_name, bl);
}
template <class T>
int RGWCache<T>::watch_cb(uint64_t notify_id,
uint64_t cookie,
uint64_t notifier_id,
bufferlist& bl)
{
RGWCacheNotifyInfo info;
try {
auto iter = bl.cbegin();
decode(info, iter);
} catch (buffer::end_of_buffer& err) {
mydout(0) << "ERROR: got bad notification" << dendl;
return -EIO;
} catch (buffer::error& err) {
mydout(0) << "ERROR: buffer::error" << dendl;
return -EIO;
}
rgw_pool pool;
string oid;
normalize_pool_and_obj(info.obj.pool, info.obj.oid, pool, oid);
string name = normal_name(pool, oid);
switch (info.op) {
case UPDATE_OBJ:
cache.put(name, info.obj_info, NULL);
break;
case REMOVE_OBJ:
cache.remove(name);
break;
default:
mydout(0) << "WARNING: got unknown notification op: " << info.op << dendl;
return -EINVAL;
}
return 0;
}
template<typename T>
void RGWCache<T>::call_list(const std::optional<std::string>& filter,
Formatter* f)
{
cache.for_each(
[this, &filter, f] (const string& name, const ObjectCacheEntry& entry) {
if (!filter || name.find(*filter) != name.npos) {
T::cache_list_dump_helper(f, name, entry.info.meta.mtime,
entry.info.meta.size);
}
});
}
template<typename T>
bool RGWCache<T>::call_inspect(const std::string& target, Formatter* f)
{
if (const auto entry = cache.get(target)) {
f->open_object_section("cache_entry");
f->dump_string("name", target.c_str());
entry->dump(f);
f->close_section();
return true;
} else {
return false;
}
}
template<typename T>
bool RGWCache<T>::call_erase(const std::string& target)
{
return cache.remove(target);
}
template<typename T>
void RGWCache<T>::call_zap()
{
cache.invalidate_all();
}
#endif

View File

@ -1361,9 +1361,6 @@ void RGWRados::finalize()
if (finisher) {
finisher->stop();
}
if (need_watch_notify()) {
finalize_watch();
}
if (finisher) {
/* delete finisher only after cleaning up watches, as watch error path might call
* into finisher. We stop finisher before finalizing watch to make sure we don't
@ -8998,113 +8995,6 @@ int RGWRados::append_async(rgw_raw_obj& obj, size_t size, bufferlist& bl)
return r;
}
int RGWRados::distribute(const string& key, bufferlist& bl)
{
RGWSI_RADOS::Obj notify_obj = pick_control_obj(key);
ldout(cct, 10) << "distributing notification oid=" << notify_oid << " bl.length()=" << bl.length() << dendl;
return robust_notify(notify_oid, bl);
}
int RGWSI_Notify::robust_notify(RGWSI_RADOS::Obj& notify_obj, bufferlist& bl)
{
// The reply of every machine that acks goes in here.
boost::container::flat_set<std::pair<uint64_t, uint64_t>> acks;
bufferlist rbl;
// First, try to send, without being fancy about it.
auto r = notify_obj.notify(bl, 0, &rbl);
// If that doesn't work, get serious.
if (r < 0) {
ldout(cct, 1) << "robust_notify: If at first you don't succeed: "
<< cpp_strerror(-r) << dendl;
auto p = rbl.cbegin();
// Gather up the replies to the first attempt.
try {
uint32_t num_acks;
decode(num_acks, p);
// Doing this ourselves since we don't care about the payload;
for (auto i = 0u; i < num_acks; ++i) {
std::pair<uint64_t, uint64_t> id;
decode(id, p);
acks.insert(id);
ldout(cct, 20) << "robust_notify: acked by " << id << dendl;
uint32_t blen;
decode(blen, p);
p.advance(blen);
}
} catch (const buffer::error& e) {
ldout(cct, 0) << "robust_notify: notify response parse failed: "
<< e.what() << dendl;
acks.clear(); // Throw away junk on failed parse.
}
// Every machine that fails to reply and hasn't acked a previous
// attempt goes in here.
boost::container::flat_set<std::pair<uint64_t, uint64_t>> timeouts;
auto tries = 1u;
while (r < 0 && tries < max_notify_retries) {
++tries;
rbl.clear();
// Reset the timeouts, we're only concerned with new ones.
timeouts.clear();
r = notify_obj.notify(bl, 0, &rbl);
if (r < 0) {
ldout(cct, 1) << "robust_notify: retry " << tries << " failed: "
<< cpp_strerror(-r) << dendl;
p = rbl.begin();
try {
uint32_t num_acks;
decode(num_acks, p);
// Not only do we not care about the payload, but we don't
// want to empty the container; we just want to augment it
// with any new members.
for (auto i = 0u; i < num_acks; ++i) {
std::pair<uint64_t, uint64_t> id;
decode(id, p);
auto ir = acks.insert(id);
if (ir.second) {
ldout(cct, 20) << "robust_notify: acked by " << id << dendl;
}
uint32_t blen;
decode(blen, p);
p.advance(blen);
}
uint32_t num_timeouts;
decode(num_timeouts, p);
for (auto i = 0u; i < num_timeouts; ++i) {
std::pair<uint64_t, uint64_t> id;
decode(id, p);
// Only track timeouts from hosts that haven't acked previously.
if (acks.find(id) != acks.cend()) {
ldout(cct, 20) << "robust_notify: " << id << " timed out."
<< dendl;
timeouts.insert(id);
}
}
} catch (const buffer::error& e) {
ldout(cct, 0) << "robust_notify: notify response parse failed: "
<< e.what() << dendl;
continue;
}
// If we got a good parse and timeouts is empty, that means
// everyone who timed out in one call received the update in a
// previous one.
if (timeouts.empty()) {
r = 0;
}
}
}
}
return r;
}
int RGWRados::pool_iterate_begin(const rgw_pool& pool, RGWPoolIterCtx& ctx)
{
librados::IoCtx& io_ctx = ctx.io_ctx;
@ -10751,23 +10641,35 @@ bool RGWRados::call(std::string_view command, const cmdmap_t& cmdmap,
return false;
}
void RGWRados::call_list(const std::optional<std::string>&,
ceph::Formatter*)
void RGWRados::call_list(const std::optional<std::string>& s,
ceph::Formatter *f)
{
return;
if (!svc.cache) {
return;
}
svc.cache->call_list(s, f);
}
bool RGWRados::call_inspect(const std::string&, Formatter*)
bool RGWRados::call_inspect(const std::string& s, Formatter *f)
{
return false;
if (!svc.cache) {
return false;
}
return svc.cache->call_inspec(s, f);
}
bool RGWRados::call_erase(const std::string&) {
return false;
bool RGWRados::call_erase(const std::string& s) {
if (!svc.cache) {
return false;
}
return svc.cache->call_erase(s);
}
void RGWRados::call_zap() {
return;
if (svc.cache) {
return;
}
svc.cache->call_zap();
}
string RGWRados::get_mfa_oid(const rgw_user& user)

View File

@ -2212,26 +2212,7 @@ public:
int update_containers_stats(map<string, RGWBucketEnt>& m);
int append_async(rgw_raw_obj& obj, size_t size, bufferlist& bl);
int watch(const string& oid, uint64_t *watch_handle, librados::WatchCtx2 *ctx);
int aio_watch(const string& oid, uint64_t *watch_handle, librados::WatchCtx2 *ctx, librados::AioCompletion *c);
int unwatch(uint64_t watch_handle);
void add_watcher(int i);
void remove_watcher(int i);
virtual bool need_watch_notify() { return false; }
int init_watch();
void finalize_watch();
int distribute(const string& key, bufferlist& bl);
private:
int robust_notify(const string& notify_oid, bufferlist& bl);
public:
virtual int watch_cb(uint64_t notify_id,
uint64_t cookie,
uint64_t notifier_id,
bufferlist& bl) { return 0; }
void pick_control_oid(const string& key, string& notify_oid);
virtual void set_cache_enabled(bool state) {}
void set_atomic(void *ctx, rgw_obj& obj) {
RGWObjectCtx *rctx = static_cast<RGWObjectCtx *>(ctx);
rctx->obj.set_atomic(obj);
@ -2285,15 +2266,6 @@ public:
bufferlist& out) override final;
protected:
void cache_list_dump_helper(Formatter* f,
const std::string& name,
const ceph::real_time mtime,
const std::uint64_t size) {
f->dump_string("name", name);
f->dump_string("mtime", ceph::to_iso_8601(mtime));
f->dump_unsigned("size", size);
}
// `call_list` must iterate over all cache entries and call
// `cache_list_dump_helper` with the supplied Formatter on any that
// include `filter` as a substring.

View File

@ -192,7 +192,7 @@ int RGWSI_Notify::init_watch()
notify_oid = notify_oid_prefix;
}
notify_objs[i] = rados_svc->obj({control_pool, notify_oid});
notify_objs[i] = rados_svc->handle(0).obj({control_pool, notify_oid});
auto& notify_obj = notify_objs[i];
librados::ObjectWriteOperation op;
@ -257,22 +257,6 @@ void RGWSI_Notify::shutdown()
finalize_watch();
}
int RGWSI_Notify::watch(RGWSI_RADOS::Obj& obj, uint64_t *watch_handle, librados::WatchCtx2 *ctx)
{
int r = obj.watch(watch_handle, ctx);
if (r < 0)
return r;
return 0;
}
int RGWSI_Notify::aio_watch(RGWSI_RADOS::Obj& obj, uint64_t *watch_handle, librados::WatchCtx2 *ctx, librados::AioCompletion *c)
{
int r = obj.aio_watch(c, watch_handle, ctx, 0);
if (r < 0)
return r;
return 0;
}
int RGWSI_Notify::unwatch(RGWSI_RADOS::Obj& obj, uint64_t watch_handle)
{
int r = obj.unwatch(watch_handle);
@ -280,7 +264,7 @@ int RGWSI_Notify::unwatch(RGWSI_RADOS::Obj& obj, uint64_t watch_handle)
ldout(cct, 0) << "ERROR: rados->unwatch2() returned r=" << r << dendl;
return r;
}
r = rados[0].watch_flush();
r = rados_svc->handle(0).watch_flush();
if (r < 0) {
ldout(cct, 0) << "ERROR: rados->watch_flush() returned r=" << r << dendl;
return r;
@ -291,30 +275,156 @@ int RGWSI_Notify::unwatch(RGWSI_RADOS::Obj& obj, uint64_t watch_handle)
void RGWSI_Notify::add_watcher(int i)
{
ldout(cct, 20) << "add_watcher() i=" << i << dendl;
Mutex::Locker l(watchers_lock);
RWLock::WLocker l(watchers_lock);
watchers_set.insert(i);
if (watchers_set.size() == (size_t)num_watchers) {
ldout(cct, 2) << "all " << num_watchers << " watchers are set, enabling cache" << dendl;
#warning fixme
#if 0
set_cache_enabled(true);
#endif
set_enabled(true);
}
}
void RGWSI_Notify::remove_watcher(int i)
{
ldout(cct, 20) << "remove_watcher() i=" << i << dendl;
Mutex::Locker l(watchers_lock);
RWLock::WLocker l(watchers_lock);
size_t orig_size = watchers_set.size();
watchers_set.erase(i);
if (orig_size == (size_t)num_watchers &&
watchers_set.size() < orig_size) { /* actually removed */
ldout(cct, 2) << "removed watcher, disabling cache" << dendl;
#warning fixme
#if 0
set_cache_enabled(false);
#endif
set_enabled(false);
}
}
int RGWSI_Notify::watch_cb(uint64_t notify_id,
uint64_t cookie,
uint64_t notifier_id,
bufferlist& bl)
{
RWLock::RLocker l(watchers_lock);
if (cb) {
return cb->watch_cb(notify_id, cookie, notifier_id, bl);
}
return 0;
}
void RGWSI_Notify::set_enabled(bool status)
{
RWLock::RLocker l(watchers_lock);
if (cb) {
cb->set_enabled(status);
}
}
int RGWSI_Notify::distribute(const string& key, bufferlist& bl)
{
RGWSI_RADOS::Obj notify_obj = pick_control_obj(key);
ldout(cct, 10) << "distributing notification oid=" << notify_obj.get_ref().oid << " bl.length()=" << bl.length() << dendl;
return robust_notify(notify_obj, bl);
}
int RGWSI_Notify::robust_notify(RGWSI_RADOS::Obj& notify_obj, bufferlist& bl)
{
// The reply of every machine that acks goes in here.
boost::container::flat_set<std::pair<uint64_t, uint64_t>> acks;
bufferlist rbl;
// First, try to send, without being fancy about it.
auto r = notify_obj.notify(bl, 0, &rbl);
// If that doesn't work, get serious.
if (r < 0) {
ldout(cct, 1) << "robust_notify: If at first you don't succeed: "
<< cpp_strerror(-r) << dendl;
auto p = rbl.cbegin();
// Gather up the replies to the first attempt.
try {
uint32_t num_acks;
decode(num_acks, p);
// Doing this ourselves since we don't care about the payload;
for (auto i = 0u; i < num_acks; ++i) {
std::pair<uint64_t, uint64_t> id;
decode(id, p);
acks.insert(id);
ldout(cct, 20) << "robust_notify: acked by " << id << dendl;
uint32_t blen;
decode(blen, p);
p.advance(blen);
}
} catch (const buffer::error& e) {
ldout(cct, 0) << "robust_notify: notify response parse failed: "
<< e.what() << dendl;
acks.clear(); // Throw away junk on failed parse.
}
// Every machine that fails to reply and hasn't acked a previous
// attempt goes in here.
boost::container::flat_set<std::pair<uint64_t, uint64_t>> timeouts;
auto tries = 1u;
while (r < 0 && tries < max_notify_retries) {
++tries;
rbl.clear();
// Reset the timeouts, we're only concerned with new ones.
timeouts.clear();
r = notify_obj.notify(bl, 0, &rbl);
if (r < 0) {
ldout(cct, 1) << "robust_notify: retry " << tries << " failed: "
<< cpp_strerror(-r) << dendl;
p = rbl.begin();
try {
uint32_t num_acks;
decode(num_acks, p);
// Not only do we not care about the payload, but we don't
// want to empty the container; we just want to augment it
// with any new members.
for (auto i = 0u; i < num_acks; ++i) {
std::pair<uint64_t, uint64_t> id;
decode(id, p);
auto ir = acks.insert(id);
if (ir.second) {
ldout(cct, 20) << "robust_notify: acked by " << id << dendl;
}
uint32_t blen;
decode(blen, p);
p.advance(blen);
}
uint32_t num_timeouts;
decode(num_timeouts, p);
for (auto i = 0u; i < num_timeouts; ++i) {
std::pair<uint64_t, uint64_t> id;
decode(id, p);
// Only track timeouts from hosts that haven't acked previously.
if (acks.find(id) != acks.cend()) {
ldout(cct, 20) << "robust_notify: " << id << " timed out."
<< dendl;
timeouts.insert(id);
}
}
} catch (const buffer::error& e) {
ldout(cct, 0) << "robust_notify: notify response parse failed: "
<< e.what() << dendl;
continue;
}
// If we got a good parse and timeouts is empty, that means
// everyone who timed out in one call received the update in a
// previous one.
if (timeouts.empty()) {
r = 0;
}
}
}
}
return r;
}
void RGWSI_Notify::register_watch_cb(CB *_cb)
{
RWLock::WLocker l(watchers_lock);
cb = _cb;
}

View File

@ -21,13 +21,16 @@ public:
class RGWSI_Notify : public RGWServiceInstance
{
public:
class CB;
private:
std::shared_ptr<RGWSI_Zone> zone_svc;
std::shared_ptr<RGWSI_RADOS> rados_svc;
std::map<std::string, RGWServiceInstance::dependency> get_deps() override;
int load(const std::string& conf, std::map<std::string, RGWServiceInstanceRef>& dep_refs) override;
Mutex watchers_lock{"watchers_lock"};
RWLock watchers_lock{"watchers_lock"};
rgw_pool control_pool;
int num_watchers{0};
@ -43,20 +46,41 @@ class RGWSI_Notify : public RGWServiceInstance
string get_control_oid(int i);
RGWSI_RADOS::Obj pick_control_obj(const string& key);
CB *cb{nullptr};
int init_watch();
void finalize_watch();
int init() override;
void shutdown() override;
int watch(RGWSI_RADOS::Obj& obj, uint64_t *watch_handle, librados::WatchCtx2 *ctx);
int aio_watch(const string& oid, uint64_t *watch_handle, librados::WatchCtx2 *ctx, librados::AioCompletion *c);
int unwatch(RGWSI_RADOS::Obj& obj, uint64_t watch_handle);
void add_watcher(int i);
void remove_watcher(int i);
int watch_cb(uint64_t notify_id,
uint64_t cookie,
uint64_t notifier_id,
bufferlist& bl);
void set_enabled(bool status);
int robust_notify(RGWSI_RADOS::Obj& notify_obj, bufferlist& bl);
public:
RGWSI_Notify(RGWService *svc, CephContext *cct): RGWServiceInstance(svc, cct) {}
class CB {
public:
virtual ~CB() {}
virtual int watch_cb(uint64_t notify_id,
uint64_t cookie,
uint64_t notifier_id,
bufferlist& bl) = 0;
virtual void set_enabled(bool status) = 0;
};
int distribute(const string& key, bufferlist& bl);
void register_watch_cb(CB *cb);
};
#endif

View File

@ -64,11 +64,19 @@ int RGWSI_RADOS::load(const string& conf, map<string, RGWServiceInstanceRef>& de
return 0;
}
librados::Rados* RGWSI_RADOS::get_rados_handle()
librados::Rados* RGWSI_RADOS::get_rados_handle(int rados_handle)
{
if (rados.size() == 1) {
return &rados[0];
}
if (rados_handle >= 0) {
if (rados_handle >= (int)rados.size()) {
rados_handle = 0;
}
return &rados[rados_handle];
}
handle_lock.get_read();
pthread_t id = pthread_self();
std::map<pthread_t, int>:: iterator it = rados_map.find(id);
@ -90,13 +98,13 @@ librados::Rados* RGWSI_RADOS::get_rados_handle()
uint64_t RGWSI_RADOS::instance_id()
{
return get_rados_handle()->get_instance_id();
return get_rados_handle(-1)->get_instance_id();
}
int RGWSI_RADOS::open_pool_ctx(const rgw_pool& pool, librados::IoCtx& io_ctx)
int RGWSI_RADOS::open_pool_ctx(const rgw_pool& pool, librados::IoCtx& io_ctx, int rados_handle)
{
constexpr bool create = true; // create the pool if it doesn't exist
return init_ioctx(cct, get_rados_handle(), pool, io_ctx, create);
return init_ioctx(cct, get_rados_handle(rados_handle), pool, io_ctx, create);
}
int RGWSI_RADOS::pool_iterate(librados::IoCtx& io_ctx,
@ -138,7 +146,7 @@ void RGWSI_RADOS::Obj::init(const rgw_raw_obj& obj)
int RGWSI_RADOS::Obj::open()
{
int r = rados_svc->open_pool_ctx(ref.pool, ref.ioctx);
int r = rados_svc->open_pool_ctx(ref.pool, ref.ioctx, rados_handle);
if (r < 0) {
return r;
}
@ -168,7 +176,7 @@ int RGWSI_RADOS::Obj::watch(uint64_t *handle, librados::WatchCtx2 *ctx)
return ref.ioctx.watch2(ref.oid, handle, ctx);
}
int RGWSI_RADOS::Obj::aio_watch(AioCompletion *c, uint64_t *handle, librados::WatchCtx2 *ctx)
int RGWSI_RADOS::Obj::aio_watch(librados::AioCompletion *c, uint64_t *handle, librados::WatchCtx2 *ctx)
{
return ref.ioctx.aio_watch(ref.oid, c, handle, ctx);
}
@ -178,6 +186,20 @@ int RGWSI_RADOS::Obj::unwatch(uint64_t handle)
return ref.ioctx.unwatch2(handle);
}
int RGWSI_RADOS::Obj::notify(bufferlist& bl,
uint64_t timeout_ms,
bufferlist *pbl)
{
return ref.ioctx.notify2(ref.oid, bl, timeout_ms, pbl);
}
void RGWSI_RADOS::Obj::notify_ack(uint64_t notify_id,
uint64_t cookie,
bufferlist& bl)
{
ref.ioctx.notify_ack(ref.oid, notify_id, cookie, bl);
}
uint64_t RGWSI_RADOS::Obj::get_last_version()
{
return ref.ioctx.get_last_version();
@ -188,7 +210,7 @@ int RGWSI_RADOS::Pool::create(const vector<rgw_pool>& pools, vector<int> *retcod
vector<librados::PoolAsyncCompletion *> completions;
vector<int> rets;
librados::Rados *rad = rados_svc->get_rados_handle();
librados::Rados *rad = rados_svc->get_rados_handle(rados_handle);
for (auto iter = pools.begin(); iter != pools.end(); ++iter) {
librados::PoolAsyncCompletion *c = librados::Rados::pool_async_create_completion();
completions.push_back(c);
@ -264,7 +286,7 @@ int RGWSI_RADOS::Pool::create(const vector<rgw_pool>& pools, vector<int> *retcod
int RGWSI_RADOS::Pool::lookup(const rgw_pool& pool)
{
librados::Rados *rad = rados_svc->get_rados_handle();
librados::Rados *rad = rados_svc->get_rados_handle(rados_handle);
int ret = rad->pool_lookup(pool.name.c_str());
if (ret < 0) {
return ret;
@ -279,7 +301,7 @@ int RGWSI_RADOS::Pool::List::init(const string& marker, RGWAccessListFilter *fil
return -EINVAL;
}
int r = pool.rados_svc->open_pool_ctx(pool.pool, ctx.ioctx);
int r = pool.rados_svc->open_pool_ctx(pool.pool, ctx.ioctx, pool.rados_handle);
if (r < 0) {
return r;
}
@ -321,3 +343,9 @@ int RGWSI_RADOS::Pool::List::get_next(int max,
return oids->size();
}
int RGWSI_RADOS::Handle::watch_flush()
{
librados::Rados *rad = rados_svc->get_rados_handle(rados_handle);
return rad->watch_flush();
}

View File

@ -46,8 +46,8 @@ class RGWSI_RADOS : public RGWServiceInstance
int load(const string& conf, std::map<std::string, RGWServiceInstanceRef>& deps) override;
librados::Rados* get_rados_handle();
int open_pool_ctx(const rgw_pool& pool, librados::IoCtx& io_ctx);
librados::Rados* get_rados_handle(int rados_handle);
int open_pool_ctx(const rgw_pool& pool, librados::IoCtx& io_ctx, int rados_handle);
int pool_iterate(librados::IoCtx& ioctx,
librados::NObjectIterator& iter,
uint32_t num, vector<rgw_bucket_dir_entry>& objs,
@ -60,28 +60,35 @@ public:
uint64_t instance_id();
class Handle;
class Obj {
friend class RGWSI_RADOS;
friend class Handle;
RGWSI_RADOS *rados_svc{nullptr};
int rados_handle{-1};
rgw_rados_ref ref;
void init(const rgw_raw_obj& obj);
Obj(RGWSI_RADOS *_rados_svc, const rgw_raw_obj& _obj) : rados_svc(_rados_svc) {
Obj(RGWSI_RADOS *_rados_svc, const rgw_raw_obj& _obj, int _rados_handle) : rados_svc(_rados_svc), rados_handle(_rados_handle) {
init(_obj);
}
public:
Obj() {}
Obj(const Obj& o) : rados_svc(o.rados_svc),
rados_handle(o.rados_handle),
ref(o.ref) {}
Obj(Obj&& o) : rados_svc(o.rados_svc),
rados_handle(o.rados_handle),
ref(std::move(o.ref)) {}
Obj& operator=(Obj&& o) {
rados_svc = o.rados_svc;
rados_handle = o.rados_handle;
ref = std::move(o.ref);
return *this;
}
@ -95,23 +102,39 @@ public:
int watch(uint64_t *handle, librados::WatchCtx2 *ctx);
int aio_watch(librados::AioCompletion *c, uint64_t *handle, librados::WatchCtx2 *ctx);
int unwatch(uint64_t handle);
int notify(bufferlist& bl,
uint64_t timeout_ms,
bufferlist *pbl);
void notify_ack(uint64_t notify_id,
uint64_t cookie,
bufferlist& bl);
uint64_t get_last_version();
rgw_rados_ref& get_ref() {
return ref;
}
};
class Pool {
friend class RGWSI_RADOS;
friend class Handle;
RGWSI_RADOS *rados_svc{nullptr};
int rados_handle{-1};
rgw_pool pool;
Pool(RGWSI_RADOS *_rados_svc, const rgw_pool& _pool) : rados_svc(_rados_svc),
pool(_pool) {}
Pool(RGWSI_RADOS *_rados_svc,
const rgw_pool& _pool,
int _rados_handle) : rados_svc(_rados_svc),
rados_handle(_rados_handle),
pool(_pool) {}
Pool(RGWSI_RADOS *_rados_svc) : rados_svc(_rados_svc) {}
public:
Pool() {}
Pool(const Pool& p) : rados_svc(p.rados_svc),
rados_handle(p.rados_handle),
pool(p.pool) {}
int create(const std::vector<rgw_pool>& pools, std::vector<int> *retcodes);
@ -142,8 +165,32 @@ public:
friend class List;
};
class Handle {
friend class RGWSI_RADOS;
RGWSI_RADOS *rados_svc{nullptr};
int rados_handle{-1};
Handle(RGWSI_RADOS *_rados_svc, int _rados_handle) : rados_svc(_rados_svc),
rados_handle(_rados_handle) {}
public:
Obj obj(const rgw_raw_obj& o) {
return Obj(rados_svc, o, rados_handle);
}
Pool pool(const rgw_pool& p) {
return Pool(rados_svc, p, rados_handle);
}
int watch_flush();
};
Handle handle(int rados_handle) {
return Handle(this, rados_handle);
}
Obj obj(const rgw_raw_obj& o) {
return Obj(this, o);
return Obj(this, o, -1);
}
Pool pool() {
@ -151,7 +198,7 @@ public:
}
Pool pool(const rgw_pool& p) {
return Pool(this, p);
return Pool(this, p, -1);
}
friend class Obj;

View File

@ -1,8 +1,52 @@
#include "svc_sys_obj_cache.h"
#include "svc_zone.h"
#include "svc_notify.h"
#define dout_subsys ceph_subsys_rgw
class RGWSI_SysObj_Cache_CB : public RGWSI_Notify::CB
{
RGWSI_SysObj_Cache *svc;
public:
RGWSI_SysObj_Cache_CB(RGWSI_SysObj_Cache *_svc) : svc(_svc) {}
int watch_cb(uint64_t notify_id,
uint64_t cookie,
uint64_t notifier_id,
bufferlist& bl) {
return svc->watch_cb(notify_id, cookie, notifier_id, bl);
}
void set_enabled(bool status) {
svc->set_enabled(status);
}
};
std::map<string, RGWServiceInstance::dependency> RGWSI_SysObj_Cache::get_deps()
{
map<string, RGWServiceInstance::dependency> deps = RGWSI_SysObj_Core::get_deps();
deps["cache_notify_dep"] = { .name = "notify",
.conf = "{}" };
return deps;
}
int RGWSI_SysObj_Cache::load(const string& conf, std::map<std::string, RGWServiceInstanceRef>& dep_refs)
{
int r = RGWSI_SysObj_Core::load(conf, dep_refs);
if (r < 0) {
return r;
}
notify_svc = static_pointer_cast<RGWSI_Notify>(dep_refs["cache_notify_dep"]);
assert(notify_svc);
cb.reset(new RGWSI_SysObj_Cache_CB(this));
notify_svc->register_watch_cb(cb.get());
return 0;
}
static string normal_name(rgw_pool& pool, const std::string& oid) {
std::string buf;
buf.reserve(pool.name.size() + pool.ns.size() + oid.size() + 2);
@ -35,7 +79,10 @@ int RGWSI_SysObj_Cache::remove(RGWSysObjectCtxBase& obj_ctx,
cache.remove(name);
ObjectCacheInfo info;
distribute_cache(name, obj, info, REMOVE_OBJ);
int r = distribute_cache(name, obj, info, REMOVE_OBJ);
if (r < 0) {
ldout(cct, 0) << "ERROR: " << __func__ << "(): failed to distribute cache: r=" << r << dendl;
}
return RGWSI_SysObj_Core::remove(obj_ctx, objv_tracker, obj);
}
@ -212,11 +259,13 @@ int RGWSI_SysObj_Cache::write_data(rgw_raw_obj& obj,
rgw_pool pool;
string oid;
normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
ObjectCacheInfo info;
info.data = data;
info.meta.size = data.length();
info.status = 0;
info.flags = CACHE_FLAG_DATA;
if (objv_tracker) {
info.version = objv_tracker->write_version;
info.flags |= CACHE_FLAG_OBJV;
@ -305,7 +354,7 @@ int RGWSI_SysObj_Cache::distribute_cache(const string& normal_name, rgw_raw_obj&
info.obj = obj;
bufferlist bl;
encode(info, bl);
return T::distribute(normal_name, bl);
return notify_svc->distribute(normal_name, bl);
}
int RGWSI_SysObj_Cache::watch_cb(uint64_t notify_id,
@ -346,13 +395,39 @@ int RGWSI_SysObj_Cache::watch_cb(uint64_t notify_id,
return 0;
}
void RGWSI_SysObj_Cache::set_enabled(bool status)
{
cache.set_enabled(status);
}
bool RGWSI_SysObj_Cache::chain_cache_entry(std::initializer_list<rgw_cache_entry_info *> cache_info_entries,
RGWChainedCache::Entry *chained_entry)
{
return cache.chain_cache_entry(cache_info_entries, chained_entry);
}
void RGWSI_SysObj_Cache::register_chained_cache(RGWChainedCache *cc)
{
cache.chain_cache(cc);
}
static void cache_list_dump_helper(Formatter* f,
const std::string& name,
const ceph::real_time mtime,
const std::uint64_t size)
{
f->dump_string("name", name);
f->dump_string("mtime", ceph::to_iso_8601(mtime));
f->dump_unsigned("size", size);
}
void RGWSI_SysObj_Cache::call_list(const std::optional<std::string>& filter, Formatter* f)
{
cache.for_each(
[this, &filter, f] (const string& name, const ObjectCacheEntry& entry) {
if (!filter || name.find(*filter) != name.npos) {
T::cache_list_dump_helper(f, name, entry.info.meta.mtime,
entry.info.meta.size);
cache_list_dump_helper(f, name, entry.info.meta.mtime,
entry.info.meta.size);
}
});
}
@ -378,4 +453,5 @@ int RGWSI_SysObj_Cache::call_erase(const std::string& target)
int RGWSI_SysObj_Cache::call_zap()
{
cache.invalidate_all();
return 0;
}

View File

@ -8,11 +8,19 @@
#include "svc_sys_obj_core.h"
class RGWSI_Notify;
class RGWSI_SysObj_Cache_CB;
class RGWSI_SysObj_Cache : public RGWSI_SysObj_Core
{
friend class RGWSI_SysObj_Cache_CB;
std::shared_ptr<RGWSI_Notify> notify_svc;
ObjectCache cache;
std::shared_ptr<RGWSI_SysObj_Cache_CB> cb;
void normalize_pool_and_obj(rgw_pool& src_pool, const string& src_obj, rgw_pool& dst_pool, string& dst_obj);
protected:
std::map<std::string, RGWServiceInstance::dependency> get_deps() override;
@ -62,8 +70,16 @@ protected:
uint64_t notifier_id,
bufferlist& bl);
void set_enabled(bool status);
public:
RGWSI_SysObj_Cache(RGWService *svc, CephContext *cct) : RGWSI_SysObj_Core(svc, cct) {}
RGWSI_SysObj_Cache(RGWService *svc, CephContext *cct) : RGWSI_SysObj_Core(svc, cct) {
cache.set_ctx(cct);
}
bool chain_cache_entry(std::initializer_list<rgw_cache_entry_info *> cache_info_entries,
RGWChainedCache::Entry *chained_entry);
void register_chained_cache(RGWChainedCache *cc);
void call_list(const std::optional<std::string>& filter, Formatter* f);
int call_inspect(const std::string& target, Formatter* f);