rgw: don't allow multiple writers to same multiobject part

Fixes: #8269
Backport: firefly, dumpling

A client might need to retry a multipart part write. The original thread
might race with the new one, trying to clean up after it, clobbering the
part's data.
The fix is to detect whether an original part already existed, and if so
use a different part name for it.

Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
(cherry picked from commit bd8e026f88b812cc70caf6232c247844df5d99bf)
This commit is contained in:
Yehuda Sadeh 2014-05-02 17:06:05 -07:00 committed by Josh Durgin
parent 053c261e5b
commit ea68b93723
4 changed files with 113 additions and 53 deletions

View File

@ -1367,22 +1367,26 @@ class RGWPutObjProcessor_Multipart : public RGWPutObjProcessor_Atomic
string upload_id;
protected:
bool immutable_head() { return true; }
int prepare(RGWRados *store, void *obj_ctx);
int prepare(RGWRados *store, void *obj_ctx, string *oid_rand);
int do_complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs);
public:
bool immutable_head() { return true; }
RGWPutObjProcessor_Multipart(const string& bucket_owner, uint64_t _p, req_state *_s) :
RGWPutObjProcessor_Atomic(bucket_owner, _s->bucket, _s->object_str, _p, _s->req_id), s(_s) {}
};
int RGWPutObjProcessor_Multipart::prepare(RGWRados *store, void *obj_ctx)
int RGWPutObjProcessor_Multipart::prepare(RGWRados *store, void *obj_ctx, string *oid_rand)
{
RGWPutObjProcessor::prepare(store, obj_ctx);
RGWPutObjProcessor::prepare(store, obj_ctx, NULL);
string oid = obj_str;
upload_id = s->info.args.get("uploadId");
mp.init(oid, upload_id);
if (!oid_rand) {
mp.init(oid, upload_id);
} else {
mp.init(oid, upload_id, *oid_rand);
}
part_num = s->info.args.get("partNumber");
if (part_num.empty()) {
@ -1398,7 +1402,13 @@ int RGWPutObjProcessor_Multipart::prepare(RGWRados *store, void *obj_ctx)
return -EINVAL;
}
string upload_prefix = oid + "." + upload_id;
string upload_prefix = oid + ".";
if (!oid_rand) {
upload_prefix.append(upload_id);
} else {
upload_prefix.append(*oid_rand);
}
rgw_obj target_obj;
target_obj.init(bucket, oid);
@ -1476,7 +1486,7 @@ int RGWPutObjProcessor_Multipart::do_complete(string& etag, time_t *mtime, time_
}
RGWPutObjProcessor *RGWPutObj::select_processor()
RGWPutObjProcessor *RGWPutObj::select_processor(bool *is_multipart)
{
RGWPutObjProcessor *processor;
@ -1492,6 +1502,10 @@ RGWPutObjProcessor *RGWPutObj::select_processor()
processor = new RGWPutObjProcessor_Multipart(bucket_owner, part_size, s);
}
if (is_multipart) {
*is_multipart = multipart;
}
return processor;
}
@ -1556,6 +1570,7 @@ void RGWPutObj::execute()
map<string, bufferlist> attrs;
int len;
map<string, string>::iterator iter;
bool multipart;
bool need_calc_md5 = (obj_manifest == NULL);
@ -1600,9 +1615,9 @@ void RGWPutObj::execute()
supplied_md5[sizeof(supplied_md5) - 1] = '\0';
}
processor = select_processor();
processor = select_processor(&multipart);
ret = processor->prepare(store, s->obj_ctx);
ret = processor->prepare(store, s->obj_ctx, NULL);
if (ret < 0)
goto done;
@ -1627,9 +1642,48 @@ void RGWPutObj::execute()
hash.Update(data_ptr, len);
}
ret = processor->throttle_data(handle);
if (ret < 0)
goto done;
/* do we need this operation to be synchronous? if we're dealing with an object with immutable
* head, e.g., multipart object we need to make sure we're the first one writing to this object
*/
bool need_to_wait = (ofs == 0) && multipart;
ret = processor->throttle_data(handle, need_to_wait);
if (ret < 0) {
if (!need_to_wait || ret != -EEXIST) {
ldout(s->cct, 20) << "processor->thottle_data() returned ret=" << ret << dendl;
goto done;
}
ldout(s->cct, 5) << "NOTICE: processor->throttle_data() returned -EEXIST, need to restart write" << dendl;
/* restart processing with different oid suffix */
dispose_processor(processor);
processor = select_processor(&multipart);
string oid_rand;
char buf[33];
gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1);
oid_rand.append(buf);
ret = processor->prepare(store, s->obj_ctx, &oid_rand);
if (ret < 0) {
ldout(s->cct, 0) << "ERROR: processor->prepare() returned " << ret << dendl;
goto done;
}
ret = processor->handle_data(data, ofs, &handle);
if (ret < 0) {
ldout(s->cct, 0) << "ERROR: processor->handle_data() returned " << ret << dendl;
goto done;
}
ret = processor->throttle_data(handle, false);
if (ret < 0) {
ldout(s->cct, 0) << "ERROR: processor->throttle_data() returned " << ret << dendl;
goto done;
}
}
ofs += len;
} while (len > 0);
@ -1774,7 +1828,7 @@ void RGWPostObj::execute()
processor = select_processor();
ret = processor->prepare(store, s->obj_ctx);
ret = processor->prepare(store, s->obj_ctx, NULL);
if (ret < 0)
goto done;
@ -1799,7 +1853,7 @@ void RGWPostObj::execute()
hash.Update(data_ptr, len);
ret = processor->throttle_data(handle);
ret = processor->throttle_data(handle, false);
if (ret < 0)
goto done;

