diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 20fbfec1d9a..112ae72b2c9 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -1709,7 +1709,7 @@ void ReplicatedPG::do_op(OpRequestRef& op) reply_ctx(ctx, -EINVAL); return; } - } else if (!get_rw_locks(ctx)) { + } else if (!get_rw_locks(write_ordered, ctx)) { dout(20) << __func__ << " waiting for rw locks " << dendl; op->mark_delayed("waiting for rw locks"); close_op_ctx(ctx, -EBUSY); diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 49ef4b81826..728fbc5fdf4 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -597,7 +597,7 @@ public: ObjectModDesc mod_desc; - enum { W_LOCK, R_LOCK, NONE } lock_to_release; + enum { W_LOCK, R_LOCK, E_LOCK, NONE } lock_to_release; Context *on_finish; @@ -752,49 +752,41 @@ protected: * @param ctx [in,out] ctx to get locks for * @return true on success, false if we are queued */ - bool get_rw_locks(OpContext *ctx) { + bool get_rw_locks(bool write_ordered, OpContext *ctx) { /* If snapset_obc, !obc->obs->exists and we will always take the * snapdir lock *before* the head lock. Since all callers will do * this (read or write) if we get the first we will be guaranteed * to get the second. */ - if (ctx->snapset_obc) { - assert(!ctx->obc->obs.exists); - if (ctx->op->may_write() || ctx->op->may_cache()) { - if (ctx->snapset_obc->get_write(ctx->op)) { - ctx->release_snapset_obc = true; - ctx->lock_to_release = OpContext::W_LOCK; - } else { - return false; - } - } else { - assert(ctx->op->may_read()); - if (ctx->snapset_obc->get_read(ctx->op)) { - ctx->release_snapset_obc = true; - ctx->lock_to_release = OpContext::R_LOCK; - } else { - return false; - } - } - } - if (ctx->op->may_write() || ctx->op->may_cache()) { - if (ctx->obc->get_write(ctx->op)) { - ctx->lock_to_release = OpContext::W_LOCK; - return true; - } else { - assert(!ctx->snapset_obc); - return false; - } + ObjectContext::RWState::State type = ObjectContext::RWState::RWNONE; + if (write_ordered && ctx->op->may_read()) { + type = ObjectContext::RWState::RWEXCL; + ctx->lock_to_release = OpContext::E_LOCK; + } else if (write_ordered) { + type = ObjectContext::RWState::RWWRITE; + ctx->lock_to_release = OpContext::W_LOCK; } else { assert(ctx->op->may_read()); - if (ctx->obc->get_read(ctx->op)) { - ctx->lock_to_release = OpContext::R_LOCK; - return true; + type = ObjectContext::RWState::RWREAD; + ctx->lock_to_release = OpContext::R_LOCK; + } + + if (ctx->snapset_obc) { + assert(!ctx->obc->obs.exists); + if (ctx->snapset_obc->get_lock_type(ctx->op, type)) { + ctx->release_snapset_obc = true; } else { - assert(!ctx->snapset_obc); + ctx->lock_to_release = OpContext::NONE; return false; } } + if (ctx->obc->get_lock_type(ctx->op, type)) { + return true; + } else { + assert(!ctx->snapset_obc); + ctx->lock_to_release = OpContext::NONE; + return false; + } } /** @@ -842,6 +834,24 @@ protected: &requeue_recovery_clone, &requeue_snaptrimmer_clone); break; + case OpContext::E_LOCK: + if (ctx->snapset_obc && ctx->release_snapset_obc) { + ctx->snapset_obc->put_excl( + &to_req, + &requeue_recovery_snapset, + &requeue_snaptrimmer_snapset); + ctx->release_snapset_obc = false; + } + ctx->obc->put_excl( + &to_req, + &requeue_recovery, + &requeue_snaptrimmer); + if (ctx->clone_obc) + ctx->clone_obc->put_write( + &to_req, + &requeue_recovery_clone, + &requeue_snaptrimmer_clone); + break; case OpContext::R_LOCK: if (ctx->snapset_obc && ctx->release_snapset_obc) { ctx->snapset_obc->put_read(&to_req);