rgw: add BucketTrimWatcher to serve watch/notify apis

Signed-off-by: Casey Bodley <cbodley@redhat.com>
This commit is contained in:
Casey Bodley 2017-09-01 11:26:01 -04:00
parent b4249cc432
commit f96d9a8e22
2 changed files with 141 additions and 2 deletions

View File

@ -14,9 +14,13 @@
*/
#include <mutex>
#include <boost/container/flat_map.hpp>
#include "common/bounded_key_counter.h"
#include "common/errno.h"
#include "rgw_sync_log_trim.h"
#include "rgw_rados.h"
#include "include/assert.h"
#define dout_subsys ceph_subsys_rgw
@ -26,6 +30,127 @@
using rgw::BucketTrimConfig;
using BucketChangeCounter = BoundedKeyCounter<std::string, int>;
// watch/notify api for gateways to coordinate about which buckets to trim
enum TrimNotifyType {
};
WRITE_RAW_ENCODER(TrimNotifyType);
struct TrimNotifyHandler {
virtual ~TrimNotifyHandler() = default;
virtual void handle(bufferlist::iterator& input, bufferlist& output) = 0;
};
/// rados watcher for bucket trim notifications
class BucketTrimWatcher : public librados::WatchCtx2 {
RGWRados *const store;
const rgw_raw_obj& obj;
rgw_rados_ref ref;
uint64_t handle{0};
using HandlerPtr = std::unique_ptr<TrimNotifyHandler>;
boost::container::flat_map<TrimNotifyType, HandlerPtr> handlers;
public:
BucketTrimWatcher(RGWRados *store, const rgw_raw_obj& obj)
: store(store), obj(obj)
{
}
~BucketTrimWatcher()
{
stop();
}
int start()
{
int r = store->get_raw_obj_ref(obj, &ref);
if (r < 0) {
return r;
}
// register a watch on the realm's control object
r = ref.ioctx.watch2(ref.oid, &handle, this);
if (r == -ENOENT) {
constexpr bool exclusive = true;
r = ref.ioctx.create(ref.oid, exclusive);
if (r == -EEXIST || r == 0) {
r = ref.ioctx.watch2(ref.oid, &handle, this);
}
}
if (r < 0) {
lderr(store->ctx()) << "Failed to watch " << ref.oid
<< " with " << cpp_strerror(-r) << dendl;
ref.ioctx.close();
return r;
}
ldout(store->ctx(), 10) << "Watching " << ref.oid << dendl;
return 0;
}
int restart()
{
int r = ref.ioctx.unwatch2(handle);
if (r < 0) {
lderr(store->ctx()) << "Failed to unwatch on " << ref.oid
<< " with " << cpp_strerror(-r) << dendl;
}
r = ref.ioctx.watch2(ref.oid, &handle, this);
if (r < 0) {
lderr(store->ctx()) << "Failed to restart watch on " << ref.oid
<< " with " << cpp_strerror(-r) << dendl;
ref.ioctx.close();
}
return r;
}
void stop()
{
ref.ioctx.unwatch2(handle);
ref.ioctx.close();
}
/// respond to bucket trim notifications
void handle_notify(uint64_t notify_id, uint64_t cookie,
uint64_t notifier_id, bufferlist& bl) override
{
if (cookie != handle) {
return;
}
bufferlist reply;
try {
auto p = bl.begin();
TrimNotifyType type;
::decode(type, p);
auto handler = handlers.find(type);
if (handler != handlers.end()) {
handler->second->handle(p, reply);
} else {
lderr(store->ctx()) << "no handler for notify type " << type << dendl;
}
} catch (const buffer::error& e) {
lderr(store->ctx()) << "Failed to decode notification: " << e.what() << dendl;
}
ref.ioctx.notify_ack(ref.oid, notify_id, cookie, reply);
}
/// reestablish the watch if it gets disconnected
void handle_error(uint64_t cookie, int err) override
{
if (cookie != handle) {
return;
}
if (err == -ENOTCONN) {
ldout(store->ctx(), 4) << "Disconnected watch on " << ref.oid << dendl;
restart();
}
}
};
namespace rgw {
class BucketTrimManager::Impl {
@ -33,15 +158,22 @@ class BucketTrimManager::Impl {
RGWRados *const store;
const BucketTrimConfig config;
const rgw_raw_obj status_obj;
/// count frequency of bucket instance entries in the data changes log
BucketChangeCounter counter;
/// protect data shared between data sync and trim threads
/// serve the bucket trim watch/notify api
BucketTrimWatcher watcher;
/// protect data shared between data sync, trim, and watch/notify threads
std::mutex mutex;
Impl(RGWRados *store, const BucketTrimConfig& config)
: store(store), config(config),
counter(config.counter_size)
status_obj(store->get_zone_params().log_pool, "bilog.trim"),
counter(config.counter_size),
watcher(store, status_obj)
{}
};
@ -52,6 +184,11 @@ BucketTrimManager::BucketTrimManager(RGWRados *store,
}
BucketTrimManager::~BucketTrimManager() = default;
int BucketTrimManager::init()
{
return impl->watcher.start();
}
void BucketTrimManager::on_bucket_changed(const boost::string_view& bucket)
{
std::lock_guard<std::mutex> lock(impl->mutex);

View File

@ -51,6 +51,8 @@ class BucketTrimManager : public BucketChangeObserver {
BucketTrimManager(RGWRados *store, const BucketTrimConfig& config);
~BucketTrimManager();
int init();
/// increment a counter for the given bucket instance
void on_bucket_changed(const boost::string_view& bucket_instance) override;
};