View File

@ -341,7 +341,7 @@ public:
policy.set_ctx(s->cct);
}
RGWPutObjProcessor *select_processor();
RGWPutObjProcessor *select_processor(bool *is_multipart);
void dispose_processor(RGWPutObjProcessor *processor);
int user_manifest_iterate_cb(rgw_bucket& bucket, RGWObjEnt& ent, RGWAccessControlPolicy *bucket_policy, off_t start_ofs, off_t end_ofs);
@ -757,21 +757,22 @@ class RGWMPObj {
string upload_id;
public:
RGWMPObj() {}
RGWMPObj(string& _oid, string& _upload_id) {
init(_oid, _upload_id);
RGWMPObj(const string& _oid, const string& _upload_id) {
init(_oid, _upload_id, _upload_id);
}
void init(string& _oid, string& _upload_id) {
void init(const string& _oid, const string& _upload_id) {
init(_oid, _upload_id, _upload_id);
}
void init(const string& _oid, const string& _upload_id, const string& part_unique_str) {
if (_oid.empty()) {
clear();
return;
}
oid = _oid;
upload_id = _upload_id;
prefix = oid;
prefix.append(".");
prefix.append(upload_id);
meta = prefix;
meta.append(MP_META_SUFFIX);
prefix = oid + ".";
meta = prefix + upload_id + MP_META_SUFFIX;
prefix.append(part_unique_str);
}
string& get_meta() { return meta; }
string get_part(int num) {
@ -802,7 +803,7 @@ public:
return false;
oid = meta.substr(0, mid_pos);
upload_id = meta.substr(mid_pos + 1, end_pos - mid_pos - 1);
init(oid, upload_id);
init(oid, upload_id, upload_id);
return true;
}
void clear() {

View File

@ -719,10 +719,10 @@ int RGWObjManifest::generator::create_begin(CephContext *cct, RGWObjManifest *_m
manifest->set_head(_h);
last_ofs = 0;
char buf[33];
gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
if (manifest->get_prefix().empty()) {
char buf[33];
gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
string oid_prefix = ".";
oid_prefix.append(buf);
oid_prefix.append("_");
@ -1006,9 +1006,9 @@ RGWPutObjProcessor::~RGWPutObjProcessor()
}
}
int RGWPutObjProcessor_Plain::prepare(RGWRados *store, void *obj_ctx)
int RGWPutObjProcessor_Plain::prepare(RGWRados *store, void *obj_ctx, string *oid_rand)
{
RGWPutObjProcessor::prepare(store, obj_ctx);
RGWPutObjProcessor::prepare(store, obj_ctx, oid_rand);
obj.init(bucket, obj_str);
@ -1041,7 +1041,7 @@ int RGWPutObjProcessor_Plain::do_complete(string& etag, time_t *mtime, time_t se
}
int RGWPutObjProcessor_Aio::handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle)
int RGWPutObjProcessor_Aio::handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle, bool exclusive)
{
if ((uint64_t)abs_ofs + bl.length() > obj_len)
obj_len = abs_ofs + bl.length();
@ -1051,7 +1051,7 @@ int RGWPutObjProcessor_Aio::handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t
int r = store->aio_put_obj_data(NULL, obj,
bl,
((ofs != 0) ? ofs : -1),
false, phandle);
exclusive, phandle);
return r;
}
@ -1091,7 +1091,7 @@ int RGWPutObjProcessor_Aio::drain_pending()
return ret;
}
int RGWPutObjProcessor_Aio::throttle_data(void *handle)
int RGWPutObjProcessor_Aio::throttle_data(void *handle, bool need_to_wait)
{
if (handle) {
struct put_obj_aio_info info;
@ -1099,10 +1099,13 @@ int RGWPutObjProcessor_Aio::throttle_data(void *handle)
pending.push_back(info);
}
size_t orig_size = pending.size();
while (pending_has_completed()) {
while (pending_has_completed()
|| need_to_wait) {
int r = wait_pending_front();
if (r < 0)
return r;
need_to_wait = false;
}
/* resize window in case messages are draining too fast */
@ -1118,7 +1121,7 @@ int RGWPutObjProcessor_Aio::throttle_data(void *handle)
return 0;
}
int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phandle)
int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phandle, bool exclusive)
{
if (ofs >= next_part_ofs) {
int r = prepare_next_part(ofs);
@ -1127,7 +1130,7 @@ int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phan
}
}
return RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle);
return RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle, exclusive);
}
int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **phandle)
@ -1168,12 +1171,15 @@ int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **pha
}
off_t write_ofs = data_ofs;
data_ofs = write_ofs + bl.length();
return write_data(bl, write_ofs, phandle);
bool exclusive = (!write_ofs && immutable_head()); /* immutable head object, need to verify nothing exists there
we could be racing with another upload, to the same
object and cleanup can be messy */
return write_data(bl, write_ofs, phandle, exclusive);
}
int RGWPutObjProcessor_Atomic::prepare(RGWRados *store, void *obj_ctx)
int RGWPutObjProcessor_Atomic::prepare(RGWRados *store, void *obj_ctx, string *oid_rand)
{
RGWPutObjProcessor::prepare(store, obj_ctx);
RGWPutObjProcessor::prepare(store, obj_ctx, oid_rand);
head_obj.init(bucket, obj_str);
@ -1220,12 +1226,12 @@ int RGWPutObjProcessor_Atomic::complete_writing_data()
}
if (pending_data_bl.length()) {
void *handle;
int r = write_data(pending_data_bl, data_ofs, &handle);
int r = write_data(pending_data_bl, data_ofs, &handle, false);
if (r < 0) {
ldout(store->ctx(), 0) << "ERROR: write_data() returned " << r << dendl;
return r;
}
r = throttle_data(handle);
r = throttle_data(handle, false);
if (r < 0) {
ldout(store->ctx(), 0) << "ERROR: throttle_data() returned " << r << dendl;
return r;
@ -3014,7 +3020,7 @@ public:
}
}
ret = processor->throttle_data(handle);
ret = processor->throttle_data(handle, false);
if (ret < 0)
return ret;
@ -3139,7 +3145,7 @@ int RGWRados::copy_obj(void *ctx,
RGWPutObjProcessor_Atomic processor(dest_bucket_info.owner, dest_obj.bucket, dest_obj.object,
cct->_conf->rgw_obj_stripe_size, tag);
ret = processor.prepare(this, ctx);
ret = processor.prepare(this, ctx, NULL);
if (ret < 0)
return ret;

View File

@ -537,13 +537,13 @@ protected:
public:
RGWPutObjProcessor(const string& _bo) : store(NULL), obj_ctx(NULL), is_complete(false), bucket_owner(_bo) {}
virtual ~RGWPutObjProcessor();
virtual int prepare(RGWRados *_store, void *_o) {
virtual int prepare(RGWRados *_store, void *_o, string *oid_rand) {
store = _store;
obj_ctx = _o;
return 0;
};
virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle) = 0;
virtual int throttle_data(void *handle) = 0;
virtual int throttle_data(void *handle, bool need_to_wait) = 0;
virtual int complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs);
};
@ -557,12 +557,12 @@ class RGWPutObjProcessor_Plain : public RGWPutObjProcessor
off_t ofs;
protected:
int prepare(RGWRados *store, void *obj_ctx);
int prepare(RGWRados *store, void *obj_ctx, string *oid_rand);
int handle_data(bufferlist& bl, off_t ofs, void **phandle);
int do_complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs);
public:
int throttle_data(void *handle) { return 0; }
int throttle_data(void *handle, bool need_to_wait) { return 0; }
RGWPutObjProcessor_Plain(const string& bucket_owner, rgw_bucket& b, const string& o) : RGWPutObjProcessor(bucket_owner),
bucket(b), obj_str(o), ofs(0) {}
};
@ -584,10 +584,10 @@ protected:
uint64_t obj_len;
int drain_pending();
int handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle);
int handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle, bool exclusive);
public:
int throttle_data(void *handle);
int throttle_data(void *handle, bool need_to_wait);
RGWPutObjProcessor_Aio(const string& bucket_owner) : RGWPutObjProcessor(bucket_owner), max_chunks(RGW_MAX_PENDING_CHUNKS), obj_len(0) {}
virtual ~RGWPutObjProcessor_Aio() {
@ -618,9 +618,7 @@ protected:
RGWObjManifest manifest;
RGWObjManifest::generator manifest_gen;
virtual bool immutable_head() { return false; }
int write_data(bufferlist& bl, off_t ofs, void **phandle);
int write_data(bufferlist& bl, off_t ofs, void **phandle, bool exclusive);
virtual int do_complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs);
int prepare_next_part(off_t ofs);
@ -640,11 +638,12 @@ public:
bucket(_b),
obj_str(_o),
unique_tag(_t) {}
int prepare(RGWRados *store, void *obj_ctx);
int prepare(RGWRados *store, void *obj_ctx, string *oid_rand);
virtual bool immutable_head() { return false; }
void set_extra_data_len(uint64_t len) {
extra_data_len = len;
}
int handle_data(bufferlist& bl, off_t ofs, void **phandle);
virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle);
bufferlist& get_extra_data() { return extra_data_bl; }
};