rgw: stream get_obj operation

Fixes: #2941
Instead of iterating through the parts one by one when reading
an object, we can now send multiple requests in parallel. Two new
configurables added to control the max request size, and the total
size of pending requests.

Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
This commit is contained in:
Yehuda Sadeh 2012-12-14 17:29:37 -08:00
parent 3383618da8
commit 278dfe50fd
11 changed files with 496 additions and 63 deletions

View File

@ -25,10 +25,11 @@ enum {
l_throttle_last,
};
Throttle::Throttle(CephContext *cct, std::string n, int64_t m, bool use_perf)
Throttle::Throttle(CephContext *cct, std::string n, int64_t m, bool _use_perf)
: cct(cct), name(n), logger(NULL),
max(m),
lock("Throttle::lock")
lock("Throttle::lock"),
use_perf(_use_perf)
{
assert(m >= 0);

View File

@ -19,9 +19,10 @@ class Throttle {
ceph::atomic_t count, max;
Mutex lock;
list<Cond*> cond;
bool use_perf;
public:
Throttle(CephContext *cct, std::string n, int64_t m = 0, bool use_perf = true);
Throttle(CephContext *cct, std::string n, int64_t m = 0, bool _use_perf = true);
~Throttle();
private:

View File

@ -507,6 +507,8 @@ OPTION(rgw_resolve_cname, OPT_BOOL, false) // should rgw try to resolve hostnam
OPTION(rgw_obj_stripe_size, OPT_INT, 4 << 20)
OPTION(rgw_extended_http_attrs, OPT_STR, "") // list of extended attrs that can be set on objects (beyond the default)
OPTION(rgw_exit_timeout_secs, OPT_INT, 120) // how many seconds to wait for process to go down before exiting unconditionally
OPTION(rgw_get_obj_window_size, OPT_INT, 16 << 20) // window size in bytes for single get obj request
OPTION(rgw_get_obj_max_req_size, OPT_INT, 4 << 20) // max length of a single get obj rados op
OPTION(mutex_perf_counter, OPT_BOOL, false) // enable/disable mutex perf counter

View File

@ -386,13 +386,13 @@ int RGWGetObj::read_user_manifest_part(rgw_bucket& bucket, RGWObjEnt& ent, RGWAc
if (ret < 0)
goto done_err;
len = bl.length();
off_t len = bl.length();
cur_ofs += len;
ofs += len;
ret = 0;
perfcounter->tinc(l_rgw_get_lat,
(ceph_clock_now(s->cct) - start_time));
send_response_data(bl);
send_response_data(bl, 0, len);
start_time = ceph_clock_now(s->cct);
}
@ -526,14 +526,43 @@ int RGWGetObj::handle_user_manifest(const char *prefix)
return 0;
}
class RGWGetObj_CB : public RGWGetDataCB
{
RGWGetObj *op;
public:
RGWGetObj_CB(RGWGetObj *_op) : op(_op) {}
virtual ~RGWGetObj_CB() {}
int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) {
return op->get_data_cb(bl, bl_ofs, bl_len);
}
};
int RGWGetObj::get_data_cb(bufferlist& bl, off_t bl_ofs, off_t bl_len)
{
/* garbage collection related handling */
utime_t start_time = ceph_clock_now(s->cct);
if (start_time > gc_invalidate_time) {
int r = store->defer_gc(s->obj_ctx, obj);
if (r < 0) {
dout(0) << "WARNING: could not defer gc entry for obj" << dendl;
}
gc_invalidate_time = start_time;
gc_invalidate_time += (s->cct->_conf->rgw_gc_obj_min_wait / 2);
}
return send_response_data(bl, bl_ofs, bl_len);
}
void RGWGetObj::execute()
{
void *handle = NULL;
utime_t start_time = s->time;
bufferlist bl;
utime_t gc_invalidate_time = ceph_clock_now(s->cct);
gc_invalidate_time = ceph_clock_now(s->cct);
gc_invalidate_time += (s->cct->_conf->rgw_gc_obj_min_wait / 2);
RGWGetObj_CB cb(this);
map<string, bufferlist>::iterator attr_iter;
perfcounter->inc(l_rgw_get);
@ -541,11 +570,11 @@ void RGWGetObj::execute()
ret = get_params();
if (ret < 0)
goto done;
goto done_err;
ret = init_common();
if (ret < 0)
goto done;
goto done_err;
new_ofs = ofs;
new_end = end;
@ -553,7 +582,7 @@ void RGWGetObj::execute()
ret = store->prepare_get_obj(s->obj_ctx, obj, &new_ofs, &new_end, &attrs, mod_ptr,
unmod_ptr, &lastmod, if_match, if_nomatch, &total_len, &s->obj_size, &handle, &s->err);
if (ret < 0)
goto done;
goto done_err;
attr_iter = attrs.find(RGW_ATTR_USER_MANIFEST);
if (attr_iter != attrs.end()) {
@ -570,53 +599,22 @@ void RGWGetObj::execute()
start = ofs;
if (!get_data || ofs > end)
goto done;
goto done_err;
perfcounter->inc(l_rgw_get_b, end - ofs);
while (ofs <= end) {
ret = store->get_obj(s->obj_ctx, &handle, obj, bl, ofs, end);
if (ret < 0) {
goto done;
}
len = ret;
ret = store->get_obj_iterate(s->obj_ctx, &handle, obj, ofs, end, &cb);
if (!len) {
dout(0) << "WARNING: failed to read object, returned zero length" << dendl;
ret = -EIO;
goto done;
}
ofs += len;
ret = 0;
perfcounter->tinc(l_rgw_get_lat,
(ceph_clock_now(s->cct) - start_time));
ret = send_response_data(bl);
bl.clear();
if (ret < 0) {
dout(0) << "NOTICE: failed to send response to client" << dendl;
goto done;
}
start_time = ceph_clock_now(s->cct);
if (ofs <= end) {
if (start_time > gc_invalidate_time) {
int r = store->defer_gc(s->obj_ctx, obj);
if (r < 0) {
dout(0) << "WARNING: could not defer gc entry for obj" << dendl;
}
gc_invalidate_time = start_time;
gc_invalidate_time += (s->cct->_conf->rgw_gc_obj_min_wait / 2);
}
}
perfcounter->tinc(l_rgw_get_lat,
(ceph_clock_now(s->cct) - start_time));
if (ret < 0) {
goto done_err;
}
return;
store->finish_get_obj(&handle);
done:
send_response_data(bl);
done_err:
send_response_data(bl, 0, 0);
store->finish_get_obj(&handle);
}

View File

@ -62,7 +62,6 @@ protected:
const char *if_match;
const char *if_nomatch;
off_t ofs;
uint64_t len;
uint64_t total_len;
off_t start;
off_t end;
@ -76,6 +75,7 @@ protected:
bool get_data;
bool partial_content;
rgw_obj obj;
utime_t gc_invalidate_time;
int init_common();
public:
@ -87,7 +87,6 @@ public:
if_nomatch = NULL;
start = 0;
ofs = 0;
len = 0;
total_len = 0;
end = -1;
mod_time = 0;
@ -112,8 +111,10 @@ public:
uint64_t *ptotal_len, bool read_data);
int handle_user_manifest(const char *prefix);
int get_data_cb(bufferlist& bl, off_t ofs, off_t len);
virtual int get_params() = 0;
virtual int send_response_data(bufferlist& bl) = 0;
virtual int send_response_data(bufferlist& bl, off_t ofs, off_t len) = 0;
virtual const char *name() { return "get_obj"; }
};

