Adjust bi log listing to work with multiple bucket shards.

Signed-off-by: Guang Yang (yguang@yahoo-inc.com)
This commit is contained in:
Guang Yang 2014-09-23 23:14:24 +00:00 committed by Yehuda Sadeh
parent 47665b23df
commit f9b280ea89
4 changed files with 214 additions and 113 deletions

View File

@ -11,6 +11,9 @@
using namespace librados;
const string BucketIndexShardsManager::KEY_VALUE_SEPARATOR = "#";
const string BucketIndexShardsManager::SHARDS_SEPARATOR = ",";
/**
* This class represents the bucket index object operation callback context.
*/
@ -37,15 +40,6 @@ public:
}
};
/*
* Callback implementation for AIO request.
*/
static void bucket_index_op_completion_cb(void* cb, void* arg) {
BucketIndexAioArg* cb_arg = (BucketIndexAioArg*) arg;
cb_arg->manager->do_completion(cb_arg->id);
cb_arg->put();
}
void BucketIndexAioManager::do_completion(int id) {
Mutex::Locker l(lock);
@ -95,12 +89,7 @@ static bool issue_bucket_index_init_op(librados::IoCtx& io_ctx,
librados::ObjectWriteOperation op;
op.create(true);
op.exec("rgw", "bucket_init_index", in);
BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager);
AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
int r = io_ctx.aio_operate(oid, c, &op);
if (r >= 0)
manager->add_pending(arg->id, c);
return r;
return manager->aio_operate(io_ctx, oid, &op);
}
static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx,
@ -111,13 +100,7 @@ static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx,
::encode(call, in);
ObjectWriteOperation op;
op.exec("rgw", "bucket_set_tag_timeout", in);
BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager);
AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
int r = io_ctx.aio_operate(oid, c, &op);
if (r >= 0) {
manager->add_pending(arg->id, c);
}
return r;
return manager->aio_operate(io_ctx, oid, &op);
}
int CLSRGWIssueBucketIndexInit::issue_op()
@ -184,13 +167,7 @@ static bool issue_bucket_list_op(librados::IoCtx& io_ctx,
librados::ObjectReadOperation op;
op.exec("rgw", "bucket_list", in, new ClsBucketIndexOpCtx<struct rgw_cls_list_ret>(pdata, NULL));
BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager);
AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
int r = io_ctx.aio_operate(oid, c, &op, NULL);
if (r >= 0)
manager->add_pending(arg->id, c);
return r;
return manager->aio_operate(io_ctx, oid, &op);
}
int CLSRGWIssueBucketList::issue_op()
@ -198,19 +175,32 @@ int CLSRGWIssueBucketList::issue_op()
return issue_bucket_list_op(io_ctx, iter->first, start_obj, filter_prefix, num_entries, &manager, &iter->second);
}
static bool issue_bi_log_list_op(librados::IoCtx& io_ctx,
const string& oid, BucketIndexShardsManager& marker_mgr, uint32_t max, BucketIndexAioManager *manager,
struct cls_rgw_bi_log_list_ret *pdata) {
bufferlist in;
cls_rgw_bi_log_list_op call;
call.marker = marker_mgr.get(oid, "");
call.max = max;
::encode(call, in);
librados::ObjectReadOperation op;
op.exec("rgw", "bi_log_list", in, new ClsBucketIndexOpCtx<struct cls_rgw_bi_log_list_ret>(pdata, NULL));
return manager->aio_operate(io_ctx, oid, &op);
}
int CLSRGWIssueBILogList::issue_op()
{
return issue_bi_log_list_op(io_ctx, iter->first, marker_mgr, max, &manager, &iter->second);
}
static bool issue_bucket_check_index_op(IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager,
struct rgw_cls_check_index_ret *pdata) {
bufferlist in;
librados::ObjectReadOperation op;
op.exec("rgw", "bucket_check_index", in, new ClsBucketIndexOpCtx<struct rgw_cls_check_index_ret>(
pdata, NULL));
BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager);
AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
int r = io_ctx.aio_operate(oid, c, &op, NULL);
if (r >= 0) {
manager->add_pending(arg->id, c);
}
return r;
return manager->aio_operate(io_ctx, oid, &op);
}
int CLSRGWIssueBucketCheck::issue_op()
@ -223,13 +213,7 @@ static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const string& oid,
bufferlist in;
librados::ObjectWriteOperation op;
op.exec("rgw", "bucket_rebuild_index", in);
BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager);
AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
int r = io_ctx.aio_operate(oid, c, &op);
if (r >= 0) {
manager->add_pending(arg->id, c);
}
return r;
return manager->aio_operate(io_ctx, oid, &op);
}
int CLSRGWIssueBucketRebuild::issue_op()
@ -291,34 +275,6 @@ int cls_rgw_get_dir_header_async(IoCtx& io_ctx, string& oid, RGWGetDirHeader_CB
return 0;
}
int cls_rgw_bi_log_list(IoCtx& io_ctx, string& oid, string& marker, uint32_t max,
list<rgw_bi_log_entry>& entries, bool *truncated)
{
bufferlist in, out;
cls_rgw_bi_log_list_op call;
call.marker = marker;
call.max = max;
::encode(call, in);
int r = io_ctx.exec(oid, "rgw", "bi_log_list", in, out);
if (r < 0)
return r;
cls_rgw_bi_log_list_ret ret;
try {
bufferlist::iterator iter = out.begin();
::decode(ret, iter);
} catch (buffer::error& err) {
return -EIO;
}
entries = ret.entries;
if (truncated)
*truncated = ret.truncated;
return r;
}
int cls_rgw_bi_log_trim(IoCtx& io_ctx, string& oid, string& start_marker, string& end_marker)
{
do {

View File

@ -2,11 +2,26 @@
#define CEPH_CLS_RGW_CLIENT_H
#include "include/types.h"
#include "include/str_list.h"
#include "include/rados/librados.hpp"
#include "cls_rgw_types.h"
#include "cls_rgw_ops.h"
#include "common/RefCountedObj.h"
// Forward declaration
class BucketIndexAioManager;
/*
* Bucket index AIO request argument, this is used to pass a argument
* to callback.
*/
struct BucketIndexAioArg : public RefCountedObject {
BucketIndexAioArg(int _id, BucketIndexAioManager* _manager) :
id(_id), manager(_manager) {}
int id;
BucketIndexAioManager* manager;
};
/*
* This class manages AIO completions. This class is not completely thread-safe,
* methods like *get_next* is not thread-safe and is expected to be called from
@ -19,12 +34,21 @@ private:
int next;
Mutex lock;
Cond cond;
public:
/*
* Create a new instance.
* Callback implementation for AIO request.
*/
BucketIndexAioManager() : pendings(), completions(), next(0),
lock("BucketIndexAioManager::lock"), cond() {}
static void bucket_index_op_completion_cb(void* cb, void* arg) {
BucketIndexAioArg* cb_arg = (BucketIndexAioArg*) arg;
cb_arg->manager->do_completion(cb_arg->id);
cb_arg->put();
}
/*
* Get next request ID. This method is not thread-safe.
*
* Return next request ID.
*/
int get_next() { return next++; }
/*
* Add a new pending AIO completion instance.
@ -36,19 +60,18 @@ public:
Mutex::Locker l(lock);
pendings[id] = completion;
}
public:
/*
* Create a new instance.
*/
BucketIndexAioManager() : pendings(), completions(), next(0),
lock("BucketIndexAioManager::lock"), cond() {}
/*
* Do completion for the given AIO request.
*/
void do_completion(int id);
/*
* Get next request ID. This method is not thread-safe.
*
* Return next request ID.
*/
int get_next() { return next++; }
/*
* Wait for AIO completions.
*
@ -59,17 +82,32 @@ public:
* Return false if there is no pending AIO, true otherwise.
*/
bool wait_for_completions(int valid_ret_code, int *num_completions, int *ret_code);
};
/*
* Bucket index AIO request argument, this is used to pass a argument
* to callback.
*/
struct BucketIndexAioArg : public RefCountedObject {
BucketIndexAioArg(int _id, BucketIndexAioManager* _manager) :
id(_id), manager(_manager) {}
int id;
BucketIndexAioManager* manager;
/**
* Do aio read operation.
*/
bool aio_operate(librados::IoCtx& io_ctx, const string& oid, librados::ObjectReadOperation *op) {
BucketIndexAioArg *arg = new BucketIndexAioArg(get_next(), this);
librados::AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
int r = io_ctx.aio_operate(oid, c, (librados::ObjectReadOperation*)op, NULL);
if (r >= 0) {
add_pending(arg->id, c);
}
return r;
}
/**
* Do aio write operation.
*/
bool aio_operate(librados::IoCtx& io_ctx, const string& oid, librados::ObjectWriteOperation *op) {
BucketIndexAioArg *arg = new BucketIndexAioArg(get_next(), this);
librados::AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
int r = io_ctx.aio_operate(oid, c, (librados::ObjectWriteOperation*)op);
if (r >= 0) {
add_pending(arg->id, c);
}
return r;
}
};
class RGWGetDirHeader_CB : public RefCountedObject {
@ -82,12 +120,23 @@ class BucketIndexShardsManager {
private:
// Per shard setting manager, for example, marker.
map<string, string> value_by_shards;
const static char KEY_VALUE_SEPARATOR = '#';
const static char SHARDS_SEPARATOR = ',';
public:
void add_item(const string& shard, const string& value) {
const static string KEY_VALUE_SEPARATOR;
const static string SHARDS_SEPARATOR;
void add(const string& shard, const string& value) {
value_by_shards[shard] = value;
}
const string& get(const string& shard, const string& default_value) {
map<string, string>::iterator iter = value_by_shards.find(shard);
return (iter == value_by_shards.end() ? default_value : iter->second);
}
bool empty() {
return value_by_shards.empty();
}
void to_string(string *out) const {
if (out) {
map<string, string>::const_iterator iter = value_by_shards.begin();
@ -98,15 +147,34 @@ public:
for (; iter != value_by_shards.end(); ++iter) {
if (out->length()) {
// Not the first item, append a separator first
out->append(1, SHARDS_SEPARATOR);
out->append(SHARDS_SEPARATOR);
}
out->append(iter->first);
out->append(1, KEY_VALUE_SEPARATOR);
out->append(KEY_VALUE_SEPARATOR);
out->append(iter->second);
}
}
}
}
int from_string(const string& composed_marker, bool has_shards, const string& oid) {
value_by_shards.clear();
if (!has_shards) {
add(oid, composed_marker);
} else {
list<string> shards;
get_str_list(composed_marker, SHARDS_SEPARATOR.c_str(), shards);
list<string>::const_iterator iter = shards.begin();
for (; iter != shards.end(); ++iter) {
size_t pos = iter->find(KEY_VALUE_SEPARATOR);
if (pos == string::npos)
return -EINVAL;
string name = iter->substr(0, pos);
value_by_shards[name] = iter->substr(pos + 1, iter->length() - pos - 1);
}
}
return 0;
}
};
/* bucket index */
@ -223,6 +291,18 @@ public:
start_obj(_start_obj), filter_prefix(_filter_prefix), num_entries(_num_entries) {}
};
class CLSRGWIssueBILogList : public CLSRGWConcurrentIO<map<string, cls_rgw_bi_log_list_ret> > {
BucketIndexShardsManager& marker_mgr;
uint32_t max;
protected:
int issue_op();
public:
CLSRGWIssueBILogList(librados::IoCtx& io_ctx, BucketIndexShardsManager& _marker_mgr, uint32_t _max,
map<string, struct cls_rgw_bi_log_list_ret>& bi_log_lists, uint32_t max_aio) :
CLSRGWConcurrentIO<map<string, cls_rgw_bi_log_list_ret> >(io_ctx, bi_log_lists, max_aio),
marker_mgr(_marker_mgr), max(_max) {}
};
/**
* Check the bucket index.
*
@ -266,10 +346,6 @@ void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist
void cls_rgw_suggest_changes(librados::ObjectWriteOperation& o, bufferlist& updates);
/* bucket index log */
int cls_rgw_bi_log_list(librados::IoCtx& io_ctx, string& oid, string& marker, uint32_t max,
list<rgw_bi_log_entry>& entries, bool *truncated);
int cls_rgw_bi_log_trim(librados::IoCtx& io_ctx, string& oid, string& start_marker, string& end_marker);
/* usage logging */

View File

@ -1676,6 +1676,15 @@ int RGWRados::open_bucket_data_extra_ctx(rgw_bucket& bucket, librados::IoCtx& da
return 0;
}
void RGWRados::build_bucket_index_marker(const string& shard_name, const string& shard_marker,
string *marker) {
if (marker) {
*marker = shard_name;
marker->append(BucketIndexShardsManager::KEY_VALUE_SEPARATOR);
marker->append(shard_marker);
}
}
int RGWRados::open_bucket_index_ctx(rgw_bucket& bucket, librados::IoCtx& index_ctx)
{
int r = open_bucket_pool_ctx(bucket.name, bucket.index_pool, index_ctx);
@ -5479,10 +5488,10 @@ int RGWRados::get_bucket_stats(rgw_bucket& bucket, string *bucket_ver, string *m
for(; iter != headers.end(); ++iter) {
accumulate_raw_stats(iter->second, stats);
snprintf(buf, sizeof(buf), "%lu", iter->second.ver);
ver_mgr.add_item(iter->first, string(buf));
ver_mgr.add(iter->first, string(buf));
snprintf(buf, sizeof(buf), "%lu", iter->second.master_ver);
master_ver_mgr.add_item(iter->first, string(buf));
marker_mgr.add_item(iter->first, iter->second.max_marker);
master_ver_mgr.add(iter->first, string(buf));
marker_mgr.add(iter->first, iter->second.max_marker);
}
ver_mgr.to_string(bucket_ver);
master_ver_mgr.to_string(master_ver);
@ -6096,22 +6105,80 @@ int RGWRados::list_raw_objects(rgw_bucket& pool, const string& prefix_filter,
int RGWRados::list_bi_log_entries(rgw_bucket& bucket, string& marker, uint32_t max,
std::list<rgw_bi_log_entry>& result, bool *truncated)
{
ldout(cct, 20) << __func__ << bucket << " marker " << marker << " max " << max << dendl;
result.clear();
librados::IoCtx index_ctx;
string oid;
int r = open_bucket_index(bucket, index_ctx, oid);
map<string, cls_rgw_bi_log_list_ret> bi_log_lists;
int r = open_bucket_index(bucket, index_ctx, bi_log_lists);
if (r < 0)
return r;
std::list<rgw_bi_log_entry> entries;
int ret = cls_rgw_bi_log_list(index_ctx, oid, marker, max - result.size(), entries, truncated);
if (ret < 0)
return ret;
BucketIndexShardsManager marker_mgr;
bool has_shards = (bi_log_lists.size() > 1);
// If there are multiple shards for the bucket index object, the marker
// should have the pattern '{shard_oid_1}#{shard_marker_1},{shard_oid_2}#
// {shard_marker_2}...', if there is no sharding, the bi_log_list should
// only contain one record, and the key is the bucket index object id.
r = marker_mgr.from_string(marker, has_shards, bi_log_lists.begin()->first);
if (r < 0)
return r;
r = CLSRGWIssueBILogList(index_ctx, marker_mgr, max, bi_log_lists, cct->_conf->rgw_bucket_index_max_aio)();
if (r < 0)
return r;
std::list<rgw_bi_log_entry>::iterator iter;
for (iter = entries.begin(); iter != entries.end(); ++iter) {
result.push_back(*iter);
vector<list<rgw_bi_log_entry>::iterator> vcurrents;
vector<list<rgw_bi_log_entry>::iterator> vends;
vector<string> vnames;
if (truncated) {
*truncated = false;
}
map<string, cls_rgw_bi_log_list_ret>::iterator miter = bi_log_lists.begin();
for (; miter != bi_log_lists.end(); ++miter) {
vnames.push_back(miter->first);
vcurrents.push_back(miter->second.entries.begin());
vends.push_back(miter->second.entries.end());
if (truncated) {
*truncated = (*truncated || miter->second.truncated);
}
}
bool has_more = true;
while (result.size() < max && has_more) {
has_more = false;
for (size_t i = 0;
result.size() < max && i < vcurrents.size() && vcurrents[i] != vends[i];
++vcurrents[i], ++i) {
if (vcurrents[i] != vends[i]) {
rgw_bi_log_entry& entry = *(vcurrents[i]);
if (has_shards) {
// Put the shard name as part of the ID, so that caller can easy find out
// the next marker
string tmp_id;
build_bucket_index_marker(vnames[i], entry.id, &tmp_id);
entry.id.swap(tmp_id);
}
marker_mgr.add(vnames[i], entry.id);
result.push_back(entry);
has_more = true;
}
}
}
for (size_t i = 0; i < vcurrents.size(); ++i) {
if (truncated) {
*truncated = (*truncated || (vcurrents[i] != vends[i]));
}
}
// Refresh marker, if there are multiple shards, the output will look like
// '{shard_oid_1}#{shard_marker_1},{shard_oid_2}#{shard_marker_2}...',
// if there is no sharding, the simply marker (without oid) is returned
if (has_shards) {
marker_mgr.to_string(&marker);
} else {
marker = result.rbegin()->id;
}
return 0;

View File

@ -1261,6 +1261,8 @@ class RGWRados
template<typename T>
int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
map<string, T>& bucket_objs);
void build_bucket_index_marker(const string& shard_name, const string& shard_marker,
string *marker);
struct GetObjState {
librados::IoCtx io_ctx;