ReplicatedPG: support async reads on ec pools

Signed-off-by: Samuel Just <sam.just@inktank.com>
This commit is contained in:
Samuel Just 2013-12-06 13:54:04 -08:00
parent 647e75ae78
commit 0bba79b722
5 changed files with 157 additions and 60 deletions

View File

@ -425,9 +425,8 @@
virtual void objects_read_async(
const hobject_t &hoid,
uint64_t off,
uint64_t len,
bufferlist *bl,
const list<pair<pair<uint64_t, uint64_t>,
pair<bufferlist*, Context*> > > &to_read,
Context *on_complete) = 0;
};

View File

@ -347,12 +347,26 @@ struct AsyncReadCallback : public GenContext<ThreadPool::TPHandle&> {
};
void ReplicatedBackend::objects_read_async(
const hobject_t &hoid,
uint64_t off,
uint64_t len,
bufferlist *bl,
const list<pair<pair<uint64_t, uint64_t>,
pair<bufferlist*, Context*> > > &to_read,
Context *on_complete)
{
int r = osd->store->read(coll, hoid, off, len, *bl);
int r = 0;
for (list<pair<pair<uint64_t, uint64_t>,
pair<bufferlist*, Context*> > >::const_iterator i =
to_read.begin();
i != to_read.end() && r >= 0;
++i) {
int _r = osd->store->read(coll, hoid, i->first.first,
i->first.second, *(i->second.first));
if (i->second.second) {
osd->gen_wq.queue(
get_parent()->bless_gencontext(
new AsyncReadCallback(_r, i->second.second)));
}
if (_r < 0)
r = _r;
}
osd->gen_wq.queue(
get_parent()->bless_gencontext(
new AsyncReadCallback(r, on_complete)));

View File

@ -184,9 +184,8 @@ public:
void objects_read_async(
const hobject_t &hoid,
uint64_t off,
uint64_t len,
bufferlist *bl,
const list<pair<pair<uint64_t, uint64_t>,
pair<bufferlist*, Context*> > > &to_read,
Context *on_complete);
private:

View File

@ -101,6 +101,41 @@ static void log_subop_stats(
osd->logger->tinc(tag_lat, latency);
}
struct OnReadComplete : public Context {
ReplicatedPG *pg;
ReplicatedPG::OpContext *opcontext;
OnReadComplete(
ReplicatedPG *pg,
ReplicatedPG::OpContext *ctx) : pg(pg), opcontext(ctx) {}
void finish(int r) {
if (r < 0)
opcontext->async_read_result = r;
opcontext->finish_read(pg);
}
~OnReadComplete() {}
};
// OpContext
void ReplicatedPG::OpContext::start_async_reads(ReplicatedPG *pg)
{
inflightreads = 1;
pg->pgbackend->objects_read_async(
obc->obs.oi.soid,
pending_async_reads,
new OnReadComplete(pg, this));
pending_async_reads.clear();
}
void ReplicatedPG::OpContext::finish_read(ReplicatedPG *pg)
{
assert(inflightreads > 0);
--inflightreads;
if (async_reads_complete()) {
set<OpContext*>::iterator iter = pg->in_progress_async_reads.find(this);
assert(iter != pg->in_progress_async_reads.end());
pg->in_progress_async_reads.erase(iter);
pg->complete_read_ctx(async_read_result, this);
}
}
class CopyFromCallback: public ReplicatedPG::CopyCallback {
public:
@ -146,8 +181,6 @@ public:
}
};
// ======================
// PGBackend::Listener
@ -1621,11 +1654,7 @@ void ReplicatedPG::execute_ctx(OpContext *ctx)
// possible to construct an operation that does a read, does a guard
// check (e.g., CMPXATTR), and then a write. Then we either succeed
// with the write, or return a CMPXATTR and the read value.
if ((ctx->op_t->empty() && !ctx->modify) || result < 0) {
// read.
ctx->reply->claim_op_out_data(ctx->ops);
ctx->reply->get_header().data_off = ctx->data_off;
} else {
if (!((ctx->op_t->empty() && !ctx->modify) || result < 0)) {
// write. normalize the result code.
if (result > 0) {
dout(20) << " zeroing write result code " << result << dendl;
@ -1636,23 +1665,12 @@ void ReplicatedPG::execute_ctx(OpContext *ctx)
// read or error?
if (ctx->op_t->empty() || result < 0) {
MOSDOpReply *reply = ctx->reply;
ctx->reply = NULL;
if (result >= 0) {
log_op_stats(ctx);
publish_stats_to_osd();
// on read, return the current object version
reply->set_reply_versions(eversion_t(), ctx->obs->oi.user_version);
} else if (result == -ENOENT) {
// on ENOENT, set a floor for what the next user version will be.
reply->set_enoent_reply_versions(info.last_update, info.last_user_version);
if (ctx->pending_async_reads.empty()) {
complete_read_ctx(result, ctx);
} else {
in_progress_async_reads.insert(ctx);
ctx->start_async_reads(this);
}
reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
osd->send_message_osd_client(reply, m->get_connection());
close_op_ctx(ctx);
return;
}
@ -2718,6 +2736,16 @@ static int check_offset_and_length(uint64_t offset, uint64_t length, uint64_t ma
return 0;
}
struct FillInExtent : public Context {
ceph_le64 *r;
FillInExtent(ceph_le64 *r) : r(r) {}
void finish(int _r) {
if (_r >= 0) {
*r = _r;
}
}
};
int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
{
int result = 0;
@ -2802,25 +2830,6 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
case CEPH_OSD_OP_READ:
++ctx->num_read;
{
// read into a buffer
bufferlist bl;
int r = pgbackend->objects_read_sync(
soid, op.extent.offset, op.extent.length, &bl);
if (first_read) {
first_read = false;
ctx->data_off = op.extent.offset;
}
osd_op.outdata.claim_append(bl);
if (r >= 0)
op.extent.length = r;
else {
result = r;
op.extent.length = 0;
}
ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(op.extent.length, 10);
ctx->delta_stats.num_rd++;
dout(10) << " read got " << r << " / " << op.extent.length << " bytes from obj " << soid << dendl;
__u32 seq = oi.truncate_seq;
// are we beyond truncate_size?
if ( (seq < op.extent.truncate_seq) &&
@ -2832,15 +2841,35 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
unsigned trim = to-from;
op.extent.length = op.extent.length - trim;
bufferlist keep;
// keep first part of osd_op.outdata; trim at truncation point
dout(10) << " obj " << soid << " seq " << seq
<< ": trimming overlap " << from << "~" << trim << dendl;
keep.substr_of(osd_op.outdata, 0, osd_op.outdata.length() - trim);
osd_op.outdata.claim(keep);
}
// read into a buffer
bufferlist bl;
if (pool.info.ec_pool()) {
ctx->pending_async_reads.push_back(
make_pair(
make_pair(op.extent.offset, op.extent.length),
make_pair(&osd_op.outdata, new FillInExtent(&op.extent.length))));
dout(10) << " async_read noted for " << soid << dendl;
} else {
int r = pgbackend->objects_read_sync(
soid, op.extent.offset, op.extent.length, &osd_op.outdata);
if (r >= 0)
op.extent.length = r;
else {
result = r;
op.extent.length = 0;
}
dout(10) << " read got " << r << " / " << op.extent.length
<< " bytes from obj " << soid << dendl;
}
if (first_read) {
first_read = false;
ctx->data_off = op.extent.offset;
}
ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(op.extent.length, 10);
ctx->delta_stats.num_rd++;
}
break;
@ -4971,6 +5000,32 @@ void ReplicatedPG::finish_ctx(OpContext *ctx, int log_op_type)
}
}
void ReplicatedPG::complete_read_ctx(int result, OpContext *ctx)
{
MOSDOp *m = static_cast<MOSDOp*>(ctx->op->get_req());
assert(ctx->async_reads_complete());
ctx->reply->claim_op_out_data(ctx->ops);
ctx->reply->get_header().data_off = ctx->data_off;
MOSDOpReply *reply = ctx->reply;
ctx->reply = NULL;
if (result >= 0) {
log_op_stats(ctx);
publish_stats_to_osd();
// on read, return the current object version
reply->set_reply_versions(eversion_t(), ctx->obs->oi.user_version);
} else if (result == -ENOENT) {
// on ENOENT, set a floor for what the next user version will be.
reply->set_enoent_reply_versions(info.last_update, info.last_user_version);
}
reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
osd->send_message_osd_client(reply, m->get_connection());
close_op_ctx(ctx);
}
// ========================================================================
// copyfrom
@ -8600,6 +8655,12 @@ void ReplicatedPG::on_change(ObjectStore::Transaction *t)
context_registry_on_change();
for (set<OpContext*>::iterator i = in_progress_async_reads.begin();
i != in_progress_async_reads.end();
in_progress_async_reads.erase(i++)) {
close_op_ctx(*i);
}
cancel_copy_ops(is_primary());
cancel_flush_ops(is_primary());

View File

@ -449,6 +449,18 @@ public:
pending_attrs.clear();
}
// pending async reads <off, len> -> <outbl, outr>
list<pair<pair<uint64_t, uint64_t>,
pair<bufferlist*, Context*> > > pending_async_reads;
int async_read_result;
unsigned inflightreads;
friend struct OnReadComplete;
void start_async_reads(ReplicatedPG *pg);
void finish_read(ReplicatedPG *pg);
bool async_reads_complete() {
return inflightreads == 0;
}
ObjectModDesc mod_desc;
enum { W_LOCK, R_LOCK, NONE } lock_to_release;
@ -469,6 +481,8 @@ public:
num_read(0),
num_write(0),
copy_cb(NULL),
async_read_result(0),
inflightreads(0),
lock_to_release(NONE) {
if (_ssc) {
new_snapset = _ssc->snapset;
@ -487,8 +501,16 @@ public:
assert(lock_to_release == NONE);
if (reply)
reply->put();
for (list<pair<pair<uint64_t, uint64_t>,
pair<bufferlist*, Context*> > >::iterator i =
pending_async_reads.begin();
i != pending_async_reads.end();
pending_async_reads.erase(i++)) {
delete i->second.second;
}
}
};
friend class OpContext;
/*
* State on the PG primary associated with the replicated mutation
@ -869,6 +891,8 @@ protected:
bool can_skip_promote(OpRequestRef op, ObjectContextRef obc);
int prepare_transaction(OpContext *ctx);
set<OpContext*> in_progress_async_reads;
void complete_read_ctx(int result, OpContext *ctx);
// pg on-disk content
void check_local();