rgw: multiple changes for intra-zone object copy

Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
This commit is contained in:
Yehuda Sadeh 2013-06-14 21:57:25 -07:00
parent 17d65716c8
commit 622f50027f
8 changed files with 117 additions and 48 deletions

View File

@ -339,12 +339,16 @@ void RGWProcess::handle_request(RGWRequest *req)
req->log(s, "reading the cors attr");
handler->read_cors_config();
req->log(s, "verifying op permissions");
ret = op->verify_permission();
if (ret < 0) {
abort_early(s, ret);
goto done;
if (!s->system_request) {
req->log(s, "verifying op permissions");
ret = op->verify_permission();
if (ret < 0) {
abort_early(s, ret);
goto done;
}
} else {
req->log(s, "skipping permissons checks for system request");
}
req->log(s, "verifying op params");

View File

@ -856,16 +856,16 @@ int RGWCreateBucket::verify_permission()
}
template<class T>
static int forward_request(struct req_state *s, RGWRados *store, bufferlist& in_data, const char *name, T& obj)
static int forward_request_to_master(struct req_state *s, RGWRados *store, bufferlist& in_data, const char *name, T& obj)
{
if (!store->rest_conn) {
if (!store->rest_master_conn) {
ldout(s->cct, 0) << "rest connection is invalid" << dendl;
return -EINVAL;
}
ldout(s->cct, 0) << "sending create_bucket request to master region" << dendl;
bufferlist response;
#define MAX_REST_RESPONSE (128 * 1024) // we expect a very small response
int ret = store->rest_conn->forward(s->user.user_id, s->info, MAX_REST_RESPONSE, &in_data, &response);
int ret = store->rest_master_conn->forward(s->user.user_id, s->info, MAX_REST_RESPONSE, &in_data, &response);
if (ret < 0)
return ret;
@ -913,7 +913,7 @@ void RGWCreateBucket::execute()
}
if (!store->region.is_master) {
ret = forward_request(s, store, in_data, "object_ver", objv);
ret = forward_request_to_master(s, store, in_data, "object_ver", objv);
if (ret < 0)
return;
@ -992,7 +992,7 @@ void RGWDeleteBucket::execute()
if (!store->region.is_master) {
bufferlist in_data;
ret = forward_request(s, store, in_data, "object_ver", objv_tracker.read_version);
ret = forward_request_to_master(s, store, in_data, "object_ver", objv_tracker.read_version);
if (ret < 0) {
return;
}
@ -1558,6 +1558,7 @@ void RGWCopyObj::execute()
ret = store->copy_obj(s->obj_ctx,
s->user.user_id,
&s->info,
source_zone,
dst_obj,
src_obj,
dest_bucket_info,

View File

@ -418,6 +418,7 @@ protected:
bool replace_attrs;
RGWBucketInfo src_bucket_info;
RGWBucketInfo dest_bucket_info;
string source_zone;
int init_common();

View File

@ -751,7 +751,13 @@ void RGWRados::finalize()
delete gc;
gc = NULL;
}
delete rest_conn;
delete rest_master_conn;
map<string, RGWRESTConn *>::iterator iter;
for (iter = zone_conn_map.begin(); iter != zone_conn_map.end(); ++iter) {
RGWRESTConn *conn = iter->second;
delete conn;
}
}
/**
@ -810,7 +816,18 @@ int RGWRados::init_complete()
lderr(cct) << "ERROR: bad region map: inconsistent master region" << dendl;
return -EINVAL;
}
rest_conn = new RGWRegionConnection(cct, this, iter->second);
RGWRegion& region = iter->second;
rest_master_conn = new RGWRESTConn(cct, this, region.endpoints);
}
map<string, RGWZone>::iterator ziter;
for (ziter = region.zones.begin(); ziter != region.zones.end(); ++ziter) {
const string& name = ziter->first;
if (name != zone.name) {
RGWZone& z = ziter->second;
ldout(cct, 20) << "generating connection object for zone " << name << dendl;
zone_conn_map[name] = new RGWRESTConn(cct, this, z.endpoints);
}
}
ret = open_root_pool_ctx();
@ -1633,6 +1650,27 @@ int RGWRados::create_pool(rgw_bucket& bucket)
return 0;
}
int RGWRados::init_bucket_index(rgw_bucket& bucket)
{
librados::IoCtx index_ctx; // context for new bucket
int r = open_bucket_index_ctx(bucket, index_ctx);
if (r < 0)
return r;
string dir_oid = dir_oid_prefix;
dir_oid.append(bucket.marker);
librados::ObjectWriteOperation op;
op.create(true);
r = cls_rgw_init_index(index_ctx, op, dir_oid);
if (r < 0 && r != -EEXIST)
return r;
return 0;
}
/**
* create a bucket with name bucket and the given list of attrs
* returns 0 on success, -ERR# otherwise.
@ -1650,12 +1688,6 @@ int RGWRados::create_bucket(string& owner, rgw_bucket& bucket,
ret = select_bucket_placement(bucket.name, bucket);
if (ret < 0)
return ret;
librados::IoCtx index_ctx; // context for new bucket
int r = open_bucket_index_ctx(bucket, index_ctx);
if (r < 0)
return r;
bufferlist bl;
uint32_t nop = 0;
::encode(nop, bl);
@ -1663,7 +1695,7 @@ int RGWRados::create_bucket(string& owner, rgw_bucket& bucket,
const string& pool = zone.domain_root.name;
const char *pool_str = pool.c_str();
librados::IoCtx id_io_ctx;
r = rados->ioctx_create(pool_str, id_io_ctx);
int r = rados->ioctx_create(pool_str, id_io_ctx);
if (r < 0)
return r;
@ -1677,10 +1709,8 @@ int RGWRados::create_bucket(string& owner, rgw_bucket& bucket,
string dir_oid = dir_oid_prefix;
dir_oid.append(bucket.marker);
librados::ObjectWriteOperation op;
op.create(true);
r = cls_rgw_init_index(index_ctx, op, dir_oid);
if (r < 0 && r != -EEXIST)
r = init_bucket_index(bucket);
if (r < 0)
return r;
if (pobjv) {
@ -1695,9 +1725,14 @@ int RGWRados::create_bucket(string& owner, rgw_bucket& bucket,
info.region = region_name;
ret = put_bucket_info(bucket.name, info, exclusive, &objv_tracker, &attrs);
if (ret == -EEXIST) {
librados::IoCtx index_ctx; // context for new bucket
int r = open_bucket_index_ctx(bucket, index_ctx);
if (r < 0)
return r;
index_ctx.remove(dir_oid);
/* we need this for this objv_tracker */
int r = get_bucket_info(NULL, bucket.name, info, &objv_tracker, NULL);
r = get_bucket_info(NULL, bucket.name, info, &objv_tracker, NULL);
if (r < 0) {
if (r == -ENOENT) {
continue;
@ -2176,6 +2211,7 @@ static void set_copy_attrs(map<string, bufferlist>& src_attrs, map<string, buffe
int RGWRados::copy_obj(void *ctx,
const string& user_id,
req_info *info,
const string& source_zone,
rgw_obj& dest_obj,
rgw_obj& src_obj,
RGWBucketInfo& dest_bucket_info,
@ -2221,7 +2257,7 @@ int RGWRados::copy_obj(void *ctx,
map<string, bufferlist> src_attrs;
off_t ofs = 0;
off_t end = -1;
if (!remote_src) {
if (!remote_src && source_zone.empty()) {
ret = prepare_get_obj(ctx, src_obj, &ofs, &end, &src_attrs,
mod_ptr, unmod_ptr, &lastmod, if_match, if_nomatch, &total_len, &obj_size, NULL, &handle, err);
if (ret < 0)
@ -2240,15 +2276,27 @@ int RGWRados::copy_obj(void *ctx,
return ret;
RGWRadosPutObj cb(&processor);
RGWRESTConn *conn;
if (source_zone.empty()) {
conn = rest_master_conn;
} else {
map<string, RGWRESTConn *>::iterator iter = zone_conn_map.find(source_zone);
if (iter == zone_conn_map.end()) {
ldout(cct, 0) << "could not find zone connection to zone: " << source_zone << dendl;
return -ENOENT;
}
conn = iter->second;
}
int ret = rest_conn->get_obj(user_id, info, src_obj, true, &cb, &in_stream_req);
int ret = conn->get_obj(user_id, info, src_obj, true, &cb, &in_stream_req);
if (ret < 0)
return ret;
string etag;
map<string, string> req_headers;
ret = rest_conn->complete_request(in_stream_req, etag, mtime, req_headers);
ret = conn->complete_request(in_stream_req, etag, mtime, req_headers);
if (ret < 0)
return ret;
@ -2310,7 +2358,7 @@ int RGWRados::copy_obj(void *ctx,
RGWRESTStreamWriteRequest *out_stream_req;
int ret = rest_conn->put_obj_init(user_id, dest_obj, astate->size, src_attrs, &out_stream_req);
int ret = rest_master_conn->put_obj_init(user_id, dest_obj, astate->size, src_attrs, &out_stream_req);
if (ret < 0)
return ret;
@ -2320,7 +2368,7 @@ int RGWRados::copy_obj(void *ctx,
string etag;
ret = rest_conn->complete_request(out_stream_req, etag, mtime);
ret = rest_master_conn->complete_request(out_stream_req, etag, mtime);
if (ret < 0)
return ret;

View File

@ -650,7 +650,6 @@ class RGWRados
int complete_atomic_overwrite(RGWRadosCtx *rctx, RGWObjState *state, rgw_obj& obj);
int update_placement_map();
int select_bucket_placement(std::string& bucket_name, rgw_bucket& bucket);
int store_bucket_info(RGWBucketInfo& info, map<string, bufferlist> *pattrs, RGWObjVersionTracker *objv_tracker, bool exclusive);
protected:
@ -670,7 +669,7 @@ public:
bucket_id_lock("rados_bucket_id"), max_bucket_id(0),
cct(NULL), rados(NULL),
pools_initialized(false),
rest_conn(NULL),
rest_master_conn(NULL),
meta_mgr(NULL), data_log(NULL) {}
void set_context(CephContext *_cct) {
@ -688,7 +687,8 @@ public:
RGWRegion region;
RGWZoneParams zone;
RGWRegionMap region_map;
RGWRegionConnection *rest_conn;
RGWRESTConn *rest_master_conn;
map<string, RGWRESTConn *> zone_conn_map;
RGWMetadataManager *meta_mgr;
@ -772,6 +772,8 @@ public:
* create a bucket with name bucket and the given list of attrs
* returns 0 on success, -ERR# otherwise.
*/
virtual int init_bucket_index(rgw_bucket& bucket);
int select_bucket_placement(std::string& bucket_name, rgw_bucket& bucket);
virtual int create_bucket(string& owner, rgw_bucket& bucket,
const string& region_name,
map<std::string,bufferlist>& attrs,
@ -891,6 +893,7 @@ public:
virtual int copy_obj(void *ctx,
const string& user_id,
req_info *info,
const string& source_zone,
rgw_obj& dest_obj,
rgw_obj& src_obj,
RGWBucketInfo& dest_bucket_info,

View File

@ -3,18 +3,18 @@
#define dout_subsys ceph_subsys_rgw
RGWRegionConnection::RGWRegionConnection(CephContext *_cct, RGWRados *store, RGWRegion& upstream) : cct(_cct)
RGWRESTConn::RGWRESTConn(CephContext *_cct, RGWRados *store, list<string>& remote_endpoints) : cct(_cct)
{
list<string>::iterator iter;
int i;
for (i = 0, iter = upstream.endpoints.begin(); iter != upstream.endpoints.end(); ++iter, ++i) {
for (i = 0, iter = remote_endpoints.begin(); iter != remote_endpoints.end(); ++iter, ++i) {
endpoints[i] = *iter;
}
key = store->zone.system_key;
region = store->region.name;
}
int RGWRegionConnection::get_url(string& endpoint)
int RGWRESTConn::get_url(string& endpoint)
{
if (endpoints.empty()) {
ldout(cct, 0) << "ERROR: endpoints not configured for upstream zone" << dendl;
@ -27,7 +27,7 @@ int RGWRegionConnection::get_url(string& endpoint)
return 0;
}
int RGWRegionConnection::forward(const string& uid, req_info& info, size_t max_response, bufferlist *inbl, bufferlist *outbl)
int RGWRESTConn::forward(const string& uid, req_info& info, size_t max_response, bufferlist *inbl, bufferlist *outbl)
{
string url;
int ret = get_url(url);
@ -46,7 +46,7 @@ public:
StreamObjData(rgw_obj& _obj) : obj(_obj) {}
};
int RGWRegionConnection::put_obj_init(const string& uid, rgw_obj& obj, uint64_t obj_size,
int RGWRESTConn::put_obj_init(const string& uid, rgw_obj& obj, uint64_t obj_size,
map<string, bufferlist>& attrs, RGWRESTStreamWriteRequest **req)
{
string url;
@ -61,7 +61,7 @@ int RGWRegionConnection::put_obj_init(const string& uid, rgw_obj& obj, uint64_t
return (*req)->put_obj_init(key, obj, obj_size, attrs);
}
int RGWRegionConnection::complete_request(RGWRESTStreamWriteRequest *req, string& etag, time_t *mtime)
int RGWRESTConn::complete_request(RGWRESTStreamWriteRequest *req, string& etag, time_t *mtime)
{
int ret = req->complete(etag, mtime);
delete req;
@ -69,7 +69,7 @@ int RGWRegionConnection::complete_request(RGWRESTStreamWriteRequest *req, string
return ret;
}
int RGWRegionConnection::get_obj(const string& uid, req_info *info /* optional */, rgw_obj& obj, bool prepend_metadata,
int RGWRESTConn::get_obj(const string& uid, req_info *info /* optional */, rgw_obj& obj, bool prepend_metadata,
RGWGetDataCB *cb, RGWRESTStreamReadRequest **req)
{
string url;
@ -102,7 +102,7 @@ int RGWRegionConnection::get_obj(const string& uid, req_info *info /* optional *
return (*req)->get_obj(key, extra_headers, obj);
}
int RGWRegionConnection::complete_request(RGWRESTStreamReadRequest *req, string& etag, time_t *mtime,
int RGWRESTConn::complete_request(RGWRESTStreamReadRequest *req, string& etag, time_t *mtime,
map<string, string>& attrs)
{
int ret = req->complete(etag, mtime, attrs);

View File

@ -5,10 +5,9 @@
class CephContext;
class RGWRados;
class RGWRegion;
class RGWGetObjData;
class RGWRegionConnection
class RGWRESTConn
{
CephContext *cct;
map<int, string> endpoints;
@ -17,7 +16,7 @@ class RGWRegionConnection
atomic_t counter;
public:
RGWRegionConnection(CephContext *_cct, RGWRados *store, RGWRegion& upstream);
RGWRESTConn(CephContext *_cct, RGWRados *store, list<string>& endpoints);
int get_url(string& endpoint);
/* sync request */

View File

@ -1224,31 +1224,44 @@ int RGWCopyObj_ObjStore_S3::get_params()
if_nomatch = s->info.env->get("HTTP_X_AMZ_COPY_IF_NONE_MATCH");
const char *req_src = s->copy_source;
if (!req_src)
if (!req_src) {
ldout(s->cct, 0) << "copy source is NULL" << dendl;
return -EINVAL;
}
ret = parse_copy_location(req_src, src_bucket_name, src_object);
if (!ret)
return -EINVAL;
if (!ret) {
ldout(s->cct, 0) << "failed to parse copy location" << dendl;
return -EINVAL;
}
dest_bucket_name = s->bucket.name;
dest_object = s->object_str;
if (s->system_request) {
source_zone = s->info.args.get(RGW_SYS_PARAM_PREFIX "source-zone");
}
const char *md_directive = s->info.env->get("HTTP_X_AMZ_METADATA_DIRECTIVE");
if (md_directive) {
if (strcasecmp(md_directive, "COPY") == 0) {
replace_attrs = false;
} else if (strcasecmp(md_directive, "REPLACE") == 0) {
replace_attrs = true;
} else if (!source_zone.empty()) {
replace_attrs = false; // default for intra-region copy
} else {
ldout(s->cct, 0) << "invalid metadata directive" << dendl;
return -EINVAL;
}
}
if ((dest_bucket_name.compare(src_bucket_name) == 0) &&
if (source_zone.empty() &&
(dest_bucket_name.compare(src_bucket_name) == 0) &&
(dest_object.compare(src_object) == 0) &&
!replace_attrs) {
/* can only copy object into itself if replacing attrs */
ldout(s->cct, 0) << "can't copy object into itself if not replacing attrs" << dendl;
return -ERR_INVALID_REQUEST;
}
return 0;