rgw, cls_rgw: keep shard ids with oids

Instead of just having the list of oids, keep the shard ids together, so
that we can know on which shard the operation happened.
Bucket markers are just using the shard numeric id, instead of the
bucket instance shard id. This makes it easier to parse the markers
appropriately.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
This commit is contained in:
Yehuda Sadeh 2014-12-05 15:52:26 -08:00
parent 6b1c4a0bc2
commit 0d1f97ff31
4 changed files with 85 additions and 76 deletions

View File

@ -52,7 +52,7 @@ void BucketIndexAioManager::do_completion(int id) {
// for further processing
map<int, string>::iterator miter = pending_objs.find(id);
if (miter != pending_objs.end()) {
completion_objs.push_back(miter->second);
completion_objs[id] = miter->second;
pending_objs.erase(miter);
}
@ -60,7 +60,7 @@ void BucketIndexAioManager::do_completion(int id) {
}
bool BucketIndexAioManager::wait_for_completions(int valid_ret_code,
int *num_completions, int *ret_code, vector<string> *objs) {
int *num_completions, int *ret_code, map<int, string> *objs) {
lock.Lock();
if (pendings.empty() && completions.empty()) {
lock.Unlock();
@ -71,11 +71,11 @@ bool BucketIndexAioManager::wait_for_completions(int valid_ret_code,
// Clear the completed AIOs
list<librados::AioCompletion*>::iterator iter = completions.begin();
list<string>::iterator liter = completion_objs.begin();
map<int, string>::iterator liter = completion_objs.begin();
for (; iter != completions.end() && liter != completion_objs.end(); ++iter, ++liter) {
int r = (*iter)->get_return_value();
if (objs && r == 0) {
objs->push_back(*liter);
(*objs)[liter->first] = liter->second;
}
if (ret_code && (r < 0 && r != valid_ret_code))
(*ret_code) = r;

View File

@ -32,7 +32,7 @@ private:
map<int, librados::AioCompletion*> pendings;
list<librados::AioCompletion*> completions;
map<int, string> pending_objs;
list<string> completion_objs;
map<int, string> completion_objs;
int next;
Mutex lock;
Cond cond;
@ -71,8 +71,7 @@ public:
/*
* Create a new instance.
*/
BucketIndexAioManager() : pendings(), completions(), pending_objs(), completion_objs(),
next(0), lock("BucketIndexAioManager::lock"), cond() {}
BucketIndexAioManager() : next(0), lock("BucketIndexAioManager::lock") {}
/*
@ -91,7 +90,7 @@ public:
* Return false if there is no pending AIO, true otherwise.
*/
bool wait_for_completions(int valid_ret_code, int *num_completions, int *ret_code,
vector<string> *objs);
map<int, string> *objs);
/**
* Do aio read operation.
@ -214,8 +213,8 @@ protected:
// OP needs to be re-send until a certain code is returned.
virtual bool need_multiple_rounds() { return false; }
// Add a new object to the end of the container.
virtual void add_object(const string& oid) {}
virtual void reset_container(vector<string>& objs) {}
virtual void add_object(int shard, const string& oid) {}
virtual void reset_container(map<int, string>& objs) {}
public:
CLSRGWConcurrentIO(librados::IoCtx& ioc, map<int, string>& _objs_container,
@ -232,7 +231,7 @@ public:
}
int num_completions, r = 0;
vector<string> objs;
map<int, string> objs;
while (manager.wait_for_completions(valid_ret_code(), &num_completions, &r, &objs)) {
if (r >= 0 && ret >= 0) {
for(int i = 0; i < num_completions && iter != objs_container.end(); ++i, ++iter) {

View File

@ -1676,10 +1676,10 @@ int RGWRados::open_bucket_data_extra_ctx(rgw_bucket& bucket, librados::IoCtx& da
return 0;
}
void RGWRados::build_bucket_index_marker(const string& shard_name, const string& shard_marker,
void RGWRados::build_bucket_index_marker(const string& shard_id_str, const string& shard_marker,
string *marker) {
if (marker) {
*marker = shard_name;
*marker = shard_id_str;
marker->append(BucketIndexShardsManager::KEY_VALUE_SEPARATOR);
marker->append(shard_marker);
}
@ -2386,7 +2386,7 @@ int RGWRados::init_bucket_index(rgw_bucket& bucket)
string dir_oid = dir_oid_prefix;
dir_oid.append(bucket.marker);
vector<string> bucket_objs;
map<int, string> bucket_objs;
get_bucket_index_objects(dir_oid, bucket_index_max_shards, bucket_objs);
return CLSRGWIssueBucketIndexInit(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
@ -2486,15 +2486,15 @@ int RGWRados::create_bucket(RGWUserInfo& owner, rgw_bucket& bucket,
/* remove bucket index */
librados::IoCtx index_ctx; // context for new bucket
vector<string> bucket_objs;
map<int, string> bucket_objs;
int r = open_bucket_index(bucket, index_ctx, bucket_objs);
if (r < 0)
return r;
vector<string>::const_iterator biter;
map<int, string>::const_iterator biter;
for (biter = bucket_objs.begin(); biter != bucket_objs.end(); ++biter) {
// Do best effort removal
index_ctx.remove(*biter);
index_ctx.remove(biter->second);
}
}
/* ret == -ENOENT here */
@ -3834,7 +3834,7 @@ int RGWRados::open_bucket_index_base(rgw_bucket& bucket, librados::IoCtx& index_
}
int RGWRados::open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
vector<string>& bucket_objs, int shard_id, vector<string> *bucket_instance_ids) {
map<int, string>& bucket_objs, int shard_id, map<int, string> *bucket_instance_ids) {
string bucket_oid_base;
int ret = open_bucket_index_base(bucket, index_ctx, bucket_oid_base);
if (ret < 0)
@ -3855,16 +3855,16 @@ int RGWRados::open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
template<typename T>
int RGWRados::open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
map<string, T>& bucket_objs, int shard_id, vector<string> *bucket_instance_ids)
map<int, string>& oids, map<int, T>& bucket_objs,
int shard_id, map<int, string> *bucket_instance_ids)
{
vector<string> oids;
int ret = open_bucket_index(bucket, index_ctx, oids, shard_id, bucket_instance_ids);
if (ret < 0)
return ret;
vector<string>::const_iterator iter = oids.begin();
map<int, string>::const_iterator iter = oids.begin();
for (; iter != oids.end(); ++iter) {
bucket_objs[*iter] = T();
bucket_objs[iter->first] = T();
}
return 0;
}
@ -3912,17 +3912,18 @@ int RGWRados::bucket_check_index(rgw_bucket& bucket,
librados::IoCtx index_ctx;
// key - bucket index object id
// value - bucket index check OP returned result with the given bucket index object (shard)
map<string, struct rgw_cls_check_index_ret> bucket_objs_ret;
int ret = open_bucket_index(bucket, index_ctx, bucket_objs_ret);
map<int, string> oids;
map<int, struct rgw_cls_check_index_ret> bucket_objs_ret;
int ret = open_bucket_index(bucket, index_ctx, oids, bucket_objs_ret);
if (ret < 0)
return ret;
ret = CLSRGWIssueBucketCheck(index_ctx, bucket_objs_ret, cct->_conf->rgw_bucket_index_max_aio)();
ret = CLSRGWIssueBucketCheck(index_ctx, oids, bucket_objs_ret, cct->_conf->rgw_bucket_index_max_aio)();
if (ret < 0)
return ret;
// Aggregate results (from different shards if there is any)
map<string, struct rgw_cls_check_index_ret>::iterator iter;
map<int, struct rgw_cls_check_index_ret>::iterator iter;
for (iter = bucket_objs_ret.begin(); iter != bucket_objs_ret.end(); ++iter) {
accumulate_raw_stats(iter->second.existing_header, *existing_stats);
accumulate_raw_stats(iter->second.calculated_header, *calculated_stats);
@ -3934,7 +3935,7 @@ int RGWRados::bucket_check_index(rgw_bucket& bucket,
int RGWRados::bucket_rebuild_index(rgw_bucket& bucket)
{
librados::IoCtx index_ctx;
vector<string> bucket_objs;
map<int, string> bucket_objs;
int r = open_bucket_index(bucket, index_ctx, bucket_objs);
if (r < 0)
return r;
@ -5525,7 +5526,7 @@ int RGWRados::get_bucket_stats(rgw_bucket& bucket, string *bucket_ver, string *m
map<RGWObjCategory, RGWStorageStats>& stats, string *max_marker)
{
map<string, rgw_bucket_dir_header> headers;
vector<string> bucket_instance_ids;
map<int, string> bucket_instance_ids;
int r = cls_bucket_head(bucket, headers, &bucket_instance_ids);
if (r < 0)
return r;
@ -5533,7 +5534,7 @@ int RGWRados::get_bucket_stats(rgw_bucket& bucket, string *bucket_ver, string *m
assert(headers.size() == bucket_instance_ids.size());
map<string, rgw_bucket_dir_header>::iterator iter = headers.begin();
vector<string>::iterator viter = bucket_instance_ids.begin();
map<int, string>::iterator viter = bucket_instance_ids.begin();
BucketIndexShardsManager ver_mgr;
BucketIndexShardsManager master_ver_mgr;
BucketIndexShardsManager marker_mgr;
@ -5541,10 +5542,10 @@ int RGWRados::get_bucket_stats(rgw_bucket& bucket, string *bucket_ver, string *m
for(; iter != headers.end(); ++iter, ++viter) {
accumulate_raw_stats(iter->second, stats);
snprintf(buf, sizeof(buf), "%lu", iter->second.ver);
ver_mgr.add(*viter, string(buf));
ver_mgr.add(viter->first, string(buf));
snprintf(buf, sizeof(buf), "%lu", iter->second.master_ver);
master_ver_mgr.add(*viter, string(buf));
marker_mgr.add(*viter, iter->second.max_marker);
master_ver_mgr.add(viter->first, string(buf));
marker_mgr.add(viter->first, iter->second.max_marker);
}
ver_mgr.to_string(bucket_ver);
master_ver_mgr.to_string(master_ver);
@ -6162,33 +6163,39 @@ int RGWRados::list_bi_log_entries(rgw_bucket& bucket, int shard_id, string& mark
result.clear();
librados::IoCtx index_ctx;
map<string, cls_rgw_bi_log_list_ret> bi_log_lists;
vector<string> bucket_instance_ids;
int r = open_bucket_index(bucket, index_ctx, bi_log_lists, shard_id, &bucket_instance_ids);
map<int, string> oids;
map<int, cls_rgw_bi_log_list_ret> bi_log_lists;
map<int, string> bucket_instance_ids;
int r = open_bucket_index(bucket, index_ctx, oids, shard_id, &bucket_instance_ids);
if (r < 0)
return r;
BucketIndexShardsManager marker_mgr;
bool has_shards = (bi_log_lists.size() > 1 || shard_id >= 0);
bool has_shards = (oids.size() > 1 || shard_id >= 0);
// If there are multiple shards for the bucket index object, the marker
// should have the pattern '{shard_oid_1}#{shard_marker_1},{shard_oid_2}#
// should have the pattern '{shard_id_1}#{shard_marker_1},{shard_id_2}#
// {shard_marker_2}...', if there is no sharding, the bi_log_list should
// only contain one record, and the key is the bucket index object id.
r = marker_mgr.from_string(marker, has_shards, bi_log_lists.begin()->first);
// only contain one record, and the key is the bucket instance id.
r = marker_mgr.from_string(marker, has_shards);
if (r < 0)
return r;
r = CLSRGWIssueBILogList(index_ctx, marker_mgr, max, bi_log_lists, cct->_conf->rgw_bucket_index_max_aio)();
r = CLSRGWIssueBILogList(index_ctx, marker_mgr, max, oids, bi_log_lists, cct->_conf->rgw_bucket_index_max_aio)();
if (r < 0)
return r;
vector<string> shard_ids_str;
vector<list<rgw_bi_log_entry>::iterator> vcurrents;
vector<list<rgw_bi_log_entry>::iterator> vends;
if (truncated) {
*truncated = false;
}
map<string, cls_rgw_bi_log_list_ret>::iterator miter = bi_log_lists.begin();
map<int, cls_rgw_bi_log_list_ret>::iterator miter = bi_log_lists.begin();
for (; miter != bi_log_lists.end(); ++miter) {
int shard_id = miter->first;
char buf[16];
snprintf(buf, sizeof(buf), "%d", shard_id);
shard_ids_str.push_back(buf);
vcurrents.push_back(miter->second.entries.begin());
vends.push_back(miter->second.entries.end());
if (truncated) {
@ -6202,17 +6209,17 @@ int RGWRados::list_bi_log_entries(rgw_bucket& bucket, int shard_id, string& mark
for (size_t i = 0;
result.size() < max && i < vcurrents.size() && vcurrents[i] != vends[i];
++vcurrents[i], ++i) {
string& shard_str = shard_ids_str[i];
if (vcurrents[i] != vends[i]) {
rgw_bi_log_entry& entry = *(vcurrents[i]);
string& name = bucket_instance_ids[i];
if (has_shards) {
// Put the shard name as part of the ID, so that caller can easy find out
// the next marker
string tmp_id;
build_bucket_index_marker(name, entry.id, &tmp_id);
build_bucket_index_marker(shard_str, entry.id, &tmp_id);
entry.id.swap(tmp_id);
}
marker_mgr.add(name, entry.id);
marker_mgr.add(i, entry.id);
result.push_back(entry);
has_more = true;
}
@ -6240,18 +6247,18 @@ int RGWRados::list_bi_log_entries(rgw_bucket& bucket, int shard_id, string& mark
int RGWRados::trim_bi_log_entries(rgw_bucket& bucket, int shard_id, string& start_marker, string& end_marker)
{
librados::IoCtx index_ctx;
vector<string> bucket_objs;
map<int, string> bucket_objs;
int r = open_bucket_index(bucket, index_ctx, bucket_objs, shard_id);
if (r < 0)
return r;
bool has_shards = bucket_objs.size() > 1 || shard_id >= 0;
BucketIndexShardsManager start_marker_mgr;
r = start_marker_mgr.from_string(start_marker, has_shards, bucket_objs.front());
r = start_marker_mgr.from_string(start_marker, has_shards);
if (r < 0)
return r;
BucketIndexShardsManager end_marker_mgr;
r = end_marker_mgr.from_string(end_marker, has_shards, bucket_objs.front());
r = end_marker_mgr.from_string(end_marker, has_shards);
if (r < 0)
return r;
@ -6357,7 +6364,7 @@ int RGWRados::cls_obj_complete_cancel(BucketShard& bs, string& tag, string& name
int RGWRados::cls_obj_set_bucket_tag_timeout(rgw_bucket& bucket, uint64_t timeout)
{
librados::IoCtx index_ctx;
vector<string> bucket_objs;
map<int, string> bucket_objs;
int r = open_bucket_index(bucket, index_ctx, bucket_objs);
if (r < 0)
return r;
@ -6374,12 +6381,13 @@ int RGWRados::cls_bucket_list(rgw_bucket& bucket, const string& start, const str
librados::IoCtx index_ctx;
// key - oid (for different shards if there is any)
// value - list result for the corresponding oid (shard), it is filled by the AIO callback
map<string, struct rgw_cls_list_ret> list_results;
int r = open_bucket_index(bucket, index_ctx, list_results);
map<int, string> oids;
map<int, struct rgw_cls_list_ret> list_results;
int r = open_bucket_index(bucket, index_ctx, oids);
if (r < 0)
return r;
r = CLSRGWIssueBucketList(index_ctx, start, prefix, num_entries, list_results, cct->_conf->rgw_bucket_index_max_aio)();
r = CLSRGWIssueBucketList(index_ctx, start, prefix, num_entries, oids, list_results, cct->_conf->rgw_bucket_index_max_aio)();
if (r < 0)
return r;
@ -6387,12 +6395,12 @@ int RGWRados::cls_bucket_list(rgw_bucket& bucket, const string& start, const str
vector<map<string, struct rgw_bucket_dir_entry>::iterator> vcurrents(list_results.size());
vector<map<string, struct rgw_bucket_dir_entry>::iterator> vends(list_results.size());
vector<string> vnames(list_results.size());
map<string, struct rgw_cls_list_ret>::iterator iter = list_results.begin();
map<int, struct rgw_cls_list_ret>::iterator iter = list_results.begin();
*is_truncated = false;
for (; iter != list_results.end(); ++iter) {
vcurrents.push_back(iter->second.dir.m.begin());
vends.push_back(iter->second.dir.m.end());
vnames.push_back(iter->first);
vnames.push_back(oids[iter->first]);
*is_truncated = (*is_truncated || iter->second.is_truncated);
}
@ -6663,21 +6671,22 @@ int RGWRados::check_disk_state(librados::IoCtx io_ctx,
return 0;
}
int RGWRados::cls_bucket_head(rgw_bucket& bucket, map<string, struct rgw_bucket_dir_header>& headers, vector<string> *bucket_instance_ids)
int RGWRados::cls_bucket_head(rgw_bucket& bucket, map<string, struct rgw_bucket_dir_header>& headers, map<int, string> *bucket_instance_ids)
{
librados::IoCtx index_ctx;
map<string, struct rgw_cls_list_ret> list_results;
int r = open_bucket_index(bucket, index_ctx, list_results, -1, bucket_instance_ids);
map<int, string> oids;
map<int, struct rgw_cls_list_ret> list_results;
int r = open_bucket_index(bucket, index_ctx, oids, list_results, -1, bucket_instance_ids);
if (r < 0)
return r;
r = CLSRGWIssueGetDirHeader(index_ctx, list_results, cct->_conf->rgw_bucket_index_max_aio)();
r = CLSRGWIssueGetDirHeader(index_ctx, oids, list_results, cct->_conf->rgw_bucket_index_max_aio)();
if (r < 0)
return r;
map<string, struct rgw_cls_list_ret>::iterator iter = list_results.begin();
map<int, struct rgw_cls_list_ret>::iterator iter = list_results.begin();
for(; iter != list_results.end(); ++iter) {
headers[iter->first] = iter->second.dir.header;
headers[oids[iter->first]] = iter->second.dir.header;
}
return 0;
}
@ -6685,14 +6694,14 @@ int RGWRados::cls_bucket_head(rgw_bucket& bucket, map<string, struct rgw_bucket_
int RGWRados::cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx, int *num_aio)
{
librados::IoCtx index_ctx;
vector<string> bucket_objs;
map<int, string> bucket_objs;
int r = open_bucket_index(bucket, index_ctx, bucket_objs);
if (r < 0)
return r;
vector<string>::iterator iter = bucket_objs.begin();
map<int, string>::iterator iter = bucket_objs.begin();
for (; iter != bucket_objs.end(); ++iter) {
r = cls_rgw_get_dir_header_async(index_ctx, *iter, static_cast<RGWGetDirHeader_CB*>(ctx->get()));
r = cls_rgw_get_dir_header_async(index_ctx, iter->second, static_cast<RGWGetDirHeader_CB*>(ctx->get()));
if (r < 0) {
ctx->put();
break;
@ -6990,46 +6999,46 @@ int RGWRados::remove_temp_objects(string date, string time)
}
void RGWRados::get_bucket_index_objects(const string& bucket_oid_base,
uint32_t num_shards, vector<string>& bucket_objects, int shard_id)
uint32_t num_shards, map<int, string>& bucket_objects, int shard_id)
{
if (!num_shards) {
bucket_objects.push_back(bucket_oid_base);
bucket_objects[0] = bucket_oid_base;
} else {
char buf[bucket_oid_base.size() + 32];
if (shard_id < 0) {
for (uint32_t i = 0; i < num_shards; ++i) {
snprintf(buf, sizeof(buf), "%s.%d", bucket_oid_base.c_str(), i);
bucket_objects.push_back(string(buf));
bucket_objects[i] = buf;
}
} else {
if ((uint32_t)shard_id > num_shards) {
return;
}
snprintf(buf, sizeof(buf), "%s.%d", bucket_oid_base.c_str(), shard_id);
bucket_objects.push_back(string(buf));
bucket_objects[shard_id] = buf;
}
}
}
void RGWRados::get_bucket_instance_ids(RGWBucketInfo& bucket_info, int shard_id, vector<string> *result)
void RGWRados::get_bucket_instance_ids(RGWBucketInfo& bucket_info, int shard_id, map<int, string> *result)
{
rgw_bucket& bucket = bucket_info.bucket;
string plain_id = bucket.name + ":" + bucket.bucket_id;
if (!bucket_info.num_shards) {
result->push_back(plain_id);
(*result)[0] = plain_id;
} else {
char buf[16];
if (shard_id < 0) {
for (uint32_t i = 0; i < bucket_info.num_shards; ++i) {
snprintf(buf, sizeof(buf), ":%d", i);
result->push_back(plain_id + buf);
(*result)[i] = plain_id + buf;
}
} else {
if ((uint32_t)shard_id > bucket_info.num_shards) {
return;
}
snprintf(buf, sizeof(buf), ":%d", shard_id);
result->push_back(plain_id + buf);
(*result)[shard_id] = plain_id + buf;
}
}
}

View File

@ -1257,14 +1257,15 @@ class RGWRados
int open_bucket_index_shard(rgw_bucket& bucket, librados::IoCtx& index_ctx,
const string& obj_key, string *bucket_obj, int *shard_id);
int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
vector<string>& bucket_objs, int shard_id = -1, vector<string> *bucket_instance_ids = NULL);
map<int, string>& bucket_objs, int shard_id = -1, map<int, string> *bucket_instance_ids = NULL);
template<typename T>
int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
map<string, T>& bucket_objs, int shard_id = -1, vector<string> *bucket_instance_ids = NULL);
void build_bucket_index_marker(const string& shard_name, const string& shard_marker,
map<int, string>& oids, map<int, T>& bucket_objs,
int shard_id = -1, map<int, string> *bucket_instance_ids = NULL);
void build_bucket_index_marker(const string& shard_id_str, const string& shard_marker,
string *marker);
void get_bucket_instance_ids(RGWBucketInfo& bucket_info, int shard_id, vector<string> *result);
void get_bucket_instance_ids(RGWBucketInfo& bucket_info, int shard_id, map<int, string> *result);
struct GetObjState {
librados::IoCtx io_ctx;
@ -1867,7 +1868,7 @@ public:
int cls_bucket_list(rgw_bucket& bucket, const string& start, const string& prefix, uint32_t hint_num,
map<string, RGWObjEnt>& m, bool *is_truncated, string *last_entry,
bool (*force_check_filter)(const string& name) = NULL);
int cls_bucket_head(rgw_bucket& bucket, map<string, struct rgw_bucket_dir_header>& headers, vector<string> *bucket_instance_ids = NULL);
int cls_bucket_head(rgw_bucket& bucket, map<string, struct rgw_bucket_dir_header>& headers, map<int, string> *bucket_instance_ids = NULL);
int cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx, int *num_aio);
int prepare_update_index(RGWObjState *state, BucketShard& bucket_shard,
@ -1974,7 +1975,7 @@ public:
* bucket_objs [out] - filled by this method, a list of bucket index objects.
*/
void get_bucket_index_objects(const string& bucket_oid_base, uint32_t num_shards,
vector<string>& bucket_objs, int shard_id = -1);
map<int, string>& bucket_objs, int shard_id = -1);
/**
* Get the bucket index object with the given base bucket index object and object key,