osd/ReplicatedPG: add on_finish to OpContext

Add a callback hook for whenever an OpContext completes or cancels.  We
are pretty sloppy here about the return values because our initial user
will not care, and it is unclear if future users will.

Signed-off-by: Sage Weil <sage@inktank.com>
This commit is contained in:
Sage Weil 2014-02-03 22:10:07 -08:00
parent a57052cb7b
commit fc28a99f55
2 changed files with 36 additions and 18 deletions

View File

@ -166,7 +166,7 @@ public:
} else if (results->should_requeue) {
ctx->pg->requeue_op(ctx->op);
}
ctx->pg->close_op_ctx(ctx);
ctx->pg->close_op_ctx(ctx, r);
}
}
@ -1348,14 +1348,14 @@ void ReplicatedPG::do_op(OpRequestRef op)
map<hobject_t,FlushOpRef>::iterator p = flush_ops.find(obc->obs.oi.soid);
if (p == flush_ops.end()) {
dout(10) << __func__ << " no flush in progress, aborting" << dendl;
close_op_ctx(ctx);
close_op_ctx(ctx, -EINVAL);
osd->reply_op_error(op, -EINVAL);
return;
}
} else if (!get_rw_locks(ctx)) {
dout(20) << __func__ << " waiting for rw locks " << dendl;
op->mark_delayed("waiting for rw locks");
close_op_ctx(ctx);
close_op_ctx(ctx, -EBUSY);
return;
}
@ -1363,13 +1363,13 @@ void ReplicatedPG::do_op(OpRequestRef op)
// This object is lost. Reading from it returns an error.
dout(20) << __func__ << ": object " << obc->obs.oi.soid
<< " is lost" << dendl;
close_op_ctx(ctx);
close_op_ctx(ctx, -ENFILE);
osd->reply_op_error(op, -ENFILE);
return;
}
if (!op->may_write() && !op->may_cache() && (!obc->obs.exists ||
obc->obs.oi.is_whiteout())) {
close_op_ctx(ctx);
close_op_ctx(ctx, -ENOENT);
osd->reply_op_error(op, -ENOENT);
return;
}
@ -1544,7 +1544,7 @@ void ReplicatedPG::execute_ctx(OpContext *ctx)
if (already_complete(oldv)) {
reply_ctx(ctx, 0, oldv, entry->user_version);
} else {
close_op_ctx(ctx);
close_op_ctx(ctx, -EBUSY);
if (m->wants_ack()) {
if (already_ack(oldv)) {
@ -1633,7 +1633,7 @@ void ReplicatedPG::execute_ctx(OpContext *ctx)
if (result == -EAGAIN) {
// clean up after the ctx
close_op_ctx(ctx);
close_op_ctx(ctx, result);
return;
}
@ -1716,13 +1716,13 @@ void ReplicatedPG::execute_ctx(OpContext *ctx)
void ReplicatedPG::reply_ctx(OpContext *ctx, int r)
{
osd->reply_op_error(ctx->op, r);
close_op_ctx(ctx);
close_op_ctx(ctx, r);
}
void ReplicatedPG::reply_ctx(OpContext *ctx, int r, eversion_t v, version_t uv)
{
osd->reply_op_error(ctx->op, r, v, uv);
close_op_ctx(ctx);
close_op_ctx(ctx, r);
}
void ReplicatedPG::log_op_stats(OpContext *ctx)
@ -5025,7 +5025,7 @@ void ReplicatedPG::complete_read_ctx(int result, OpContext *ctx)
reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
osd->send_message_osd_client(reply, m->get_connection());
close_op_ctx(ctx);
close_op_ctx(ctx, 0);
}
// ========================================================================
@ -5728,7 +5728,8 @@ int ReplicatedPG::start_flush(OpContext *ctx, bool blocking)
if (fop->ctx->op == ctx->op) {
// we couldn't take the write lock on a cache-try-flush before;
// now we are trying again for the lock.
close_op_ctx(fop->ctx); // clean up the previous ctx and use the new one.
// clean up the previous ctx and use the new one.
close_op_ctx(fop->ctx, -EAGAIN);
fop->ctx = ctx;
return try_flush_mark_clean(fop);
}
@ -5869,10 +5870,15 @@ int ReplicatedPG::try_flush_mark_clean(FlushOpRef fop)
if (!fop->blocking) {
// non-blocking: try to take the lock manually, since we don't
// have a ctx yet.
dout(20) << __func__ << " taking write lock" << dendl;
if (!obc->get_write(fop->ctx->op)) {
dout(10) << __func__ << " waiting on lock" << dendl;
if (obc->get_write(fop->ctx->op)) {
dout(20) << __func__ << " took write lock" << dendl;
} else if (fop->ctx->op) {
dout(10) << __func__ << " waiting on write lock" << dendl;
return -EINPROGRESS; // will retry. this ctx is still alive!
} else {
dout(10) << __func__ << " failed write lock, no op; failing" << dendl;
cancel_flush(fop, false);
return -ECANCELED;
}
} else {
dout(20) << __func__ << " already holding write lock: "
@ -5929,7 +5935,7 @@ void ReplicatedPG::cancel_flush(FlushOpRef fop, bool requeue)
kick_object_context_blocked(fop->ctx->obc);
}
flush_ops.erase(fop->ctx->obc->obs.oi.soid);
close_op_ctx(fop->ctx);
close_op_ctx(fop->ctx, -ECANCELED);
}
void ReplicatedPG::cancel_flush_ops(bool requeue)
@ -6322,6 +6328,7 @@ void ReplicatedPG::remove_repop(RepGather *repop)
{
dout(20) << __func__ << " " << *repop << dendl;
release_op_ctx_locks(repop->ctx);
repop->ctx->finish(0); // FIXME: return value here is sloppy
repop_map.erase(repop->rep_tid);
repop->put();
@ -8717,7 +8724,7 @@ void ReplicatedPG::on_change(ObjectStore::Transaction *t)
in_progress_async_reads.begin();
i != in_progress_async_reads.end();
in_progress_async_reads.erase(i++)) {
close_op_ctx(i->second);
close_op_ctx(i->second, -ECANCELED);
requeue_op(i->first);
}

View File

@ -465,6 +465,8 @@ public:
enum { W_LOCK, R_LOCK, NONE } lock_to_release;
Context *on_finish;
OpContext(const OpContext& other);
const OpContext& operator=(const OpContext& other);
@ -483,7 +485,8 @@ public:
copy_cb(NULL),
async_read_result(0),
inflightreads(0),
lock_to_release(NONE) {
lock_to_release(NONE),
on_finish(NULL) {
if (_ssc) {
new_snapset = _ssc->snapset;
snapset = &_ssc->snapset;
@ -508,6 +511,13 @@ public:
pending_async_reads.erase(i++)) {
delete i->second.second;
}
assert(on_finish == NULL);
}
void finish(int r) {
if (on_finish) {
on_finish->complete(r);
on_finish = NULL;
}
}
};
friend struct OpContext;
@ -602,10 +612,11 @@ protected:
*
* @param ctx [in] ctx to clean up
*/
void close_op_ctx(OpContext *ctx) {
void close_op_ctx(OpContext *ctx, int r) {
release_op_ctx_locks(ctx);
delete ctx->op_t;
ctx->op_t = NULL;
ctx->finish(r);
delete ctx;
}