diff --git a/src/cls_rgw.cc b/src/cls_rgw.cc index f1252a31188..7e5f4de619d 100644 --- a/src/cls_rgw.cc +++ b/src/cls_rgw.cc @@ -20,6 +20,7 @@ cls_method_handle_t h_rgw_bucket_init_index; cls_method_handle_t h_rgw_bucket_list; cls_method_handle_t h_rgw_bucket_prepare_op; cls_method_handle_t h_rgw_bucket_complete_op; +cls_method_handle_t h_rgw_dir_suggest_changes; #define ROUND_BLOCK_SIZE 4096 @@ -161,6 +162,7 @@ int rgw_bucket_prepare_op(cls_method_context_t hctx, bufferlist *in, bufferlist entry.name = op.name; entry.epoch = 0; entry.exists = false; + entry.locator = op.locator; } // fill in proper state @@ -208,22 +210,13 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist entry.name = op.name; entry.epoch = op.epoch; entry.meta = op.meta; + entry.locator = op.locator; ondisk = false; } } else { bufferlist::iterator cur_iter = current_entry.begin(); ::decode(entry, cur_iter); CLS_LOG("rgw_bucket_complete_op(): existing entry: epoch=%lld\n", entry.epoch); - if (op.epoch <= entry.epoch) { - CLS_LOG("rgw_bucket_complete_op(): skipping request, old epoch\n"); - return 0; - } - if (entry.exists) { - struct rgw_bucket_category_stats& stats = header.stats[entry.meta.category]; - stats.num_entries--; - stats.total_size -= entry.meta.size; - stats.total_size_rounded -= get_rounded_size(entry.meta.size); - } } if (op.tag.size()) { @@ -235,6 +228,18 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist entry.pending_map.erase(pinter); } + if (op.epoch <= entry.epoch) { + CLS_LOG("rgw_bucket_complete_op(): skipping request, old epoch\n"); + return 0; + } + + if (entry.exists) { + struct rgw_bucket_category_stats& stats = header.stats[entry.meta.category]; + stats.num_entries--; + stats.total_size -= entry.meta.size; + stats.total_size_rounded -= get_rounded_size(entry.meta.size); + } + bufferlist op_bl; switch (op.op) { @@ -278,6 +283,87 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist return cls_cxx_map_update(hctx, &update_bl); } +int rgw_dir_suggest_changes(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + CLS_LOG("rgw_dir_suggest_changes()"); + + bufferlist header_bl; + struct rgw_bucket_dir_header header; + bool header_changed = false; + int rc = cls_cxx_map_read_header(hctx, &header_bl); + if (rc < 0) + return rc; + bufferlist::iterator header_iter = header_bl.begin(); + ::decode(header, header_iter); + + bufferlist::iterator in_iter = in->begin(); + __u8 op; + rgw_bucket_dir_entry cur_change; + rgw_bucket_dir_entry cur_disk; + bufferlist cur_disk_bl; + bufferlist op_bl; + + while (!in_iter.end()) { + try { + ::decode(op, in_iter); + ::decode(cur_change, in_iter); + } catch (buffer::error& err) { + CLS_LOG("ERROR: rgw_dir_suggest_changes(): failed to decode request\n"); + return -EINVAL; + } + + cls_cxx_map_read_key(hctx, cur_change.name, &cur_disk_bl); + bufferlist::iterator cur_disk_iter = cur_disk_bl.begin(); + ::decode(cur_disk, cur_disk_iter); + + utime_t cur_time = ceph_clock_now(g_ceph_context); + map::iterator iter = + cur_disk.pending_map.begin(); + while(iter != cur_disk.pending_map.end()) { + map::iterator cur_iter=iter++; + if (cur_time > (cur_iter->second.timestamp + CEPH_RGW_TAG_TIMEOUT)) { + cur_disk.pending_map.erase(cur_iter); + } + } + + if (cur_disk.pending_map.empty()) { + struct rgw_bucket_category_stats& stats = + header.stats[cur_disk.meta.category]; + if (cur_disk.exists) { + stats.num_entries--; + stats.total_size -= cur_disk.meta.size; + stats.total_size_rounded -= get_rounded_size(cur_disk.meta.size); + header_changed = true; + } + switch(op) { + case CEPH_RGW_REMOVE: + op_bl.append(CEPH_OSD_TMAP_RM); + ::encode(cur_change.name, op_bl); + break; + case CEPH_RGW_UPDATE: + stats.num_entries++; + stats.total_size += cur_change.meta.size; + stats.total_size_rounded += get_rounded_size(cur_change.meta.size); + bufferlist cur_state_bl; + ::encode(cur_change, cur_state_bl); + op_bl.append(CEPH_OSD_TMAP_SET); + ::encode(cur_state_bl, op_bl); + break; + } + } + } + + bufferlist update_bl; + if (header_changed) { + bufferlist new_header_bl; + ::encode(header, new_header_bl); + update_bl.append(CEPH_OSD_TMAP_HDR); + ::encode(new_header_bl, update_bl); + } + update_bl.claim_append(op_bl); + return cls_cxx_map_update(hctx, &update_bl); +} + void __cls_init() { CLS_LOG("Loaded rgw class!"); @@ -287,6 +373,7 @@ void __cls_init() cls_register_cxx_method(h_class, "bucket_list", CLS_METHOD_RD | CLS_METHOD_PUBLIC, rgw_bucket_list, &h_rgw_bucket_list); cls_register_cxx_method(h_class, "bucket_prepare_op", CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, rgw_bucket_prepare_op, &h_rgw_bucket_prepare_op); cls_register_cxx_method(h_class, "bucket_complete_op", CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, rgw_bucket_complete_op, &h_rgw_bucket_complete_op); + cls_register_cxx_method(h_class, "dir_suggest_changes", CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, rgw_dir_suggest_changes, &h_rgw_dir_suggest_changes); return; } diff --git a/src/rgw/rgw_cls_api.h b/src/rgw/rgw_cls_api.h index 7d729844bdf..4eb81ebfa49 100644 --- a/src/rgw/rgw_cls_api.h +++ b/src/rgw/rgw_cls_api.h @@ -6,6 +6,11 @@ #include "include/types.h" #include "include/utime.h" + +#define CEPH_RGW_REMOVE 'r' +#define CEPH_RGW_UPDATE 'u' +#define CEPH_RGW_TAG_TIMEOUT 60*60*24 + enum RGWPendingState { CLS_RGW_STATE_PENDING_MODIFY, CLS_RGW_STATE_COMPLETE, @@ -50,6 +55,9 @@ struct rgw_bucket_dir_entry_meta { string owner_display_name; string tag; + rgw_bucket_dir_entry_meta() : + category(0), size(0) { mtime.set_from_double(0); } + void encode(bufferlist &bl) const { __u8 struct_v = 1; ::encode(struct_v, bl); @@ -76,18 +84,28 @@ WRITE_CLASS_ENCODER(rgw_bucket_dir_entry_meta) struct rgw_bucket_dir_entry { std::string name; uint64_t epoch; + std::string locator; bool exists; struct rgw_bucket_dir_entry_meta meta; map pending_map; + rgw_bucket_dir_entry() : + epoch(0), exists(false) {} + void encode(bufferlist &bl) const { - __u8 struct_v = 1; + __u8 struct_v = 2; + if (!locator.size()) { + struct_v = 1; // don't waste space encoding it + } ::encode(struct_v, bl); ::encode(name, bl); ::encode(epoch, bl); ::encode(exists, bl); ::encode(meta, bl); ::encode(pending_map, bl); + if (locator.size()) { + ::encode(locator, bl); + } } void decode(bufferlist::iterator &bl) { __u8 struct_v; @@ -97,6 +115,9 @@ struct rgw_bucket_dir_entry { ::decode(exists, bl); ::decode(meta, bl); ::decode(pending_map, bl); + if (struct_v >= 2) { + ::decode(locator, bl); + } } }; WRITE_CLASS_ENCODER(rgw_bucket_dir_entry) @@ -163,13 +184,20 @@ struct rgw_cls_obj_prepare_op uint8_t op; string name; string tag; + string locator; void encode(bufferlist &bl) const { - __u8 struct_v = 1; + __u8 struct_v = 2; + if (!locator.size()) { + struct_v = 1; // don't waste the encoding space + } ::encode(struct_v, bl); ::encode(op, bl); ::encode(name, bl); ::encode(tag, bl); + if (locator.size()) { + ::encode(locator, bl); + } } void decode(bufferlist::iterator &bl) { __u8 struct_v; @@ -177,6 +205,9 @@ struct rgw_cls_obj_prepare_op ::decode(op, bl); ::decode(name, bl); ::decode(tag, bl); + if (struct_v >= 2) { + ::decode(locator, bl); + } } }; WRITE_CLASS_ENCODER(rgw_cls_obj_prepare_op) @@ -185,19 +216,26 @@ struct rgw_cls_obj_complete_op { uint8_t op; string name; + string locator; uint64_t epoch; struct rgw_bucket_dir_entry_meta meta; string tag; void encode(bufferlist &bl) const { - __u8 struct_v = 1; + __u8 struct_v = 2; + if (!locator.size()) { + struct_v = 1; // don't waste the encoding space + } ::encode(struct_v, bl); ::encode(op, bl); ::encode(name, bl); ::encode(epoch, bl); ::encode(meta, bl); ::encode(tag, bl); - } + if (locator.size()) { + ::encode(locator, bl); + } + } void decode(bufferlist::iterator &bl) { __u8 struct_v; ::decode(struct_v, bl); @@ -206,6 +244,9 @@ struct rgw_cls_obj_complete_op ::decode(epoch, bl); ::decode(meta, bl); ::decode(tag, bl); + if (struct_v >= 2) { + ::decode(locator, bl); + } } }; WRITE_CLASS_ENCODER(rgw_cls_obj_complete_op) diff --git a/src/rgw/rgw_common.h b/src/rgw/rgw_common.h index cea557a38f2..6d0fffeab63 100644 --- a/src/rgw/rgw_common.h +++ b/src/rgw/rgw_common.h @@ -688,7 +688,15 @@ public: return orig_key; } - static bool translate_raw_obj(string& obj, string& ns) { + /** + * Translate a namespace-mangled object name to the user-facing name + * existing in the given namespace. + * + * If the object is part of the given namespace, it returns true + * and cuts down the name to the unmangled version. If it is not + * part of the given namespace, it returns false. + */ + static bool translate_raw_obj_to_obj_in_ns(string& obj, string& ns) { if (ns.empty()) { if (obj[0] != '_') return true; @@ -716,6 +724,35 @@ public: return true; } + /** + * Given a mangled object name and an empty namespace string, this + * function extracts the namespace into the string and sets the object + * name to be the unmangled version. + * + * It returns true after successfully doing so, or + * false if it fails. + */ + static bool strip_namespace_from_object(string& obj, string& ns) { + ns.clear(); + if (obj[0] != '_') { + return true; + } + + size_t pos = obj.find('_', 1); + if (pos == string::npos) { + return false; + } + + size_t period_pos = obj.find('.'); + if (period_pos < pos) { + return false; + } + + ns = obj.substr(1, pos-1); + obj = obj.substr(pos+1, string::npos); + return true; + } + void encode(bufferlist& bl) const { __u8 struct_v = 2; ::encode(struct_v, bl); diff --git a/src/rgw/rgw_fs.cc b/src/rgw/rgw_fs.cc index 21b714e3edd..2f7cf312b0a 100644 --- a/src/rgw/rgw_fs.cc +++ b/src/rgw/rgw_fs.cc @@ -122,7 +122,7 @@ int RGWFS::list_objects(string& id, rgw_bucket& bucket, int max, string& prefix, string obj = dirent->d_name; - if (!rgw_obj::translate_raw_obj(obj, ns)) + if (!rgw_obj::translate_raw_obj_to_obj_in_ns(obj, ns)) continue; string key = obj; diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 815fcb586f4..93e3cd6bd70 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -357,7 +357,8 @@ int RGWRados::list_objects(string& id, rgw_bucket& bucket, int max, string& pref do { std::map ent_map; - int r = cls_bucket_list(bucket, cur_marker, max - count, ent_map, &truncated); + int r = cls_bucket_list(bucket, cur_marker, max - count, ent_map, + &truncated, &cur_marker); if (r < 0) return r; @@ -365,9 +366,8 @@ int RGWRados::list_objects(string& id, rgw_bucket& bucket, int max, string& pref for (eiter = ent_map.begin(); eiter != ent_map.end(); ++eiter) { string obj = eiter->first; string key = obj; - cur_marker = obj; - if (!rgw_obj::translate_raw_obj(obj, ns)) + if (!rgw_obj::translate_raw_obj_to_obj_in_ns(obj, ns)) continue; if (filter && !filter->filter(obj, key)) @@ -646,7 +646,7 @@ int RGWRados::put_obj_meta(void *ctx, std::string& id, rgw_obj& obj, uint64_t s return 0; string tag; - r = prepare_update_index(NULL, bucket, obj.object, tag); + r = prepare_update_index(NULL, bucket, obj, tag); if (r < 0) return r; @@ -850,7 +850,8 @@ int RGWRados::delete_bucket(std::string& id, rgw_bucket& bucket, bool remove_poo do { #define NUM_ENTRIES 1000 - r = cls_bucket_list(bucket, marker, NUM_ENTRIES, ent_map, &is_truncated); + r = cls_bucket_list(bucket, marker, NUM_ENTRIES, ent_map, + &is_truncated, &marker); if (r < 0) return r; @@ -860,10 +861,9 @@ int RGWRados::delete_bucket(std::string& id, rgw_bucket& bucket, bool remove_poo for (eiter = ent_map.begin(); eiter != ent_map.end(); ++eiter) { obj = eiter->first; - if (rgw_obj::translate_raw_obj(obj, ns)) + if (rgw_obj::translate_raw_obj_to_obj_in_ns(obj, ns)) return -ENOTEMPTY; } - marker = obj; } while (is_truncated); if (remove_pool) { @@ -1011,7 +1011,7 @@ int RGWRados::delete_obj_impl(void *ctx, std::string& id, rgw_obj& obj, bool syn string tag; op.remove(); if (sync) { - r = prepare_update_index(state, bucket, obj.object, tag); + r = prepare_update_index(state, bucket, obj, tag); if (r < 0) return r; r = io_ctx.operate(oid, &op); @@ -1474,7 +1474,8 @@ done_err: return r; } -int RGWRados::prepare_update_index(RGWObjState *state, rgw_bucket& bucket, string& oid, string& tag) +int RGWRados::prepare_update_index(RGWObjState *state, rgw_bucket& bucket, + rgw_obj& obj, string& tag) { if (state && state->obj_tag.length()) { int len = state->obj_tag.length(); @@ -1485,7 +1486,8 @@ int RGWRados::prepare_update_index(RGWObjState *state, rgw_bucket& bucket, strin } else { append_rand_alpha(tag, tag, 32); } - int ret = cls_obj_prepare_op(bucket, CLS_RGW_OP_ADD, tag, oid); + int ret = cls_obj_prepare_op(bucket, CLS_RGW_OP_ADD, tag, + obj.object, obj.key); return ret; } @@ -1607,7 +1609,7 @@ int RGWRados::clone_objs_impl(void *ctx, rgw_obj& dst_obj, string tag; uint64_t epoch = 0; - int ret = prepare_update_index(state, bucket, dst_obj.object, tag); + int ret = prepare_update_index(state, bucket, dst_obj, tag); if (ret < 0) goto done; @@ -2027,7 +2029,8 @@ int RGWRados::cls_rgw_init_index(rgw_bucket& bucket, string& oid) return r; } -int RGWRados::cls_obj_prepare_op(rgw_bucket& bucket, uint8_t op, string& tag, string& name) +int RGWRados::cls_obj_prepare_op(rgw_bucket& bucket, uint8_t op, string& tag, + string& name, string& locator) { if (bucket.marker.empty()) { if (bucket.name[0] == '.') @@ -2050,6 +2053,7 @@ int RGWRados::cls_obj_prepare_op(rgw_bucket& bucket, uint8_t op, string& tag, st call.op = op; call.tag = tag; call.name = name; + call.locator = locator; ::encode(call, in); r = io_ctx.exec(oid, "rgw", "bucket_prepare_op", in, out); return r; @@ -2105,7 +2109,7 @@ int RGWRados::cls_obj_complete_del(rgw_bucket& bucket, string& tag, uint64_t epo } int RGWRados::cls_bucket_list(rgw_bucket& bucket, string start, uint32_t num, map& m, - bool *is_truncated) + bool *is_truncated, string *last_entry) { dout(0) << "cls_bucket_list " << bucket << " start " << start << " num " << num << dendl; @@ -2145,25 +2149,93 @@ int RGWRados::cls_bucket_list(rgw_bucket& bucket, string start, uint32_t num, ma struct rgw_bucket_dir& dir = ret.dir; map::iterator miter; + bufferlist updates; for (miter = dir.m.begin(); miter != dir.m.end(); ++miter) { RGWObjEnt e; rgw_bucket_dir_entry& dirent = miter->second; - if (!dirent.exists) - continue; + // fill it in with initial values; we may correct later e.name = dirent.name; e.size = dirent.meta.size; e.mtime = dirent.meta.mtime; e.etag = dirent.meta.etag; e.owner = dirent.meta.owner; e.owner_display_name = dirent.meta.owner_display_name; + + if (!dirent.exists || !dirent.pending_map.empty()) { + /* there are uncommitted ops. We need to check the current state, + * and if the tags are old we need to do cleanup as well. */ + librados::IoCtx sub_ctx; + sub_ctx.dup(io_ctx); + r = check_disk_state(sub_ctx, bucket, dirent, e, updates); + if (r < 0) { + if (r == -ENOENT) + continue; + else + return r; + } + } m[e.name] = e; dout(0) << " got " << e.name << dendl; } + if (dir.m.size()) { + *last_entry = dir.m.rbegin()->first; + } + + if (updates.length()) { + // we don't care if we lose suggested updates, send them off blindly + AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL); + r = io_ctx.aio_exec(oid, c, "rgw", "dir_suggest_changes", in, NULL); + c->release(); + } return m.size(); } +int RGWRados::check_disk_state(librados::IoCtx io_ctx, + rgw_bucket& bucket, + rgw_bucket_dir_entry& list_state, + RGWObjEnt& object, + bufferlist& suggested_updates) +{ + rgw_obj obj; + std::string oid, key, ns; + oid = list_state.name; + if (!rgw_obj::strip_namespace_from_object(oid, ns)) { + // well crap + assert(0 == "got bad object name off disk"); + } + obj.init(bucket, oid, list_state.locator, ns); + get_obj_bucket_and_oid_key(obj, bucket, oid, key); + io_ctx.locator_set_key(key); + int r = io_ctx.stat(oid, &object.size, &object.mtime); + + list_state.pending_map.clear(); // we don't need this and it inflates size + if (r == -ENOENT) { + /* object doesn't exist right now -- hopefully because it's + * marked as !exists and got deleted */ + if (list_state.exists) { + /* FIXME: what should happen now? Work out if there are any + * non-bad ways this could happen (there probably are, but annoying + * to handle!) */ + } + // encode a suggested removal of that key + list_state.epoch = io_ctx.get_last_version(); + suggested_updates.append(CEPH_RGW_REMOVE); + ::encode(list_state, suggested_updates); + } + if (r < 0) + return r; + + // encode suggested updates + list_state.epoch = io_ctx.get_last_version(); + list_state.meta.size = object.size; + list_state.meta.mtime.set_from_double(double(object.mtime)); + suggested_updates.append(CEPH_RGW_UPDATE); + ::encode(list_state, suggested_updates); + return 0; +} + int RGWRados::cls_bucket_head(rgw_bucket& bucket, struct rgw_bucket_dir_header& header) { librados::IoCtx io_ctx; diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 405d3d513c0..e697d0ab147 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -300,14 +300,18 @@ public: virtual int get_bucket_info(string& bucket_name, RGWBucketInfo& info); int cls_rgw_init_index(rgw_bucket& bucket, string& oid); - int cls_obj_prepare_op(rgw_bucket& bucket, uint8_t op, string& tag, string& name); + int cls_obj_prepare_op(rgw_bucket& bucket, uint8_t op, string& tag, + string& name, string& locator); int cls_obj_complete_op(rgw_bucket& bucket, uint8_t op, string& tag, uint64_t epoch, RGWObjEnt& ent, RGWObjCategory category); int cls_obj_complete_add(rgw_bucket& bucket, string& tag, uint64_t epoch, RGWObjEnt& ent, RGWObjCategory category); int cls_obj_complete_del(rgw_bucket& bucket, string& tag, uint64_t epoch, string& name); - int cls_bucket_list(rgw_bucket& bucket, string start, uint32_t num, map& m, bool *is_truncated); + int cls_bucket_list(rgw_bucket& bucket, string start, uint32_t num, + map& m, bool *is_truncated, + string *last_entry = NULL); int cls_bucket_head(rgw_bucket& bucket, struct rgw_bucket_dir_header& header); - int prepare_update_index(RGWObjState *state, rgw_bucket& bucket, string& oid, string& tag); + int prepare_update_index(RGWObjState *state, rgw_bucket& bucket, + rgw_obj& oid, string& tag); int complete_update_index(rgw_bucket& bucket, string& oid, string& tag, uint64_t epoch, uint64_t size, utime_t& ut, string& etag, bufferlist *acl_bl, RGWObjCategory category); int complete_update_index_del(rgw_bucket& bucket, string& oid, string& tag, uint64_t epoch) { @@ -320,6 +324,25 @@ public: private: int process_intent_log(rgw_bucket& bucket, string& oid, time_t epoch, int flags, bool purge); + /** + * Check the actual on-disk state of the object specified + * by list_state, and fill in the time and size of object. + * Then append any changes to suggested_updates for + * the rgw class' dir_suggest_changes function. + * + * Note that this can maul list_state; don't use it afterwards. Also + * it expects object to already be filled in from list_state; it only + * sets the size and mtime. + * + * Returns 0 on success, -ENOENT if the object doesn't exist on disk, + * and -errno on other failures. (-ENOENT is not a failure, and it + * will encode that info as a suggested update.) + */ + int check_disk_state(librados::IoCtx io_ctx, + rgw_bucket& bucket, + rgw_bucket_dir_entry& list_state, + RGWObjEnt& object, + bufferlist& suggested_updates); };