ReplicatedPG::get_rw_locks: use excl lock for read & write_ordered

Signed-off-by: Samuel Just <sjust@redhat.com>
This commit is contained in:
Samuel Just 2015-01-13 14:34:23 -08:00
parent a8e041f065
commit a81f3e6e61
2 changed files with 44 additions and 34 deletions

View File

@ -1709,7 +1709,7 @@ void ReplicatedPG::do_op(OpRequestRef& op)
reply_ctx(ctx, -EINVAL);
return;
}
} else if (!get_rw_locks(ctx)) {
} else if (!get_rw_locks(write_ordered, ctx)) {
dout(20) << __func__ << " waiting for rw locks " << dendl;
op->mark_delayed("waiting for rw locks");
close_op_ctx(ctx, -EBUSY);

View File

@ -597,7 +597,7 @@ public:
ObjectModDesc mod_desc;
enum { W_LOCK, R_LOCK, NONE } lock_to_release;
enum { W_LOCK, R_LOCK, E_LOCK, NONE } lock_to_release;
Context *on_finish;
@ -752,49 +752,41 @@ protected:
* @param ctx [in,out] ctx to get locks for
* @return true on success, false if we are queued
*/
bool get_rw_locks(OpContext *ctx) {
bool get_rw_locks(bool write_ordered, OpContext *ctx) {
/* If snapset_obc, !obc->obs->exists and we will always take the
* snapdir lock *before* the head lock. Since all callers will do
* this (read or write) if we get the first we will be guaranteed
* to get the second.
*/
if (ctx->snapset_obc) {
assert(!ctx->obc->obs.exists);
if (ctx->op->may_write() || ctx->op->may_cache()) {
if (ctx->snapset_obc->get_write(ctx->op)) {
ctx->release_snapset_obc = true;
ctx->lock_to_release = OpContext::W_LOCK;
} else {
return false;
}
} else {
assert(ctx->op->may_read());
if (ctx->snapset_obc->get_read(ctx->op)) {
ctx->release_snapset_obc = true;
ctx->lock_to_release = OpContext::R_LOCK;
} else {
return false;
}
}
}
if (ctx->op->may_write() || ctx->op->may_cache()) {
if (ctx->obc->get_write(ctx->op)) {
ctx->lock_to_release = OpContext::W_LOCK;
return true;
} else {
assert(!ctx->snapset_obc);
return false;
}
ObjectContext::RWState::State type = ObjectContext::RWState::RWNONE;
if (write_ordered && ctx->op->may_read()) {
type = ObjectContext::RWState::RWEXCL;
ctx->lock_to_release = OpContext::E_LOCK;
} else if (write_ordered) {
type = ObjectContext::RWState::RWWRITE;
ctx->lock_to_release = OpContext::W_LOCK;
} else {
assert(ctx->op->may_read());
if (ctx->obc->get_read(ctx->op)) {
ctx->lock_to_release = OpContext::R_LOCK;
return true;
type = ObjectContext::RWState::RWREAD;
ctx->lock_to_release = OpContext::R_LOCK;
}
if (ctx->snapset_obc) {
assert(!ctx->obc->obs.exists);
if (ctx->snapset_obc->get_lock_type(ctx->op, type)) {
ctx->release_snapset_obc = true;
} else {
assert(!ctx->snapset_obc);
ctx->lock_to_release = OpContext::NONE;
return false;
}
}
if (ctx->obc->get_lock_type(ctx->op, type)) {
return true;
} else {
assert(!ctx->snapset_obc);
ctx->lock_to_release = OpContext::NONE;
return false;
}
}
/**
@ -842,6 +834,24 @@ protected:
&requeue_recovery_clone,
&requeue_snaptrimmer_clone);
break;
case OpContext::E_LOCK:
if (ctx->snapset_obc && ctx->release_snapset_obc) {
ctx->snapset_obc->put_excl(
&to_req,
&requeue_recovery_snapset,
&requeue_snaptrimmer_snapset);
ctx->release_snapset_obc = false;
}
ctx->obc->put_excl(
&to_req,
&requeue_recovery,
&requeue_snaptrimmer);
if (ctx->clone_obc)
ctx->clone_obc->put_write(
&to_req,
&requeue_recovery_clone,
&requeue_snaptrimmer_clone);
break;
case OpContext::R_LOCK:
if (ctx->snapset_obc && ctx->release_snapset_obc) {
ctx->snapset_obc->put_read(&to_req);