diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 464208c97c5..7894927cfc2 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -1367,22 +1367,26 @@ class RGWPutObjProcessor_Multipart : public RGWPutObjProcessor_Atomic string upload_id; protected: - bool immutable_head() { return true; } - int prepare(RGWRados *store, void *obj_ctx); + int prepare(RGWRados *store, void *obj_ctx, string *oid_rand); int do_complete(string& etag, time_t *mtime, time_t set_mtime, map& attrs); public: + bool immutable_head() { return true; } RGWPutObjProcessor_Multipart(const string& bucket_owner, uint64_t _p, req_state *_s) : RGWPutObjProcessor_Atomic(bucket_owner, _s->bucket, _s->object_str, _p, _s->req_id), s(_s) {} }; -int RGWPutObjProcessor_Multipart::prepare(RGWRados *store, void *obj_ctx) +int RGWPutObjProcessor_Multipart::prepare(RGWRados *store, void *obj_ctx, string *oid_rand) { - RGWPutObjProcessor::prepare(store, obj_ctx); + RGWPutObjProcessor::prepare(store, obj_ctx, NULL); string oid = obj_str; upload_id = s->info.args.get("uploadId"); - mp.init(oid, upload_id); + if (!oid_rand) { + mp.init(oid, upload_id); + } else { + mp.init(oid, upload_id, *oid_rand); + } part_num = s->info.args.get("partNumber"); if (part_num.empty()) { @@ -1398,7 +1402,13 @@ int RGWPutObjProcessor_Multipart::prepare(RGWRados *store, void *obj_ctx) return -EINVAL; } - string upload_prefix = oid + "." + upload_id; + string upload_prefix = oid + "."; + + if (!oid_rand) { + upload_prefix.append(upload_id); + } else { + upload_prefix.append(*oid_rand); + } rgw_obj target_obj; target_obj.init(bucket, oid); @@ -1476,7 +1486,7 @@ int RGWPutObjProcessor_Multipart::do_complete(string& etag, time_t *mtime, time_ } -RGWPutObjProcessor *RGWPutObj::select_processor() +RGWPutObjProcessor *RGWPutObj::select_processor(bool *is_multipart) { RGWPutObjProcessor *processor; @@ -1492,6 +1502,10 @@ RGWPutObjProcessor *RGWPutObj::select_processor() processor = new RGWPutObjProcessor_Multipart(bucket_owner, part_size, s); } + if (is_multipart) { + *is_multipart = multipart; + } + return processor; } @@ -1556,6 +1570,7 @@ void RGWPutObj::execute() map attrs; int len; map::iterator iter; + bool multipart; bool need_calc_md5 = (obj_manifest == NULL); @@ -1600,9 +1615,9 @@ void RGWPutObj::execute() supplied_md5[sizeof(supplied_md5) - 1] = '\0'; } - processor = select_processor(); + processor = select_processor(&multipart); - ret = processor->prepare(store, s->obj_ctx); + ret = processor->prepare(store, s->obj_ctx, NULL); if (ret < 0) goto done; @@ -1627,9 +1642,48 @@ void RGWPutObj::execute() hash.Update(data_ptr, len); } - ret = processor->throttle_data(handle); - if (ret < 0) - goto done; + /* do we need this operation to be synchronous? if we're dealing with an object with immutable + * head, e.g., multipart object we need to make sure we're the first one writing to this object + */ + bool need_to_wait = (ofs == 0) && multipart; + + ret = processor->throttle_data(handle, need_to_wait); + if (ret < 0) { + if (!need_to_wait || ret != -EEXIST) { + ldout(s->cct, 20) << "processor->thottle_data() returned ret=" << ret << dendl; + goto done; + } + + ldout(s->cct, 5) << "NOTICE: processor->throttle_data() returned -EEXIST, need to restart write" << dendl; + + /* restart processing with different oid suffix */ + + dispose_processor(processor); + processor = select_processor(&multipart); + + string oid_rand; + char buf[33]; + gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1); + oid_rand.append(buf); + + ret = processor->prepare(store, s->obj_ctx, &oid_rand); + if (ret < 0) { + ldout(s->cct, 0) << "ERROR: processor->prepare() returned " << ret << dendl; + goto done; + } + + ret = processor->handle_data(data, ofs, &handle); + if (ret < 0) { + ldout(s->cct, 0) << "ERROR: processor->handle_data() returned " << ret << dendl; + goto done; + } + + ret = processor->throttle_data(handle, false); + if (ret < 0) { + ldout(s->cct, 0) << "ERROR: processor->throttle_data() returned " << ret << dendl; + goto done; + } + } ofs += len; } while (len > 0); @@ -1774,7 +1828,7 @@ void RGWPostObj::execute() processor = select_processor(); - ret = processor->prepare(store, s->obj_ctx); + ret = processor->prepare(store, s->obj_ctx, NULL); if (ret < 0) goto done; @@ -1799,7 +1853,7 @@ void RGWPostObj::execute() hash.Update(data_ptr, len); - ret = processor->throttle_data(handle); + ret = processor->throttle_data(handle, false); if (ret < 0) goto done; diff --git a/src/rgw/rgw_op.h b/src/rgw/rgw_op.h index ed090e77572..95a34edc2b9 100644 --- a/src/rgw/rgw_op.h +++ b/src/rgw/rgw_op.h @@ -341,7 +341,7 @@ public: policy.set_ctx(s->cct); } - RGWPutObjProcessor *select_processor(); + RGWPutObjProcessor *select_processor(bool *is_multipart); void dispose_processor(RGWPutObjProcessor *processor); int user_manifest_iterate_cb(rgw_bucket& bucket, RGWObjEnt& ent, RGWAccessControlPolicy *bucket_policy, off_t start_ofs, off_t end_ofs); @@ -757,21 +757,22 @@ class RGWMPObj { string upload_id; public: RGWMPObj() {} - RGWMPObj(string& _oid, string& _upload_id) { - init(_oid, _upload_id); + RGWMPObj(const string& _oid, const string& _upload_id) { + init(_oid, _upload_id, _upload_id); } - void init(string& _oid, string& _upload_id) { + void init(const string& _oid, const string& _upload_id) { + init(_oid, _upload_id, _upload_id); + } + void init(const string& _oid, const string& _upload_id, const string& part_unique_str) { if (_oid.empty()) { clear(); return; } oid = _oid; upload_id = _upload_id; - prefix = oid; - prefix.append("."); - prefix.append(upload_id); - meta = prefix; - meta.append(MP_META_SUFFIX); + prefix = oid + "."; + meta = prefix + upload_id + MP_META_SUFFIX; + prefix.append(part_unique_str); } string& get_meta() { return meta; } string get_part(int num) { @@ -802,7 +803,7 @@ public: return false; oid = meta.substr(0, mid_pos); upload_id = meta.substr(mid_pos + 1, end_pos - mid_pos - 1); - init(oid, upload_id); + init(oid, upload_id, upload_id); return true; } void clear() { diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 61b23dafb0a..3be8b78c880 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -719,10 +719,10 @@ int RGWObjManifest::generator::create_begin(CephContext *cct, RGWObjManifest *_m manifest->set_head(_h); last_ofs = 0; - char buf[33]; - gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1); - if (manifest->get_prefix().empty()) { + char buf[33]; + gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1); + string oid_prefix = "."; oid_prefix.append(buf); oid_prefix.append("_"); @@ -1006,9 +1006,9 @@ RGWPutObjProcessor::~RGWPutObjProcessor() } } -int RGWPutObjProcessor_Plain::prepare(RGWRados *store, void *obj_ctx) +int RGWPutObjProcessor_Plain::prepare(RGWRados *store, void *obj_ctx, string *oid_rand) { - RGWPutObjProcessor::prepare(store, obj_ctx); + RGWPutObjProcessor::prepare(store, obj_ctx, oid_rand); obj.init(bucket, obj_str); @@ -1041,7 +1041,7 @@ int RGWPutObjProcessor_Plain::do_complete(string& etag, time_t *mtime, time_t se } -int RGWPutObjProcessor_Aio::handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle) +int RGWPutObjProcessor_Aio::handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle, bool exclusive) { if ((uint64_t)abs_ofs + bl.length() > obj_len) obj_len = abs_ofs + bl.length(); @@ -1051,7 +1051,7 @@ int RGWPutObjProcessor_Aio::handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t int r = store->aio_put_obj_data(NULL, obj, bl, ((ofs != 0) ? ofs : -1), - false, phandle); + exclusive, phandle); return r; } @@ -1091,7 +1091,7 @@ int RGWPutObjProcessor_Aio::drain_pending() return ret; } -int RGWPutObjProcessor_Aio::throttle_data(void *handle) +int RGWPutObjProcessor_Aio::throttle_data(void *handle, bool need_to_wait) { if (handle) { struct put_obj_aio_info info; @@ -1099,10 +1099,13 @@ int RGWPutObjProcessor_Aio::throttle_data(void *handle) pending.push_back(info); } size_t orig_size = pending.size(); - while (pending_has_completed()) { + while (pending_has_completed() + || need_to_wait) { int r = wait_pending_front(); if (r < 0) return r; + + need_to_wait = false; } /* resize window in case messages are draining too fast */ @@ -1118,7 +1121,7 @@ int RGWPutObjProcessor_Aio::throttle_data(void *handle) return 0; } -int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phandle) +int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phandle, bool exclusive) { if (ofs >= next_part_ofs) { int r = prepare_next_part(ofs); @@ -1127,7 +1130,7 @@ int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phan } } - return RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle); + return RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle, exclusive); } int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **phandle) @@ -1168,12 +1171,15 @@ int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **pha } off_t write_ofs = data_ofs; data_ofs = write_ofs + bl.length(); - return write_data(bl, write_ofs, phandle); + bool exclusive = (!write_ofs && immutable_head()); /* immutable head object, need to verify nothing exists there + we could be racing with another upload, to the same + object and cleanup can be messy */ + return write_data(bl, write_ofs, phandle, exclusive); } -int RGWPutObjProcessor_Atomic::prepare(RGWRados *store, void *obj_ctx) +int RGWPutObjProcessor_Atomic::prepare(RGWRados *store, void *obj_ctx, string *oid_rand) { - RGWPutObjProcessor::prepare(store, obj_ctx); + RGWPutObjProcessor::prepare(store, obj_ctx, oid_rand); head_obj.init(bucket, obj_str); @@ -1220,12 +1226,12 @@ int RGWPutObjProcessor_Atomic::complete_writing_data() } if (pending_data_bl.length()) { void *handle; - int r = write_data(pending_data_bl, data_ofs, &handle); + int r = write_data(pending_data_bl, data_ofs, &handle, false); if (r < 0) { ldout(store->ctx(), 0) << "ERROR: write_data() returned " << r << dendl; return r; } - r = throttle_data(handle); + r = throttle_data(handle, false); if (r < 0) { ldout(store->ctx(), 0) << "ERROR: throttle_data() returned " << r << dendl; return r; @@ -3014,7 +3020,7 @@ public: } } - ret = processor->throttle_data(handle); + ret = processor->throttle_data(handle, false); if (ret < 0) return ret; @@ -3139,7 +3145,7 @@ int RGWRados::copy_obj(void *ctx, RGWPutObjProcessor_Atomic processor(dest_bucket_info.owner, dest_obj.bucket, dest_obj.object, cct->_conf->rgw_obj_stripe_size, tag); - ret = processor.prepare(this, ctx); + ret = processor.prepare(this, ctx, NULL); if (ret < 0) return ret; diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index b0c233b052d..7cf47649af9 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -537,13 +537,13 @@ protected: public: RGWPutObjProcessor(const string& _bo) : store(NULL), obj_ctx(NULL), is_complete(false), bucket_owner(_bo) {} virtual ~RGWPutObjProcessor(); - virtual int prepare(RGWRados *_store, void *_o) { + virtual int prepare(RGWRados *_store, void *_o, string *oid_rand) { store = _store; obj_ctx = _o; return 0; }; virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle) = 0; - virtual int throttle_data(void *handle) = 0; + virtual int throttle_data(void *handle, bool need_to_wait) = 0; virtual int complete(string& etag, time_t *mtime, time_t set_mtime, map& attrs); }; @@ -557,12 +557,12 @@ class RGWPutObjProcessor_Plain : public RGWPutObjProcessor off_t ofs; protected: - int prepare(RGWRados *store, void *obj_ctx); + int prepare(RGWRados *store, void *obj_ctx, string *oid_rand); int handle_data(bufferlist& bl, off_t ofs, void **phandle); int do_complete(string& etag, time_t *mtime, time_t set_mtime, map& attrs); public: - int throttle_data(void *handle) { return 0; } + int throttle_data(void *handle, bool need_to_wait) { return 0; } RGWPutObjProcessor_Plain(const string& bucket_owner, rgw_bucket& b, const string& o) : RGWPutObjProcessor(bucket_owner), bucket(b), obj_str(o), ofs(0) {} }; @@ -584,10 +584,10 @@ protected: uint64_t obj_len; int drain_pending(); - int handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle); + int handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle, bool exclusive); public: - int throttle_data(void *handle); + int throttle_data(void *handle, bool need_to_wait); RGWPutObjProcessor_Aio(const string& bucket_owner) : RGWPutObjProcessor(bucket_owner), max_chunks(RGW_MAX_PENDING_CHUNKS), obj_len(0) {} virtual ~RGWPutObjProcessor_Aio() { @@ -618,9 +618,7 @@ protected: RGWObjManifest manifest; RGWObjManifest::generator manifest_gen; - virtual bool immutable_head() { return false; } - - int write_data(bufferlist& bl, off_t ofs, void **phandle); + int write_data(bufferlist& bl, off_t ofs, void **phandle, bool exclusive); virtual int do_complete(string& etag, time_t *mtime, time_t set_mtime, map& attrs); int prepare_next_part(off_t ofs); @@ -640,11 +638,12 @@ public: bucket(_b), obj_str(_o), unique_tag(_t) {} - int prepare(RGWRados *store, void *obj_ctx); + int prepare(RGWRados *store, void *obj_ctx, string *oid_rand); + virtual bool immutable_head() { return false; } void set_extra_data_len(uint64_t len) { extra_data_len = len; } - int handle_data(bufferlist& bl, off_t ofs, void **phandle); + virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle); bufferlist& get_extra_data() { return extra_data_bl; } };