mirror of
https://github.com/ceph/ceph
synced 2025-02-20 17:37:29 +00:00
rgw: call processor->handle_data() again if needed
Fixes: #8937
Following the fix to #8928 we end up accumulating pending data that
needs to be written. Beforehand it was working fine because we were
feeding it with the exact amount of bytes we were writing.
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
(cherry picked from commit 0553890e79
)
This commit is contained in:
parent
b22a8a9e60
commit
062062479a
@ -1564,6 +1564,36 @@ int RGWPutObj::user_manifest_iterate_cb(rgw_bucket& bucket, RGWObjEnt& ent, RGWA
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int put_data_and_throttle(RGWPutObjProcessor *processor, bufferlist& data, off_t ofs,
|
||||
MD5 *hash, bool need_to_wait)
|
||||
{
|
||||
const unsigned char *data_ptr = (hash ? (const unsigned char *)data.c_str() : NULL);
|
||||
bool again;
|
||||
uint64_t len = data.length();
|
||||
|
||||
do {
|
||||
void *handle;
|
||||
|
||||
int ret = processor->handle_data(data, ofs, &handle, &again);
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
|
||||
if (hash) {
|
||||
hash->Update(data_ptr, len);
|
||||
hash = NULL; /* only calculate hash once */
|
||||
}
|
||||
|
||||
ret = processor->throttle_data(handle, false);
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
|
||||
need_to_wait = false; /* the need to wait only applies to the first iteration */
|
||||
} while (again);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
void RGWPutObj::execute()
|
||||
{
|
||||
RGWPutObjProcessor *processor = NULL;
|
||||
@ -1637,23 +1667,12 @@ void RGWPutObj::execute()
|
||||
if (!len)
|
||||
break;
|
||||
|
||||
void *handle;
|
||||
const unsigned char *data_ptr = (const unsigned char *)data.c_str();
|
||||
|
||||
ret = processor->handle_data(data, ofs, &handle);
|
||||
if (ret < 0)
|
||||
goto done;
|
||||
|
||||
if (need_calc_md5) {
|
||||
hash.Update(data_ptr, len);
|
||||
}
|
||||
|
||||
/* do we need this operation to be synchronous? if we're dealing with an object with immutable
|
||||
* head, e.g., multipart object we need to make sure we're the first one writing to this object
|
||||
*/
|
||||
bool need_to_wait = (ofs == 0) && multipart;
|
||||
|
||||
ret = processor->throttle_data(handle, need_to_wait);
|
||||
ret = put_data_and_throttle(processor, data, ofs, (need_calc_md5 ? &hash : NULL), need_to_wait);
|
||||
if (ret < 0) {
|
||||
if (!need_to_wait || ret != -EEXIST) {
|
||||
ldout(s->cct, 20) << "processor->thottle_data() returned ret=" << ret << dendl;
|
||||
@ -1678,15 +1697,8 @@ void RGWPutObj::execute()
|
||||
goto done;
|
||||
}
|
||||
|
||||
ret = processor->handle_data(data, ofs, &handle);
|
||||
ret = put_data_and_throttle(processor, data, ofs, NULL, false);
|
||||
if (ret < 0) {
|
||||
ldout(s->cct, 0) << "ERROR: processor->handle_data() returned " << ret << dendl;
|
||||
goto done;
|
||||
}
|
||||
|
||||
ret = processor->throttle_data(handle, false);
|
||||
if (ret < 0) {
|
||||
ldout(s->cct, 0) << "ERROR: processor->throttle_data() returned " << ret << dendl;
|
||||
goto done;
|
||||
}
|
||||
}
|
||||
@ -1850,18 +1862,7 @@ void RGWPostObj::execute()
|
||||
if (!len)
|
||||
break;
|
||||
|
||||
void *handle;
|
||||
const unsigned char *data_ptr = (const unsigned char *)data.c_str();
|
||||
|
||||
ret = processor->handle_data(data, ofs, &handle);
|
||||
if (ret < 0)
|
||||
goto done;
|
||||
|
||||
hash.Update(data_ptr, len);
|
||||
|
||||
ret = processor->throttle_data(handle, false);
|
||||
if (ret < 0)
|
||||
goto done;
|
||||
ret = put_data_and_throttle(processor, data, ofs, &hash, false);
|
||||
|
||||
ofs += len;
|
||||
|
||||
|
@ -900,8 +900,10 @@ int RGWPutObjProcessor_Plain::prepare(RGWRados *store, void *obj_ctx, string *oi
|
||||
return 0;
|
||||
}
|
||||
|
||||
int RGWPutObjProcessor_Plain::handle_data(bufferlist& bl, off_t _ofs, void **phandle)
|
||||
int RGWPutObjProcessor_Plain::handle_data(bufferlist& bl, off_t _ofs, void **phandle, bool *again)
|
||||
{
|
||||
*again = false;
|
||||
|
||||
if (ofs != _ofs)
|
||||
return -EINVAL;
|
||||
|
||||
@ -1026,8 +1028,10 @@ int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phan
|
||||
return RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle, exclusive);
|
||||
}
|
||||
|
||||
int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **phandle)
|
||||
int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **phandle, bool *again)
|
||||
{
|
||||
*again = false;
|
||||
|
||||
*phandle = NULL;
|
||||
if (extra_data_len) {
|
||||
size_t extra_len = bl.length();
|
||||
@ -1052,6 +1056,9 @@ int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **pha
|
||||
|
||||
pending_data_bl.splice(0, max_write_size, &bl);
|
||||
|
||||
/* do we have enough data pending accumulated that needs to be written? */
|
||||
*again = (pending_data_bl.length() >= max_chunk_size);
|
||||
|
||||
if (!data_ofs && !immutable_head()) {
|
||||
first_chunk.claim(bl);
|
||||
obj_len = (uint64_t)first_chunk.length();
|
||||
@ -3023,25 +3030,33 @@ public:
|
||||
int handle_data(bufferlist& bl, off_t ofs, off_t len) {
|
||||
progress_cb(ofs, progress_data);
|
||||
|
||||
void *handle;
|
||||
int ret = processor->handle_data(bl, ofs, &handle);
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
bool again;
|
||||
|
||||
if (opstate) {
|
||||
/* need to update opstate repository with new state. This is ratelimited, so we're not
|
||||
* really doing it every time
|
||||
*/
|
||||
ret = opstate->renew_state();
|
||||
if (ret < 0) {
|
||||
/* could not renew state! might have been marked as cancelled */
|
||||
bool need_opstate = true;
|
||||
|
||||
do {
|
||||
void *handle;
|
||||
int ret = processor->handle_data(bl, ofs, &handle, &again);
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
ret = processor->throttle_data(handle, false);
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
if (need_opstate && opstate) {
|
||||
/* need to update opstate repository with new state. This is ratelimited, so we're not
|
||||
* really doing it every time
|
||||
*/
|
||||
ret = opstate->renew_state();
|
||||
if (ret < 0) {
|
||||
/* could not renew state! might have been marked as cancelled */
|
||||
return ret;
|
||||
}
|
||||
|
||||
need_opstate = false;
|
||||
}
|
||||
|
||||
ret = processor->throttle_data(handle, false);
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
} while (again);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -556,7 +556,7 @@ public:
|
||||
obj_ctx = _o;
|
||||
return 0;
|
||||
}
|
||||
virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle) = 0;
|
||||
virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle, bool *again) = 0;
|
||||
virtual int throttle_data(void *handle, bool need_to_wait) = 0;
|
||||
virtual int complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs);
|
||||
};
|
||||
@ -572,7 +572,7 @@ class RGWPutObjProcessor_Plain : public RGWPutObjProcessor
|
||||
|
||||
protected:
|
||||
int prepare(RGWRados *store, void *obj_ctx, string *oid_rand);
|
||||
int handle_data(bufferlist& bl, off_t ofs, void **phandle);
|
||||
int handle_data(bufferlist& bl, off_t ofs, void **phandle, bool *again);
|
||||
int do_complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs);
|
||||
|
||||
public:
|
||||
@ -662,7 +662,7 @@ public:
|
||||
void set_extra_data_len(uint64_t len) {
|
||||
extra_data_len = len;
|
||||
}
|
||||
virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle);
|
||||
virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle, bool *again);
|
||||
bufferlist& get_extra_data() { return extra_data_bl; }
|
||||
};
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user