rgw: implement quota handlers for user stats

Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
This commit is contained in:
Yehuda Sadeh 2014-01-10 14:35:31 -08:00
parent c13634e888
commit c1c40df12b
3 changed files with 88 additions and 12 deletions

View File

@ -75,18 +75,16 @@ public:
int async_refresh(const string& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs);
void async_refresh_response(const string& user, rgw_bucket& bucket, RGWStorageStats& stats);
class AsyncRefreshHandler : public RGWGetBucketStats_CB {
class AsyncRefreshHandler {
protected:
RGWRados *store;
RGWQuotaCache<T> *cache;
string user;
public:
AsyncRefreshHandler(RGWRados *_store, RGWQuotaCache<T> *_cache,
const string& _user, rgw_bucket& _bucket) : RGWGetBucketStats_CB(_bucket), store(_store),
cache(_cache), user(_user) {}
AsyncRefreshHandler(RGWRados *_store, RGWQuotaCache<T> *_cache) : store(_store), cache(_cache) {}
virtual ~AsyncRefreshHandler() {}
virtual int init_fetch() = 0;
virtual void handle_response(int r) = 0;
virtual void drop_reference() = 0;
};
virtual AsyncRefreshHandler *allocate_refresh_handler(const string& user, rgw_bucket& bucket) = 0;
@ -140,7 +138,7 @@ int RGWQuotaCache<T>::async_refresh(const string& user, rgw_bucket& bucket, RGWQ
int ret = handler->init_fetch();
if (ret < 0) {
async_refcount->put();
handler->put();
handler->drop_reference();
return ret;
}
@ -231,14 +229,18 @@ void RGWQuotaCache<T>::adjust_stats(const string& user, rgw_bucket& bucket, int
map_find_and_update(user, bucket, &update);
}
class BucketAsyncRefreshHandler : public RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler {
class BucketAsyncRefreshHandler : public RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler,
public RGWGetBucketStats_CB {
string user;
public:
BucketAsyncRefreshHandler(RGWRados *_store, RGWQuotaCache<rgw_bucket> *_cache,
const string& _user, rgw_bucket& _bucket) :
RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler(_store, _cache, _user, _bucket) {}
RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler(_store, _cache),
RGWGetBucketStats_CB(_bucket), user(_user) {}
int init_fetch();
void drop_reference() { put(); }
void handle_response(int r);
int init_fetch();
};
int BucketAsyncRefreshHandler::init_fetch()
@ -328,6 +330,81 @@ int RGWBucketStatsCache::fetch_stats_from_storage(const string& user, rgw_bucket
return 0;
}
class UserAsyncRefreshHandler : public RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler,
public RGWGetUserStats_CB {
rgw_bucket bucket;
public:
UserAsyncRefreshHandler(RGWRados *_store, RGWQuotaCache<rgw_bucket> *_cache,
const string& _user, rgw_bucket& _bucket) :
RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler(_store, _cache),
RGWGetUserStats_CB(_user),
bucket(_bucket) {}
void drop_reference() { put(); }
int init_fetch();
void handle_response(int r);
};
int UserAsyncRefreshHandler::init_fetch()
{
ldout(store->ctx(), 20) << "initiating async quota refresh for user=" << user << dendl;
int r = store->get_user_stats_async(user, this);
if (r < 0) {
ldout(store->ctx(), 0) << "could not get bucket info for user=" << user << dendl;
/* get_bucket_stats_async() dropped our reference already */
return r;
}
return 0;
}
void UserAsyncRefreshHandler::handle_response(int r)
{
if (r < 0) {
ldout(store->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl;
return; /* nothing to do here */
}
cache->async_refresh_response(user, bucket, stats);
}
class RGWUserStatsCache : public RGWQuotaCache<rgw_bucket> {
protected:
bool map_find(const string& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs) {
return stats_map.find(bucket, qs);
}
bool map_find_and_update(const string& user, rgw_bucket& bucket, lru_map<rgw_bucket, RGWQuotaCacheStats>::UpdateContext *ctx) {
return stats_map.find_and_update(bucket, NULL, ctx);
}
void map_add(const string& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs) {
stats_map.add(bucket, qs);
}
int fetch_stats_from_storage(const string& user, rgw_bucket& bucket, RGWStorageStats& stats);
public:
RGWUserStatsCache(RGWRados *_store) : RGWQuotaCache(store, store->ctx()->_conf->rgw_bucket_quota_cache_size) {
}
AsyncRefreshHandler *allocate_refresh_handler(const string& user, rgw_bucket& bucket) {
return new UserAsyncRefreshHandler(store, this, user, bucket);
}
};
int RGWUserStatsCache::fetch_stats_from_storage(const string& user, rgw_bucket& bucket, RGWStorageStats& stats)
{
int r = store->get_user_stats(user, stats);
if (r < 0) {
ldout(store->ctx(), 0) << "could not get user stats for user=" << user << dendl;
return r;
}
return 0;
}
class RGWQuotaHandlerImpl : public RGWQuotaHandler {
RGWRados *store;

View File

@ -4730,7 +4730,7 @@ public:
}
};
int RGWRados::get_bucket_stats(const string& user, RGWStorageStats& stats)
int RGWRados::get_user_stats(const string& user, RGWStorageStats& stats)
{
cls_user_header header;
int r = cls_user_get_header(user, &header);

View File

@ -1342,7 +1342,6 @@ public:
}
int decode_policy(bufferlist& bl, ACLOwner *owner);
int get_bucket_stats(const string& user, RGWStorageStats& stats);
int get_bucket_stats(rgw_bucket& bucket, uint64_t *bucket_ver, uint64_t *master_ver, map<RGWObjCategory, RGWStorageStats>& stats,
string *max_marker);
int get_bucket_stats_async(rgw_bucket& bucket, RGWGetBucketStats_CB *cb);