Merge branch 'rgw-dir-cleanup'

This commit is contained in:
Greg Farnum 2011-10-24 10:12:50 -07:00
commit 40b7b57239
6 changed files with 294 additions and 34 deletions

View File

@ -20,6 +20,7 @@ cls_method_handle_t h_rgw_bucket_init_index;
cls_method_handle_t h_rgw_bucket_list;
cls_method_handle_t h_rgw_bucket_prepare_op;
cls_method_handle_t h_rgw_bucket_complete_op;
cls_method_handle_t h_rgw_dir_suggest_changes;
#define ROUND_BLOCK_SIZE 4096
@ -161,6 +162,7 @@ int rgw_bucket_prepare_op(cls_method_context_t hctx, bufferlist *in, bufferlist
entry.name = op.name;
entry.epoch = 0;
entry.exists = false;
entry.locator = op.locator;
}
// fill in proper state
@ -208,22 +210,13 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
entry.name = op.name;
entry.epoch = op.epoch;
entry.meta = op.meta;
entry.locator = op.locator;
ondisk = false;
}
} else {
bufferlist::iterator cur_iter = current_entry.begin();
::decode(entry, cur_iter);
CLS_LOG("rgw_bucket_complete_op(): existing entry: epoch=%lld\n", entry.epoch);
if (op.epoch <= entry.epoch) {
CLS_LOG("rgw_bucket_complete_op(): skipping request, old epoch\n");
return 0;
}
if (entry.exists) {
struct rgw_bucket_category_stats& stats = header.stats[entry.meta.category];
stats.num_entries--;
stats.total_size -= entry.meta.size;
stats.total_size_rounded -= get_rounded_size(entry.meta.size);
}
}
if (op.tag.size()) {
@ -235,6 +228,18 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
entry.pending_map.erase(pinter);
}
if (op.epoch <= entry.epoch) {
CLS_LOG("rgw_bucket_complete_op(): skipping request, old epoch\n");
return 0;
}
if (entry.exists) {
struct rgw_bucket_category_stats& stats = header.stats[entry.meta.category];
stats.num_entries--;
stats.total_size -= entry.meta.size;
stats.total_size_rounded -= get_rounded_size(entry.meta.size);
}
bufferlist op_bl;
switch (op.op) {
@ -278,6 +283,87 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
return cls_cxx_map_update(hctx, &update_bl);
}
int rgw_dir_suggest_changes(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
CLS_LOG("rgw_dir_suggest_changes()");
bufferlist header_bl;
struct rgw_bucket_dir_header header;
bool header_changed = false;
int rc = cls_cxx_map_read_header(hctx, &header_bl);
if (rc < 0)
return rc;
bufferlist::iterator header_iter = header_bl.begin();
::decode(header, header_iter);
bufferlist::iterator in_iter = in->begin();
__u8 op;
rgw_bucket_dir_entry cur_change;
rgw_bucket_dir_entry cur_disk;
bufferlist cur_disk_bl;
bufferlist op_bl;
while (!in_iter.end()) {
try {
::decode(op, in_iter);
::decode(cur_change, in_iter);
} catch (buffer::error& err) {
CLS_LOG("ERROR: rgw_dir_suggest_changes(): failed to decode request\n");
return -EINVAL;
}
cls_cxx_map_read_key(hctx, cur_change.name, &cur_disk_bl);
bufferlist::iterator cur_disk_iter = cur_disk_bl.begin();
::decode(cur_disk, cur_disk_iter);
utime_t cur_time = ceph_clock_now(g_ceph_context);
map<string, struct rgw_bucket_pending_info>::iterator iter =
cur_disk.pending_map.begin();
while(iter != cur_disk.pending_map.end()) {
map<string, struct rgw_bucket_pending_info>::iterator cur_iter=iter++;
if (cur_time > (cur_iter->second.timestamp + CEPH_RGW_TAG_TIMEOUT)) {
cur_disk.pending_map.erase(cur_iter);
}
}
if (cur_disk.pending_map.empty()) {
struct rgw_bucket_category_stats& stats =
header.stats[cur_disk.meta.category];
if (cur_disk.exists) {
stats.num_entries--;
stats.total_size -= cur_disk.meta.size;
stats.total_size_rounded -= get_rounded_size(cur_disk.meta.size);
header_changed = true;
}
switch(op) {
case CEPH_RGW_REMOVE:
op_bl.append(CEPH_OSD_TMAP_RM);
::encode(cur_change.name, op_bl);
break;
case CEPH_RGW_UPDATE:
stats.num_entries++;
stats.total_size += cur_change.meta.size;
stats.total_size_rounded += get_rounded_size(cur_change.meta.size);
bufferlist cur_state_bl;
::encode(cur_change, cur_state_bl);
op_bl.append(CEPH_OSD_TMAP_SET);
::encode(cur_state_bl, op_bl);
break;
}
}
}
bufferlist update_bl;
if (header_changed) {
bufferlist new_header_bl;
::encode(header, new_header_bl);
update_bl.append(CEPH_OSD_TMAP_HDR);
::encode(new_header_bl, update_bl);
}
update_bl.claim_append(op_bl);
return cls_cxx_map_update(hctx, &update_bl);
}
void __cls_init()
{
CLS_LOG("Loaded rgw class!");
@ -287,6 +373,7 @@ void __cls_init()
cls_register_cxx_method(h_class, "bucket_list", CLS_METHOD_RD | CLS_METHOD_PUBLIC, rgw_bucket_list, &h_rgw_bucket_list);
cls_register_cxx_method(h_class, "bucket_prepare_op", CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, rgw_bucket_prepare_op, &h_rgw_bucket_prepare_op);
cls_register_cxx_method(h_class, "bucket_complete_op", CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, rgw_bucket_complete_op, &h_rgw_bucket_complete_op);
cls_register_cxx_method(h_class, "dir_suggest_changes", CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, rgw_dir_suggest_changes, &h_rgw_dir_suggest_changes);
return;
}

