mirror of
https://github.com/ceph/ceph
synced 2025-03-06 08:20:12 +00:00
rgw: new class methods for handling usage information
The new methods are: - user_usage_log_add: add new usage information - user_usage_log_read: get usage information - user_usage_log_trim: remove usage information Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
This commit is contained in:
parent
ec689e3e7e
commit
9a70ec94c9
266
src/cls_rgw.cc
266
src/cls_rgw.cc
@ -21,6 +21,9 @@ cls_method_handle_t h_rgw_bucket_list;
|
||||
cls_method_handle_t h_rgw_bucket_prepare_op;
|
||||
cls_method_handle_t h_rgw_bucket_complete_op;
|
||||
cls_method_handle_t h_rgw_dir_suggest_changes;
|
||||
cls_method_handle_t h_rgw_user_usage_log_add;
|
||||
cls_method_handle_t h_rgw_user_usage_log_read;
|
||||
cls_method_handle_t h_rgw_user_usage_log_trim;
|
||||
|
||||
|
||||
#define ROUND_BLOCK_SIZE 4096
|
||||
@ -408,6 +411,266 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx, bufferlist *in, bufferlis
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void usage_record_prefix_by_time(uint64_t epoch, string& key)
|
||||
{
|
||||
char buf[32];
|
||||
snprintf(buf, sizeof(buf), "%011llu", (long long unsigned)epoch);
|
||||
key = buf;
|
||||
}
|
||||
|
||||
static void usage_record_name_by_time(uint64_t epoch, string& user, string& bucket, string& key)
|
||||
{
|
||||
char buf[32 + user.size() + bucket.size()];
|
||||
snprintf(buf, sizeof(buf), "%011llu_%s_%s", (long long unsigned)epoch, user.c_str(), bucket.c_str());
|
||||
key = buf;
|
||||
}
|
||||
|
||||
static void usage_record_name_by_user(string& user, uint64_t epoch, string& bucket, string& key)
|
||||
{
|
||||
char buf[32 + user.size() + bucket.size()];
|
||||
snprintf(buf, sizeof(buf), "%s_%011llu_%s", user.c_str(), (long long unsigned)epoch, bucket.c_str());
|
||||
key = buf;
|
||||
}
|
||||
|
||||
static int usage_record_decode(bufferlist& record_bl, rgw_usage_log_entry& e)
|
||||
{
|
||||
bufferlist::iterator kiter = record_bl.begin();
|
||||
try {
|
||||
::decode(e, kiter);
|
||||
} catch (buffer::error& err) {
|
||||
CLS_LOG("ERROR: usage_record_decode(): failed to decode record_bl\n");
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int rgw_user_usage_log_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
|
||||
{
|
||||
CLS_LOG("rgw_user_usage_log_add()");
|
||||
|
||||
bufferlist::iterator in_iter = in->begin();
|
||||
rgw_cls_usage_log_add_op op;
|
||||
|
||||
try {
|
||||
::decode(op, in_iter);
|
||||
} catch (buffer::error& err) {
|
||||
CLS_LOG("ERROR: rgw_user_usage_log_add(): failed to decode request\n");
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
rgw_usage_log_info& info = op.info;
|
||||
vector<rgw_usage_log_entry>::iterator iter;
|
||||
|
||||
for (iter = info.entries.begin(); iter != info.entries.end(); ++iter) {
|
||||
rgw_usage_log_entry& entry = *iter;
|
||||
string key_by_time;
|
||||
usage_record_name_by_time(entry.epoch, entry.owner, entry.bucket, key_by_time);
|
||||
|
||||
CLS_LOG("rgw_user_usage_log_add user=%s bucket=%s\n", entry.owner.c_str(), entry.bucket.c_str());
|
||||
|
||||
bufferlist record_bl;
|
||||
int ret = cls_cxx_map_read_key(hctx, key_by_time, &record_bl);
|
||||
if (ret < 0 && ret != -ENOENT) {
|
||||
CLS_LOG("ERROR: rgw_user_usage_log_add(): cls_cxx_map_read_key returned %d\n", ret);
|
||||
return -EINVAL;
|
||||
}
|
||||
if (ret >= 0) {
|
||||
rgw_usage_log_entry e;
|
||||
ret = usage_record_decode(record_bl, e);
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
CLS_LOG("rgw_user_usage_log_add aggregating existing bucket\n");
|
||||
entry.bytes_sent += e.bytes_sent;
|
||||
entry.bytes_received += e.bytes_received;
|
||||
}
|
||||
|
||||
bufferlist new_record_bl;
|
||||
::encode(entry, new_record_bl);
|
||||
ret = cls_cxx_map_write_key(hctx, key_by_time, &new_record_bl);
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
|
||||
string key_by_user;
|
||||
usage_record_name_by_user(entry.owner, entry.epoch, entry.bucket, key_by_user);
|
||||
ret = cls_cxx_map_write_key(hctx, key_by_user, &new_record_bl);
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int usage_iterate_range(cls_method_context_t hctx, uint64_t start, uint64_t end,
|
||||
string& user, string& key_iter, uint32_t max_entries, bool *truncated,
|
||||
int (*cb)(cls_method_context_t, const string&, rgw_usage_log_entry&, void *),
|
||||
void *param)
|
||||
{
|
||||
CLS_LOG("usage_iterate_range");
|
||||
|
||||
map<string, bufferlist> keys;
|
||||
#define NUM_KEYS 32
|
||||
string filter_prefix;
|
||||
string start_key, end_key;
|
||||
bool by_user = !user.empty();
|
||||
uint32_t i = 0;
|
||||
string user_key;
|
||||
|
||||
if (truncated)
|
||||
*truncated = false;
|
||||
|
||||
if (!by_user) {
|
||||
usage_record_prefix_by_time(end, end_key);
|
||||
} else {
|
||||
user_key = user;
|
||||
user_key.append("_");
|
||||
}
|
||||
|
||||
if (key_iter.empty()) {
|
||||
if (by_user) {
|
||||
start_key = user;
|
||||
} else {
|
||||
usage_record_prefix_by_time(start, start_key);
|
||||
}
|
||||
} else {
|
||||
start_key = key_iter;
|
||||
}
|
||||
|
||||
do {
|
||||
int ret = cls_cxx_map_read_keys(hctx, start_key, filter_prefix, NUM_KEYS, &keys);
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
|
||||
|
||||
map<string, bufferlist>::iterator iter = keys.begin();
|
||||
if (iter == keys.end())
|
||||
break;
|
||||
|
||||
for (; iter != keys.end(); ++iter) {
|
||||
const string& key = iter->first;
|
||||
rgw_usage_log_entry e;
|
||||
|
||||
if (!by_user && key.compare(end_key) >= 0)
|
||||
return 0;
|
||||
|
||||
if (by_user && key.compare(0, user_key.size(), user_key) != 0)
|
||||
return 0;
|
||||
|
||||
ret = usage_record_decode(iter->second, e);
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
|
||||
if (e.epoch < start)
|
||||
continue;
|
||||
|
||||
/* keys are sorted by epoch, so once we're past end we're done */
|
||||
if (e.epoch >= end)
|
||||
return 0;
|
||||
|
||||
ret = cb(hctx, key, e, param);
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
|
||||
|
||||
i++;
|
||||
if (max_entries && (i > max_entries)) {
|
||||
*truncated = true;
|
||||
key_iter = key;
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
iter--;
|
||||
start_key = iter->first;
|
||||
} while (true);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int usage_log_read_cb(cls_method_context_t hctx, const string& key, rgw_usage_log_entry& entry, void *param)
|
||||
{
|
||||
map<rgw_user_bucket, rgw_usage_log_entry> *usage = (map<rgw_user_bucket, rgw_usage_log_entry> *)param;
|
||||
rgw_user_bucket ub(entry.owner, entry.bucket);
|
||||
rgw_usage_log_entry& le = (*usage)[ub];
|
||||
le.bytes_sent += entry.bytes_sent;
|
||||
le.bytes_received += entry.bytes_received;
|
||||
le.epoch = entry.epoch;
|
||||
le.owner = entry.owner;
|
||||
le.bucket = entry.bucket;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int rgw_user_usage_log_read(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
|
||||
{
|
||||
CLS_LOG("rgw_user_usage_log_read()");
|
||||
|
||||
bufferlist::iterator in_iter = in->begin();
|
||||
rgw_cls_usage_log_read_op op;
|
||||
|
||||
try {
|
||||
::decode(op, in_iter);
|
||||
} catch (buffer::error& err) {
|
||||
CLS_LOG("ERROR: rgw_user_usage_log_read(): failed to decode request\n");
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
rgw_cls_usage_log_read_ret ret_info;
|
||||
map<rgw_user_bucket, rgw_usage_log_entry> *usage = &ret_info.usage;
|
||||
string iter = op.iter;
|
||||
#define MAX_ENTRIES 1000
|
||||
uint32_t max_entries = (op.max_entries ? op.max_entries : MAX_ENTRIES);
|
||||
int ret = usage_iterate_range(hctx, op.start_epoch, op.end_epoch, op.owner, iter, max_entries, &ret_info.truncated, usage_log_read_cb, (void *)usage);
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
|
||||
if (ret_info.truncated)
|
||||
ret_info.next_iter = iter;
|
||||
|
||||
::encode(ret_info, *out);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int usage_log_trim_cb(cls_method_context_t hctx, const string& key, rgw_usage_log_entry& entry, void *param)
|
||||
{
|
||||
string key_by_time;
|
||||
string key_by_user;
|
||||
|
||||
usage_record_name_by_time(entry.epoch, entry.owner, entry.bucket, key_by_time);
|
||||
usage_record_name_by_user(entry.owner, entry.epoch, entry.bucket, key_by_user);
|
||||
|
||||
int ret = cls_cxx_map_remove_key(hctx, key_by_time);
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
|
||||
return cls_cxx_map_remove_key(hctx, key_by_user);
|
||||
}
|
||||
|
||||
int rgw_user_usage_log_trim(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
|
||||
{
|
||||
CLS_LOG("rgw_user_usage_log_trim()");
|
||||
|
||||
/* only continue if object exists! */
|
||||
int ret = cls_cxx_stat(hctx, NULL, NULL);
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
|
||||
bufferlist::iterator in_iter = in->begin();
|
||||
rgw_cls_usage_log_trim_op op;
|
||||
|
||||
try {
|
||||
::decode(op, in_iter);
|
||||
} catch (buffer::error& err) {
|
||||
CLS_LOG("ERROR: rgw_user_log_usage_log_trim(): failed to decode request\n");
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
string iter;
|
||||
ret = usage_iterate_range(hctx, op.start_epoch, op.end_epoch, op.user, iter, 0, NULL, usage_log_trim_cb, NULL);
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void __cls_init()
|
||||
{
|
||||
CLS_LOG("Loaded rgw class!");
|
||||
@ -418,6 +681,9 @@ void __cls_init()
|
||||
cls_register_cxx_method(h_class, "bucket_prepare_op", CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, rgw_bucket_prepare_op, &h_rgw_bucket_prepare_op);
|
||||
cls_register_cxx_method(h_class, "bucket_complete_op", CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, rgw_bucket_complete_op, &h_rgw_bucket_complete_op);
|
||||
cls_register_cxx_method(h_class, "dir_suggest_changes", CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, rgw_dir_suggest_changes, &h_rgw_dir_suggest_changes);
|
||||
cls_register_cxx_method(h_class, "user_usage_log_add", CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, rgw_user_usage_log_add, &h_rgw_user_usage_log_add);
|
||||
cls_register_cxx_method(h_class, "user_usage_log_read", CLS_METHOD_RD | CLS_METHOD_PUBLIC, rgw_user_usage_log_read, &h_rgw_user_usage_log_read);
|
||||
cls_register_cxx_method(h_class, "user_usage_log_trim", CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, rgw_user_usage_log_trim, &h_rgw_user_usage_log_trim);
|
||||
|
||||
return;
|
||||
}
|
||||
|
@ -317,4 +317,202 @@ struct rgw_cls_list_ret
|
||||
};
|
||||
WRITE_CLASS_ENCODER(rgw_cls_list_ret)
|
||||
|
||||
|
||||
struct rgw_usage_log_entry {
|
||||
string owner;
|
||||
string bucket;
|
||||
uint64_t epoch;
|
||||
uint64_t bytes_sent;
|
||||
uint64_t bytes_received;
|
||||
|
||||
rgw_usage_log_entry() : bytes_sent(0), bytes_received(0) {}
|
||||
rgw_usage_log_entry(string& o, string& b, uint64_t s, uint64_t r) : owner(o), bucket(b), bytes_sent(s), bytes_received(r) {}
|
||||
|
||||
void encode(bufferlist& bl) const {
|
||||
ENCODE_START(1, 1, bl);
|
||||
::encode(owner, bl);
|
||||
::encode(bucket, bl);
|
||||
::encode(epoch, bl);
|
||||
::encode(bytes_sent, bl);
|
||||
::encode(bytes_received, bl);
|
||||
ENCODE_FINISH(bl);
|
||||
}
|
||||
|
||||
|
||||
void decode(bufferlist::iterator& bl) {
|
||||
DECODE_START(1, bl);
|
||||
::decode(owner, bl);
|
||||
::decode(bucket, bl);
|
||||
::decode(epoch, bl);
|
||||
::decode(bytes_sent, bl);
|
||||
::decode(bytes_received, bl);
|
||||
DECODE_FINISH(bl);
|
||||
}
|
||||
|
||||
void aggregate(const rgw_usage_log_entry& e) {
|
||||
if (owner.empty()) {
|
||||
owner = e.owner;
|
||||
bucket = e.bucket;
|
||||
epoch = e.epoch;
|
||||
}
|
||||
bytes_sent += e.bytes_sent;
|
||||
bytes_received += e.bytes_received;
|
||||
}
|
||||
};
|
||||
WRITE_CLASS_ENCODER(rgw_usage_log_entry)
|
||||
|
||||
struct rgw_usage_log_info {
|
||||
vector<rgw_usage_log_entry> entries;
|
||||
|
||||
void encode(bufferlist& bl) const {
|
||||
ENCODE_START(1, 1, bl);
|
||||
::encode(entries, bl);
|
||||
ENCODE_FINISH(bl);
|
||||
}
|
||||
|
||||
void decode(bufferlist::iterator& bl) {
|
||||
DECODE_START(1, bl);
|
||||
::decode(entries, bl);
|
||||
DECODE_FINISH(bl);
|
||||
}
|
||||
|
||||
rgw_usage_log_info() {}
|
||||
};
|
||||
WRITE_CLASS_ENCODER(rgw_usage_log_info)
|
||||
|
||||
struct rgw_cls_usage_log_add_op {
|
||||
rgw_usage_log_info info;
|
||||
|
||||
void encode(bufferlist& bl) const {
|
||||
ENCODE_START(1, 1, bl);
|
||||
::encode(info, bl);
|
||||
ENCODE_FINISH(bl);
|
||||
}
|
||||
|
||||
void decode(bufferlist::iterator& bl) {
|
||||
DECODE_START(1, bl);
|
||||
::decode(info, bl);
|
||||
DECODE_FINISH(bl);
|
||||
}
|
||||
};
|
||||
WRITE_CLASS_ENCODER(rgw_cls_usage_log_add_op)
|
||||
|
||||
struct rgw_cls_usage_log_read_op {
|
||||
uint64_t start_epoch;
|
||||
uint64_t end_epoch;
|
||||
string owner;
|
||||
|
||||
string iter; // should be empty for the first call, non empty for subsequent calls
|
||||
uint32_t max_entries;
|
||||
|
||||
void encode(bufferlist& bl) const {
|
||||
ENCODE_START(1, 1, bl);
|
||||
::encode(start_epoch, bl);
|
||||
::encode(end_epoch, bl);
|
||||
::encode(owner, bl);
|
||||
::encode(iter, bl);
|
||||
::encode(max_entries, bl);
|
||||
ENCODE_FINISH(bl);
|
||||
}
|
||||
|
||||
void decode(bufferlist::iterator& bl) {
|
||||
DECODE_START(1, bl);
|
||||
::decode(start_epoch, bl);
|
||||
::decode(end_epoch, bl);
|
||||
::decode(owner, bl);
|
||||
::decode(iter, bl);
|
||||
::decode(max_entries, bl);
|
||||
DECODE_FINISH(bl);
|
||||
}
|
||||
};
|
||||
WRITE_CLASS_ENCODER(rgw_cls_usage_log_read_op)
|
||||
|
||||
struct rgw_user_bucket {
|
||||
string user;
|
||||
string bucket;
|
||||
|
||||
rgw_user_bucket() {}
|
||||
rgw_user_bucket(string &u, string& b) : user(u), bucket(b) {}
|
||||
|
||||
bool operator()(const rgw_user_bucket& ub1, const rgw_user_bucket& ub2) const
|
||||
{
|
||||
int comp = ub1.user.compare(ub2.user);
|
||||
if (comp < 0)
|
||||
return true;
|
||||
else if (!comp)
|
||||
return ub1.bucket.compare(ub2.bucket) < 0;
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
void encode(bufferlist& bl) const {
|
||||
ENCODE_START(1, 1, bl);
|
||||
::encode(user, bl);
|
||||
::encode(bucket, bl);
|
||||
ENCODE_FINISH(bl);
|
||||
}
|
||||
|
||||
void decode(bufferlist::iterator& bl) {
|
||||
DECODE_START(1, bl);
|
||||
::decode(user, bl);
|
||||
::decode(bucket, bl);
|
||||
DECODE_FINISH(bl);
|
||||
}
|
||||
|
||||
bool operator<(const rgw_user_bucket& ub2) const {
|
||||
int comp = user.compare(ub2.user);
|
||||
if (comp < 0)
|
||||
return true;
|
||||
else if (!comp)
|
||||
return bucket.compare(ub2.bucket) < 0;
|
||||
|
||||
return false;
|
||||
}
|
||||
};
|
||||
WRITE_CLASS_ENCODER(rgw_user_bucket)
|
||||
|
||||
struct rgw_cls_usage_log_read_ret {
|
||||
map<rgw_user_bucket, rgw_usage_log_entry> usage;
|
||||
bool truncated;
|
||||
string next_iter;
|
||||
|
||||
void encode(bufferlist& bl) const {
|
||||
ENCODE_START(1, 1, bl);
|
||||
::encode(usage, bl);
|
||||
::encode(truncated, bl);
|
||||
::encode(next_iter, bl);
|
||||
ENCODE_FINISH(bl);
|
||||
}
|
||||
|
||||
void decode(bufferlist::iterator& bl) {
|
||||
DECODE_START(1, bl);
|
||||
::decode(usage, bl);
|
||||
::decode(truncated, bl);
|
||||
::decode(next_iter, bl);
|
||||
DECODE_FINISH(bl);
|
||||
}
|
||||
};
|
||||
WRITE_CLASS_ENCODER(rgw_cls_usage_log_read_ret)
|
||||
|
||||
struct rgw_cls_usage_log_trim_op {
|
||||
uint64_t start_epoch;
|
||||
uint64_t end_epoch;
|
||||
string user;
|
||||
|
||||
void encode(bufferlist& bl) const {
|
||||
ENCODE_START(1, 1, bl);
|
||||
::encode(start_epoch, bl);
|
||||
::encode(end_epoch, bl);
|
||||
ENCODE_FINISH(bl);
|
||||
}
|
||||
|
||||
void decode(bufferlist::iterator& bl) {
|
||||
DECODE_START(1, bl);
|
||||
::decode(start_epoch, bl);
|
||||
::decode(end_epoch, bl);
|
||||
DECODE_FINISH(bl);
|
||||
}
|
||||
};
|
||||
WRITE_CLASS_ENCODER(rgw_cls_usage_log_trim_op)
|
||||
|
||||
#endif
|
||||
|
Loading…
Reference in New Issue
Block a user