Merge pull request #2157 from ceph/wip-8937

rgw: call processor->handle_data() again if needed

Reviewed-by: Sage Weil <sage@redhat.com>
This commit is contained in:
Yehuda Sadeh 2014-07-31 18:11:12 -07:00
commit 9b05a87ec9
3 changed files with 69 additions and 53 deletions

View File

@ -1564,6 +1564,36 @@ int RGWPutObj::user_manifest_iterate_cb(rgw_bucket& bucket, RGWObjEnt& ent, RGWA
return 0;
}
static int put_data_and_throttle(RGWPutObjProcessor *processor, bufferlist& data, off_t ofs,
MD5 *hash, bool need_to_wait)
{
const unsigned char *data_ptr = (hash ? (const unsigned char *)data.c_str() : NULL);
bool again;
uint64_t len = data.length();
do {
void *handle;
int ret = processor->handle_data(data, ofs, &handle, &again);
if (ret < 0)
return ret;
if (hash) {
hash->Update(data_ptr, len);
hash = NULL; /* only calculate hash once */
}
ret = processor->throttle_data(handle, false);
if (ret < 0)
return ret;
need_to_wait = false; /* the need to wait only applies to the first iteration */
} while (again);
return 0;
}
void RGWPutObj::execute()
{
RGWPutObjProcessor *processor = NULL;
@ -1637,23 +1667,12 @@ void RGWPutObj::execute()
if (!len)
break;
void *handle;
const unsigned char *data_ptr = (const unsigned char *)data.c_str();
ret = processor->handle_data(data, ofs, &handle);
if (ret < 0)
goto done;
if (need_calc_md5) {
hash.Update(data_ptr, len);
}
/* do we need this operation to be synchronous? if we're dealing with an object with immutable
* head, e.g., multipart object we need to make sure we're the first one writing to this object
*/
bool need_to_wait = (ofs == 0) && multipart;
ret = processor->throttle_data(handle, need_to_wait);
ret = put_data_and_throttle(processor, data, ofs, (need_calc_md5 ? &hash : NULL), need_to_wait);
if (ret < 0) {
if (!need_to_wait || ret != -EEXIST) {
ldout(s->cct, 20) << "processor->thottle_data() returned ret=" << ret << dendl;
@ -1678,15 +1697,8 @@ void RGWPutObj::execute()
goto done;
}
ret = processor->handle_data(data, ofs, &handle);
ret = put_data_and_throttle(processor, data, ofs, NULL, false);
if (ret < 0) {
ldout(s->cct, 0) << "ERROR: processor->handle_data() returned " << ret << dendl;
goto done;
}
ret = processor->throttle_data(handle, false);
if (ret < 0) {
ldout(s->cct, 0) << "ERROR: processor->throttle_data() returned " << ret << dendl;
goto done;
}
}
@ -1850,18 +1862,7 @@ void RGWPostObj::execute()
if (!len)
break;
void *handle;
const unsigned char *data_ptr = (const unsigned char *)data.c_str();
ret = processor->handle_data(data, ofs, &handle);
if (ret < 0)
goto done;
hash.Update(data_ptr, len);
ret = processor->throttle_data(handle, false);
if (ret < 0)
goto done;
ret = put_data_and_throttle(processor, data, ofs, &hash, false);
ofs += len;

View File

@ -900,8 +900,10 @@ int RGWPutObjProcessor_Plain::prepare(RGWRados *store, void *obj_ctx, string *oi
return 0;
}
int RGWPutObjProcessor_Plain::handle_data(bufferlist& bl, off_t _ofs, void **phandle)
int RGWPutObjProcessor_Plain::handle_data(bufferlist& bl, off_t _ofs, void **phandle, bool *again)
{
*again = false;
if (ofs != _ofs)
return -EINVAL;
@ -1026,8 +1028,10 @@ int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phan
return RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle, exclusive);
}
int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **phandle)
int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **phandle, bool *again)
{
*again = false;
*phandle = NULL;
if (extra_data_len) {
size_t extra_len = bl.length();
@ -1052,6 +1056,9 @@ int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **pha
pending_data_bl.splice(0, max_write_size, &bl);
/* do we have enough data pending accumulated that needs to be written? */
*again = (pending_data_bl.length() >= max_chunk_size);
if (!data_ofs && !immutable_head()) {
first_chunk.claim(bl);
obj_len = (uint64_t)first_chunk.length();
@ -3023,25 +3030,33 @@ public:
int handle_data(bufferlist& bl, off_t ofs, off_t len) {
progress_cb(ofs, progress_data);
void *handle;
int ret = processor->handle_data(bl, ofs, &handle);
if (ret < 0)
return ret;
bool again;
if (opstate) {
/* need to update opstate repository with new state. This is ratelimited, so we're not
* really doing it every time
*/
ret = opstate->renew_state();
if (ret < 0) {
/* could not renew state! might have been marked as cancelled */
bool need_opstate = true;
do {
void *handle;
int ret = processor->handle_data(bl, ofs, &handle, &again);
if (ret < 0)
return ret;
}
}
ret = processor->throttle_data(handle, false);
if (ret < 0)
return ret;
if (need_opstate && opstate) {
/* need to update opstate repository with new state. This is ratelimited, so we're not
* really doing it every time
*/
ret = opstate->renew_state();
if (ret < 0) {
/* could not renew state! might have been marked as cancelled */
return ret;
}
need_opstate = false;
}
ret = processor->throttle_data(handle, false);
if (ret < 0)
return ret;
} while (again);
return 0;
}

View File

@ -556,7 +556,7 @@ public:
obj_ctx = _o;
return 0;
}
virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle) = 0;
virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle, bool *again) = 0;
virtual int throttle_data(void *handle, bool need_to_wait) = 0;
virtual int complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs);
};
@ -572,7 +572,7 @@ class RGWPutObjProcessor_Plain : public RGWPutObjProcessor
protected:
int prepare(RGWRados *store, void *obj_ctx, string *oid_rand);
int handle_data(bufferlist& bl, off_t ofs, void **phandle);
int handle_data(bufferlist& bl, off_t ofs, void **phandle, bool *again);
int do_complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs);
public:
@ -662,7 +662,7 @@ public:
void set_extra_data_len(uint64_t len) {
extra_data_len = len;
}
virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle);
virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle, bool *again);
bufferlist& get_extra_data() { return extra_data_bl; }
};