View File

@ -6,6 +6,11 @@
#include "include/types.h"
#include "include/utime.h"
#define CEPH_RGW_REMOVE 'r'
#define CEPH_RGW_UPDATE 'u'
#define CEPH_RGW_TAG_TIMEOUT 60*60*24
enum RGWPendingState {
CLS_RGW_STATE_PENDING_MODIFY,
CLS_RGW_STATE_COMPLETE,
@ -50,6 +55,9 @@ struct rgw_bucket_dir_entry_meta {
string owner_display_name;
string tag;
rgw_bucket_dir_entry_meta() :
category(0), size(0) { mtime.set_from_double(0); }
void encode(bufferlist &bl) const {
__u8 struct_v = 1;
::encode(struct_v, bl);
@ -76,18 +84,28 @@ WRITE_CLASS_ENCODER(rgw_bucket_dir_entry_meta)
struct rgw_bucket_dir_entry {
std::string name;
uint64_t epoch;
std::string locator;
bool exists;
struct rgw_bucket_dir_entry_meta meta;
map<string, struct rgw_bucket_pending_info> pending_map;
rgw_bucket_dir_entry() :
epoch(0), exists(false) {}
void encode(bufferlist &bl) const {
__u8 struct_v = 1;
__u8 struct_v = 2;
if (!locator.size()) {
struct_v = 1; // don't waste space encoding it
}
::encode(struct_v, bl);
::encode(name, bl);
::encode(epoch, bl);
::encode(exists, bl);
::encode(meta, bl);
::encode(pending_map, bl);
if (locator.size()) {
::encode(locator, bl);
}
}
void decode(bufferlist::iterator &bl) {
__u8 struct_v;
@ -97,6 +115,9 @@ struct rgw_bucket_dir_entry {
::decode(exists, bl);
::decode(meta, bl);
::decode(pending_map, bl);
if (struct_v >= 2) {
::decode(locator, bl);
}
}
};
WRITE_CLASS_ENCODER(rgw_bucket_dir_entry)
@ -163,13 +184,20 @@ struct rgw_cls_obj_prepare_op
uint8_t op;
string name;
string tag;
string locator;
void encode(bufferlist &bl) const {
__u8 struct_v = 1;
__u8 struct_v = 2;
if (!locator.size()) {
struct_v = 1; // don't waste the encoding space
}
::encode(struct_v, bl);
::encode(op, bl);
::encode(name, bl);
::encode(tag, bl);
if (locator.size()) {
::encode(locator, bl);
}
}
void decode(bufferlist::iterator &bl) {
__u8 struct_v;
@ -177,6 +205,9 @@ struct rgw_cls_obj_prepare_op
::decode(op, bl);
::decode(name, bl);
::decode(tag, bl);
if (struct_v >= 2) {
::decode(locator, bl);
}
}
};
WRITE_CLASS_ENCODER(rgw_cls_obj_prepare_op)
@ -185,19 +216,26 @@ struct rgw_cls_obj_complete_op
{
uint8_t op;
string name;
string locator;
uint64_t epoch;
struct rgw_bucket_dir_entry_meta meta;
string tag;
void encode(bufferlist &bl) const {
__u8 struct_v = 1;
__u8 struct_v = 2;
if (!locator.size()) {
struct_v = 1; // don't waste the encoding space
}
::encode(struct_v, bl);
::encode(op, bl);
::encode(name, bl);
::encode(epoch, bl);
::encode(meta, bl);
::encode(tag, bl);
}
if (locator.size()) {
::encode(locator, bl);
}
}
void decode(bufferlist::iterator &bl) {
__u8 struct_v;
::decode(struct_v, bl);
@ -206,6 +244,9 @@ struct rgw_cls_obj_complete_op
::decode(epoch, bl);
::decode(meta, bl);
::decode(tag, bl);
if (struct_v >= 2) {
::decode(locator, bl);
}
}
};
WRITE_CLASS_ENCODER(rgw_cls_obj_complete_op)

View File

@ -688,7 +688,15 @@ public:
return orig_key;
}
static bool translate_raw_obj(string& obj, string& ns) {
/**
* Translate a namespace-mangled object name to the user-facing name
* existing in the given namespace.
*
* If the object is part of the given namespace, it returns true
* and cuts down the name to the unmangled version. If it is not
* part of the given namespace, it returns false.
*/
static bool translate_raw_obj_to_obj_in_ns(string& obj, string& ns) {
if (ns.empty()) {
if (obj[0] != '_')
return true;
@ -716,6 +724,35 @@ public:
return true;
}
/**
* Given a mangled object name and an empty namespace string, this
* function extracts the namespace into the string and sets the object
* name to be the unmangled version.
*
* It returns true after successfully doing so, or
* false if it fails.
*/
static bool strip_namespace_from_object(string& obj, string& ns) {
ns.clear();
if (obj[0] != '_') {
return true;
}
size_t pos = obj.find('_', 1);
if (pos == string::npos) {
return false;
}
size_t period_pos = obj.find('.');
if (period_pos < pos) {
return false;
}
ns = obj.substr(1, pos-1);
obj = obj.substr(pos+1, string::npos);
return true;
}
void encode(bufferlist& bl) const {
__u8 struct_v = 2;
::encode(struct_v, bl);

View File

@ -122,7 +122,7 @@ int RGWFS::list_objects(string& id, rgw_bucket& bucket, int max, string& prefix,
string obj = dirent->d_name;
if (!rgw_obj::translate_raw_obj(obj, ns))
if (!rgw_obj::translate_raw_obj_to_obj_in_ns(obj, ns))
continue;
string key = obj;

View File

@ -357,7 +357,8 @@ int RGWRados::list_objects(string& id, rgw_bucket& bucket, int max, string& pref
do {
std::map<string, RGWObjEnt> ent_map;
int r = cls_bucket_list(bucket, cur_marker, max - count, ent_map, &truncated);
int r = cls_bucket_list(bucket, cur_marker, max - count, ent_map,
&truncated, &cur_marker);
if (r < 0)
return r;
@ -365,9 +366,8 @@ int RGWRados::list_objects(string& id, rgw_bucket& bucket, int max, string& pref
for (eiter = ent_map.begin(); eiter != ent_map.end(); ++eiter) {
string obj = eiter->first;
string key = obj;
cur_marker = obj;
if (!rgw_obj::translate_raw_obj(obj, ns))
if (!rgw_obj::translate_raw_obj_to_obj_in_ns(obj, ns))
continue;
if (filter && !filter->filter(obj, key))
@ -646,7 +646,7 @@ int RGWRados::put_obj_meta(void *ctx, std::string& id, rgw_obj& obj, uint64_t s
return 0;
string tag;
r = prepare_update_index(NULL, bucket, obj.object, tag);
r = prepare_update_index(NULL, bucket, obj, tag);
if (r < 0)
return r;
@ -850,7 +850,8 @@ int RGWRados::delete_bucket(std::string& id, rgw_bucket& bucket, bool remove_poo
do {
#define NUM_ENTRIES 1000
r = cls_bucket_list(bucket, marker, NUM_ENTRIES, ent_map, &is_truncated);
r = cls_bucket_list(bucket, marker, NUM_ENTRIES, ent_map,
&is_truncated, &marker);
if (r < 0)
return r;
@ -860,10 +861,9 @@ int RGWRados::delete_bucket(std::string& id, rgw_bucket& bucket, bool remove_poo
for (eiter = ent_map.begin(); eiter != ent_map.end(); ++eiter) {
obj = eiter->first;
if (rgw_obj::translate_raw_obj(obj, ns))
if (rgw_obj::translate_raw_obj_to_obj_in_ns(obj, ns))
return -ENOTEMPTY;
}
marker = obj;
} while (is_truncated);
if (remove_pool) {
@ -1011,7 +1011,7 @@ int RGWRados::delete_obj_impl(void *ctx, std::string& id, rgw_obj& obj, bool syn
string tag;
op.remove();
if (sync) {
r = prepare_update_index(state, bucket, obj.object, tag);
r = prepare_update_index(state, bucket, obj, tag);
if (r < 0)
return r;
r = io_ctx.operate(oid, &op);
@ -1474,7 +1474,8 @@ done_err:
return r;
}
int RGWRados::prepare_update_index(RGWObjState *state, rgw_bucket& bucket, string& oid, string& tag)
int RGWRados::prepare_update_index(RGWObjState *state, rgw_bucket& bucket,
rgw_obj& obj, string& tag)
{
if (state && state->obj_tag.length()) {
int len = state->obj_tag.length();
@ -1485,7 +1486,8 @@ int RGWRados::prepare_update_index(RGWObjState *state, rgw_bucket& bucket, strin
} else {
append_rand_alpha(tag, tag, 32);
}
int ret = cls_obj_prepare_op(bucket, CLS_RGW_OP_ADD, tag, oid);
int ret = cls_obj_prepare_op(bucket, CLS_RGW_OP_ADD, tag,
obj.object, obj.key);
return ret;
}
@ -1607,7 +1609,7 @@ int RGWRados::clone_objs_impl(void *ctx, rgw_obj& dst_obj,
string tag;
uint64_t epoch = 0;
int ret = prepare_update_index(state, bucket, dst_obj.object, tag);
int ret = prepare_update_index(state, bucket, dst_obj, tag);
if (ret < 0)
goto done;
@ -2027,7 +2029,8 @@ int RGWRados::cls_rgw_init_index(rgw_bucket& bucket, string& oid)
return r;
}
int RGWRados::cls_obj_prepare_op(rgw_bucket& bucket, uint8_t op, string& tag, string& name)
int RGWRados::cls_obj_prepare_op(rgw_bucket& bucket, uint8_t op, string& tag,
string& name, string& locator)
{
if (bucket.marker.empty()) {
if (bucket.name[0] == '.')
@ -2050,6 +2053,7 @@ int RGWRados::cls_obj_prepare_op(rgw_bucket& bucket, uint8_t op, string& tag, st
call.op = op;
call.tag = tag;
call.name = name;
call.locator = locator;
::encode(call, in);
r = io_ctx.exec(oid, "rgw", "bucket_prepare_op", in, out);
return r;
@ -2105,7 +2109,7 @@ int RGWRados::cls_obj_complete_del(rgw_bucket& bucket, string& tag, uint64_t epo
}
int RGWRados::cls_bucket_list(rgw_bucket& bucket, string start, uint32_t num, map<string, RGWObjEnt>& m,
bool *is_truncated)
bool *is_truncated, string *last_entry)
{
dout(0) << "cls_bucket_list " << bucket << " start " << start << " num " << num << dendl;
@ -2145,25 +2149,93 @@ int RGWRados::cls_bucket_list(rgw_bucket& bucket, string start, uint32_t num, ma
struct rgw_bucket_dir& dir = ret.dir;
map<string, struct rgw_bucket_dir_entry>::iterator miter;
bufferlist updates;
for (miter = dir.m.begin(); miter != dir.m.end(); ++miter) {
RGWObjEnt e;
rgw_bucket_dir_entry& dirent = miter->second;
if (!dirent.exists)
continue;
// fill it in with initial values; we may correct later
e.name = dirent.name;
e.size = dirent.meta.size;
e.mtime = dirent.meta.mtime;
e.etag = dirent.meta.etag;
e.owner = dirent.meta.owner;
e.owner_display_name = dirent.meta.owner_display_name;
if (!dirent.exists || !dirent.pending_map.empty()) {
/* there are uncommitted ops. We need to check the current state,
* and if the tags are old we need to do cleanup as well. */
librados::IoCtx sub_ctx;
sub_ctx.dup(io_ctx);
r = check_disk_state(sub_ctx, bucket, dirent, e, updates);
if (r < 0) {
if (r == -ENOENT)
continue;
else
return r;
}
}
m[e.name] = e;
dout(0) << " got " << e.name << dendl;
}
if (dir.m.size()) {
*last_entry = dir.m.rbegin()->first;
}
if (updates.length()) {
// we don't care if we lose suggested updates, send them off blindly
AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
r = io_ctx.aio_exec(oid, c, "rgw", "dir_suggest_changes", in, NULL);
c->release();
}
return m.size();
}
int RGWRados::check_disk_state(librados::IoCtx io_ctx,
rgw_bucket& bucket,
rgw_bucket_dir_entry& list_state,
RGWObjEnt& object,
bufferlist& suggested_updates)
{
rgw_obj obj;
std::string oid, key, ns;
oid = list_state.name;
if (!rgw_obj::strip_namespace_from_object(oid, ns)) {
// well crap
assert(0 == "got bad object name off disk");
}
obj.init(bucket, oid, list_state.locator, ns);
get_obj_bucket_and_oid_key(obj, bucket, oid, key);
io_ctx.locator_set_key(key);
int r = io_ctx.stat(oid, &object.size, &object.mtime);
list_state.pending_map.clear(); // we don't need this and it inflates size
if (r == -ENOENT) {
/* object doesn't exist right now -- hopefully because it's
* marked as !exists and got deleted */
if (list_state.exists) {
/* FIXME: what should happen now? Work out if there are any
* non-bad ways this could happen (there probably are, but annoying
* to handle!) */
}
// encode a suggested removal of that key
list_state.epoch = io_ctx.get_last_version();
suggested_updates.append(CEPH_RGW_REMOVE);
::encode(list_state, suggested_updates);
}
if (r < 0)
return r;
// encode suggested updates
list_state.epoch = io_ctx.get_last_version();
list_state.meta.size = object.size;
list_state.meta.mtime.set_from_double(double(object.mtime));
suggested_updates.append(CEPH_RGW_UPDATE);
::encode(list_state, suggested_updates);
return 0;
}
int RGWRados::cls_bucket_head(rgw_bucket& bucket, struct rgw_bucket_dir_header& header)
{
librados::IoCtx io_ctx;

View File

@ -300,14 +300,18 @@ public:
virtual int get_bucket_info(string& bucket_name, RGWBucketInfo& info);
int cls_rgw_init_index(rgw_bucket& bucket, string& oid);
int cls_obj_prepare_op(rgw_bucket& bucket, uint8_t op, string& tag, string& name);
int cls_obj_prepare_op(rgw_bucket& bucket, uint8_t op, string& tag,
string& name, string& locator);
int cls_obj_complete_op(rgw_bucket& bucket, uint8_t op, string& tag, uint64_t epoch,
RGWObjEnt& ent, RGWObjCategory category);
int cls_obj_complete_add(rgw_bucket& bucket, string& tag, uint64_t epoch, RGWObjEnt& ent, RGWObjCategory category);
int cls_obj_complete_del(rgw_bucket& bucket, string& tag, uint64_t epoch, string& name);
int cls_bucket_list(rgw_bucket& bucket, string start, uint32_t num, map<string, RGWObjEnt>& m, bool *is_truncated);
int cls_bucket_list(rgw_bucket& bucket, string start, uint32_t num,
map<string, RGWObjEnt>& m, bool *is_truncated,
string *last_entry = NULL);
int cls_bucket_head(rgw_bucket& bucket, struct rgw_bucket_dir_header& header);
int prepare_update_index(RGWObjState *state, rgw_bucket& bucket, string& oid, string& tag);
int prepare_update_index(RGWObjState *state, rgw_bucket& bucket,
rgw_obj& oid, string& tag);
int complete_update_index(rgw_bucket& bucket, string& oid, string& tag, uint64_t epoch, uint64_t size,
utime_t& ut, string& etag, bufferlist *acl_bl, RGWObjCategory category);
int complete_update_index_del(rgw_bucket& bucket, string& oid, string& tag, uint64_t epoch) {
@ -320,6 +324,25 @@ public:
private:
int process_intent_log(rgw_bucket& bucket, string& oid,
time_t epoch, int flags, bool purge);
/**
* Check the actual on-disk state of the object specified
* by list_state, and fill in the time and size of object.
* Then append any changes to suggested_updates for
* the rgw class' dir_suggest_changes function.
*
* Note that this can maul list_state; don't use it afterwards. Also
* it expects object to already be filled in from list_state; it only
* sets the size and mtime.
*
* Returns 0 on success, -ENOENT if the object doesn't exist on disk,
* and -errno on other failures. (-ENOENT is not a failure, and it
* will encode that info as a suggested update.)
*/
int check_disk_state(librados::IoCtx io_ctx,
rgw_bucket& bucket,
rgw_bucket_dir_entry& list_state,
RGWObjEnt& object,
bufferlist& suggested_updates);
};