From 5110214337c8cc73676b30f15abc9f262064274f Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Wed, 11 Nov 2015 12:40:16 -0800 Subject: [PATCH] osd/: replace simple_repop_.* with simple_opc_.* This way, we don't expose the RepGather structure up into the users (which pretty much exclusively use the repop->ctx member anyway). This will pave the way to removing RepGather::ctx. Part of this involves generalizing the repop callback members (queue_snap_trimmer and on_applied) to on_applied, on_committed, on_success, and on_finish on both OpContext and RepGather. Signed-off-by: Samuel Just --- src/osd/ReplicatedPG.cc | 272 ++++++++++++++++++---------------------- src/osd/ReplicatedPG.h | 59 +++++++-- 2 files changed, 170 insertions(+), 161 deletions(-) diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 6f5a91dac12..d383058997f 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -3231,7 +3231,7 @@ void ReplicatedPG::do_backfill(OpRequestRef op) } } -ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid) +ReplicatedPG::OpContextUPtr ReplicatedPG::trim_object(const hobject_t &coid) { // load clone info bufferlist bl; @@ -3298,12 +3298,12 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid) } } - RepGather *repop = simple_repop_create(obc); - OpContext *ctx = repop->ctx; + OpContextUPtr ctx = simple_opc_create(obc); ctx->snapset_obc = snapset_obc; ctx->lock_to_release = OpContext::W_LOCK; ctx->release_snapset_obc = true; ctx->at_version = get_next_version(); + PGBackend::PGTransaction *t = ctx->op_t; if (new_snaps.empty()) { @@ -3386,7 +3386,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid) coi.version = ctx->at_version; bl.clear(); ::encode(coi, bl); - setattr_maybe_cache(ctx->obc, ctx, t, OI_ATTR, bl); + setattr_maybe_cache(ctx->obc, ctx.get(), t, OI_ATTR, bl); ctx->log.push_back( pg_log_entry_t( @@ -3470,7 +3470,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid) bl.clear(); ::encode(ctx->snapset_obc->obs.oi, bl); attrs[OI_ATTR].claim(bl); - setattrs_maybe_cache(ctx->snapset_obc, ctx, t, attrs); + setattrs_maybe_cache(ctx->snapset_obc, ctx.get(), t, attrs); if (pool.info.require_rollback()) { set changing; @@ -3482,7 +3482,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid) } } - return repop; + return ctx; } void ReplicatedPG::snap_trimmer(epoch_t queued) @@ -7145,12 +7145,12 @@ void ReplicatedPG::process_copy_chunk(hobject_t oid, ceph_tid_t tid, int r) dout(20) << __func__ << " using temp " << cop->results.temp_oid << dendl; } ObjectContextRef tempobc = get_object_context(cop->results.temp_oid, true); - RepGather *repop = simple_repop_create(tempobc); + OpContextUPtr ctx = simple_opc_create(tempobc); if (cop->temp_cursor.is_initial()) { - repop->ctx->new_temp_oid = cop->results.temp_oid; + ctx->new_temp_oid = cop->results.temp_oid; } - _write_copy_chunk(cop, repop->ctx->op_t); - simple_repop_submit(repop); + _write_copy_chunk(cop, ctx->op_t); + simple_opc_submit(std::move(ctx)); dout(10) << __func__ << " fetching more" << dendl; _copy_some(cobc, cop); return; @@ -7453,9 +7453,9 @@ void ReplicatedPG::finish_promote(int r, CopyResults *results, dout(10) << __func__ << " abort; will clean up partial work" << dendl; ObjectContextRef tempobc = get_object_context(results->temp_oid, false); assert(tempobc); - RepGather *repop = simple_repop_create(tempobc); - repop->ctx->op_t->remove(results->temp_oid); - simple_repop_submit(repop); + OpContextUPtr ctx = simple_opc_create(tempobc); + ctx->op_t->remove(results->temp_oid); + simple_opc_submit(std::move(ctx)); results->started_temp_obj = false; } @@ -7467,8 +7467,8 @@ void ReplicatedPG::finish_promote(int r, CopyResults *results, hobject_t head(soid.get_head()); ObjectContextRef obc = get_object_context(head, false); assert(obc); - RepGather *repop = simple_repop_create(obc); - OpContext *tctx = repop->ctx; + + OpContextUPtr tctx = simple_opc_create(obc); tctx->at_version = get_next_version(); filter_snapc(tctx->new_snapset.snaps); vector new_clones; @@ -7489,9 +7489,9 @@ void ReplicatedPG::finish_promote(int r, CopyResults *results, tctx->lock_to_release = OpContext::W_LOCK; dout(20) << __func__ << " took lock on obc, " << obc->rwstate << dendl; - finish_ctx(tctx, pg_log_entry_t::PROMOTE); + finish_ctx(tctx.get(), pg_log_entry_t::PROMOTE); - simple_repop_submit(repop); + simple_opc_submit(std::move(tctx)); return; } @@ -7519,8 +7519,7 @@ void ReplicatedPG::finish_promote(int r, CopyResults *results, return; } - RepGather *repop = simple_repop_create(obc); - OpContext *tctx = repop->ctx; + OpContextUPtr tctx = simple_opc_create(obc); tctx->at_version = get_next_version(); ++tctx->delta_stats.num_objects; @@ -7588,9 +7587,9 @@ void ReplicatedPG::finish_promote(int r, CopyResults *results, tctx->lock_to_release = OpContext::W_LOCK; dout(20) << __func__ << " took lock on obc, " << obc->rwstate << dendl; - finish_ctx(tctx, pg_log_entry_t::PROMOTE); + finish_ctx(tctx.get(), pg_log_entry_t::PROMOTE); - simple_repop_submit(repop); + simple_opc_submit(std::move(tctx)); osd->logger->inc(l_osd_tier_promote); @@ -8020,8 +8019,7 @@ int ReplicatedPG::try_flush_mark_clean(FlushOpRef fop) } dout(10) << __func__ << " clearing DIRTY flag for " << oid << dendl; - RepGather *repop = simple_repop_create(fop->obc); - OpContext *ctx = repop->ctx; + OpContextUPtr ctx = simple_opc_create(fop->obc); ctx->on_finish = fop->on_flush; fop->on_flush = NULL; @@ -8033,7 +8031,7 @@ int ReplicatedPG::try_flush_mark_clean(FlushOpRef fop) ctx->new_obs.oi.clear_flag(object_info_t::FLAG_DIRTY); --ctx->delta_stats.num_objects_dirty; - finish_ctx(ctx, pg_log_entry_t::CLEAN); + finish_ctx(ctx.get(), pg_log_entry_t::CLEAN); osd->logger->inc(l_osd_tier_clean); @@ -8046,7 +8044,7 @@ int ReplicatedPG::try_flush_mark_clean(FlushOpRef fop) requeue_ops(ls); } - simple_repop_submit(repop); + simple_opc_submit(std::move(ctx)); flush_ops.erase(oid); @@ -8124,10 +8122,6 @@ void ReplicatedPG::repop_all_applied(RepGather *repop) repop->all_applied = true; if (!repop->rep_aborted) { eval_repop(repop); - if (repop->on_applied) { - repop->on_applied->complete(0); - repop->on_applied = NULL; - } } } @@ -8208,6 +8202,12 @@ void ReplicatedPG::eval_repop(RepGather *repop) // ondisk? if (repop->all_committed) { + for (auto p = repop->on_committed.begin(); + p != repop->on_committed.end(); + repop->on_committed.erase(p++)) { + (*p)(); + } + if (repop->ctx->op && !repop->log_op_stat) { log_op_stats(repop->ctx); repop->log_op_stat = true; @@ -8253,6 +8253,12 @@ void ReplicatedPG::eval_repop(RepGather *repop) // applied? if (repop->all_applied) { + dout(10) << " applied: " << *repop << " " << dendl; + for (auto p = repop->on_applied.begin(); + p != repop->on_applied.end(); + repop->on_applied.erase(p++)) { + (*p)(); + } // send dup acks, in order if (waiting_for_ack.count(repop->v)) { @@ -8306,9 +8312,10 @@ void ReplicatedPG::eval_repop(RepGather *repop) calc_min_last_complete_ondisk(); - // kick snap_trimmer if necessary - if (repop->queue_snap_trimmer) { - queue_snap_trim(); + for (auto p = repop->on_success.begin(); + p != repop->on_success.end(); + repop->on_success.erase(p++)) { + (*p)(); } dout(10) << " removing " << *repop << dendl; @@ -8397,8 +8404,9 @@ void ReplicatedPG::issue_repop(RepGather *repop) repop->ctx->op_t = NULL; } -ReplicatedPG::RepGather *ReplicatedPG::new_repop(OpContext *ctx, ObjectContextRef obc, - ceph_tid_t rep_tid) +ReplicatedPG::RepGather *ReplicatedPG::new_repop( + OpContext *ctx, ObjectContextRef obc, + ceph_tid_t rep_tid) { if (ctx->op) dout(10) << "new_repop rep_tid " << rep_tid << " on " << *ctx->op->get_req() << dendl; @@ -8426,6 +8434,13 @@ void ReplicatedPG::remove_repop(RepGather *repop) dout(20) << " clone_obc " << *repop->ctx->clone_obc << dendl; if (repop->ctx->snapset_obc) dout(20) << " snapset_obc " << *repop->ctx->snapset_obc << dendl; + + for (auto p = repop->on_finish.begin(); + p != repop->on_finish.end(); + repop->on_finish.erase(p++)) { + (*p)(); + } + release_op_ctx_locks(repop->ctx); repop->ctx->finish(0); // FIXME: return value here is sloppy repop->put(); @@ -8433,21 +8448,21 @@ void ReplicatedPG::remove_repop(RepGather *repop) osd->logger->dec(l_osd_op_wip); } -ReplicatedPG::RepGather *ReplicatedPG::simple_repop_create(ObjectContextRef obc) +ReplicatedPG::OpContextUPtr ReplicatedPG::simple_opc_create(ObjectContextRef obc) { dout(20) << __func__ << " " << obc->obs.oi.soid << dendl; vector ops; ceph_tid_t rep_tid = osd->get_tid(); osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid); - OpContext *ctx = new OpContext(OpRequestRef(), reqid, ops, obc, this); + OpContextUPtr ctx(new OpContext(OpRequestRef(), reqid, ops, obc, this)); ctx->op_t = pgbackend->get_transaction(); ctx->mtime = ceph_clock_now(g_ceph_context); - RepGather *repop = new_repop(ctx, obc, rep_tid); - return repop; + return ctx; } -void ReplicatedPG::simple_repop_submit(RepGather *repop) +void ReplicatedPG::simple_opc_submit(OpContextUPtr ctx) { + RepGather *repop = new_repop(ctx.get(), ctx->obc, ctx->reqid.tid); dout(20) << __func__ << " " << repop << dendl; issue_repop(repop); eval_repop(repop); @@ -8575,8 +8590,7 @@ void ReplicatedPG::handle_watch_timeout(WatchRef watch) return; } - RepGather *repop = simple_repop_create(obc); - OpContext *ctx = repop->ctx; + OpContextUPtr ctx = simple_opc_create(obc); ctx->at_version = get_next_version(); object_info_t& oi = ctx->new_obs.oi; @@ -8594,11 +8608,11 @@ void ReplicatedPG::handle_watch_timeout(WatchRef watch) 0, osd_reqid_t(), ctx->mtime)); - oi.prior_version = repop->obc->obs.oi.version; + oi.prior_version = obc->obs.oi.version; oi.version = ctx->at_version; bufferlist bl; ::encode(oi, bl); - setattr_maybe_cache(obc, repop->ctx, t, OI_ATTR, bl); + setattr_maybe_cache(obc, ctx.get(), t, OI_ATTR, bl); if (pool.info.require_rollback()) { map > to_set; @@ -8609,9 +8623,7 @@ void ReplicatedPG::handle_watch_timeout(WatchRef watch) } // no ctx->delta_stats - - // obc ref swallowed by repop! - simple_repop_submit(repop); + simple_opc_submit(std::move(ctx)); // apply new object state. ctx->obc->obs = ctx->new_obs; @@ -9569,10 +9581,9 @@ void ReplicatedPG::apply_and_flush_repops(bool requeue) repop_queue.pop_front(); dout(10) << " canceling repop tid " << repop->rep_tid << dendl; repop->rep_aborted = true; - if (repop->on_applied) { - delete repop->on_applied; - repop->on_applied = NULL; - } + repop->on_applied.clear(); + repop->on_committed.clear(); + repop->on_success.clear(); if (requeue) { if (repop->ctx->op) { @@ -11164,15 +11175,14 @@ void ReplicatedPG::hit_set_remove_all() ObjectContextRef obc = get_object_context(oid, false); assert(obc); - RepGather *repop = simple_repop_create(obc); - OpContext *ctx = repop->ctx; + OpContextUPtr ctx = simple_opc_create(obc); ctx->at_version = get_next_version(); ctx->updated_hset_history = info.hit_set; utime_t now = ceph_clock_now(cct); ctx->mtime = now; - hit_set_trim(repop, 0); - apply_ctx_stats(ctx); - simple_repop_submit(repop); + hit_set_trim(ctx, 0); + apply_ctx_stats(ctx.get()); + simple_opc_submit(std::move(ctx)); } info.hit_set = pg_hit_set_history_t(); @@ -11252,15 +11262,6 @@ bool ReplicatedPG::hit_set_apply_log() return true; } -struct C_HitSetFlushing : public Context { - ReplicatedPGRef pg; - time_t hit_set_name; - C_HitSetFlushing(ReplicatedPG *p, time_t n) : pg(p), hit_set_name(n) { } - void finish(int r) { - pg->hit_set_flushing.erase(hit_set_name); - } -}; - void ReplicatedPG::hit_set_persist() { dout(10) << __func__ << dendl; @@ -11268,7 +11269,6 @@ void ReplicatedPG::hit_set_persist() unsigned max = pool.info.hit_set_count; utime_t now = ceph_clock_now(cct); - RepGather *repop; hobject_t oid; time_t flush_time = 0; @@ -11337,10 +11337,15 @@ void ReplicatedPG::hit_set_persist() flush_time = new_hset.begin; ObjectContextRef obc = get_object_context(oid, true); - repop = simple_repop_create(obc); - if (flush_time != 0) - repop->on_applied = new C_HitSetFlushing(this, flush_time); - OpContext *ctx = repop->ctx; + OpContextUPtr ctx = simple_opc_create(obc); + if (flush_time != 0) { + ReplicatedPGRef pg(this); + ctx->register_on_applied( + [pg, flush_time]() { + pg->hit_set_flushing.erase(flush_time); + }); + } + ctx->at_version = get_next_version(); ctx->updated_hset_history = info.hit_set; pg_hit_set_history_t &updated_hit_set_hist = *(ctx->updated_hset_history); @@ -11377,7 +11382,7 @@ void ReplicatedPG::hit_set_persist() map attrs; attrs[OI_ATTR].claim(boi); attrs[SS_ATTR].claim(bss); - setattrs_maybe_cache(ctx->obc, ctx, ctx->op_t, attrs); + setattrs_maybe_cache(ctx->obc, ctx.get(), ctx->op_t, attrs); ctx->log.push_back( pg_log_entry_t( pg_log_entry_t::MODIFY, @@ -11394,17 +11399,17 @@ void ReplicatedPG::hit_set_persist() ctx->log.back().mod_desc.mark_unrollbackable(); } - hit_set_trim(repop, max); + hit_set_trim(ctx, max); - apply_ctx_stats(ctx); - simple_repop_submit(repop); + apply_ctx_stats(ctx.get()); + simple_opc_submit(std::move(ctx)); } -void ReplicatedPG::hit_set_trim(RepGather *repop, unsigned max) +void ReplicatedPG::hit_set_trim(OpContextUPtr &ctx, unsigned max) { - assert(repop->ctx->updated_hset_history); + assert(ctx->updated_hset_history); pg_hit_set_history_t &updated_hit_set_hist = - *(repop->ctx->updated_hset_history); + *(ctx->updated_hset_history); for (unsigned num = updated_hit_set_hist.history.size(); num > max; --num) { list::iterator p = updated_hit_set_hist.history.begin(); assert(p != updated_hit_set_hist.history.end()); @@ -11413,34 +11418,34 @@ void ReplicatedPG::hit_set_trim(RepGather *repop, unsigned max) assert(!is_degraded_or_backfilling_object(oid)); dout(20) << __func__ << " removing " << oid << dendl; - ++repop->ctx->at_version.version; - repop->ctx->log.push_back( + ++ctx->at_version.version; + ctx->log.push_back( pg_log_entry_t(pg_log_entry_t::DELETE, oid, - repop->ctx->at_version, + ctx->at_version, p->version, 0, osd_reqid_t(), - repop->ctx->mtime)); + ctx->mtime)); if (pool.info.require_rollback()) { - if (repop->ctx->log.back().mod_desc.rmobject( - repop->ctx->at_version.version)) { - repop->ctx->op_t->stash(oid, repop->ctx->at_version.version); + if (ctx->log.back().mod_desc.rmobject( + ctx->at_version.version)) { + ctx->op_t->stash(oid, ctx->at_version.version); } else { - repop->ctx->op_t->remove(oid); + ctx->op_t->remove(oid); } } else { - repop->ctx->op_t->remove(oid); - repop->ctx->log.back().mod_desc.mark_unrollbackable(); + ctx->op_t->remove(oid); + ctx->log.back().mod_desc.mark_unrollbackable(); } updated_hit_set_hist.history.pop_front(); ObjectContextRef obc = get_object_context(oid, false); assert(obc); - --repop->ctx->delta_stats.num_objects; - --repop->ctx->delta_stats.num_objects_hit_set_archive; - repop->ctx->delta_stats.num_bytes -= obc->obs.oi.size; - repop->ctx->delta_stats.num_bytes_hit_set_archive -= obc->obs.oi.size; + --ctx->delta_stats.num_objects; + --ctx->delta_stats.num_objects_hit_set_archive; + ctx->delta_stats.num_bytes -= obc->obs.oi.size; + ctx->delta_stats.num_bytes_hit_set_archive -= obc->obs.oi.size; } } @@ -11868,14 +11873,13 @@ bool ReplicatedPG::agent_maybe_evict(ObjectContextRef& obc, bool after_flush) } dout(10) << __func__ << " evicting " << obc->obs.oi << dendl; - RepGather *repop = simple_repop_create(obc); - OpContext *ctx = repop->ctx; + OpContextUPtr ctx = simple_opc_create(obc); Context *on_evict = new C_AgentEvictStartStop(this); ctx->on_finish = on_evict; ctx->lock_to_release = OpContext::W_LOCK; ctx->at_version = get_next_version(); assert(ctx->new_obs.exists); - int r = _delete_oid(ctx, true); + int r = _delete_oid(ctx.get(), true); if (obc->obs.oi.is_omap()) ctx->delta_stats.num_objects_omap--; ctx->delta_stats.num_evict++; @@ -11883,8 +11887,8 @@ bool ReplicatedPG::agent_maybe_evict(ObjectContextRef& obc, bool after_flush) if (obc->obs.oi.is_dirty()) --ctx->delta_stats.num_objects_dirty; assert(r == 0); - finish_ctx(ctx, pg_log_entry_t::DELETE, false); - simple_repop_submit(repop); + finish_ctx(ctx.get(), pg_log_entry_t::DELETE, false); + simple_opc_submit(std::move(ctx)); osd->logger->inc(l_osd_tier_evict); osd->logger->inc(l_osd_agent_evict); return true; @@ -12583,15 +12587,14 @@ void ReplicatedPG::_scrub( dout(10) << __func__ << " recording digests for " << p->first << dendl; ObjectContextRef obc = get_object_context(p->first, false); assert(obc); - RepGather *repop = simple_repop_create(obc); - OpContext *ctx = repop->ctx; + OpContextUPtr ctx = simple_opc_create(obc); ctx->at_version = get_next_version(); ctx->mtime = utime_t(); // do not update mtime ctx->new_obs.oi.set_data_digest(p->second.first); ctx->new_obs.oi.set_omap_digest(p->second.second); - finish_ctx(ctx, pg_log_entry_t::MODIFY, true, true); + finish_ctx(ctx.get(), pg_log_entry_t::MODIFY, true, true); ctx->on_finish = new C_ScrubDigestUpdated(this); - simple_repop_submit(repop); + simple_opc_submit(std::move(ctx)); ++scrubber.num_digest_updates_pending; } @@ -12674,10 +12677,7 @@ void ReplicatedPG::_scrub_finish() ReplicatedPG::SnapTrimmer::~SnapTrimmer() { - while (!repops.empty()) { - (*repops.begin())->put(); - repops.erase(repops.begin()); - } + in_flight.clear(); } void ReplicatedPG::SnapTrimmer::log_enter(const char *state_name) @@ -12746,36 +12746,19 @@ ReplicatedPG::TrimmingObjects::TrimmingObjects(my_context ctx) void ReplicatedPG::TrimmingObjects::exit() { context< SnapTrimmer >().log_exit(state_name, enter_time); - // Clean up repops in case of reset - set &repops = context().repops; - for (set::iterator i = repops.begin(); - i != repops.end(); - repops.erase(i++)) { - (*i)->put(); - } + context().in_flight.clear(); } boost::statechart::result ReplicatedPG::TrimmingObjects::react(const SnapTrim&) { dout(10) << "TrimmingObjects react" << dendl; - ReplicatedPG *pg = context< SnapTrimmer >().pg; + ReplicatedPGRef pg = context< SnapTrimmer >().pg; snapid_t snap_to_trim = context().snap_to_trim; - set &repops = context().repops; + auto &in_flight = context().in_flight; dout(10) << "TrimmingObjects: trimming snap " << snap_to_trim << dendl; - for (set::iterator i = repops.begin(); - i != repops.end(); - ) { - if ((*i)->all_applied && (*i)->all_committed) { - (*i)->put(); - repops.erase(i++); - } else { - ++i; - } - } - - while (repops.size() < g_conf->osd_pg_max_concurrent_snap_trims) { + while (in_flight.size() < g_conf->osd_pg_max_concurrent_snap_trims) { // Get next hobject_t old_pos = pos; int r = pg->snap_mapper.get_next_object_to_trim(snap_to_trim, &pos); @@ -12790,20 +12773,25 @@ boost::statechart::result ReplicatedPG::TrimmingObjects::react(const SnapTrim&) } dout(10) << "TrimmingObjects react trimming " << pos << dendl; - RepGather *repop = pg->trim_object(pos); - if (!repop) { + OpContextUPtr ctx = pg->trim_object(pos); + if (!ctx) { dout(10) << __func__ << " could not get write lock on obj " << pos << dendl; pos = old_pos; return discard_event(); } - assert(repop); - repop->queue_snap_trimmer = true; + assert(ctx); + hobject_t to_remove = pos; + ctx->register_on_success( + [pg, to_remove, &in_flight]() { + in_flight.erase(to_remove); + pg->queue_snap_trim(); + }); - pg->apply_ctx_stats(repop->ctx); + pg->apply_ctx_stats(ctx.get()); - repops.insert(repop->get()); - pg->simple_repop_submit(repop); + in_flight.insert(pos); + pg->simple_opc_submit(std::move(ctx)); } return discard_event(); } @@ -12819,30 +12807,16 @@ ReplicatedPG::WaitingOnReplicas::WaitingOnReplicas(my_context ctx) void ReplicatedPG::WaitingOnReplicas::exit() { context< SnapTrimmer >().log_exit(state_name, enter_time); - - // Clean up repops in case of reset - set &repops = context().repops; - for (set::iterator i = repops.begin(); - i != repops.end(); - repops.erase(i++)) { - (*i)->put(); - } + context().in_flight.clear(); } boost::statechart::result ReplicatedPG::WaitingOnReplicas::react(const SnapTrim&) { - // Have all the repops applied? + // Have all the trims finished? dout(10) << "Waiting on Replicas react" << dendl; ReplicatedPG *pg = context< SnapTrimmer >().pg; - set &repops = context().repops; - for (set::iterator i = repops.begin(); - i != repops.end(); - repops.erase(i++)) { - if (!(*i)->all_applied || !(*i)->all_committed) { - return discard_event(); - } else { - (*i)->put(); - } + if (!context().in_flight.empty()) { + return discard_event(); } snapid_t &sn = context().snap_to_trim; diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index f0211168f12..0f053749349 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -557,6 +557,29 @@ public: // pending xattr updates map > > pending_attrs; + + list> on_applied; + list> on_committed; + list> on_finish; + list> on_success; + template + void register_on_finish(F &&f) { + on_finish.emplace_back(std::move(f)); + } + template + void register_on_success(F &&f) { + on_finish.emplace_back(std::move(f)); + } + template + void register_on_applied(F &&f) { + on_applied.emplace_back(std::move(f)); + } + template + void register_on_commit(F &&f) { + on_committed.emplace_back(std::move(f)); + } + + void apply_pending_attrs() { for (map > >::iterator i = @@ -675,6 +698,7 @@ public: } } }; + using OpContextUPtr = std::unique_ptr; friend struct OpContext; /* @@ -705,9 +729,10 @@ public: eversion_t pg_local_last_complete; - bool queue_snap_trimmer; - - Context *on_applied; + list> on_applied; + list> on_committed; + list> on_success; + list> on_finish; bool log_op_stat; RepGather(OpContext *c, ObjectContextRef pi, ceph_tid_t rt, @@ -721,8 +746,10 @@ public: //sent_nvram(false), sent_disk(false), pg_local_last_complete(lc), - queue_snap_trimmer(false), - on_applied(NULL), + on_applied(std::move(c->on_applied)), + on_committed(std::move(c->on_committed)), + on_success(std::move(c->on_success)), + on_finish(std::move(c->on_finish)), log_op_stat(false) { } RepGather *get() { @@ -733,7 +760,7 @@ public: assert(nref > 0); if (--nref == 0) { delete ctx; // must already be unlocked - assert(on_applied == NULL); + assert(on_applied.empty()); delete this; //generic_dout(0) << "deleting " << this << dendl; } @@ -795,6 +822,11 @@ protected: release_op_ctx_locks(ctx); delete ctx->op_t; ctx->op_t = NULL; + for (auto p = ctx->on_finish.begin(); + p != ctx->on_finish.end(); + ctx->on_finish.erase(p++)) { + (*p)(); + } ctx->finish(r); delete ctx; } @@ -896,11 +928,14 @@ protected: void repop_all_committed(RepGather *repop); void eval_repop(RepGather*); void issue_repop(RepGather *repop); - RepGather *new_repop(OpContext *ctx, ObjectContextRef obc, ceph_tid_t rep_tid); + RepGather *new_repop( + OpContext *ctx, + ObjectContextRef obc, + ceph_tid_t rep_tid); void remove_repop(RepGather *repop); - RepGather *simple_repop_create(ObjectContextRef obc); - void simple_repop_submit(RepGather *repop); + OpContextUPtr simple_opc_create(ObjectContextRef obc); + void simple_opc_submit(OpContextUPtr ctx); // hot/cold tracking HitSetRef hit_set; ///< currently accumulating HitSet @@ -913,7 +948,7 @@ protected: void hit_set_create(); ///< create a new HitSet void hit_set_persist(); ///< persist hit info bool hit_set_apply_log(); ///< apply log entries to update in-memory HitSet - void hit_set_trim(RepGather *repop, unsigned max); ///< discard old HitSets + void hit_set_trim(OpContextUPtr &ctx, unsigned max); ///< discard old HitSets void hit_set_in_memory_trim(uint32_t max_in_memory); ///< discard old in memory HitSets void hit_set_remove_all(); @@ -1474,7 +1509,7 @@ public: ThreadPool::TPHandle &handle); void do_backfill(OpRequestRef op); - RepGather *trim_object(const hobject_t &coid); + OpContextUPtr trim_object(const hobject_t &coid); void snap_trimmer(epoch_t e); int do_osd_ops(OpContext *ctx, vector& ops); @@ -1544,7 +1579,7 @@ private: }; struct SnapTrimmer : public boost::statechart::state_machine< SnapTrimmer, NotTrimming > { ReplicatedPG *pg; - set repops; + set in_flight; snapid_t snap_to_trim; bool need_share_pg_info; explicit SnapTrimmer(ReplicatedPG *pg) : pg(pg), need_share_pg_info(false) {}