Merge pull request #1772 from ceph/wip-8169

rgw: calculate user manifest

Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
This commit is contained in:
Josh Durgin 2014-05-08 15:35:45 -07:00
commit d34cc1e7cd
2 changed files with 112 additions and 25 deletions

View File

@ -685,8 +685,11 @@ done_err:
return ret;
}
int RGWGetObj::iterate_user_manifest_parts(rgw_bucket& bucket, string& obj_prefix, RGWAccessControlPolicy *bucket_policy,
uint64_t *ptotal_len, bool read_data)
static int iterate_user_manifest_parts(CephContext *cct, RGWRados *store, off_t ofs, off_t end,
rgw_bucket& bucket, string& obj_prefix, RGWAccessControlPolicy *bucket_policy,
uint64_t *ptotal_len,
int (*cb)(rgw_bucket& bucket, RGWObjEnt& ent, RGWAccessControlPolicy *bucket_policy,
off_t start_ofs, off_t end_ofs, void *param), void *cb_param)
{
uint64_t obj_ofs = 0, len_count = 0;
bool found_start = false, found_end = false;
@ -697,7 +700,7 @@ int RGWGetObj::iterate_user_manifest_parts(rgw_bucket& bucket, string& obj_prefi
map<string, bool> common_prefixes;
vector<RGWObjEnt> objs;
utime_t start_time = ceph_clock_now(s->cct);
utime_t start_time = ceph_clock_now(cct);
do {
#define MAX_LIST_OBJS 100
@ -727,20 +730,20 @@ int RGWGetObj::iterate_user_manifest_parts(rgw_bucket& bucket, string& obj_prefi
}
perfcounter->tinc(l_rgw_get_lat,
(ceph_clock_now(s->cct) - start_time));
(ceph_clock_now(cct) - start_time));
if (found_start) {
len_count += end_ofs - start_ofs;
if (read_data) {
r = read_user_manifest_part(bucket, ent, bucket_policy, start_ofs, end_ofs);
if (cb) {
r = cb(bucket, ent, bucket_policy, start_ofs, end_ofs, cb_param);
if (r < 0)
return r;
}
}
marker = ent.name;
start_time = ceph_clock_now(s->cct);
start_time = ceph_clock_now(cct);
}
} while (is_truncated && !found_end);
@ -750,6 +753,13 @@ int RGWGetObj::iterate_user_manifest_parts(rgw_bucket& bucket, string& obj_prefi
return 0;
}
static int get_obj_user_manifest_iterate_cb(rgw_bucket& bucket, RGWObjEnt& ent, RGWAccessControlPolicy *bucket_policy, off_t start_ofs, off_t end_ofs,
void *param)
{
RGWGetObj *op = (RGWGetObj *)param;
return op->read_user_manifest_part(bucket, ent, bucket_policy, start_ofs, end_ofs);
}
int RGWGetObj::handle_user_manifest(const char *prefix)
{
ldout(s->cct, 2) << "RGWGetObj::handle_user_manifest() prefix=" << prefix << dendl;
@ -789,13 +799,13 @@ int RGWGetObj::handle_user_manifest(const char *prefix)
}
/* dry run to find out total length */
int r = iterate_user_manifest_parts(bucket, obj_prefix, bucket_policy, &total_len, false);
int r = iterate_user_manifest_parts(s->cct, store, ofs, end, bucket, obj_prefix, bucket_policy, &total_len, NULL, NULL);
if (r < 0)
return r;
s->obj_size = total_len;
r = iterate_user_manifest_parts(bucket, obj_prefix, bucket_policy, NULL, true);
r = iterate_user_manifest_parts(s->cct, store, ofs, end, bucket, obj_prefix, bucket_policy, NULL, get_obj_user_manifest_iterate_cb, (void *)this);
if (r < 0)
return r;
@ -1495,6 +1505,45 @@ void RGWPutObj::pre_exec()
rgw_bucket_object_pre_exec(s);
}
static int put_obj_user_manifest_iterate_cb(rgw_bucket& bucket, RGWObjEnt& ent, RGWAccessControlPolicy *bucket_policy, off_t start_ofs, off_t end_ofs,
void *param)
{
RGWPutObj *op = (RGWPutObj *)param;
return op->user_manifest_iterate_cb(bucket, ent, bucket_policy, start_ofs, end_ofs);
}
int RGWPutObj::user_manifest_iterate_cb(rgw_bucket& bucket, RGWObjEnt& ent, RGWAccessControlPolicy *bucket_policy, off_t start_ofs, off_t end_ofs)
{
rgw_obj part(bucket, ent.name);
map<string, bufferlist> attrs;
int ret = get_obj_attrs(store, s, part, attrs, NULL, NULL);
if (ret < 0) {
return ret;
}
map<string, bufferlist>::iterator iter = attrs.find(RGW_ATTR_ETAG);
if (iter == attrs.end()) {
return 0;
}
bufferlist& bl = iter->second;
const char *buf = bl.c_str();
int len = bl.length();
while (len > 0 && buf[len - 1] == '\0') {
len--;
}
if (len > 0) {
user_manifest_parts_hash->Update((const byte *)bl.c_str(), len);
}
if (s->cct->_conf->subsys.should_gather(ceph_subsys_rgw, 20)) {
string e(bl.c_str(), bl.length());
ldout(s->cct, 20) << __func__ << ": appending user manifest etag: " << e << dendl;
}
return 0;
}
void RGWPutObj::execute()
{
RGWPutObjProcessor *processor = NULL;
@ -1508,6 +1557,8 @@ void RGWPutObj::execute()
int len;
map<string, string>::iterator iter;
bool need_calc_md5 = (obj_manifest == NULL);
perfcounter->inc(l_rgw_put);
ret = -EINVAL;
@ -1570,7 +1621,9 @@ void RGWPutObj::execute()
if (ret < 0)
goto done;
hash.Update(data_ptr, len);
if (need_calc_md5) {
hash.Update(data_ptr, len);
}
ret = processor->throttle_data(handle);
if (ret < 0)
@ -1592,30 +1645,61 @@ void RGWPutObj::execute()
goto done;
}
hash.Final(m);
if (need_calc_md5) {
hash.Final(m);
buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5);
if (supplied_md5_b64 && strcmp(calc_md5, supplied_md5)) {
ret = -ERR_BAD_DIGEST;
goto done;
buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5);
etag = calc_md5;
}
policy.encode(aclbl);
etag = calc_md5;
attrs[RGW_ATTR_ACL] = aclbl;
if (obj_manifest) {
bufferlist manifest_bl;
string manifest_obj_prefix;
string manifest_bucket;
RGWBucketInfo bucket_info;
char etag_buf[CEPH_CRYPTO_MD5_DIGESTSIZE];
char etag_buf_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 16];
manifest_bl.append(obj_manifest, strlen(obj_manifest) + 1);
attrs[RGW_ATTR_USER_MANIFEST] = manifest_bl;
user_manifest_parts_hash = &hash;
string prefix_str = obj_manifest;
int pos = prefix_str.find('/');
if (pos < 0) {
ldout(s->cct, 0) << "bad user manifest, missing slash separator: " << obj_manifest << dendl;
goto done;
}
manifest_bucket = prefix_str.substr(0, pos);
manifest_obj_prefix = prefix_str.substr(pos + 1);
ret = store->get_bucket_info(NULL, manifest_bucket, bucket_info, NULL, NULL);
if (ret < 0) {
ldout(s->cct, 0) << "could not get bucket info for bucket=" << manifest_bucket << dendl;
}
ret = iterate_user_manifest_parts(s->cct, store, 0, -1, bucket_info.bucket, manifest_obj_prefix,
NULL, NULL, put_obj_user_manifest_iterate_cb, (void *)this);
if (ret < 0) {
goto done;
}
hash.Final((byte *)etag_buf);
buf_to_hex((const unsigned char *)etag_buf, CEPH_CRYPTO_MD5_DIGESTSIZE, etag_buf_str);
ldout(s->cct, 0) << __func__ << ": calculated md5 for user manifest: " << etag_buf_str << dendl;
etag = etag_buf_str;
}
if (supplied_etag && etag.compare(supplied_etag) != 0) {
ret = -ERR_UNPROCESSABLE_ENTITY;
goto done;
}
bl.append(etag.c_str(), etag.size() + 1);
attrs[RGW_ATTR_ETAG] = bl;
attrs[RGW_ATTR_ACL] = aclbl;
if (obj_manifest) {
bufferlist manifest_bl;
manifest_bl.append(obj_manifest, strlen(obj_manifest) + 1);
attrs[RGW_ATTR_USER_MANIFEST] = manifest_bl;
}
for (iter = s->generic_attrs.begin(); iter != s->generic_attrs.end(); ++iter) {
bufferlist& attrbl = attrs[iter->first];

View File

@ -132,8 +132,6 @@ public:
void pre_exec();
void execute();
int read_user_manifest_part(rgw_bucket& bucket, RGWObjEnt& ent, RGWAccessControlPolicy *bucket_policy, off_t start_ofs, off_t end_ofs);
int iterate_user_manifest_parts(rgw_bucket& bucket, string& obj_prefix, RGWAccessControlPolicy *bucket_policy,
uint64_t *ptotal_len, bool read_data);
int handle_user_manifest(const char *prefix);
int get_data_cb(bufferlist& bl, off_t ofs, off_t len);
@ -324,6 +322,8 @@ protected:
const char *obj_manifest;
time_t mtime;
MD5 *user_manifest_parts_hash;
public:
RGWPutObj() {
ret = 0;
@ -333,6 +333,7 @@ public:
chunked_upload = false;
obj_manifest = NULL;
mtime = 0;
user_manifest_parts_hash = NULL;
}
virtual void init(RGWRados *store, struct req_state *s, RGWHandler *h) {
@ -343,6 +344,8 @@ public:
RGWPutObjProcessor *select_processor();
void dispose_processor(RGWPutObjProcessor *processor);
int user_manifest_iterate_cb(rgw_bucket& bucket, RGWObjEnt& ent, RGWAccessControlPolicy *bucket_policy, off_t start_ofs, off_t end_ofs);
int verify_permission();
void pre_exec();
void execute();