Merge pull request #11230 from yehudasa/wip-rgw-resharding

rgw: bucket resharding
Reviewed-by: Orit Wasserman <owasserm@redhat.com>
This commit is contained in:
Orit Wasserman 2016-10-21 19:49:41 +02:00 committed by GitHub
commit c86206032f
12 changed files with 861 additions and 66 deletions

View File

@ -28,6 +28,7 @@ cls_method_handle_t h_rgw_bucket_set_tag_timeout;
cls_method_handle_t h_rgw_bucket_list;
cls_method_handle_t h_rgw_bucket_check_index;
cls_method_handle_t h_rgw_bucket_rebuild_index;
cls_method_handle_t h_rgw_bucket_update_stats;
cls_method_handle_t h_rgw_bucket_prepare_op;
cls_method_handle_t h_rgw_bucket_complete_op;
cls_method_handle_t h_rgw_bucket_link_olh;
@ -58,9 +59,6 @@ cls_method_handle_t h_rgw_lc_get_head;
cls_method_handle_t h_rgw_lc_list_entries;
#define ROUND_BLOCK_SIZE 4096
#define BI_PREFIX_CHAR 0x80
#define BI_BUCKET_OBJS_INDEX 0
@ -78,11 +76,6 @@ static string bucket_index_prefixes[] = { "", /* special handling for the objs l
/* this must be the last index */
"9999_",};
static uint64_t get_rounded_size(uint64_t size)
{
return (size + ROUND_BLOCK_SIZE - 1) & ~(ROUND_BLOCK_SIZE - 1);
}
static bool bi_is_objs_index(const string& s) {
return ((unsigned char)s[0] != BI_PREFIX_CHAR);
}
@ -542,7 +535,7 @@ static int check_index(cls_method_context_t hctx, struct rgw_bucket_dir_header *
struct rgw_bucket_category_stats& stats = calc_header->stats[entry.meta.category];
stats.num_entries++;
stats.total_size += entry.meta.accounted_size;
stats.total_size_rounded += get_rounded_size(entry.meta.accounted_size);
stats.total_size_rounded += cls_rgw_get_rounded_size(entry.meta.accounted_size);
start_obj = kiter->first;
}
@ -585,6 +578,38 @@ int rgw_bucket_rebuild_index(cls_method_context_t hctx, bufferlist *in, bufferli
return write_bucket_header(hctx, &calc_header);
}
int rgw_bucket_update_stats(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
// decode request
rgw_cls_bucket_update_stats_op op;
auto iter = in->begin();
try {
::decode(op, iter);
} catch (buffer::error& err) {
CLS_LOG(1, "ERROR: %s(): failed to decode request\n", __func__);
return -EINVAL;
}
struct rgw_bucket_dir_header header;
int rc = read_bucket_header(hctx, &header);
if (rc < 0) {
CLS_LOG(1, "ERROR: %s(): failed to read header\n", __func__);
return rc;
}
for (auto& s : op.stats) {
auto& dest = header.stats[s.first];
if (op.absolute) {
dest = s.second;
} else {
dest.total_size += s.second.total_size;
dest.total_size_rounded += s.second.total_size_rounded;
dest.num_entries += s.second.num_entries;
}
}
return write_bucket_header(hctx, &header);
}
int rgw_bucket_init_index(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
@ -714,7 +739,7 @@ static void unaccount_entry(struct rgw_bucket_dir_header& header, struct rgw_buc
struct rgw_bucket_category_stats& stats = header.stats[entry.meta.category];
stats.num_entries--;
stats.total_size -= entry.meta.accounted_size;
stats.total_size_rounded -= get_rounded_size(entry.meta.accounted_size);
stats.total_size_rounded -= cls_rgw_get_rounded_size(entry.meta.accounted_size);
}
static void log_entry(const char *func, const char *str, struct rgw_bucket_dir_entry *entry)
@ -899,7 +924,7 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
entry.tag = op.tag;
stats.num_entries++;
stats.total_size += meta.accounted_size;
stats.total_size_rounded += get_rounded_size(meta.accounted_size);
stats.total_size_rounded += cls_rgw_get_rounded_size(meta.accounted_size);
bufferlist new_key_bl;
::encode(entry, new_key_bl);
int ret = cls_cxx_map_set_val(hctx, idx, &new_key_bl);
@ -1929,7 +1954,7 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx, bufferlist *in, bufferlis
CLS_LOG(10, "total_entries: %" PRId64 " -> %" PRId64 "\n", old_stats.num_entries, old_stats.num_entries - 1);
old_stats.num_entries--;
old_stats.total_size -= cur_disk.meta.accounted_size;
old_stats.total_size_rounded -= get_rounded_size(cur_disk.meta.accounted_size);
old_stats.total_size_rounded -= cls_rgw_get_rounded_size(cur_disk.meta.accounted_size);
header_changed = true;
}
struct rgw_bucket_category_stats& stats =
@ -1956,7 +1981,7 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx, bufferlist *in, bufferlis
cur_change.key.name.c_str(), cur_change.key.instance.c_str(), stats.num_entries, stats.num_entries + 1);
stats.num_entries++;
stats.total_size += cur_change.meta.accounted_size;
stats.total_size_rounded += get_rounded_size(cur_change.meta.accounted_size);
stats.total_size_rounded += cls_rgw_get_rounded_size(cur_change.meta.accounted_size);
header_changed = true;
cur_change.index_ver = header.ver;
bufferlist cur_state_bl;
@ -2261,6 +2286,11 @@ static int list_plain_entries(cls_method_context_t hctx, const string& name, con
{
string filter = name;
string start_key = marker;
string first_instance_idx;
encode_obj_versioned_data_key(string(), &first_instance_idx);
string end_key = first_instance_idx;
int count = 0;
map<string, bufferlist> keys;
do {
@ -2276,6 +2306,11 @@ static int list_plain_entries(cls_method_context_t hctx, const string& name, con
map<string, bufferlist>::iterator iter;
for (iter = keys.begin(); iter != keys.end(); ++iter) {
if (iter->first >= end_key) {
/* past the end of plain namespace */
return count;
}
rgw_cls_bi_entry entry;
entry.type = PlainIdx;
entry.idx = iter->first;
@ -2293,12 +2328,15 @@ static int list_plain_entries(cls_method_context_t hctx, const string& name, con
CLS_LOG(20, "%s(): entry.idx=%s e.key.name=%s", __func__, escape_str(entry.idx).c_str(), escape_str(e.key.name).c_str());
if (e.key.name != name) {
if (!name.empty() && e.key.name != name) {
return count;
}
entries->push_back(entry);
count++;
if (count >= (int)max) {
return count;
}
start_key = entry.idx;
}
} while (!keys.empty());
@ -2312,13 +2350,20 @@ static int list_instance_entries(cls_method_context_t hctx, const string& name,
cls_rgw_obj_key key(name);
string first_instance_idx;
encode_obj_versioned_data_key(key, &first_instance_idx);
string start_key = first_instance_idx;
string start_key;
if (!name.empty()) {
start_key = first_instance_idx;
} else {
start_key = BI_PREFIX_CHAR;
start_key.append(bucket_index_prefixes[BI_BUCKET_OBJ_INSTANCE_INDEX]);
}
string filter = start_key;
if (bi_entry_gt(marker, start_key)) {
start_key = marker;
}
int count = 0;
map<string, bufferlist> keys;
string filter = first_instance_idx;
bool started = true;
do {
if (count >= (int)max) {
@ -2330,13 +2375,13 @@ static int list_instance_entries(cls_method_context_t hctx, const string& name,
if (started) {
ret = cls_cxx_map_get_val(hctx, start_key, &keys[start_key]);
if (ret == -ENOENT) {
ret = cls_cxx_map_get_vals(hctx, start_key, filter, BI_GET_NUM_KEYS, &keys);
ret = cls_cxx_map_get_vals(hctx, start_key, string(), BI_GET_NUM_KEYS, &keys);
}
started = false;
} else {
ret = cls_cxx_map_get_vals(hctx, start_key, filter, BI_GET_NUM_KEYS, &keys);
ret = cls_cxx_map_get_vals(hctx, start_key, string(), BI_GET_NUM_KEYS, &keys);
}
CLS_LOG(20, "%s(): start_key=%s keys.size()=%d", __func__, escape_str(start_key).c_str(), (int)keys.size());
CLS_LOG(20, "%s(): start_key=%s first_instance_idx=%s keys.size()=%d", __func__, escape_str(start_key).c_str(), escape_str(first_instance_idx).c_str(), (int)keys.size());
if (ret < 0) {
return ret;
}
@ -2348,6 +2393,10 @@ static int list_instance_entries(cls_method_context_t hctx, const string& name,
entry.idx = iter->first;
entry.data = iter->second;
if (!filter.empty() && entry.idx.compare(0, filter.size(), filter) != 0) {
return count;
}
CLS_LOG(20, "%s(): entry.idx=%s", __func__, escape_str(entry.idx).c_str());
bufferlist::iterator biter = entry.data.begin();
@ -2360,7 +2409,85 @@ static int list_instance_entries(cls_method_context_t hctx, const string& name,
return -EIO;
}
if (e.key.name != name) {
if (!name.empty() && e.key.name != name) {
return count;
}
entries->push_back(entry);
count++;
start_key = entry.idx;
}
} while (!keys.empty());
return count;
}
static int list_olh_entries(cls_method_context_t hctx, const string& name, const string& marker, uint32_t max,
list<rgw_cls_bi_entry> *entries)
{
cls_rgw_obj_key key(name);
string first_instance_idx;
encode_olh_data_key(key, &first_instance_idx);
string start_key;
if (!name.empty()) {
start_key = first_instance_idx;
} else {
start_key = BI_PREFIX_CHAR;
start_key.append(bucket_index_prefixes[BI_BUCKET_OLH_DATA_INDEX]);
}
string filter = start_key;
if (bi_entry_gt(marker, start_key)) {
start_key = marker;
}
int count = 0;
map<string, bufferlist> keys;
bool started = true;
do {
if (count >= (int)max) {
return count;
}
keys.clear();
#define BI_GET_NUM_KEYS 128
int ret;
if (started) {
ret = cls_cxx_map_get_val(hctx, start_key, &keys[start_key]);
if (ret == -ENOENT) {
ret = cls_cxx_map_get_vals(hctx, start_key, string(), BI_GET_NUM_KEYS, &keys);
}
started = false;
} else {
ret = cls_cxx_map_get_vals(hctx, start_key, string(), BI_GET_NUM_KEYS, &keys);
}
CLS_LOG(20, "%s(): start_key=%s first_instance_idx=%s keys.size()=%d", __func__, escape_str(start_key).c_str(), escape_str(first_instance_idx).c_str(), (int)keys.size());
if (ret < 0) {
return ret;
}
map<string, bufferlist>::iterator iter;
for (iter = keys.begin(); iter != keys.end(); ++iter) {
rgw_cls_bi_entry entry;
entry.type = OLHIdx;
entry.idx = iter->first;
entry.data = iter->second;
if (!filter.empty() && entry.idx.compare(0, filter.size(), filter) != 0) {
return count;
}
CLS_LOG(20, "%s(): entry.idx=%s", __func__, escape_str(entry.idx).c_str());
bufferlist::iterator biter = entry.data.begin();
rgw_bucket_olh_entry e;
try {
::decode(e, biter);
} catch (buffer::error& err) {
CLS_LOG(0, "ERROR: %s(): failed to decode buffer (size=%d)", __func__, entry.data.length());
return -EIO;
}
if (!name.empty() && e.key.name != name) {
return count;
}
@ -2391,29 +2518,35 @@ static int rgw_bi_list_op(cls_method_context_t hctx, bufferlist *in, bufferlist
#define MAX_BI_LIST_ENTRIES 1000
int32_t max = (op.max < MAX_BI_LIST_ENTRIES ? op.max : MAX_BI_LIST_ENTRIES);
string start_key = op.marker;
int ret = list_plain_entries(hctx, op.name, op.marker, max, &op_ret.entries);
int ret = list_plain_entries(hctx, op.name, op.marker, max, &op_ret.entries) + 1; /* one extra entry for identifying truncation */
if (ret < 0) {
CLS_LOG(0, "ERROR: %s(): list_plain_entries retured ret=%d", __func__, ret);
return ret;
}
int count = ret;
CLS_LOG(20, "found %d plain entries", count);
ret = list_instance_entries(hctx, op.name, op.marker, max - count, &op_ret.entries);
if (ret < 0) {
CLS_LOG(0, "ERROR: %s(): list_instance_entries retured ret=%d", __func__, ret);
return ret;
}
cls_rgw_obj_key key(op.name);
rgw_cls_bi_entry entry;
encode_olh_data_key(key, &entry.idx);
ret = cls_cxx_map_get_val(hctx, entry.idx, &entry.data);
if (ret < 0 && ret != -ENOENT) {
CLS_LOG(0, "ERROR: %s(): cls_cxx_map_get_val retured ret=%d", __func__, ret);
count += ret;
ret = list_olh_entries(hctx, op.name, op.marker, max - count, &op_ret.entries);
if (ret < 0) {
CLS_LOG(0, "ERROR: %s(): list_instance_entries retured ret=%d", __func__, ret);
return ret;
} else if (ret >= 0) {
entry.type = OLHIdx;
op_ret.entries.push_back(entry);
}
count += ret;
op_ret.is_truncated = (count >= max);
while (count >= max) {
op_ret.entries.pop_back();
count--;
}
::encode(op_ret, *out);
@ -3381,6 +3514,7 @@ void __cls_init()
cls_register_cxx_method(h_class, "bucket_list", CLS_METHOD_RD, rgw_bucket_list, &h_rgw_bucket_list);
cls_register_cxx_method(h_class, "bucket_check_index", CLS_METHOD_RD, rgw_bucket_check_index, &h_rgw_bucket_check_index);
cls_register_cxx_method(h_class, "bucket_rebuild_index", CLS_METHOD_RD | CLS_METHOD_WR, rgw_bucket_rebuild_index, &h_rgw_bucket_rebuild_index);
cls_register_cxx_method(h_class, "bucket_update_stats", CLS_METHOD_RD | CLS_METHOD_WR, rgw_bucket_update_stats, &h_rgw_bucket_update_stats);
cls_register_cxx_method(h_class, "bucket_prepare_op", CLS_METHOD_RD | CLS_METHOD_WR, rgw_bucket_prepare_op, &h_rgw_bucket_prepare_op);
cls_register_cxx_method(h_class, "bucket_complete_op", CLS_METHOD_RD | CLS_METHOD_WR, rgw_bucket_complete_op, &h_rgw_bucket_complete_op);
cls_register_cxx_method(h_class, "bucket_link_olh", CLS_METHOD_RD | CLS_METHOD_WR, rgw_bucket_link_olh, &h_rgw_bucket_link_olh);

View File

@ -138,6 +138,17 @@ int CLSRGWIssueSetTagTimeout::issue_op(int shard_id, const string& oid)
return issue_bucket_set_tag_timeout_op(io_ctx, oid, tag_timeout, &manager);
}
void cls_rgw_bucket_update_stats(librados::ObjectWriteOperation& o, bool absolute,
const map<uint8_t, rgw_bucket_category_stats>& stats)
{
struct rgw_cls_bucket_update_stats_op call;
call.absolute = absolute;
call.stats = stats;
bufferlist in;
::encode(call, in);
o.exec("rgw", "bucket_update_stats", in);
}
void cls_rgw_bucket_prepare_op(ObjectWriteOperation& o, RGWModifyOp op, string& tag,
const cls_rgw_obj_key& key, const string& locator, bool log_op,
uint16_t bilog_flags)
@ -277,6 +288,15 @@ int cls_rgw_bi_put(librados::IoCtx& io_ctx, const string oid, rgw_cls_bi_entry&
return 0;
}
void cls_rgw_bi_put(ObjectWriteOperation& op, const string oid, rgw_cls_bi_entry& entry)
{
bufferlist in, out;
struct rgw_cls_bi_put_op call;
call.entry = entry;
::encode(call, in);
op.exec("rgw", "bi_put", in);
}
int cls_rgw_bi_list(librados::IoCtx& io_ctx, const string oid,
const string& name, const string& marker, uint32_t max,
list<rgw_cls_bi_entry> *entries, bool *is_truncated)

View File

@ -304,6 +304,9 @@ public:
CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio), tag_timeout(_tag_timeout) {}
};
void cls_rgw_bucket_update_stats(librados::ObjectWriteOperation& o, bool absolute,
const map<uint8_t, rgw_bucket_category_stats>& stats);
void cls_rgw_bucket_prepare_op(librados::ObjectWriteOperation& o, RGWModifyOp op, string& tag,
const cls_rgw_obj_key& key, const string& locator, bool log_op,
uint16_t bilog_op);
@ -324,6 +327,7 @@ int cls_rgw_bi_get(librados::IoCtx& io_ctx, const string oid,
BIIndexType index_type, cls_rgw_obj_key& key,
rgw_cls_bi_entry *entry);
int cls_rgw_bi_put(librados::IoCtx& io_ctx, const string oid, rgw_cls_bi_entry& entry);
void cls_rgw_bi_put(librados::ObjectWriteOperation& op, const string oid, rgw_cls_bi_entry& entry);
int cls_rgw_bi_list(librados::IoCtx& io_ctx, const string oid,
const string& name, const string& marker, uint32_t max,
list<rgw_cls_bi_entry> *entries, bool *is_truncated);

View File

@ -343,6 +343,29 @@ void rgw_cls_check_index_ret::dump(Formatter *f) const
::encode_json("calculated_header", calculated_header, f);
}
void rgw_cls_bucket_update_stats_op::generate_test_instances(list<rgw_cls_bucket_update_stats_op*>& o)
{
rgw_cls_bucket_update_stats_op *r = new rgw_cls_bucket_update_stats_op;
r->absolute = true;
rgw_bucket_category_stats& s = r->stats[0];
s.total_size = 1;
s.total_size_rounded = 4096;
s.num_entries = 1;
o.push_back(r);
o.push_back(new rgw_cls_bucket_update_stats_op);
}
void rgw_cls_bucket_update_stats_op::dump(Formatter *f) const
{
::encode_json("absolute", absolute, f);
map<int, rgw_bucket_category_stats> s;
for (auto& entry : stats) {
s[(int)entry.first] = entry.second;
}
::encode_json("stats", s, f);
}
void cls_rgw_bi_log_list_op::dump(Formatter *f) const
{
f->dump_string("marker", marker);

View File

@ -443,6 +443,30 @@ struct rgw_cls_check_index_ret
};
WRITE_CLASS_ENCODER(rgw_cls_check_index_ret)
struct rgw_cls_bucket_update_stats_op
{
bool absolute{false};
map<uint8_t, rgw_bucket_category_stats> stats;
rgw_cls_bucket_update_stats_op() {}
void encode(bufferlist &bl) const {
ENCODE_START(1, 1, bl);
::encode(absolute, bl);
::encode(stats, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::iterator &bl) {
DECODE_START(1, bl);
::decode(absolute, bl);
::decode(stats, bl);
DECODE_FINISH(bl);
}
void dump(Formatter *f) const;
static void generate_test_instances(list<rgw_cls_bucket_update_stats_op *>& o);
};
WRITE_CLASS_ENCODER(rgw_cls_bucket_update_stats_op)
struct rgw_cls_obj_remove_op {
list<string> keep_attr_prefixes;

View File

@ -237,6 +237,38 @@ void rgw_cls_bi_entry::dump(Formatter *f) const
dump_bi_entry(data, type, f);
}
bool rgw_cls_bi_entry::get_info(cls_rgw_obj_key *key, uint8_t *category, rgw_bucket_category_stats *accounted_stats)
{
bool account = false;
bufferlist::iterator iter = data.begin();
switch (type) {
case PlainIdx:
case InstanceIdx:
{
rgw_bucket_dir_entry entry;
::decode(entry, iter);
*key = entry.key;
*category = entry.meta.category;
accounted_stats->num_entries++;
accounted_stats->total_size += entry.meta.accounted_size;
accounted_stats->total_size_rounded += cls_rgw_get_rounded_size(entry.meta.accounted_size);
account = true;
}
break;
case OLHIdx:
{
rgw_bucket_olh_entry entry;
::decode(entry, iter);
*key = entry.key;
}
break;
default:
break;
}
return account;
}
void rgw_bucket_olh_entry::dump(Formatter *f) const
{
encode_json("key", key, f);

View File

@ -49,6 +49,13 @@ enum RGWCheckMTimeType {
CLS_RGW_CHECK_TIME_MTIME_GE = 4,
};
#define ROUND_BLOCK_SIZE 4096
static inline uint64_t cls_rgw_get_rounded_size(uint64_t size)
{
return (size + ROUND_BLOCK_SIZE - 1) & ~(ROUND_BLOCK_SIZE - 1);
}
struct rgw_bucket_pending_info {
RGWPendingState state;
ceph::real_time timestamp;
@ -361,6 +368,8 @@ enum BIIndexType {
OLHIdx = 3,
};
struct rgw_bucket_category_stats;
struct rgw_cls_bi_entry {
BIIndexType type;
string idx;
@ -388,6 +397,8 @@ struct rgw_cls_bi_entry {
void dump(Formatter *f) const;
void decode_json(JSONObj *obj, cls_rgw_obj_key *effective_key = NULL);
bool get_info(cls_rgw_obj_key *key, uint8_t *category, rgw_bucket_category_stats *accounted_stats);
};
WRITE_CLASS_ENCODER(rgw_cls_bi_entry)

View File

@ -1,4 +1,3 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#include <errno.h>
@ -16,6 +15,8 @@
#include "common/errno.h"
#include "common/safe_io.h"
#include "cls/rgw/cls_rgw_client.h"
#include "global/global_init.h"
#include "include/utime.h"
@ -72,6 +73,10 @@ void _usage()
cout << " bucket stats returns bucket statistics\n";
cout << " bucket rm remove bucket\n";
cout << " bucket check check bucket index\n";
cout << " bucket reshard reshard bucket\n";
cout << " bi get retrieve bucket index object entries\n";
cout << " bi put store bucket index object entries\n";
cout << " bi list list raw bucket index entries\n";
cout << " object rm remove object\n";
cout << " object unlink unlink object from bucket index\n";
cout << " objects expire run expired objects cleanup\n";
@ -300,6 +305,7 @@ enum {
OPT_BUCKET_SYNC_RUN,
OPT_BUCKET_RM,
OPT_BUCKET_REWRITE,
OPT_BUCKET_RESHARD,
OPT_POLICY,
OPT_POOL_ADD,
OPT_POOL_RM,
@ -317,6 +323,7 @@ enum {
OPT_BI_GET,
OPT_BI_PUT,
OPT_BI_LIST,
OPT_BI_PURGE,
OPT_OLH_GET,
OPT_OLH_READLOG,
OPT_QUOTA_SET,
@ -501,6 +508,8 @@ static int get_cmd(const char *cmd, const char *prev_cmd, const char *prev_prev_
return OPT_BUCKET_RM;
if (strcmp(cmd, "rewrite") == 0)
return OPT_BUCKET_REWRITE;
if (strcmp(cmd, "reshard") == 0)
return OPT_BUCKET_RESHARD;
if (strcmp(cmd, "check") == 0)
return OPT_BUCKET_CHECK;
if (strcmp(cmd, "sync") == 0) {
@ -566,6 +575,8 @@ static int get_cmd(const char *cmd, const char *prev_cmd, const char *prev_prev_
return OPT_BI_PUT;
if (strcmp(cmd, "list") == 0)
return OPT_BI_LIST;
if (strcmp(cmd, "purge") == 0)
return OPT_BI_PURGE;
} else if (strcmp(prev_cmd, "period") == 0) {
if (strcmp(cmd, "delete") == 0)
return OPT_PERIOD_DELETE;
@ -897,16 +908,16 @@ public:
};
static int init_bucket(const string& tenant_name, const string& bucket_name, const string& bucket_id,
RGWBucketInfo& bucket_info, rgw_bucket& bucket)
RGWBucketInfo& bucket_info, rgw_bucket& bucket, map<string, bufferlist> *pattrs = nullptr)
{
if (!bucket_name.empty()) {
RGWObjectCtx obj_ctx(store);
int r;
if (bucket_id.empty()) {
r = store->get_bucket_info(obj_ctx, tenant_name, bucket_name, bucket_info, NULL);
r = store->get_bucket_info(obj_ctx, tenant_name, bucket_name, bucket_info, nullptr, pattrs);
} else {
string bucket_instance_id = bucket_name + ":" + bucket_id;
r = store->get_bucket_instance_info(obj_ctx, bucket_instance_id, bucket_info, NULL, NULL);
r = store->get_bucket_instance_info(obj_ctx, bucket_instance_id, bucket_info, NULL, pattrs);
}
if (r < 0) {
cerr << "could not get bucket info for bucket=" << bucket_name << std::endl;
@ -2004,7 +2015,176 @@ static void parse_tier_config_param(const string& s, map<string, string>& out)
}
}
int main(int argc, char **argv)
#define RESHARD_SHARD_WINDOW 64
#define RESHARD_MAX_AIO 128
class BucketReshardShard {
RGWRados *store;
RGWBucketInfo& bucket_info;
int num_shard;
RGWRados::BucketShard bs;
vector<rgw_cls_bi_entry> entries;
map<uint8_t, rgw_bucket_category_stats> stats;
deque<librados::AioCompletion *>& aio_completions;
int wait_next_completion() {
librados::AioCompletion *c = aio_completions.front();
aio_completions.pop_front();
c->wait_for_safe();
int ret = c->get_return_value();
c->release();
if (ret < 0) {
cerr << "ERROR: reshard rados operation failed: " << cpp_strerror(-ret) << std::endl;
return ret;
}
return 0;
}
int get_completion(librados::AioCompletion **c) {
if (aio_completions.size() >= RESHARD_MAX_AIO) {
int ret = wait_next_completion();
if (ret < 0) {
return ret;
}
}
*c = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr);
aio_completions.push_back(*c);
return 0;
}
public:
BucketReshardShard(RGWRados *_store, RGWBucketInfo& _bucket_info,
int _num_shard,
deque<librados::AioCompletion *>& _completions) : store(_store), bucket_info(_bucket_info), bs(store),
aio_completions(_completions) {
num_shard = (bucket_info.num_shards > 0 ? _num_shard : -1);
bs.init(bucket_info.bucket, num_shard);
}
int get_num_shard() {
return num_shard;
}
int add_entry(rgw_cls_bi_entry& entry, bool account, uint8_t category,
const rgw_bucket_category_stats& entry_stats) {
entries.push_back(entry);
if (account) {
rgw_bucket_category_stats& target = stats[category];
target.num_entries += entry_stats.num_entries;
target.total_size += entry_stats.total_size;
target.total_size_rounded += entry_stats.total_size_rounded;
}
if (entries.size() >= RESHARD_SHARD_WINDOW) {
int ret = flush();
if (ret < 0) {
return ret;
}
}
return 0;
}
int flush() {
if (entries.size() == 0) {
return 0;
}
librados::ObjectWriteOperation op;
for (auto& entry : entries) {
store->bi_put(op, bs, entry);
}
cls_rgw_bucket_update_stats(op, false, stats);
librados::AioCompletion *c;
int ret = get_completion(&c);
if (ret < 0) {
return ret;
}
ret = bs.index_ctx.aio_operate(bs.bucket_obj, c, &op);
if (ret < 0) {
std::cerr << "ERROR: failed to store entries in target bucket shard (bs=" << bs.bucket << "/" << bs.shard_id << ") error=" << cpp_strerror(-ret) << std::endl;
return ret;
}
entries.clear();
stats.clear();
return 0;
}
int wait_all_aio() {
int ret = 0;
while (!aio_completions.empty()) {
int r = wait_next_completion();
if (r < 0) {
ret = r;
}
}
return ret;
}
};
class BucketReshardManager {
RGWRados *store;
RGWBucketInfo& target_bucket_info;
deque<librados::AioCompletion *> completions;
int num_target_shards;
vector<BucketReshardShard *> target_shards;
public:
BucketReshardManager(RGWRados *_store, RGWBucketInfo& _target_bucket_info, int _num_target_shards) : store(_store), target_bucket_info(_target_bucket_info),
num_target_shards(_num_target_shards) {
target_shards.resize(num_target_shards);
for (int i = 0; i < num_target_shards; ++i) {
target_shards[i] = new BucketReshardShard(store, target_bucket_info, i, completions);
}
}
~BucketReshardManager() {
for (auto& shard : target_shards) {
int ret = shard->wait_all_aio();
if (ret < 0) {
ldout(store->ctx(), 20) << __func__ << ": shard->wait_all_aio() returned ret=" << ret << dendl;
}
}
}
int add_entry(int shard_index,
rgw_cls_bi_entry& entry, bool account, uint8_t category,
const rgw_bucket_category_stats& entry_stats) {
int ret = target_shards[shard_index]->add_entry(entry, account, category, entry_stats);
if (ret < 0) {
cerr << "ERROR: target_shards.add_entry(" << entry.idx << ") returned error: " << cpp_strerror(-ret) << std::endl;
return ret;
}
return 0;
}
int finish() {
int ret = 0;
for (auto& shard : target_shards) {
int r = shard->flush();
if (r < 0) {
cerr << "ERROR: target_shards[" << shard->get_num_shard() << "].flush() returned error: " << cpp_strerror(-r) << std::endl;
ret = r;
}
}
for (auto& shard : target_shards) {
int r = shard->wait_all_aio();
if (r < 0) {
cerr << "ERROR: target_shards[" << shard->get_num_shard() << "].wait_all_aio() returned error: " << cpp_strerror(-r) << std::endl;
ret = r;
}
delete shard;
}
target_shards.clear();
return ret;
}
};
int main(int argc, char **argv)
{
vector<const char*> args;
argv_to_vec(argc, (const char **)argv, args);
@ -2109,6 +2289,8 @@ int main(int argc, char **argv)
int bypass_gc = false;
int inconsistent_index = false;
int verbose = false;
int extra_info = false;
uint64_t min_rewrite_size = 4 * 1024 * 1024;
@ -2119,6 +2301,7 @@ int main(int argc, char **argv)
string job_id;
int num_shards = 0;
bool num_shards_specified = false;
int max_concurrent_ios = 32;
uint64_t orphan_stale_secs = (24 * 3600);
@ -2200,6 +2383,8 @@ int main(int argc, char **argv)
admin_specified = true;
} else if (ceph_argparse_binary_flag(args, i, &system, NULL, "--system", (char*)NULL)) {
system_specified = true;
} else if (ceph_argparse_binary_flag(args, i, &verbose, NULL, "--verbose", (char*)NULL)) {
// do nothing
} else if (ceph_argparse_binary_flag(args, i, &staging, NULL, "--staging", (char*)NULL)) {
// do nothing
} else if (ceph_argparse_binary_flag(args, i, &commit, NULL, "--commit", (char*)NULL)) {
@ -2256,6 +2441,7 @@ int main(int argc, char **argv)
cerr << "ERROR: failed to parse num shards: " << err << std::endl;
return EINVAL;
}
num_shards_specified = true;
} else if (ceph_argparse_witharg(args, i, &val, "--max-concurrent-ios", (char*)NULL)) {
max_concurrent_ios = (int)strict_strtol(val.c_str(), 10, &err);
if (!err.empty()) {
@ -4470,6 +4656,10 @@ next:
}
if (opt_cmd == OPT_BI_LIST) {
if (bucket_name.empty()) {
cerr << "ERROR: bucket name not specified" << std::endl;
return EINVAL;
}
RGWBucketInfo bucket_info;
int ret = init_bucket(tenant, bucket_name, bucket_id, bucket_info, bucket);
if (ret < 0) {
@ -4483,29 +4673,88 @@ next:
max_entries = 1000;
}
int max_shards = (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1);
formatter->open_array_section("entries");
do {
entries.clear();
ret = store->bi_list(bucket, object, marker, max_entries, &entries, &is_truncated);
for (int i = 0; i < max_shards; i++) {
RGWRados::BucketShard bs(store);
int shard_id = (bucket_info.num_shards > 0 ? i : -1);
int ret = bs.init(bucket, shard_id);
marker.clear();
if (ret < 0) {
cerr << "ERROR: bi_list(): " << cpp_strerror(-ret) << std::endl;
cerr << "ERROR: bs.init(bucket=" << bucket << ", shard=" << shard_id << "): " << cpp_strerror(-ret) << std::endl;
return -ret;
}
list<rgw_cls_bi_entry>::iterator iter;
for (iter = entries.begin(); iter != entries.end(); ++iter) {
rgw_cls_bi_entry& entry = *iter;
encode_json("entry", entry, formatter);
marker = entry.idx;
}
do {
entries.clear();
ret = store->bi_list(bs, object, marker, max_entries, &entries, &is_truncated);
if (ret < 0) {
cerr << "ERROR: bi_list(): " << cpp_strerror(-ret) << std::endl;
return -ret;
}
list<rgw_cls_bi_entry>::iterator iter;
for (iter = entries.begin(); iter != entries.end(); ++iter) {
rgw_cls_bi_entry& entry = *iter;
encode_json("entry", entry, formatter);
marker = entry.idx;
}
formatter->flush(cout);
} while (is_truncated);
formatter->flush(cout);
} while (is_truncated);
}
formatter->close_section();
formatter->flush(cout);
}
if (opt_cmd == OPT_BI_PURGE) {
if (bucket_name.empty()) {
cerr << "ERROR: bucket name not specified" << std::endl;
return EINVAL;
}
RGWBucketInfo bucket_info;
int ret = init_bucket(tenant, bucket_name, bucket_id, bucket_info, bucket);
if (ret < 0) {
cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
RGWBucketInfo cur_bucket_info;
rgw_bucket cur_bucket;
ret = init_bucket(tenant, bucket_name, string(), cur_bucket_info, cur_bucket);
if (ret < 0) {
cerr << "ERROR: could not init current bucket info for bucket_name=" << bucket_name << ": " << cpp_strerror(-ret) << std::endl;
return -ret;
}
if (cur_bucket_info.bucket.bucket_id == bucket_info.bucket.bucket_id && !yes_i_really_mean_it) {
cerr << "specified bucket instance points to a current bucket instance" << std::endl;
cerr << "do you really mean it? (requires --yes-i-really-mean-it)" << std::endl;
return EINVAL;
}
int max_shards = (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1);
for (int i = 0; i < max_shards; i++) {
RGWRados::BucketShard bs(store);
int shard_id = (bucket_info.num_shards > 0 ? i : -1);
int ret = bs.init(bucket, shard_id);
if (ret < 0) {
cerr << "ERROR: bs.init(bucket=" << bucket << ", shard=" << shard_id << "): " << cpp_strerror(-ret) << std::endl;
return -ret;
}
ret = store->bi_remove(bs);
if (ret < 0) {
cerr << "ERROR: failed to remove bucket index object: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
}
}
if (opt_cmd == OPT_OBJECT_RM) {
RGWBucketInfo bucket_info;
int ret = init_bucket(tenant, bucket_name, bucket_id, bucket_info, bucket);
@ -4669,6 +4918,158 @@ next:
formatter->flush(cout);
}
if (opt_cmd == OPT_BUCKET_RESHARD) {
if (bucket_name.empty()) {
cerr << "ERROR: bucket not specified" << std::endl;
return EINVAL;
}
if (!num_shards_specified) {
cerr << "ERROR: --num-shards not specified" << std::endl;
return EINVAL;
}
if (num_shards > (int)store->get_max_bucket_shards()) {
cerr << "ERROR: num_shards too high, max value: " << store->get_max_bucket_shards() << std::endl;
return EINVAL;
}
RGWBucketInfo bucket_info;
map<string, bufferlist> attrs;
int ret = init_bucket(tenant, bucket_name, bucket_id, bucket_info, bucket, &attrs);
if (ret < 0) {
cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
int num_source_shards = (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1);
if (num_shards <= num_source_shards && !yes_i_really_mean_it) {
cerr << "num shards is less or equal to current shards count" << std::endl
<< "do you really mean it? (requires --yes-i-really-mean-it)" << std::endl;
return EINVAL;
}
RGWBucketInfo new_bucket_info(bucket_info);
store->create_bucket_id(&new_bucket_info.bucket.bucket_id);
new_bucket_info.bucket.oid.clear();
new_bucket_info.num_shards = num_shards;
new_bucket_info.objv_tracker.clear();
cout << "*** NOTICE: operation will not remove old bucket index objects ***" << std::endl;
cout << "*** these will need to be removed manually ***" << std::endl;
cout << "old bucket instance id: " << bucket_info.bucket.bucket_id << std::endl;
cout << "new bucket instance id: " << new_bucket_info.bucket.bucket_id << std::endl;
ret = store->init_bucket_index(new_bucket_info.bucket, new_bucket_info.num_shards);
if (ret < 0) {
cerr << "ERROR: failed to init new bucket indexes: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
ret = store->put_bucket_instance_info(new_bucket_info, true, real_time(), &attrs);
if (ret < 0) {
cerr << "ERROR: failed to store new bucket instance info: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
list<rgw_cls_bi_entry> entries;
if (max_entries < 0) {
max_entries = 1000;
}
int num_target_shards = (new_bucket_info.num_shards > 0 ? new_bucket_info.num_shards : 1);
BucketReshardManager target_shards_mgr(store, new_bucket_info, num_target_shards);
if (verbose) {
formatter->open_array_section("entries");
}
uint64_t total_entries = 0;
if (!verbose) {
cout << "total entries:";
}
for (int i = 0; i < num_source_shards; ++i) {
bool is_truncated = true;
marker.clear();
while (is_truncated) {
entries.clear();
ret = store->bi_list(bucket, i, string(), marker, max_entries, &entries, &is_truncated);
if (ret < 0) {
cerr << "ERROR: bi_list(): " << cpp_strerror(-ret) << std::endl;
return -ret;
}
list<rgw_cls_bi_entry>::iterator iter;
for (iter = entries.begin(); iter != entries.end(); ++iter) {
rgw_cls_bi_entry& entry = *iter;
if (verbose) {
formatter->open_object_section("entry");
encode_json("shard_id", i, formatter);
encode_json("num_entry", total_entries, formatter);
encode_json("entry", entry, formatter);
}
total_entries++;
marker = entry.idx;
int target_shard_id;
cls_rgw_obj_key cls_key;
uint8_t category;
rgw_bucket_category_stats stats;
bool account = entry.get_info(&cls_key, &category, &stats);
rgw_obj_key key(cls_key);
rgw_obj obj(new_bucket_info.bucket, key);
int ret = store->get_target_shard_id(new_bucket_info, obj.get_hash_object(), &target_shard_id);
if (ret < 0) {
cerr << "ERROR: get_target_shard_id() returned ret=" << ret << std::endl;
return ret;
}
int shard_index = (target_shard_id > 0 ? target_shard_id : 0);
ret = target_shards_mgr.add_entry(shard_index, entry, account, category, stats);
if (ret < 0) {
return ret;
}
if (verbose) {
formatter->close_section();
formatter->flush(cout);
formatter->flush(cout);
} else if (!(total_entries % 1000)) {
cout << " " << total_entries;
}
}
}
}
if (verbose) {
formatter->close_section();
formatter->flush(cout);
} else {
cout << " " << total_entries << std::endl;
}
ret = target_shards_mgr.finish();
if (ret < 0) {
cerr << "ERROR: failed to reshard" << std::endl;
return EIO;
}
bucket_op.set_bucket_id(new_bucket_info.bucket.bucket_id);
bucket_op.set_user_id(new_bucket_info.owner);
string err;
int r = RGWBucketAdminOp::link(store, bucket_op, &err);
if (r < 0) {
cerr << "failed to link new bucket instance (bucket_id=" << new_bucket_info.bucket.bucket_id << ": " << err << "; " << cpp_strerror(-r) << std::endl;
return -r;
}
}
if (opt_cmd == OPT_OBJECT_UNLINK) {
RGWBucketInfo bucket_info;
int ret = init_bucket(tenant, bucket_name, bucket_id, bucket_info, bucket);

View File

@ -224,6 +224,7 @@ int rgw_link_bucket(RGWRados *store, const rgw_user& user_id, rgw_bucket& bucket
ep.linked = true;
ep.owner = user_id;
ep.bucket = bucket;
ret = store->put_bucket_entrypoint_info(tenant_name, bucket_name, ep, false, ot, real_time(), &attrs);
if (ret < 0)
goto done_err;
@ -891,7 +892,7 @@ int RGWBucket::link(RGWBucketAdminOpState& op_state, std::string *err_msg)
rgw_obj obj_bucket_instance(bucket_instance, no_oid);
r = store->system_obj_set_attr(NULL, obj_bucket_instance, RGW_ATTR_ACL, aclbl, &objv_tracker);
r = rgw_link_bucket(store, user_info.user_id, bucket, real_time());
r = rgw_link_bucket(store, user_info.user_id, bucket_info.bucket, real_time());
if (r < 0)
return r;
}

View File

@ -72,8 +72,6 @@ using namespace librados;
#define dout_subsys ceph_subsys_rgw
#define MAX_BUCKET_INDEX_SHARDS_PRIME 7877
using namespace std;
static RGWCache<RGWRados> cached_rados_provider;
@ -5274,7 +5272,7 @@ int RGWRados::init_bucket_index(rgw_bucket& bucket, int num_shards)
return r;
string dir_oid = dir_oid_prefix;
dir_oid.append(bucket.marker);
dir_oid.append(bucket.bucket_id);
map<int, string> bucket_objs;
get_bucket_index_objects(dir_oid, num_shards, bucket_objs);
@ -5282,6 +5280,15 @@ int RGWRados::init_bucket_index(rgw_bucket& bucket, int num_shards)
return CLSRGWIssueBucketIndexInit(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
}
void RGWRados::create_bucket_id(string *bucket_id)
{
uint64_t iid = instance_id();
uint64_t bid = next_bucket_id();
char buf[get_zone_params().get_id().size() + 48];
snprintf(buf, sizeof(buf), "%s.%llu.%llu", get_zone_params().get_id().c_str(), (long long)iid, (long long)bid);
*bucket_id = buf;
}
/**
* create a bucket with name bucket and the given list of attrs
* returns 0 on success, -ERR# otherwise.
@ -5312,11 +5319,7 @@ int RGWRados::create_bucket(RGWUserInfo& owner, rgw_bucket& bucket,
return ret;
if (!pmaster_bucket) {
uint64_t iid = instance_id();
uint64_t bid = next_bucket_id();
char buf[get_zone_params().get_id().size() + 48];
snprintf(buf, sizeof(buf), "%s.%llu.%llu", get_zone_params().get_id().c_str(), (long long)iid, (long long)bid);
bucket.marker = buf;
create_bucket_id(&bucket.marker);
bucket.bucket_id = bucket.marker;
} else {
bucket.marker = pmaster_bucket->marker;
@ -5991,6 +5994,21 @@ int RGWRados::BucketShard::init(rgw_bucket& _bucket, rgw_obj& obj)
return 0;
}
int RGWRados::BucketShard::init(rgw_bucket& _bucket, int sid)
{
bucket = _bucket;
shard_id = sid;
int ret = store->open_bucket_index_shard(bucket, index_ctx, shard_id, &bucket_obj);
if (ret < 0) {
ldout(store->ctx(), 0) << "ERROR: open_bucket_index_shard() returned ret=" << ret << dendl;
return ret;
}
ldout(store->ctx(), 20) << " bucket index object: " << bucket_obj << dendl;
return 0;
}
/* Execute @handler on last item in bucket listing for bucket specified
* in @bucket_info. @obj_prefix and @obj_delim narrow down the listing
@ -7793,13 +7811,13 @@ int RGWRados::open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
if (r < 0)
return r;
if (bucket.marker.empty()) {
ldout(cct, 0) << "ERROR: empty marker for bucket operation" << dendl;
if (bucket.bucket_id.empty()) {
ldout(cct, 0) << "ERROR: empty bucket id for bucket operation" << dendl;
return -EIO;
}
bucket_oid = dir_oid_prefix;
bucket_oid.append(bucket.marker);
bucket_oid.append(bucket.bucket_id);
return 0;
}
@ -7810,13 +7828,13 @@ int RGWRados::open_bucket_index_base(rgw_bucket& bucket, librados::IoCtx& index_
if (r < 0)
return r;
if (bucket.marker.empty()) {
ldout(cct, 0) << "ERROR: empty marker for bucket operation" << dendl;
if (bucket.bucket_id.empty()) {
ldout(cct, 0) << "ERROR: empty bucket_id for bucket operation" << dendl;
return -EIO;
}
bucket_oid_base = dir_oid_prefix;
bucket_oid_base.append(bucket.marker);
bucket_oid_base.append(bucket.bucket_id);
return 0;
@ -7885,6 +7903,27 @@ int RGWRados::open_bucket_index_shard(rgw_bucket& bucket, librados::IoCtx& index
return 0;
}
int RGWRados::open_bucket_index_shard(rgw_bucket& bucket, librados::IoCtx& index_ctx,
int shard_id, string *bucket_obj)
{
string bucket_oid_base;
int ret = open_bucket_index_base(bucket, index_ctx, bucket_oid_base);
if (ret < 0)
return ret;
RGWObjectCtx obj_ctx(this);
// Get the bucket info
RGWBucketInfo binfo;
ret = get_bucket_instance_info(obj_ctx, bucket, binfo, NULL, NULL);
if (ret < 0)
return ret;
get_bucket_index_object(bucket_oid_base, binfo.num_shards,
shard_id, bucket_obj);
return 0;
}
static void accumulate_raw_stats(const rgw_bucket_dir_header& header,
map<RGWObjCategory, RGWStorageStats>& stats)
{
@ -11530,6 +11569,20 @@ int RGWRados::bi_get(rgw_bucket& bucket, rgw_obj& obj, BIIndexType index_type, r
return 0;
}
void RGWRados::bi_put(ObjectWriteOperation& op, BucketShard& bs, rgw_cls_bi_entry& entry)
{
cls_rgw_bi_put(op, bs.bucket_obj, entry);
}
int RGWRados::bi_put(BucketShard& bs, rgw_cls_bi_entry& entry)
{
int ret = cls_rgw_bi_put(bs.index_ctx, bs.bucket_obj, entry);
if (ret < 0)
return ret;
return 0;
}
int RGWRados::bi_put(rgw_bucket& bucket, rgw_obj& obj, rgw_cls_bi_entry& entry)
{
BucketShard bs(this);
@ -11539,11 +11592,7 @@ int RGWRados::bi_put(rgw_bucket& bucket, rgw_obj& obj, rgw_cls_bi_entry& entry)
return ret;
}
ret = cls_rgw_bi_put(bs.index_ctx, bs.bucket_obj, entry);
if (ret < 0)
return ret;
return 0;
return bi_put(bs, entry);
}
int RGWRados::bi_list(rgw_bucket& bucket, const string& obj_name, const string& marker, uint32_t max, list<rgw_cls_bi_entry> *entries, bool *is_truncated)
@ -11563,6 +11612,41 @@ int RGWRados::bi_list(rgw_bucket& bucket, const string& obj_name, const string&
return 0;
}
int RGWRados::bi_list(BucketShard& bs, const string& filter_obj, const string& marker, uint32_t max, list<rgw_cls_bi_entry> *entries, bool *is_truncated)
{
int ret = cls_rgw_bi_list(bs.index_ctx, bs.bucket_obj, filter_obj, marker, max, entries, is_truncated);
if (ret < 0)
return ret;
return 0;
}
int RGWRados::bi_remove(BucketShard& bs)
{
int ret = bs.index_ctx.remove(bs.bucket_obj);
if (ret == -ENOENT) {
ret = 0;
}
if (ret < 0) {
ldout(cct, 5) << "bs.index_ctx.remove(" << bs.bucket_obj << ") returned ret=" << ret << dendl;
return ret;
}
return 0;
}
int RGWRados::bi_list(rgw_bucket& bucket, int shard_id, const string& filter_obj, const string& marker, uint32_t max, list<rgw_cls_bi_entry> *entries, bool *is_truncated)
{
BucketShard bs(this);
int ret = bs.init(bucket, shard_id);
if (ret < 0) {
ldout(cct, 5) << "bs.init() returned ret=" << ret << dendl;
return ret;
}
return bi_list(bs, filter_obj, marker, max, entries, is_truncated);
}
int RGWRados::gc_operate(string& oid, librados::ObjectWriteOperation *op)
{
return gc_pool_ctx.operate(oid, op);
@ -12318,6 +12402,44 @@ void RGWRados::get_bucket_instance_ids(RGWBucketInfo& bucket_info, int shard_id,
}
}
int RGWRados::get_target_shard_id(const RGWBucketInfo& bucket_info, const string& obj_key,
int *shard_id)
{
int r = 0;
switch (bucket_info.bucket_index_shard_hash_type) {
case RGWBucketInfo::MOD:
if (!bucket_info.num_shards) {
if (shard_id) {
*shard_id = -1;
}
} else {
uint32_t sid = ceph_str_hash_linux(obj_key.c_str(), obj_key.size());
uint32_t sid2 = sid ^ ((sid & 0xFF) << 24);
sid = sid2 % MAX_BUCKET_INDEX_SHARDS_PRIME % bucket_info.num_shards;
if (shard_id) {
*shard_id = (int)sid;
}
}
break;
default:
r = -ENOTSUP;
}
return r;
}
void RGWRados::get_bucket_index_object(const string& bucket_oid_base, uint32_t num_shards,
int shard_id, string *bucket_obj)
{
if (!num_shards) {
// By default with no sharding, we use the bucket oid as itself
(*bucket_obj) = bucket_oid_base;
} else {
char buf[bucket_oid_base.size() + 32];
snprintf(buf, sizeof(buf), "%s.%d", bucket_oid_base.c_str(), shard_id);
(*bucket_obj) = buf;
}
}
int RGWRados::get_bucket_index_object(const string& bucket_oid_base, const string& obj_key,
uint32_t num_shards, RGWBucketInfo::BIShardsHashType hash_type, string *bucket_obj, int *shard_id)
{

View File

@ -49,6 +49,8 @@ class RGWRESTConn;
#define RGW_NO_SHARD -1
#define MAX_BUCKET_INDEX_SHARDS_PRIME 7877
static inline void prepend_bucket_marker(rgw_bucket& bucket, const string& orig_oid, string& oid)
{
if (bucket.marker.empty() || orig_oid.empty()) {
@ -1819,6 +1821,8 @@ class RGWRados
string& bucket_oid_base);
int open_bucket_index_shard(rgw_bucket& bucket, librados::IoCtx& index_ctx,
const string& obj_key, string *bucket_obj, int *shard_id);
int open_bucket_index_shard(rgw_bucket& bucket, librados::IoCtx& index_ctx,
int shard_id, string *bucket_obj);
int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
map<int, string>& bucket_objs, int shard_id = -1, map<int, string> *bucket_instance_ids = NULL);
template<typename T>
@ -2083,6 +2087,10 @@ public:
int get_required_alignment(rgw_bucket& bucket, uint64_t *alignment);
int get_max_chunk_size(rgw_bucket& bucket, uint64_t *max_chunk_size);
uint32_t get_max_bucket_shards() {
return MAX_BUCKET_INDEX_SHARDS_PRIME;
}
int list_raw_objects(rgw_bucket& pool, const string& prefix_filter, int max,
RGWListRawObjsCtx& ctx, list<string>& oids,
bool *is_truncated);
@ -2160,6 +2168,7 @@ public:
RGWZonePlacementInfo *rule_info);
int set_bucket_location_by_rule(const string& location_rule, const string& tenant_name, const string& bucket_name, rgw_bucket& bucket,
RGWZonePlacementInfo *rule_info);
void create_bucket_id(string *bucket_id);
virtual int create_bucket(RGWUserInfo& owner, rgw_bucket& bucket,
const string& zonegroup_id,
const string& placement_rule,
@ -2242,6 +2251,7 @@ public:
explicit BucketShard(RGWRados *_store) : store(_store), shard_id(-1) {}
int init(rgw_bucket& _bucket, rgw_obj& obj);
int init(rgw_bucket& _bucket, int sid);
};
class Object {
@ -2923,9 +2933,14 @@ public:
int bi_get_instance(rgw_obj& obj, rgw_bucket_dir_entry *dirent);
int bi_get(rgw_bucket& bucket, rgw_obj& obj, BIIndexType index_type, rgw_cls_bi_entry *entry);
void bi_put(librados::ObjectWriteOperation& op, BucketShard& bs, rgw_cls_bi_entry& entry);
int bi_put(BucketShard& bs, rgw_cls_bi_entry& entry);
int bi_put(rgw_bucket& bucket, rgw_obj& obj, rgw_cls_bi_entry& entry);
int bi_list(rgw_bucket& bucket, int shard_id, const string& filter_obj, const string& marker, uint32_t max, list<rgw_cls_bi_entry> *entries, bool *is_truncated);
int bi_list(BucketShard& bs, const string& filter_obj, const string& marker, uint32_t max, list<rgw_cls_bi_entry> *entries, bool *is_truncated);
int bi_list(rgw_bucket& bucket, const string& obj_name, const string& marker, uint32_t max,
list<rgw_cls_bi_entry> *entries, bool *is_truncated);
int bi_remove(BucketShard& bs);
int cls_obj_usage_log_add(const string& oid, rgw_usage_log_info& info);
int cls_obj_usage_log_read(string& oid, string& user, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries,
@ -2936,6 +2951,7 @@ public:
void shard_name(const string& prefix, unsigned max_shards, const string& key, string& name, int *shard_id);
void shard_name(const string& prefix, unsigned max_shards, const string& section, const string& key, string& name);
void shard_name(const string& prefix, unsigned shard_id, string& name);
int get_target_shard_id(const RGWBucketInfo& bucket_info, const string& obj_key, int *shard_id);
void time_log_prepare_entry(cls_log_entry& entry, const ceph::real_time& ut, const string& section, const string& key, bufferlist& bl);
int time_log_add_init(librados::IoCtx& io_ctx);
int time_log_add(const string& oid, list<cls_log_entry>& entries,
@ -3108,6 +3124,9 @@ public:
int get_bucket_index_object(const string& bucket_oid_base, const string& obj_key,
uint32_t num_shards, RGWBucketInfo::BIShardsHashType hash_type, string *bucket_obj, int *shard);
void get_bucket_index_object(const string& bucket_oid_base, uint32_t num_shards,
int shard_id, string *bucket_obj);
/**
* Check the actual on-disk state of the object specified
* by list_state, and fill in the time and size of object.

View File

@ -23,6 +23,10 @@
bucket stats returns bucket statistics
bucket rm remove bucket
bucket check check bucket index
bucket reshard reshard bucket
bi get retrieve bucket index object entries
bi put store bucket index object entries
bi list list raw bucket index entries
object rm remove object
object unlink unlink object from bucket index
objects expire run expired objects cleanup