diff --git a/src/cls/rgw/cls_rgw.cc b/src/cls/rgw/cls_rgw.cc index 45506972330..06347675447 100644 --- a/src/cls/rgw/cls_rgw.cc +++ b/src/cls/rgw/cls_rgw.cc @@ -28,6 +28,7 @@ cls_method_handle_t h_rgw_bucket_set_tag_timeout; cls_method_handle_t h_rgw_bucket_list; cls_method_handle_t h_rgw_bucket_check_index; cls_method_handle_t h_rgw_bucket_rebuild_index; +cls_method_handle_t h_rgw_bucket_update_stats; cls_method_handle_t h_rgw_bucket_prepare_op; cls_method_handle_t h_rgw_bucket_complete_op; cls_method_handle_t h_rgw_bucket_link_olh; @@ -58,9 +59,6 @@ cls_method_handle_t h_rgw_lc_get_head; cls_method_handle_t h_rgw_lc_list_entries; -#define ROUND_BLOCK_SIZE 4096 - - #define BI_PREFIX_CHAR 0x80 #define BI_BUCKET_OBJS_INDEX 0 @@ -78,11 +76,6 @@ static string bucket_index_prefixes[] = { "", /* special handling for the objs l /* this must be the last index */ "9999_",}; -static uint64_t get_rounded_size(uint64_t size) -{ - return (size + ROUND_BLOCK_SIZE - 1) & ~(ROUND_BLOCK_SIZE - 1); -} - static bool bi_is_objs_index(const string& s) { return ((unsigned char)s[0] != BI_PREFIX_CHAR); } @@ -542,7 +535,7 @@ static int check_index(cls_method_context_t hctx, struct rgw_bucket_dir_header * struct rgw_bucket_category_stats& stats = calc_header->stats[entry.meta.category]; stats.num_entries++; stats.total_size += entry.meta.accounted_size; - stats.total_size_rounded += get_rounded_size(entry.meta.accounted_size); + stats.total_size_rounded += cls_rgw_get_rounded_size(entry.meta.accounted_size); start_obj = kiter->first; } @@ -585,6 +578,38 @@ int rgw_bucket_rebuild_index(cls_method_context_t hctx, bufferlist *in, bufferli return write_bucket_header(hctx, &calc_header); } +int rgw_bucket_update_stats(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + // decode request + rgw_cls_bucket_update_stats_op op; + auto iter = in->begin(); + try { + ::decode(op, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: %s(): failed to decode request\n", __func__); + return -EINVAL; + } + + struct rgw_bucket_dir_header header; + int rc = read_bucket_header(hctx, &header); + if (rc < 0) { + CLS_LOG(1, "ERROR: %s(): failed to read header\n", __func__); + return rc; + } + + for (auto& s : op.stats) { + auto& dest = header.stats[s.first]; + if (op.absolute) { + dest = s.second; + } else { + dest.total_size += s.second.total_size; + dest.total_size_rounded += s.second.total_size_rounded; + dest.num_entries += s.second.num_entries; + } + } + + return write_bucket_header(hctx, &header); +} int rgw_bucket_init_index(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { @@ -714,7 +739,7 @@ static void unaccount_entry(struct rgw_bucket_dir_header& header, struct rgw_buc struct rgw_bucket_category_stats& stats = header.stats[entry.meta.category]; stats.num_entries--; stats.total_size -= entry.meta.accounted_size; - stats.total_size_rounded -= get_rounded_size(entry.meta.accounted_size); + stats.total_size_rounded -= cls_rgw_get_rounded_size(entry.meta.accounted_size); } static void log_entry(const char *func, const char *str, struct rgw_bucket_dir_entry *entry) @@ -899,7 +924,7 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist entry.tag = op.tag; stats.num_entries++; stats.total_size += meta.accounted_size; - stats.total_size_rounded += get_rounded_size(meta.accounted_size); + stats.total_size_rounded += cls_rgw_get_rounded_size(meta.accounted_size); bufferlist new_key_bl; ::encode(entry, new_key_bl); int ret = cls_cxx_map_set_val(hctx, idx, &new_key_bl); @@ -1929,7 +1954,7 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx, bufferlist *in, bufferlis CLS_LOG(10, "total_entries: %" PRId64 " -> %" PRId64 "\n", old_stats.num_entries, old_stats.num_entries - 1); old_stats.num_entries--; old_stats.total_size -= cur_disk.meta.accounted_size; - old_stats.total_size_rounded -= get_rounded_size(cur_disk.meta.accounted_size); + old_stats.total_size_rounded -= cls_rgw_get_rounded_size(cur_disk.meta.accounted_size); header_changed = true; } struct rgw_bucket_category_stats& stats = @@ -1956,7 +1981,7 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx, bufferlist *in, bufferlis cur_change.key.name.c_str(), cur_change.key.instance.c_str(), stats.num_entries, stats.num_entries + 1); stats.num_entries++; stats.total_size += cur_change.meta.accounted_size; - stats.total_size_rounded += get_rounded_size(cur_change.meta.accounted_size); + stats.total_size_rounded += cls_rgw_get_rounded_size(cur_change.meta.accounted_size); header_changed = true; cur_change.index_ver = header.ver; bufferlist cur_state_bl; @@ -2261,6 +2286,11 @@ static int list_plain_entries(cls_method_context_t hctx, const string& name, con { string filter = name; string start_key = marker; + + string first_instance_idx; + encode_obj_versioned_data_key(string(), &first_instance_idx); + string end_key = first_instance_idx; + int count = 0; map keys; do { @@ -2276,6 +2306,11 @@ static int list_plain_entries(cls_method_context_t hctx, const string& name, con map::iterator iter; for (iter = keys.begin(); iter != keys.end(); ++iter) { + if (iter->first >= end_key) { + /* past the end of plain namespace */ + return count; + } + rgw_cls_bi_entry entry; entry.type = PlainIdx; entry.idx = iter->first; @@ -2293,12 +2328,15 @@ static int list_plain_entries(cls_method_context_t hctx, const string& name, con CLS_LOG(20, "%s(): entry.idx=%s e.key.name=%s", __func__, escape_str(entry.idx).c_str(), escape_str(e.key.name).c_str()); - if (e.key.name != name) { + if (!name.empty() && e.key.name != name) { return count; } entries->push_back(entry); count++; + if (count >= (int)max) { + return count; + } start_key = entry.idx; } } while (!keys.empty()); @@ -2312,13 +2350,20 @@ static int list_instance_entries(cls_method_context_t hctx, const string& name, cls_rgw_obj_key key(name); string first_instance_idx; encode_obj_versioned_data_key(key, &first_instance_idx); - string start_key = first_instance_idx; + string start_key; + + if (!name.empty()) { + start_key = first_instance_idx; + } else { + start_key = BI_PREFIX_CHAR; + start_key.append(bucket_index_prefixes[BI_BUCKET_OBJ_INSTANCE_INDEX]); + } + string filter = start_key; if (bi_entry_gt(marker, start_key)) { start_key = marker; } int count = 0; map keys; - string filter = first_instance_idx; bool started = true; do { if (count >= (int)max) { @@ -2330,13 +2375,13 @@ static int list_instance_entries(cls_method_context_t hctx, const string& name, if (started) { ret = cls_cxx_map_get_val(hctx, start_key, &keys[start_key]); if (ret == -ENOENT) { - ret = cls_cxx_map_get_vals(hctx, start_key, filter, BI_GET_NUM_KEYS, &keys); + ret = cls_cxx_map_get_vals(hctx, start_key, string(), BI_GET_NUM_KEYS, &keys); } started = false; } else { - ret = cls_cxx_map_get_vals(hctx, start_key, filter, BI_GET_NUM_KEYS, &keys); + ret = cls_cxx_map_get_vals(hctx, start_key, string(), BI_GET_NUM_KEYS, &keys); } - CLS_LOG(20, "%s(): start_key=%s keys.size()=%d", __func__, escape_str(start_key).c_str(), (int)keys.size()); + CLS_LOG(20, "%s(): start_key=%s first_instance_idx=%s keys.size()=%d", __func__, escape_str(start_key).c_str(), escape_str(first_instance_idx).c_str(), (int)keys.size()); if (ret < 0) { return ret; } @@ -2348,6 +2393,10 @@ static int list_instance_entries(cls_method_context_t hctx, const string& name, entry.idx = iter->first; entry.data = iter->second; + if (!filter.empty() && entry.idx.compare(0, filter.size(), filter) != 0) { + return count; + } + CLS_LOG(20, "%s(): entry.idx=%s", __func__, escape_str(entry.idx).c_str()); bufferlist::iterator biter = entry.data.begin(); @@ -2360,7 +2409,85 @@ static int list_instance_entries(cls_method_context_t hctx, const string& name, return -EIO; } - if (e.key.name != name) { + if (!name.empty() && e.key.name != name) { + return count; + } + + entries->push_back(entry); + count++; + start_key = entry.idx; + } + } while (!keys.empty()); + + return count; +} + +static int list_olh_entries(cls_method_context_t hctx, const string& name, const string& marker, uint32_t max, + list *entries) +{ + cls_rgw_obj_key key(name); + string first_instance_idx; + encode_olh_data_key(key, &first_instance_idx); + string start_key; + + if (!name.empty()) { + start_key = first_instance_idx; + } else { + start_key = BI_PREFIX_CHAR; + start_key.append(bucket_index_prefixes[BI_BUCKET_OLH_DATA_INDEX]); + } + string filter = start_key; + if (bi_entry_gt(marker, start_key)) { + start_key = marker; + } + int count = 0; + map keys; + bool started = true; + do { + if (count >= (int)max) { + return count; + } + keys.clear(); +#define BI_GET_NUM_KEYS 128 + int ret; + if (started) { + ret = cls_cxx_map_get_val(hctx, start_key, &keys[start_key]); + if (ret == -ENOENT) { + ret = cls_cxx_map_get_vals(hctx, start_key, string(), BI_GET_NUM_KEYS, &keys); + } + started = false; + } else { + ret = cls_cxx_map_get_vals(hctx, start_key, string(), BI_GET_NUM_KEYS, &keys); + } + CLS_LOG(20, "%s(): start_key=%s first_instance_idx=%s keys.size()=%d", __func__, escape_str(start_key).c_str(), escape_str(first_instance_idx).c_str(), (int)keys.size()); + if (ret < 0) { + return ret; + } + + map::iterator iter; + for (iter = keys.begin(); iter != keys.end(); ++iter) { + rgw_cls_bi_entry entry; + entry.type = OLHIdx; + entry.idx = iter->first; + entry.data = iter->second; + + if (!filter.empty() && entry.idx.compare(0, filter.size(), filter) != 0) { + return count; + } + + CLS_LOG(20, "%s(): entry.idx=%s", __func__, escape_str(entry.idx).c_str()); + + bufferlist::iterator biter = entry.data.begin(); + + rgw_bucket_olh_entry e; + try { + ::decode(e, biter); + } catch (buffer::error& err) { + CLS_LOG(0, "ERROR: %s(): failed to decode buffer (size=%d)", __func__, entry.data.length()); + return -EIO; + } + + if (!name.empty() && e.key.name != name) { return count; } @@ -2391,29 +2518,35 @@ static int rgw_bi_list_op(cls_method_context_t hctx, bufferlist *in, bufferlist #define MAX_BI_LIST_ENTRIES 1000 int32_t max = (op.max < MAX_BI_LIST_ENTRIES ? op.max : MAX_BI_LIST_ENTRIES); string start_key = op.marker; - int ret = list_plain_entries(hctx, op.name, op.marker, max, &op_ret.entries); + int ret = list_plain_entries(hctx, op.name, op.marker, max, &op_ret.entries) + 1; /* one extra entry for identifying truncation */ if (ret < 0) { CLS_LOG(0, "ERROR: %s(): list_plain_entries retured ret=%d", __func__, ret); return ret; } int count = ret; + CLS_LOG(20, "found %d plain entries", count); + ret = list_instance_entries(hctx, op.name, op.marker, max - count, &op_ret.entries); if (ret < 0) { CLS_LOG(0, "ERROR: %s(): list_instance_entries retured ret=%d", __func__, ret); return ret; } - cls_rgw_obj_key key(op.name); - rgw_cls_bi_entry entry; - encode_olh_data_key(key, &entry.idx); - ret = cls_cxx_map_get_val(hctx, entry.idx, &entry.data); - if (ret < 0 && ret != -ENOENT) { - CLS_LOG(0, "ERROR: %s(): cls_cxx_map_get_val retured ret=%d", __func__, ret); + count += ret; + + ret = list_olh_entries(hctx, op.name, op.marker, max - count, &op_ret.entries); + if (ret < 0) { + CLS_LOG(0, "ERROR: %s(): list_instance_entries retured ret=%d", __func__, ret); return ret; - } else if (ret >= 0) { - entry.type = OLHIdx; - op_ret.entries.push_back(entry); + } + + count += ret; + + op_ret.is_truncated = (count >= max); + while (count >= max) { + op_ret.entries.pop_back(); + count--; } ::encode(op_ret, *out); @@ -3381,6 +3514,7 @@ void __cls_init() cls_register_cxx_method(h_class, "bucket_list", CLS_METHOD_RD, rgw_bucket_list, &h_rgw_bucket_list); cls_register_cxx_method(h_class, "bucket_check_index", CLS_METHOD_RD, rgw_bucket_check_index, &h_rgw_bucket_check_index); cls_register_cxx_method(h_class, "bucket_rebuild_index", CLS_METHOD_RD | CLS_METHOD_WR, rgw_bucket_rebuild_index, &h_rgw_bucket_rebuild_index); + cls_register_cxx_method(h_class, "bucket_update_stats", CLS_METHOD_RD | CLS_METHOD_WR, rgw_bucket_update_stats, &h_rgw_bucket_update_stats); cls_register_cxx_method(h_class, "bucket_prepare_op", CLS_METHOD_RD | CLS_METHOD_WR, rgw_bucket_prepare_op, &h_rgw_bucket_prepare_op); cls_register_cxx_method(h_class, "bucket_complete_op", CLS_METHOD_RD | CLS_METHOD_WR, rgw_bucket_complete_op, &h_rgw_bucket_complete_op); cls_register_cxx_method(h_class, "bucket_link_olh", CLS_METHOD_RD | CLS_METHOD_WR, rgw_bucket_link_olh, &h_rgw_bucket_link_olh); diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc index f2dc02148bf..437c888098c 100644 --- a/src/cls/rgw/cls_rgw_client.cc +++ b/src/cls/rgw/cls_rgw_client.cc @@ -138,6 +138,17 @@ int CLSRGWIssueSetTagTimeout::issue_op(int shard_id, const string& oid) return issue_bucket_set_tag_timeout_op(io_ctx, oid, tag_timeout, &manager); } +void cls_rgw_bucket_update_stats(librados::ObjectWriteOperation& o, bool absolute, + const map& stats) +{ + struct rgw_cls_bucket_update_stats_op call; + call.absolute = absolute; + call.stats = stats; + bufferlist in; + ::encode(call, in); + o.exec("rgw", "bucket_update_stats", in); +} + void cls_rgw_bucket_prepare_op(ObjectWriteOperation& o, RGWModifyOp op, string& tag, const cls_rgw_obj_key& key, const string& locator, bool log_op, uint16_t bilog_flags) @@ -277,6 +288,15 @@ int cls_rgw_bi_put(librados::IoCtx& io_ctx, const string oid, rgw_cls_bi_entry& return 0; } +void cls_rgw_bi_put(ObjectWriteOperation& op, const string oid, rgw_cls_bi_entry& entry) +{ + bufferlist in, out; + struct rgw_cls_bi_put_op call; + call.entry = entry; + ::encode(call, in); + op.exec("rgw", "bi_put", in); +} + int cls_rgw_bi_list(librados::IoCtx& io_ctx, const string oid, const string& name, const string& marker, uint32_t max, list *entries, bool *is_truncated) diff --git a/src/cls/rgw/cls_rgw_client.h b/src/cls/rgw/cls_rgw_client.h index 4c68385f5a8..be7106a6c34 100644 --- a/src/cls/rgw/cls_rgw_client.h +++ b/src/cls/rgw/cls_rgw_client.h @@ -304,6 +304,9 @@ public: CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio), tag_timeout(_tag_timeout) {} }; +void cls_rgw_bucket_update_stats(librados::ObjectWriteOperation& o, bool absolute, + const map& stats); + void cls_rgw_bucket_prepare_op(librados::ObjectWriteOperation& o, RGWModifyOp op, string& tag, const cls_rgw_obj_key& key, const string& locator, bool log_op, uint16_t bilog_op); @@ -324,6 +327,7 @@ int cls_rgw_bi_get(librados::IoCtx& io_ctx, const string oid, BIIndexType index_type, cls_rgw_obj_key& key, rgw_cls_bi_entry *entry); int cls_rgw_bi_put(librados::IoCtx& io_ctx, const string oid, rgw_cls_bi_entry& entry); +void cls_rgw_bi_put(librados::ObjectWriteOperation& op, const string oid, rgw_cls_bi_entry& entry); int cls_rgw_bi_list(librados::IoCtx& io_ctx, const string oid, const string& name, const string& marker, uint32_t max, list *entries, bool *is_truncated); diff --git a/src/cls/rgw/cls_rgw_ops.cc b/src/cls/rgw/cls_rgw_ops.cc index 2cf361993e8..0b3701bf9ff 100644 --- a/src/cls/rgw/cls_rgw_ops.cc +++ b/src/cls/rgw/cls_rgw_ops.cc @@ -343,6 +343,29 @@ void rgw_cls_check_index_ret::dump(Formatter *f) const ::encode_json("calculated_header", calculated_header, f); } +void rgw_cls_bucket_update_stats_op::generate_test_instances(list& o) +{ + rgw_cls_bucket_update_stats_op *r = new rgw_cls_bucket_update_stats_op; + r->absolute = true; + rgw_bucket_category_stats& s = r->stats[0]; + s.total_size = 1; + s.total_size_rounded = 4096; + s.num_entries = 1; + o.push_back(r); + + o.push_back(new rgw_cls_bucket_update_stats_op); +} + +void rgw_cls_bucket_update_stats_op::dump(Formatter *f) const +{ + ::encode_json("absolute", absolute, f); + map s; + for (auto& entry : stats) { + s[(int)entry.first] = entry.second; + } + ::encode_json("stats", s, f); +} + void cls_rgw_bi_log_list_op::dump(Formatter *f) const { f->dump_string("marker", marker); diff --git a/src/cls/rgw/cls_rgw_ops.h b/src/cls/rgw/cls_rgw_ops.h index 4aed26b4f5e..74035929387 100644 --- a/src/cls/rgw/cls_rgw_ops.h +++ b/src/cls/rgw/cls_rgw_ops.h @@ -443,6 +443,30 @@ struct rgw_cls_check_index_ret }; WRITE_CLASS_ENCODER(rgw_cls_check_index_ret) +struct rgw_cls_bucket_update_stats_op +{ + bool absolute{false}; + map stats; + + rgw_cls_bucket_update_stats_op() {} + + void encode(bufferlist &bl) const { + ENCODE_START(1, 1, bl); + ::encode(absolute, bl); + ::encode(stats, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::iterator &bl) { + DECODE_START(1, bl); + ::decode(absolute, bl); + ::decode(stats, bl); + DECODE_FINISH(bl); + } + void dump(Formatter *f) const; + static void generate_test_instances(list& o); +}; +WRITE_CLASS_ENCODER(rgw_cls_bucket_update_stats_op) + struct rgw_cls_obj_remove_op { list keep_attr_prefixes; diff --git a/src/cls/rgw/cls_rgw_types.cc b/src/cls/rgw/cls_rgw_types.cc index 79a52722d66..27a413e865b 100644 --- a/src/cls/rgw/cls_rgw_types.cc +++ b/src/cls/rgw/cls_rgw_types.cc @@ -237,6 +237,38 @@ void rgw_cls_bi_entry::dump(Formatter *f) const dump_bi_entry(data, type, f); } +bool rgw_cls_bi_entry::get_info(cls_rgw_obj_key *key, uint8_t *category, rgw_bucket_category_stats *accounted_stats) +{ + bool account = false; + bufferlist::iterator iter = data.begin(); + switch (type) { + case PlainIdx: + case InstanceIdx: + { + rgw_bucket_dir_entry entry; + ::decode(entry, iter); + *key = entry.key; + *category = entry.meta.category; + accounted_stats->num_entries++; + accounted_stats->total_size += entry.meta.accounted_size; + accounted_stats->total_size_rounded += cls_rgw_get_rounded_size(entry.meta.accounted_size); + account = true; + } + break; + case OLHIdx: + { + rgw_bucket_olh_entry entry; + ::decode(entry, iter); + *key = entry.key; + } + break; + default: + break; + } + + return account; +} + void rgw_bucket_olh_entry::dump(Formatter *f) const { encode_json("key", key, f); diff --git a/src/cls/rgw/cls_rgw_types.h b/src/cls/rgw/cls_rgw_types.h index 33d273ac3ea..ea14cb6ec23 100644 --- a/src/cls/rgw/cls_rgw_types.h +++ b/src/cls/rgw/cls_rgw_types.h @@ -49,6 +49,13 @@ enum RGWCheckMTimeType { CLS_RGW_CHECK_TIME_MTIME_GE = 4, }; +#define ROUND_BLOCK_SIZE 4096 + +static inline uint64_t cls_rgw_get_rounded_size(uint64_t size) +{ + return (size + ROUND_BLOCK_SIZE - 1) & ~(ROUND_BLOCK_SIZE - 1); +} + struct rgw_bucket_pending_info { RGWPendingState state; ceph::real_time timestamp; @@ -361,6 +368,8 @@ enum BIIndexType { OLHIdx = 3, }; +struct rgw_bucket_category_stats; + struct rgw_cls_bi_entry { BIIndexType type; string idx; @@ -388,6 +397,8 @@ struct rgw_cls_bi_entry { void dump(Formatter *f) const; void decode_json(JSONObj *obj, cls_rgw_obj_key *effective_key = NULL); + + bool get_info(cls_rgw_obj_key *key, uint8_t *category, rgw_bucket_category_stats *accounted_stats); }; WRITE_CLASS_ENCODER(rgw_cls_bi_entry) diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 57e3ec99a97..d35f0f79304 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -1,4 +1,3 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #include @@ -16,6 +15,8 @@ #include "common/errno.h" #include "common/safe_io.h" +#include "cls/rgw/cls_rgw_client.h" + #include "global/global_init.h" #include "include/utime.h" @@ -72,6 +73,10 @@ void _usage() cout << " bucket stats returns bucket statistics\n"; cout << " bucket rm remove bucket\n"; cout << " bucket check check bucket index\n"; + cout << " bucket reshard reshard bucket\n"; + cout << " bi get retrieve bucket index object entries\n"; + cout << " bi put store bucket index object entries\n"; + cout << " bi list list raw bucket index entries\n"; cout << " object rm remove object\n"; cout << " object unlink unlink object from bucket index\n"; cout << " objects expire run expired objects cleanup\n"; @@ -300,6 +305,7 @@ enum { OPT_BUCKET_SYNC_RUN, OPT_BUCKET_RM, OPT_BUCKET_REWRITE, + OPT_BUCKET_RESHARD, OPT_POLICY, OPT_POOL_ADD, OPT_POOL_RM, @@ -317,6 +323,7 @@ enum { OPT_BI_GET, OPT_BI_PUT, OPT_BI_LIST, + OPT_BI_PURGE, OPT_OLH_GET, OPT_OLH_READLOG, OPT_QUOTA_SET, @@ -501,6 +508,8 @@ static int get_cmd(const char *cmd, const char *prev_cmd, const char *prev_prev_ return OPT_BUCKET_RM; if (strcmp(cmd, "rewrite") == 0) return OPT_BUCKET_REWRITE; + if (strcmp(cmd, "reshard") == 0) + return OPT_BUCKET_RESHARD; if (strcmp(cmd, "check") == 0) return OPT_BUCKET_CHECK; if (strcmp(cmd, "sync") == 0) { @@ -566,6 +575,8 @@ static int get_cmd(const char *cmd, const char *prev_cmd, const char *prev_prev_ return OPT_BI_PUT; if (strcmp(cmd, "list") == 0) return OPT_BI_LIST; + if (strcmp(cmd, "purge") == 0) + return OPT_BI_PURGE; } else if (strcmp(prev_cmd, "period") == 0) { if (strcmp(cmd, "delete") == 0) return OPT_PERIOD_DELETE; @@ -897,16 +908,16 @@ public: }; static int init_bucket(const string& tenant_name, const string& bucket_name, const string& bucket_id, - RGWBucketInfo& bucket_info, rgw_bucket& bucket) + RGWBucketInfo& bucket_info, rgw_bucket& bucket, map *pattrs = nullptr) { if (!bucket_name.empty()) { RGWObjectCtx obj_ctx(store); int r; if (bucket_id.empty()) { - r = store->get_bucket_info(obj_ctx, tenant_name, bucket_name, bucket_info, NULL); + r = store->get_bucket_info(obj_ctx, tenant_name, bucket_name, bucket_info, nullptr, pattrs); } else { string bucket_instance_id = bucket_name + ":" + bucket_id; - r = store->get_bucket_instance_info(obj_ctx, bucket_instance_id, bucket_info, NULL, NULL); + r = store->get_bucket_instance_info(obj_ctx, bucket_instance_id, bucket_info, NULL, pattrs); } if (r < 0) { cerr << "could not get bucket info for bucket=" << bucket_name << std::endl; @@ -2004,7 +2015,176 @@ static void parse_tier_config_param(const string& s, map& out) } } -int main(int argc, char **argv) +#define RESHARD_SHARD_WINDOW 64 +#define RESHARD_MAX_AIO 128 + +class BucketReshardShard { + RGWRados *store; + RGWBucketInfo& bucket_info; + int num_shard; + RGWRados::BucketShard bs; + vector entries; + map stats; + deque& aio_completions; + + int wait_next_completion() { + librados::AioCompletion *c = aio_completions.front(); + aio_completions.pop_front(); + + c->wait_for_safe(); + + int ret = c->get_return_value(); + c->release(); + + if (ret < 0) { + cerr << "ERROR: reshard rados operation failed: " << cpp_strerror(-ret) << std::endl; + return ret; + } + + return 0; + } + + int get_completion(librados::AioCompletion **c) { + if (aio_completions.size() >= RESHARD_MAX_AIO) { + int ret = wait_next_completion(); + if (ret < 0) { + return ret; + } + } + + *c = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr); + aio_completions.push_back(*c); + + return 0; + } + +public: + BucketReshardShard(RGWRados *_store, RGWBucketInfo& _bucket_info, + int _num_shard, + deque& _completions) : store(_store), bucket_info(_bucket_info), bs(store), + aio_completions(_completions) { + num_shard = (bucket_info.num_shards > 0 ? _num_shard : -1); + bs.init(bucket_info.bucket, num_shard); + } + + int get_num_shard() { + return num_shard; + } + + int add_entry(rgw_cls_bi_entry& entry, bool account, uint8_t category, + const rgw_bucket_category_stats& entry_stats) { + entries.push_back(entry); + if (account) { + rgw_bucket_category_stats& target = stats[category]; + target.num_entries += entry_stats.num_entries; + target.total_size += entry_stats.total_size; + target.total_size_rounded += entry_stats.total_size_rounded; + } + if (entries.size() >= RESHARD_SHARD_WINDOW) { + int ret = flush(); + if (ret < 0) { + return ret; + } + } + return 0; + } + int flush() { + if (entries.size() == 0) { + return 0; + } + + librados::ObjectWriteOperation op; + for (auto& entry : entries) { + store->bi_put(op, bs, entry); + } + cls_rgw_bucket_update_stats(op, false, stats); + + librados::AioCompletion *c; + int ret = get_completion(&c); + if (ret < 0) { + return ret; + } + ret = bs.index_ctx.aio_operate(bs.bucket_obj, c, &op); + if (ret < 0) { + std::cerr << "ERROR: failed to store entries in target bucket shard (bs=" << bs.bucket << "/" << bs.shard_id << ") error=" << cpp_strerror(-ret) << std::endl; + return ret; + } + entries.clear(); + stats.clear(); + return 0; + } + + int wait_all_aio() { + int ret = 0; + while (!aio_completions.empty()) { + int r = wait_next_completion(); + if (r < 0) { + ret = r; + } + } + return ret; + } +}; + +class BucketReshardManager { + RGWRados *store; + RGWBucketInfo& target_bucket_info; + deque completions; + int num_target_shards; + vector target_shards; + +public: + BucketReshardManager(RGWRados *_store, RGWBucketInfo& _target_bucket_info, int _num_target_shards) : store(_store), target_bucket_info(_target_bucket_info), + num_target_shards(_num_target_shards) { + target_shards.resize(num_target_shards); + for (int i = 0; i < num_target_shards; ++i) { + target_shards[i] = new BucketReshardShard(store, target_bucket_info, i, completions); + } + } + + ~BucketReshardManager() { + for (auto& shard : target_shards) { + int ret = shard->wait_all_aio(); + if (ret < 0) { + ldout(store->ctx(), 20) << __func__ << ": shard->wait_all_aio() returned ret=" << ret << dendl; + } + } + } + + int add_entry(int shard_index, + rgw_cls_bi_entry& entry, bool account, uint8_t category, + const rgw_bucket_category_stats& entry_stats) { + int ret = target_shards[shard_index]->add_entry(entry, account, category, entry_stats); + if (ret < 0) { + cerr << "ERROR: target_shards.add_entry(" << entry.idx << ") returned error: " << cpp_strerror(-ret) << std::endl; + return ret; + } + return 0; + } + + int finish() { + int ret = 0; + for (auto& shard : target_shards) { + int r = shard->flush(); + if (r < 0) { + cerr << "ERROR: target_shards[" << shard->get_num_shard() << "].flush() returned error: " << cpp_strerror(-r) << std::endl; + ret = r; + } + } + for (auto& shard : target_shards) { + int r = shard->wait_all_aio(); + if (r < 0) { + cerr << "ERROR: target_shards[" << shard->get_num_shard() << "].wait_all_aio() returned error: " << cpp_strerror(-r) << std::endl; + ret = r; + } + delete shard; + } + target_shards.clear(); + return ret; + } +}; + +int main(int argc, char **argv) { vector args; argv_to_vec(argc, (const char **)argv, args); @@ -2109,6 +2289,8 @@ int main(int argc, char **argv) int bypass_gc = false; int inconsistent_index = false; + int verbose = false; + int extra_info = false; uint64_t min_rewrite_size = 4 * 1024 * 1024; @@ -2119,6 +2301,7 @@ int main(int argc, char **argv) string job_id; int num_shards = 0; + bool num_shards_specified = false; int max_concurrent_ios = 32; uint64_t orphan_stale_secs = (24 * 3600); @@ -2200,6 +2383,8 @@ int main(int argc, char **argv) admin_specified = true; } else if (ceph_argparse_binary_flag(args, i, &system, NULL, "--system", (char*)NULL)) { system_specified = true; + } else if (ceph_argparse_binary_flag(args, i, &verbose, NULL, "--verbose", (char*)NULL)) { + // do nothing } else if (ceph_argparse_binary_flag(args, i, &staging, NULL, "--staging", (char*)NULL)) { // do nothing } else if (ceph_argparse_binary_flag(args, i, &commit, NULL, "--commit", (char*)NULL)) { @@ -2256,6 +2441,7 @@ int main(int argc, char **argv) cerr << "ERROR: failed to parse num shards: " << err << std::endl; return EINVAL; } + num_shards_specified = true; } else if (ceph_argparse_witharg(args, i, &val, "--max-concurrent-ios", (char*)NULL)) { max_concurrent_ios = (int)strict_strtol(val.c_str(), 10, &err); if (!err.empty()) { @@ -4470,6 +4656,10 @@ next: } if (opt_cmd == OPT_BI_LIST) { + if (bucket_name.empty()) { + cerr << "ERROR: bucket name not specified" << std::endl; + return EINVAL; + } RGWBucketInfo bucket_info; int ret = init_bucket(tenant, bucket_name, bucket_id, bucket_info, bucket); if (ret < 0) { @@ -4483,29 +4673,88 @@ next: max_entries = 1000; } + int max_shards = (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1); formatter->open_array_section("entries"); - do { - entries.clear(); - ret = store->bi_list(bucket, object, marker, max_entries, &entries, &is_truncated); + for (int i = 0; i < max_shards; i++) { + RGWRados::BucketShard bs(store); + int shard_id = (bucket_info.num_shards > 0 ? i : -1); + int ret = bs.init(bucket, shard_id); + marker.clear(); + if (ret < 0) { - cerr << "ERROR: bi_list(): " << cpp_strerror(-ret) << std::endl; + cerr << "ERROR: bs.init(bucket=" << bucket << ", shard=" << shard_id << "): " << cpp_strerror(-ret) << std::endl; return -ret; } - list::iterator iter; - for (iter = entries.begin(); iter != entries.end(); ++iter) { - rgw_cls_bi_entry& entry = *iter; - encode_json("entry", entry, formatter); - marker = entry.idx; - } + do { + entries.clear(); + ret = store->bi_list(bs, object, marker, max_entries, &entries, &is_truncated); + if (ret < 0) { + cerr << "ERROR: bi_list(): " << cpp_strerror(-ret) << std::endl; + return -ret; + } + + list::iterator iter; + for (iter = entries.begin(); iter != entries.end(); ++iter) { + rgw_cls_bi_entry& entry = *iter; + encode_json("entry", entry, formatter); + marker = entry.idx; + } + formatter->flush(cout); + } while (is_truncated); formatter->flush(cout); - } while (is_truncated); + } formatter->close_section(); formatter->flush(cout); } + if (opt_cmd == OPT_BI_PURGE) { + if (bucket_name.empty()) { + cerr << "ERROR: bucket name not specified" << std::endl; + return EINVAL; + } + RGWBucketInfo bucket_info; + int ret = init_bucket(tenant, bucket_name, bucket_id, bucket_info, bucket); + if (ret < 0) { + cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + + RGWBucketInfo cur_bucket_info; + rgw_bucket cur_bucket; + ret = init_bucket(tenant, bucket_name, string(), cur_bucket_info, cur_bucket); + if (ret < 0) { + cerr << "ERROR: could not init current bucket info for bucket_name=" << bucket_name << ": " << cpp_strerror(-ret) << std::endl; + return -ret; + } + + if (cur_bucket_info.bucket.bucket_id == bucket_info.bucket.bucket_id && !yes_i_really_mean_it) { + cerr << "specified bucket instance points to a current bucket instance" << std::endl; + cerr << "do you really mean it? (requires --yes-i-really-mean-it)" << std::endl; + return EINVAL; + } + + int max_shards = (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1); + + for (int i = 0; i < max_shards; i++) { + RGWRados::BucketShard bs(store); + int shard_id = (bucket_info.num_shards > 0 ? i : -1); + int ret = bs.init(bucket, shard_id); + if (ret < 0) { + cerr << "ERROR: bs.init(bucket=" << bucket << ", shard=" << shard_id << "): " << cpp_strerror(-ret) << std::endl; + return -ret; + } + + ret = store->bi_remove(bs); + if (ret < 0) { + cerr << "ERROR: failed to remove bucket index object: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + } + } + if (opt_cmd == OPT_OBJECT_RM) { RGWBucketInfo bucket_info; int ret = init_bucket(tenant, bucket_name, bucket_id, bucket_info, bucket); @@ -4669,6 +4918,158 @@ next: formatter->flush(cout); } + if (opt_cmd == OPT_BUCKET_RESHARD) { + if (bucket_name.empty()) { + cerr << "ERROR: bucket not specified" << std::endl; + return EINVAL; + } + + if (!num_shards_specified) { + cerr << "ERROR: --num-shards not specified" << std::endl; + return EINVAL; + } + + if (num_shards > (int)store->get_max_bucket_shards()) { + cerr << "ERROR: num_shards too high, max value: " << store->get_max_bucket_shards() << std::endl; + return EINVAL; + } + + RGWBucketInfo bucket_info; + map attrs; + int ret = init_bucket(tenant, bucket_name, bucket_id, bucket_info, bucket, &attrs); + if (ret < 0) { + cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + + int num_source_shards = (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1); + + if (num_shards <= num_source_shards && !yes_i_really_mean_it) { + cerr << "num shards is less or equal to current shards count" << std::endl + << "do you really mean it? (requires --yes-i-really-mean-it)" << std::endl; + return EINVAL; + } + + RGWBucketInfo new_bucket_info(bucket_info); + store->create_bucket_id(&new_bucket_info.bucket.bucket_id); + new_bucket_info.bucket.oid.clear(); + + new_bucket_info.num_shards = num_shards; + new_bucket_info.objv_tracker.clear(); + + cout << "*** NOTICE: operation will not remove old bucket index objects ***" << std::endl; + cout << "*** these will need to be removed manually ***" << std::endl; + cout << "old bucket instance id: " << bucket_info.bucket.bucket_id << std::endl; + cout << "new bucket instance id: " << new_bucket_info.bucket.bucket_id << std::endl; + + ret = store->init_bucket_index(new_bucket_info.bucket, new_bucket_info.num_shards); + if (ret < 0) { + cerr << "ERROR: failed to init new bucket indexes: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + + ret = store->put_bucket_instance_info(new_bucket_info, true, real_time(), &attrs); + if (ret < 0) { + cerr << "ERROR: failed to store new bucket instance info: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + list entries; + + if (max_entries < 0) { + max_entries = 1000; + } + + int num_target_shards = (new_bucket_info.num_shards > 0 ? new_bucket_info.num_shards : 1); + + BucketReshardManager target_shards_mgr(store, new_bucket_info, num_target_shards); + + if (verbose) { + formatter->open_array_section("entries"); + } + + uint64_t total_entries = 0; + + if (!verbose) { + cout << "total entries:"; + } + + for (int i = 0; i < num_source_shards; ++i) { + bool is_truncated = true; + marker.clear(); + while (is_truncated) { + entries.clear(); + ret = store->bi_list(bucket, i, string(), marker, max_entries, &entries, &is_truncated); + if (ret < 0) { + cerr << "ERROR: bi_list(): " << cpp_strerror(-ret) << std::endl; + return -ret; + } + + list::iterator iter; + for (iter = entries.begin(); iter != entries.end(); ++iter) { + rgw_cls_bi_entry& entry = *iter; + if (verbose) { + formatter->open_object_section("entry"); + + encode_json("shard_id", i, formatter); + encode_json("num_entry", total_entries, formatter); + encode_json("entry", entry, formatter); + } + total_entries++; + + marker = entry.idx; + + int target_shard_id; + cls_rgw_obj_key cls_key; + uint8_t category; + rgw_bucket_category_stats stats; + bool account = entry.get_info(&cls_key, &category, &stats); + rgw_obj_key key(cls_key); + rgw_obj obj(new_bucket_info.bucket, key); + int ret = store->get_target_shard_id(new_bucket_info, obj.get_hash_object(), &target_shard_id); + if (ret < 0) { + cerr << "ERROR: get_target_shard_id() returned ret=" << ret << std::endl; + return ret; + } + + int shard_index = (target_shard_id > 0 ? target_shard_id : 0); + + ret = target_shards_mgr.add_entry(shard_index, entry, account, category, stats); + if (ret < 0) { + return ret; + } + if (verbose) { + formatter->close_section(); + formatter->flush(cout); + formatter->flush(cout); + } else if (!(total_entries % 1000)) { + cout << " " << total_entries; + } + } + } + } + if (verbose) { + formatter->close_section(); + formatter->flush(cout); + } else { + cout << " " << total_entries << std::endl; + } + + ret = target_shards_mgr.finish(); + if (ret < 0) { + cerr << "ERROR: failed to reshard" << std::endl; + return EIO; + } + + bucket_op.set_bucket_id(new_bucket_info.bucket.bucket_id); + bucket_op.set_user_id(new_bucket_info.owner); + string err; + int r = RGWBucketAdminOp::link(store, bucket_op, &err); + if (r < 0) { + cerr << "failed to link new bucket instance (bucket_id=" << new_bucket_info.bucket.bucket_id << ": " << err << "; " << cpp_strerror(-r) << std::endl; + return -r; + } + } + if (opt_cmd == OPT_OBJECT_UNLINK) { RGWBucketInfo bucket_info; int ret = init_bucket(tenant, bucket_name, bucket_id, bucket_info, bucket); diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc index b49c7477457..199dc1a4041 100644 --- a/src/rgw/rgw_bucket.cc +++ b/src/rgw/rgw_bucket.cc @@ -224,6 +224,7 @@ int rgw_link_bucket(RGWRados *store, const rgw_user& user_id, rgw_bucket& bucket ep.linked = true; ep.owner = user_id; + ep.bucket = bucket; ret = store->put_bucket_entrypoint_info(tenant_name, bucket_name, ep, false, ot, real_time(), &attrs); if (ret < 0) goto done_err; @@ -891,7 +892,7 @@ int RGWBucket::link(RGWBucketAdminOpState& op_state, std::string *err_msg) rgw_obj obj_bucket_instance(bucket_instance, no_oid); r = store->system_obj_set_attr(NULL, obj_bucket_instance, RGW_ATTR_ACL, aclbl, &objv_tracker); - r = rgw_link_bucket(store, user_info.user_id, bucket, real_time()); + r = rgw_link_bucket(store, user_info.user_id, bucket_info.bucket, real_time()); if (r < 0) return r; } diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 2f0210b2be8..4dfe489f6d1 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -72,8 +72,6 @@ using namespace librados; #define dout_subsys ceph_subsys_rgw -#define MAX_BUCKET_INDEX_SHARDS_PRIME 7877 - using namespace std; static RGWCache cached_rados_provider; @@ -5274,7 +5272,7 @@ int RGWRados::init_bucket_index(rgw_bucket& bucket, int num_shards) return r; string dir_oid = dir_oid_prefix; - dir_oid.append(bucket.marker); + dir_oid.append(bucket.bucket_id); map bucket_objs; get_bucket_index_objects(dir_oid, num_shards, bucket_objs); @@ -5282,6 +5280,15 @@ int RGWRados::init_bucket_index(rgw_bucket& bucket, int num_shards) return CLSRGWIssueBucketIndexInit(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)(); } +void RGWRados::create_bucket_id(string *bucket_id) +{ + uint64_t iid = instance_id(); + uint64_t bid = next_bucket_id(); + char buf[get_zone_params().get_id().size() + 48]; + snprintf(buf, sizeof(buf), "%s.%llu.%llu", get_zone_params().get_id().c_str(), (long long)iid, (long long)bid); + *bucket_id = buf; +} + /** * create a bucket with name bucket and the given list of attrs * returns 0 on success, -ERR# otherwise. @@ -5312,11 +5319,7 @@ int RGWRados::create_bucket(RGWUserInfo& owner, rgw_bucket& bucket, return ret; if (!pmaster_bucket) { - uint64_t iid = instance_id(); - uint64_t bid = next_bucket_id(); - char buf[get_zone_params().get_id().size() + 48]; - snprintf(buf, sizeof(buf), "%s.%llu.%llu", get_zone_params().get_id().c_str(), (long long)iid, (long long)bid); - bucket.marker = buf; + create_bucket_id(&bucket.marker); bucket.bucket_id = bucket.marker; } else { bucket.marker = pmaster_bucket->marker; @@ -5991,6 +5994,21 @@ int RGWRados::BucketShard::init(rgw_bucket& _bucket, rgw_obj& obj) return 0; } +int RGWRados::BucketShard::init(rgw_bucket& _bucket, int sid) +{ + bucket = _bucket; + shard_id = sid; + + int ret = store->open_bucket_index_shard(bucket, index_ctx, shard_id, &bucket_obj); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: open_bucket_index_shard() returned ret=" << ret << dendl; + return ret; + } + ldout(store->ctx(), 20) << " bucket index object: " << bucket_obj << dendl; + + return 0; +} + /* Execute @handler on last item in bucket listing for bucket specified * in @bucket_info. @obj_prefix and @obj_delim narrow down the listing @@ -7793,13 +7811,13 @@ int RGWRados::open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx, if (r < 0) return r; - if (bucket.marker.empty()) { - ldout(cct, 0) << "ERROR: empty marker for bucket operation" << dendl; + if (bucket.bucket_id.empty()) { + ldout(cct, 0) << "ERROR: empty bucket id for bucket operation" << dendl; return -EIO; } bucket_oid = dir_oid_prefix; - bucket_oid.append(bucket.marker); + bucket_oid.append(bucket.bucket_id); return 0; } @@ -7810,13 +7828,13 @@ int RGWRados::open_bucket_index_base(rgw_bucket& bucket, librados::IoCtx& index_ if (r < 0) return r; - if (bucket.marker.empty()) { - ldout(cct, 0) << "ERROR: empty marker for bucket operation" << dendl; + if (bucket.bucket_id.empty()) { + ldout(cct, 0) << "ERROR: empty bucket_id for bucket operation" << dendl; return -EIO; } bucket_oid_base = dir_oid_prefix; - bucket_oid_base.append(bucket.marker); + bucket_oid_base.append(bucket.bucket_id); return 0; @@ -7885,6 +7903,27 @@ int RGWRados::open_bucket_index_shard(rgw_bucket& bucket, librados::IoCtx& index return 0; } +int RGWRados::open_bucket_index_shard(rgw_bucket& bucket, librados::IoCtx& index_ctx, + int shard_id, string *bucket_obj) +{ + string bucket_oid_base; + int ret = open_bucket_index_base(bucket, index_ctx, bucket_oid_base); + if (ret < 0) + return ret; + + RGWObjectCtx obj_ctx(this); + + // Get the bucket info + RGWBucketInfo binfo; + ret = get_bucket_instance_info(obj_ctx, bucket, binfo, NULL, NULL); + if (ret < 0) + return ret; + + get_bucket_index_object(bucket_oid_base, binfo.num_shards, + shard_id, bucket_obj); + return 0; +} + static void accumulate_raw_stats(const rgw_bucket_dir_header& header, map& stats) { @@ -11530,6 +11569,20 @@ int RGWRados::bi_get(rgw_bucket& bucket, rgw_obj& obj, BIIndexType index_type, r return 0; } +void RGWRados::bi_put(ObjectWriteOperation& op, BucketShard& bs, rgw_cls_bi_entry& entry) +{ + cls_rgw_bi_put(op, bs.bucket_obj, entry); +} + +int RGWRados::bi_put(BucketShard& bs, rgw_cls_bi_entry& entry) +{ + int ret = cls_rgw_bi_put(bs.index_ctx, bs.bucket_obj, entry); + if (ret < 0) + return ret; + + return 0; +} + int RGWRados::bi_put(rgw_bucket& bucket, rgw_obj& obj, rgw_cls_bi_entry& entry) { BucketShard bs(this); @@ -11539,11 +11592,7 @@ int RGWRados::bi_put(rgw_bucket& bucket, rgw_obj& obj, rgw_cls_bi_entry& entry) return ret; } - ret = cls_rgw_bi_put(bs.index_ctx, bs.bucket_obj, entry); - if (ret < 0) - return ret; - - return 0; + return bi_put(bs, entry); } int RGWRados::bi_list(rgw_bucket& bucket, const string& obj_name, const string& marker, uint32_t max, list *entries, bool *is_truncated) @@ -11563,6 +11612,41 @@ int RGWRados::bi_list(rgw_bucket& bucket, const string& obj_name, const string& return 0; } +int RGWRados::bi_list(BucketShard& bs, const string& filter_obj, const string& marker, uint32_t max, list *entries, bool *is_truncated) +{ + int ret = cls_rgw_bi_list(bs.index_ctx, bs.bucket_obj, filter_obj, marker, max, entries, is_truncated); + if (ret < 0) + return ret; + + return 0; +} + +int RGWRados::bi_remove(BucketShard& bs) +{ + int ret = bs.index_ctx.remove(bs.bucket_obj); + if (ret == -ENOENT) { + ret = 0; + } + if (ret < 0) { + ldout(cct, 5) << "bs.index_ctx.remove(" << bs.bucket_obj << ") returned ret=" << ret << dendl; + return ret; + } + + return 0; +} + +int RGWRados::bi_list(rgw_bucket& bucket, int shard_id, const string& filter_obj, const string& marker, uint32_t max, list *entries, bool *is_truncated) +{ + BucketShard bs(this); + int ret = bs.init(bucket, shard_id); + if (ret < 0) { + ldout(cct, 5) << "bs.init() returned ret=" << ret << dendl; + return ret; + } + + return bi_list(bs, filter_obj, marker, max, entries, is_truncated); +} + int RGWRados::gc_operate(string& oid, librados::ObjectWriteOperation *op) { return gc_pool_ctx.operate(oid, op); @@ -12318,6 +12402,44 @@ void RGWRados::get_bucket_instance_ids(RGWBucketInfo& bucket_info, int shard_id, } } +int RGWRados::get_target_shard_id(const RGWBucketInfo& bucket_info, const string& obj_key, + int *shard_id) +{ + int r = 0; + switch (bucket_info.bucket_index_shard_hash_type) { + case RGWBucketInfo::MOD: + if (!bucket_info.num_shards) { + if (shard_id) { + *shard_id = -1; + } + } else { + uint32_t sid = ceph_str_hash_linux(obj_key.c_str(), obj_key.size()); + uint32_t sid2 = sid ^ ((sid & 0xFF) << 24); + sid = sid2 % MAX_BUCKET_INDEX_SHARDS_PRIME % bucket_info.num_shards; + if (shard_id) { + *shard_id = (int)sid; + } + } + break; + default: + r = -ENOTSUP; + } + return r; +} + +void RGWRados::get_bucket_index_object(const string& bucket_oid_base, uint32_t num_shards, + int shard_id, string *bucket_obj) +{ + if (!num_shards) { + // By default with no sharding, we use the bucket oid as itself + (*bucket_obj) = bucket_oid_base; + } else { + char buf[bucket_oid_base.size() + 32]; + snprintf(buf, sizeof(buf), "%s.%d", bucket_oid_base.c_str(), shard_id); + (*bucket_obj) = buf; + } +} + int RGWRados::get_bucket_index_object(const string& bucket_oid_base, const string& obj_key, uint32_t num_shards, RGWBucketInfo::BIShardsHashType hash_type, string *bucket_obj, int *shard_id) { diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index b2092a160ac..4cebef83b62 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -49,6 +49,8 @@ class RGWRESTConn; #define RGW_NO_SHARD -1 +#define MAX_BUCKET_INDEX_SHARDS_PRIME 7877 + static inline void prepend_bucket_marker(rgw_bucket& bucket, const string& orig_oid, string& oid) { if (bucket.marker.empty() || orig_oid.empty()) { @@ -1819,6 +1821,8 @@ class RGWRados string& bucket_oid_base); int open_bucket_index_shard(rgw_bucket& bucket, librados::IoCtx& index_ctx, const string& obj_key, string *bucket_obj, int *shard_id); + int open_bucket_index_shard(rgw_bucket& bucket, librados::IoCtx& index_ctx, + int shard_id, string *bucket_obj); int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx, map& bucket_objs, int shard_id = -1, map *bucket_instance_ids = NULL); template @@ -2083,6 +2087,10 @@ public: int get_required_alignment(rgw_bucket& bucket, uint64_t *alignment); int get_max_chunk_size(rgw_bucket& bucket, uint64_t *max_chunk_size); + uint32_t get_max_bucket_shards() { + return MAX_BUCKET_INDEX_SHARDS_PRIME; + } + int list_raw_objects(rgw_bucket& pool, const string& prefix_filter, int max, RGWListRawObjsCtx& ctx, list& oids, bool *is_truncated); @@ -2160,6 +2168,7 @@ public: RGWZonePlacementInfo *rule_info); int set_bucket_location_by_rule(const string& location_rule, const string& tenant_name, const string& bucket_name, rgw_bucket& bucket, RGWZonePlacementInfo *rule_info); + void create_bucket_id(string *bucket_id); virtual int create_bucket(RGWUserInfo& owner, rgw_bucket& bucket, const string& zonegroup_id, const string& placement_rule, @@ -2242,6 +2251,7 @@ public: explicit BucketShard(RGWRados *_store) : store(_store), shard_id(-1) {} int init(rgw_bucket& _bucket, rgw_obj& obj); + int init(rgw_bucket& _bucket, int sid); }; class Object { @@ -2923,9 +2933,14 @@ public: int bi_get_instance(rgw_obj& obj, rgw_bucket_dir_entry *dirent); int bi_get(rgw_bucket& bucket, rgw_obj& obj, BIIndexType index_type, rgw_cls_bi_entry *entry); + void bi_put(librados::ObjectWriteOperation& op, BucketShard& bs, rgw_cls_bi_entry& entry); + int bi_put(BucketShard& bs, rgw_cls_bi_entry& entry); int bi_put(rgw_bucket& bucket, rgw_obj& obj, rgw_cls_bi_entry& entry); + int bi_list(rgw_bucket& bucket, int shard_id, const string& filter_obj, const string& marker, uint32_t max, list *entries, bool *is_truncated); + int bi_list(BucketShard& bs, const string& filter_obj, const string& marker, uint32_t max, list *entries, bool *is_truncated); int bi_list(rgw_bucket& bucket, const string& obj_name, const string& marker, uint32_t max, list *entries, bool *is_truncated); + int bi_remove(BucketShard& bs); int cls_obj_usage_log_add(const string& oid, rgw_usage_log_info& info); int cls_obj_usage_log_read(string& oid, string& user, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries, @@ -2936,6 +2951,7 @@ public: void shard_name(const string& prefix, unsigned max_shards, const string& key, string& name, int *shard_id); void shard_name(const string& prefix, unsigned max_shards, const string& section, const string& key, string& name); void shard_name(const string& prefix, unsigned shard_id, string& name); + int get_target_shard_id(const RGWBucketInfo& bucket_info, const string& obj_key, int *shard_id); void time_log_prepare_entry(cls_log_entry& entry, const ceph::real_time& ut, const string& section, const string& key, bufferlist& bl); int time_log_add_init(librados::IoCtx& io_ctx); int time_log_add(const string& oid, list& entries, @@ -3108,6 +3124,9 @@ public: int get_bucket_index_object(const string& bucket_oid_base, const string& obj_key, uint32_t num_shards, RGWBucketInfo::BIShardsHashType hash_type, string *bucket_obj, int *shard); + void get_bucket_index_object(const string& bucket_oid_base, uint32_t num_shards, + int shard_id, string *bucket_obj); + /** * Check the actual on-disk state of the object specified * by list_state, and fill in the time and size of object. diff --git a/src/test/cli/radosgw-admin/help.t b/src/test/cli/radosgw-admin/help.t index e595db55cec..868d338be0a 100644 --- a/src/test/cli/radosgw-admin/help.t +++ b/src/test/cli/radosgw-admin/help.t @@ -23,6 +23,10 @@ bucket stats returns bucket statistics bucket rm remove bucket bucket check check bucket index + bucket reshard reshard bucket + bi get retrieve bucket index object entries + bi put store bucket index object entries + bi list list raw bucket index entries object rm remove object object unlink unlink object from bucket index objects expire run expired objects cleanup