diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 02d7c504f73..0bc6f71725b 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -1564,6 +1564,36 @@ int RGWPutObj::user_manifest_iterate_cb(rgw_bucket& bucket, RGWObjEnt& ent, RGWA return 0; } +static int put_data_and_throttle(RGWPutObjProcessor *processor, bufferlist& data, off_t ofs, + MD5 *hash, bool need_to_wait) +{ + const unsigned char *data_ptr = (hash ? (const unsigned char *)data.c_str() : NULL); + bool again; + uint64_t len = data.length(); + + do { + void *handle; + + int ret = processor->handle_data(data, ofs, &handle, &again); + if (ret < 0) + return ret; + + if (hash) { + hash->Update(data_ptr, len); + hash = NULL; /* only calculate hash once */ + } + + ret = processor->throttle_data(handle, false); + if (ret < 0) + return ret; + + need_to_wait = false; /* the need to wait only applies to the first iteration */ + } while (again); + + return 0; +} + + void RGWPutObj::execute() { RGWPutObjProcessor *processor = NULL; @@ -1637,23 +1667,12 @@ void RGWPutObj::execute() if (!len) break; - void *handle; - const unsigned char *data_ptr = (const unsigned char *)data.c_str(); - - ret = processor->handle_data(data, ofs, &handle); - if (ret < 0) - goto done; - - if (need_calc_md5) { - hash.Update(data_ptr, len); - } - /* do we need this operation to be synchronous? if we're dealing with an object with immutable * head, e.g., multipart object we need to make sure we're the first one writing to this object */ bool need_to_wait = (ofs == 0) && multipart; - ret = processor->throttle_data(handle, need_to_wait); + ret = put_data_and_throttle(processor, data, ofs, (need_calc_md5 ? &hash : NULL), need_to_wait); if (ret < 0) { if (!need_to_wait || ret != -EEXIST) { ldout(s->cct, 20) << "processor->thottle_data() returned ret=" << ret << dendl; @@ -1678,15 +1697,8 @@ void RGWPutObj::execute() goto done; } - ret = processor->handle_data(data, ofs, &handle); + ret = put_data_and_throttle(processor, data, ofs, NULL, false); if (ret < 0) { - ldout(s->cct, 0) << "ERROR: processor->handle_data() returned " << ret << dendl; - goto done; - } - - ret = processor->throttle_data(handle, false); - if (ret < 0) { - ldout(s->cct, 0) << "ERROR: processor->throttle_data() returned " << ret << dendl; goto done; } } @@ -1850,18 +1862,7 @@ void RGWPostObj::execute() if (!len) break; - void *handle; - const unsigned char *data_ptr = (const unsigned char *)data.c_str(); - - ret = processor->handle_data(data, ofs, &handle); - if (ret < 0) - goto done; - - hash.Update(data_ptr, len); - - ret = processor->throttle_data(handle, false); - if (ret < 0) - goto done; + ret = put_data_and_throttle(processor, data, ofs, &hash, false); ofs += len; diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index c5b558b22cb..65e5de92346 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -900,8 +900,10 @@ int RGWPutObjProcessor_Plain::prepare(RGWRados *store, void *obj_ctx, string *oi return 0; } -int RGWPutObjProcessor_Plain::handle_data(bufferlist& bl, off_t _ofs, void **phandle) +int RGWPutObjProcessor_Plain::handle_data(bufferlist& bl, off_t _ofs, void **phandle, bool *again) { + *again = false; + if (ofs != _ofs) return -EINVAL; @@ -1026,8 +1028,10 @@ int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phan return RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle, exclusive); } -int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **phandle) +int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **phandle, bool *again) { + *again = false; + *phandle = NULL; if (extra_data_len) { size_t extra_len = bl.length(); @@ -1052,6 +1056,9 @@ int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **pha pending_data_bl.splice(0, max_write_size, &bl); + /* do we have enough data pending accumulated that needs to be written? */ + *again = (pending_data_bl.length() >= max_chunk_size); + if (!data_ofs && !immutable_head()) { first_chunk.claim(bl); obj_len = (uint64_t)first_chunk.length(); @@ -3023,25 +3030,33 @@ public: int handle_data(bufferlist& bl, off_t ofs, off_t len) { progress_cb(ofs, progress_data); - void *handle; - int ret = processor->handle_data(bl, ofs, &handle); - if (ret < 0) - return ret; + bool again; - if (opstate) { - /* need to update opstate repository with new state. This is ratelimited, so we're not - * really doing it every time - */ - ret = opstate->renew_state(); - if (ret < 0) { - /* could not renew state! might have been marked as cancelled */ + bool need_opstate = true; + + do { + void *handle; + int ret = processor->handle_data(bl, ofs, &handle, &again); + if (ret < 0) return ret; - } - } - ret = processor->throttle_data(handle, false); - if (ret < 0) - return ret; + if (need_opstate && opstate) { + /* need to update opstate repository with new state. This is ratelimited, so we're not + * really doing it every time + */ + ret = opstate->renew_state(); + if (ret < 0) { + /* could not renew state! might have been marked as cancelled */ + return ret; + } + + need_opstate = false; + } + + ret = processor->throttle_data(handle, false); + if (ret < 0) + return ret; + } while (again); return 0; } diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 6b93b98f9b2..2792352b8e8 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -556,7 +556,7 @@ public: obj_ctx = _o; return 0; } - virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle) = 0; + virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle, bool *again) = 0; virtual int throttle_data(void *handle, bool need_to_wait) = 0; virtual int complete(string& etag, time_t *mtime, time_t set_mtime, map& attrs); }; @@ -572,7 +572,7 @@ class RGWPutObjProcessor_Plain : public RGWPutObjProcessor protected: int prepare(RGWRados *store, void *obj_ctx, string *oid_rand); - int handle_data(bufferlist& bl, off_t ofs, void **phandle); + int handle_data(bufferlist& bl, off_t ofs, void **phandle, bool *again); int do_complete(string& etag, time_t *mtime, time_t set_mtime, map& attrs); public: @@ -662,7 +662,7 @@ public: void set_extra_data_len(uint64_t len) { extra_data_len = len; } - virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle); + virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle, bool *again); bufferlist& get_extra_data() { return extra_data_bl; } };