ReplicatedPG: take snapset_obc write lock where appropriate

Otherwise, we might read it for backfill before it's fully created
on a peer.

Signed-off-by: Samuel Just <sam.just@inktank.com>
This commit is contained in:
Samuel Just 2014-02-15 17:44:57 -08:00
parent da4652c92d
commit fd9da00edc
2 changed files with 17 additions and 3 deletions

View File

@ -4889,6 +4889,9 @@ void ReplicatedPG::finish_ctx(OpContext *ctx, int log_op_type)
ctx->snapset_obc = get_object_context(snapoid, false);
if (ctx->snapset_obc && ctx->snapset_obc->obs.exists) {
bool got = ctx->snapset_obc->get_write(ctx->op);
assert(got);
ctx->release_snapset_obc = true;
ctx->log.push_back(pg_log_entry_t(pg_log_entry_t::DELETE, snapoid,
ctx->at_version,
ctx->obs->oi.version,
@ -4922,6 +4925,9 @@ void ReplicatedPG::finish_ctx(OpContext *ctx, int log_op_type)
0, osd_reqid_t(), ctx->mtime));
ctx->snapset_obc = get_object_context(snapoid, true);
bool got = ctx->snapset_obc->get_write(ctx->op);
assert(got);
ctx->release_snapset_obc = true;
if (pool.info.require_rollback() && !ctx->snapset_obc->obs.exists) {
ctx->log.back().mod_desc.create();
} else if (!pool.info.require_rollback()) {

View File

@ -471,6 +471,8 @@ public:
OpContext(const OpContext& other);
const OpContext& operator=(const OpContext& other);
bool release_snapset_obc;
OpContext(OpRequestRef _op, osd_reqid_t _reqid, vector<OSDOp>& _ops,
ObjectState *_obs, SnapSetContext *_ssc,
ReplicatedPG *_pg) :
@ -487,7 +489,8 @@ public:
async_read_result(0),
inflightreads(0),
lock_to_release(NONE),
on_finish(NULL) {
on_finish(NULL),
release_snapset_obc(false) {
if (_ssc) {
new_snapset = _ssc->snapset;
snapset = &_ssc->snapset;
@ -630,13 +633,16 @@ protected:
list<OpRequestRef> to_req;
bool requeue_recovery = false;
bool requeue_recovery_clone = false;
bool requeue_recovery_snapset = false;
if (ctx->snapset_obc && ctx->release_snapset_obc) {
ctx->snapset_obc->put_write(&to_req, &requeue_recovery_snapset);
ctx->release_snapset_obc = false;
}
switch (ctx->lock_to_release) {
case OpContext::W_LOCK:
ctx->obc->put_write(&to_req, &requeue_recovery);
if (ctx->clone_obc)
ctx->clone_obc->put_write(&to_req, &requeue_recovery_clone);
if (requeue_recovery || requeue_recovery_clone)
osd->recovery_wq.queue(this);
break;
case OpContext::R_LOCK:
ctx->obc->put_read(&to_req);
@ -647,6 +653,8 @@ protected:
assert(0);
};
ctx->lock_to_release = OpContext::NONE;
if (requeue_recovery || requeue_recovery_clone || requeue_recovery_snapset)
osd->recovery_wq.queue(this);
requeue_ops(to_req);
}