View File

@ -4,6 +4,7 @@
#include "common/errno.h"
#include "common/Formatter.h"
#include "common/Throttle.h"
#include "rgw_rados.h"
#include "rgw_cache.h"
@ -2338,8 +2339,7 @@ int RGWRados::prepare_get_obj(void *ctx, rgw_obj& obj,
done_err:
delete new_ctx;
delete state;
*handle = NULL;
finish_get_obj(handle);
return r;
}
@ -2651,8 +2651,7 @@ done:
r = bl.length();
}
if (r < 0 || !len || ((off_t)(ofs + len - 1) == end)) {
delete state;
*handle = NULL;
finish_get_obj(handle);
}
done_ret:
@ -2661,6 +2660,332 @@ done_ret:
return r;
}
struct get_obj_data;
struct get_obj_aio_data {
struct get_obj_data *op_data;
off_t ofs;
off_t len;
};
struct get_obj_io {
off_t len;
bufferlist bl;
};
static void _get_obj_aio_completion_cb(completion_t cb, void *arg);
struct get_obj_data : public RefCountedObject {
CephContext *cct;
RGWRados *rados;
void *ctx;
IoCtx io_ctx;
map<off_t, get_obj_io> io_map;
map<off_t, librados::AioCompletion *> completion_map;
uint64_t total_read;
Mutex lock;
Mutex data_lock;
list<get_obj_aio_data> aio_data;
RGWGetDataCB *client_cb;
atomic_t cancelled;
atomic_t err_code;
Throttle throttle;
get_obj_data(CephContext *_cct) : cct(_cct),
total_read(0), lock("get_obj_data"), data_lock("get_obj_data::data_lock"),
throttle(cct, "get_obj_data", cct->_conf->rgw_get_obj_window_size, false) {}
virtual ~get_obj_data() { }
void set_cancelled(int r) {
cancelled.set(1);
err_code.set(r);
}
bool is_cancelled() {
return cancelled.read() == 1;
}
int get_err_code() {
return err_code.read();
}
int wait_next_io(bool *done) {
lock.Lock();
map<off_t, librados::AioCompletion *>::iterator iter = completion_map.begin();
if (iter == completion_map.end()) {
*done = true;
lock.Unlock();
return 0;
}
off_t cur_ofs = iter->first;
librados::AioCompletion *c = iter->second;
lock.Unlock();
c->wait_for_complete_and_cb();
int r = c->get_return_value();
c->release();
lock.Lock();
completion_map.erase(cur_ofs);
if (completion_map.empty()) {
*done = true;
}
lock.Unlock();
return r;
}
void add_io(off_t ofs, off_t len, bufferlist **pbl, AioCompletion **pc) {
Mutex::Locker l(lock);
get_obj_io& io = io_map[ofs];
*pbl = &io.bl;
struct get_obj_aio_data aio;
aio.ofs = ofs;
aio.len = len;
aio.op_data = this;
aio_data.push_back(aio);
struct get_obj_aio_data *paio_data = &aio_data.back(); /* last element */
librados::AioCompletion *c = librados::Rados::aio_create_completion((void *)paio_data, _get_obj_aio_completion_cb, NULL);
completion_map[ofs] = c;
*pc = c;
/* we have a reference per IO, plus one reference for the calling function.
* reference is dropped for each callback, plus when we're done iterating
* over the parts */
get();
}
void cancel_io(off_t ofs) {
ldout(cct, 20) << "get_obj_data::cancel_io() ofs=" << ofs << dendl;
lock.Lock();
map<off_t, AioCompletion *>::iterator iter = completion_map.find(ofs);
if (iter != completion_map.end()) {
AioCompletion *c = iter->second;
c->release();
completion_map.erase(ofs);
io_map.erase(ofs);
}
lock.Unlock();
/* we don't drop a reference here -- e.g., not calling d->put(), because we still
* need IoCtx to live, as io callback may still be called
*/
}
void cancel_all_io() {
ldout(cct, 20) << "get_obj_data::cancel_all_io()" << dendl;
Mutex::Locker l(lock);
for (map<off_t, librados::AioCompletion *>::iterator iter = completion_map.begin();
iter != completion_map.end(); ++iter) {
librados::AioCompletion *c = iter->second;
c->release();
}
}
int get_complete_ios(off_t ofs, list<bufferlist>& bl_list) {
Mutex::Locker l(lock);
map<off_t, get_obj_io>::iterator liter = io_map.begin();
if (liter == io_map.end() ||
liter->first != ofs) {
return 0;
}
map<off_t, librados::AioCompletion *>::iterator aiter;
aiter = completion_map.find(ofs);
if (aiter == completion_map.end()) {
/* completion map does not hold this io, it was cancelled */
return 0;
}
AioCompletion *completion = aiter->second;
int r = completion->get_return_value();
if (r < 0)
return r;
for (; aiter != completion_map.end(); aiter++) {
completion = aiter->second;
if (!completion->is_complete()) {
/* reached a request that is not yet complete, stop */
break;
}
r = completion->get_return_value();
if (r < 0) {
set_cancelled(r); /* mark it as cancelled, so that we don't continue processing next operations */
return r;
}
total_read += r;
map<off_t, get_obj_io>::iterator old_liter = liter++;
bl_list.push_back(old_liter->second.bl);
io_map.erase(old_liter);
}
return 0;
}
};
static int _get_obj_iterate_cb(rgw_obj& obj, off_t obj_ofs, off_t read_ofs, off_t len, bool is_head_obj, RGWObjState *astate, void *arg)
{
struct get_obj_data *d = (struct get_obj_data *)arg;
return d->rados->get_obj_iterate_cb(d->ctx, astate, obj, obj_ofs, read_ofs, len, is_head_obj, arg);
}
static void _get_obj_aio_completion_cb(completion_t cb, void *arg)
{
struct get_obj_aio_data *aio_data = (struct get_obj_aio_data *)arg;
struct get_obj_data *d = aio_data->op_data;
d->rados->get_obj_aio_completion_cb(cb, arg);
}
void RGWRados::get_obj_aio_completion_cb(completion_t c, void *arg)
{
struct get_obj_aio_data *aio_data = (struct get_obj_aio_data *)arg;
struct get_obj_data *d = aio_data->op_data;
off_t ofs = aio_data->ofs;
off_t len = aio_data->len;
list<bufferlist> bl_list;
list<bufferlist>::iterator iter;
int r;
ldout(cct, 20) << "get_obj_aio_completion_cb: io completion ofs=" << ofs << " len=" << len << dendl;
d->throttle.put(len);
if (d->is_cancelled())
goto done;
d->data_lock.Lock();
r = d->get_complete_ios(ofs, bl_list);
if (r < 0) {
goto done_unlock;
}
for (iter = bl_list.begin(); iter != bl_list.end(); ++iter) {
bufferlist& bl = *iter;
d->client_cb->handle_data(bl, 0, bl.length());
}
done_unlock:
d->data_lock.Unlock();
done:
d->put();
return;
}
int RGWRados::get_obj_iterate_cb(void *ctx, RGWObjState *astate,
rgw_obj& obj,
off_t obj_ofs,
off_t read_ofs, off_t len,
bool is_head_obj, void *arg)
{
RGWRadosCtx *rctx = (RGWRadosCtx *)ctx;
ObjectReadOperation op;
struct get_obj_data *d = (struct get_obj_data *)arg;
if (is_head_obj) {
/* only when reading from the head object do we need to do the atomic test */
int r = append_atomic_test(rctx, obj, op, &astate);
if (r < 0)
return r;
if (astate &&
obj_ofs < astate->data.length()) {
unsigned chunk_len = min((uint64_t)astate->data.length() - obj_ofs, (uint64_t)len);
d->data_lock.Lock();
d->client_cb->handle_data(astate->data, obj_ofs, chunk_len);
d->data_lock.Unlock();
d->lock.Lock();
d->total_read += chunk_len;
d->lock.Unlock();
len -= chunk_len;
read_ofs += chunk_len;
obj_ofs += chunk_len;
if (!len)
return 0;
}
}
string oid, key;
rgw_bucket bucket;
get_obj_bucket_and_oid_key(obj, bucket, oid, key);
bufferlist *pbl;
AioCompletion *c;
d->add_io(obj_ofs, len, &pbl, &c);
d->throttle.get(len);
if (d->is_cancelled()) {
return d->get_err_code();
}
ldout(cct, 20) << "rados->get_obj_iterate_cb oid=" << oid << " obj-ofs=" << obj_ofs << " read_ofs=" << read_ofs << " len=" << len << dendl;
op.read(read_ofs, len, pbl, NULL);
librados::IoCtx io_ctx(d->io_ctx);
io_ctx.locator_set_key(key);
int r = io_ctx.aio_operate(oid, c, &op, NULL);
ldout(cct, 20) << "rados->aio_operate r=" << r << " bl.length=" << pbl->length() << dendl;
if (r < 0) {
d->set_cancelled(r);
d->cancel_io(obj_ofs);
}
return r;
}
int RGWRados::get_obj_iterate(void *ctx, void **handle, rgw_obj& obj,
off_t ofs, off_t end,
RGWGetDataCB *cb)
{
struct get_obj_data *data = new get_obj_data(cct);
bool done = false;
GetObjState *state = *(GetObjState **)handle;
data->rados = this;
data->ctx = ctx;
data->io_ctx.dup(state->io_ctx);
data->client_cb = cb;
int r = iterate_obj(ctx, obj, ofs, end, cct->_conf->rgw_get_obj_max_req_size, _get_obj_iterate_cb, (void *)data);
if (r < 0) {
goto done;
}
while (!done) {
r = data->wait_next_io(&done);
if (r < 0) {
dout(10) << "get_obj_iterate() r=" << r << ", canceling all io" << dendl;
data->cancel_all_io();
break;
}
}
done:
data->put();
return r;
}
void RGWRados::finish_get_obj(void **handle)
{
if (*handle) {
@ -2670,6 +2995,87 @@ void RGWRados::finish_get_obj(void **handle)
}
}
int RGWRados::iterate_obj(void *ctx, rgw_obj& obj,
off_t ofs, off_t end,
uint64_t max_chunk_size,
int (*iterate_obj_cb)(rgw_obj&, off_t, off_t, off_t, bool, RGWObjState *, void *),
void *arg)
{
rgw_bucket bucket;
rgw_obj read_obj = obj;
uint64_t read_ofs = ofs;
uint64_t len;
RGWRadosCtx *rctx = (RGWRadosCtx *)ctx;
RGWRadosCtx *new_ctx = NULL;
bool reading_from_head = true;
RGWObjState *astate = NULL;
if (!rctx) {
new_ctx = new RGWRadosCtx(this);
rctx = new_ctx;
}
int r = get_obj_state(rctx, obj, &astate);
if (r < 0)
goto done_err;
if (end < 0)
len = 0;
else
len = end - ofs + 1;
if (astate->has_manifest) {
/* now get the relevant object part */
map<uint64_t, RGWObjManifestPart>::iterator iter = astate->manifest.objs.upper_bound(ofs);
/* we're now pointing at the next part (unless the first part starts at a higher ofs),
so retract to previous part */
if (iter != astate->manifest.objs.begin()) {
--iter;
}
for (; iter != astate->manifest.objs.end() && ofs <= end; ++iter) {
RGWObjManifestPart& part = iter->second;
off_t part_ofs = iter->first;
off_t next_part_ofs = part_ofs + part.size;
while (ofs < next_part_ofs && ofs <= end) {
read_obj = part.loc;
uint64_t read_len = min(len, part.size - (ofs - part_ofs));
read_ofs = part.loc_ofs + (ofs - part_ofs);
if (read_len > max_chunk_size) {
read_len = max_chunk_size;
}
reading_from_head = (read_obj == obj);
r = iterate_obj_cb(read_obj, ofs, read_ofs, read_len, reading_from_head, astate, arg);
if (r < 0)
goto done_err;
len -= read_len;
ofs += read_len;
}
}
} else {
while (ofs <= end) {
uint64_t read_len = min(len, max_chunk_size);
r = iterate_obj_cb(obj, ofs, ofs, read_len, reading_from_head, astate, arg);
if (r < 0)
goto done_err;
len -= read_len;
ofs += read_len;
}
}
return 0;
done_err:
delete new_ctx;
return r;
}
/* a simple object read */
int RGWRados::read(void *ctx, rgw_obj& obj, off_t ofs, size_t size, bufferlist& bl)
{

View File

@ -3,6 +3,7 @@
#include "include/rados/librados.hpp"
#include "include/Context.h"
#include "common/RefCountedObj.h"
#include "rgw_common.h"
#include "cls/rgw/cls_rgw_types.h"
#include "rgw_log.h"
@ -55,6 +56,12 @@ struct RGWUsageIter {
RGWUsageIter() : index(0) {}
};
class RGWGetDataCB {
public:
virtual int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) = 0;
virtual ~RGWGetDataCB() {}
};
class RGWAccessListFilter {
public:
virtual ~RGWAccessListFilter() {}
@ -625,7 +632,24 @@ public:
virtual void finish_get_obj(void **handle);
/**
int iterate_obj(void *ctx, rgw_obj& obj,
off_t ofs, off_t end,
uint64_t max_chunk_size,
int (*iterate_obj_cb)(rgw_obj&, off_t, off_t, off_t, bool, RGWObjState *, void *),
void *arg);
int get_obj_iterate(void *ctx, void **handle, rgw_obj& obj,
off_t ofs, off_t end,
RGWGetDataCB *cb);
int get_obj_iterate_cb(void *ctx, RGWObjState *astate,
rgw_obj& obj,
off_t obj_ofs, off_t read_ofs, off_t len,
bool is_head_obj, void *arg);
void get_obj_aio_completion_cb(librados::completion_t cb, void *arg);
/**
* a simple object read without keeping state
*/
virtual int read(void *ctx, rgw_obj& obj, off_t ofs, size_t size, bufferlist& bl);

View File

@ -67,7 +67,7 @@ static struct response_attr_param resp_attr_params[] = {
{NULL, NULL},
};
int RGWGetObj_ObjStore_S3::send_response_data(bufferlist& bl)
int RGWGetObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t bl_ofs, off_t bl_len)
{
const char *content_type = NULL;
string content_type_str;
@ -149,7 +149,7 @@ done:
send_data:
if (get_data && !orig_ret) {
int r = s->cio->write(bl.c_str(), len);
int r = s->cio->write(bl.c_str() + bl_ofs, bl_len);
if (r < 0)
return r;
}

View File

@ -17,7 +17,7 @@ public:
RGWGetObj_ObjStore_S3() {}
~RGWGetObj_ObjStore_S3() {}
int send_response_data(bufferlist& bl);
int send_response_data(bufferlist& bl, off_t ofs, off_t len);
};
class RGWListBuckets_ObjStore_S3 : public RGWListBuckets_ObjStore {

View File

@ -432,7 +432,7 @@ void RGWCopyObj_ObjStore_SWIFT::send_response()
end_header(s);
}
int RGWGetObj_ObjStore_SWIFT::send_response_data(bufferlist& bl)
int RGWGetObj_ObjStore_SWIFT::send_response_data(bufferlist& bl, off_t bl_ofs, off_t bl_len)
{
const char *content_type = NULL;
int orig_ret = ret;
@ -495,7 +495,7 @@ int RGWGetObj_ObjStore_SWIFT::send_response_data(bufferlist& bl)
send_data:
if (get_data && !orig_ret) {
int r = s->cio->write(bl.c_str(), len);
int r = s->cio->write(bl.c_str() + bl_ofs, bl_len);
if (r < 0)
return r;
}

View File

@ -10,7 +10,7 @@ public:
RGWGetObj_ObjStore_SWIFT() {}
~RGWGetObj_ObjStore_SWIFT() {}
int send_response_data(bufferlist& bl);
int send_response_data(bufferlist& bl, off_t ofs, off_t len);
};
class RGWListBuckets_ObjStore_SWIFT : public RGWListBuckets_ObjStore {