osd/: replace simple_repop_.* with simple_opc_.*

This way, we don't expose the RepGather structure
up into the users (which pretty much exclusively
use the repop->ctx member anyway).  This will pave
the way to removing RepGather::ctx.

Part of this involves generalizing the repop callback
members (queue_snap_trimmer and on_applied) to
on_applied, on_committed, on_success, and on_finish
on both OpContext and RepGather.

Signed-off-by: Samuel Just <sjust@redhat.com>
This commit is contained in:
Samuel Just 2015-11-11 12:40:16 -08:00
parent 6db7fe7de3
commit 5110214337
2 changed files with 170 additions and 161 deletions

View File

@ -3231,7 +3231,7 @@ void ReplicatedPG::do_backfill(OpRequestRef op)
}
}
ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid)
ReplicatedPG::OpContextUPtr ReplicatedPG::trim_object(const hobject_t &coid)
{
// load clone info
bufferlist bl;
@ -3298,12 +3298,12 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid)
}
}
RepGather *repop = simple_repop_create(obc);
OpContext *ctx = repop->ctx;
OpContextUPtr ctx = simple_opc_create(obc);
ctx->snapset_obc = snapset_obc;
ctx->lock_to_release = OpContext::W_LOCK;
ctx->release_snapset_obc = true;
ctx->at_version = get_next_version();
PGBackend::PGTransaction *t = ctx->op_t;
if (new_snaps.empty()) {
@ -3386,7 +3386,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid)
coi.version = ctx->at_version;
bl.clear();
::encode(coi, bl);
setattr_maybe_cache(ctx->obc, ctx, t, OI_ATTR, bl);
setattr_maybe_cache(ctx->obc, ctx.get(), t, OI_ATTR, bl);
ctx->log.push_back(
pg_log_entry_t(
@ -3470,7 +3470,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid)
bl.clear();
::encode(ctx->snapset_obc->obs.oi, bl);
attrs[OI_ATTR].claim(bl);
setattrs_maybe_cache(ctx->snapset_obc, ctx, t, attrs);
setattrs_maybe_cache(ctx->snapset_obc, ctx.get(), t, attrs);
if (pool.info.require_rollback()) {
set<string> changing;
@ -3482,7 +3482,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid)
}
}
return repop;
return ctx;
}
void ReplicatedPG::snap_trimmer(epoch_t queued)
@ -7145,12 +7145,12 @@ void ReplicatedPG::process_copy_chunk(hobject_t oid, ceph_tid_t tid, int r)
dout(20) << __func__ << " using temp " << cop->results.temp_oid << dendl;
}
ObjectContextRef tempobc = get_object_context(cop->results.temp_oid, true);
RepGather *repop = simple_repop_create(tempobc);
OpContextUPtr ctx = simple_opc_create(tempobc);
if (cop->temp_cursor.is_initial()) {
repop->ctx->new_temp_oid = cop->results.temp_oid;
ctx->new_temp_oid = cop->results.temp_oid;
}
_write_copy_chunk(cop, repop->ctx->op_t);
simple_repop_submit(repop);
_write_copy_chunk(cop, ctx->op_t);
simple_opc_submit(std::move(ctx));
dout(10) << __func__ << " fetching more" << dendl;
_copy_some(cobc, cop);
return;
@ -7453,9 +7453,9 @@ void ReplicatedPG::finish_promote(int r, CopyResults *results,
dout(10) << __func__ << " abort; will clean up partial work" << dendl;
ObjectContextRef tempobc = get_object_context(results->temp_oid, false);
assert(tempobc);
RepGather *repop = simple_repop_create(tempobc);
repop->ctx->op_t->remove(results->temp_oid);
simple_repop_submit(repop);
OpContextUPtr ctx = simple_opc_create(tempobc);
ctx->op_t->remove(results->temp_oid);
simple_opc_submit(std::move(ctx));
results->started_temp_obj = false;
}
@ -7467,8 +7467,8 @@ void ReplicatedPG::finish_promote(int r, CopyResults *results,
hobject_t head(soid.get_head());
ObjectContextRef obc = get_object_context(head, false);
assert(obc);
RepGather *repop = simple_repop_create(obc);
OpContext *tctx = repop->ctx;
OpContextUPtr tctx = simple_opc_create(obc);
tctx->at_version = get_next_version();
filter_snapc(tctx->new_snapset.snaps);
vector<snapid_t> new_clones;
@ -7489,9 +7489,9 @@ void ReplicatedPG::finish_promote(int r, CopyResults *results,
tctx->lock_to_release = OpContext::W_LOCK;
dout(20) << __func__ << " took lock on obc, " << obc->rwstate << dendl;
finish_ctx(tctx, pg_log_entry_t::PROMOTE);
finish_ctx(tctx.get(), pg_log_entry_t::PROMOTE);
simple_repop_submit(repop);
simple_opc_submit(std::move(tctx));
return;
}
@ -7519,8 +7519,7 @@ void ReplicatedPG::finish_promote(int r, CopyResults *results,
return;
}
RepGather *repop = simple_repop_create(obc);
OpContext *tctx = repop->ctx;
OpContextUPtr tctx = simple_opc_create(obc);
tctx->at_version = get_next_version();
++tctx->delta_stats.num_objects;
@ -7588,9 +7587,9 @@ void ReplicatedPG::finish_promote(int r, CopyResults *results,
tctx->lock_to_release = OpContext::W_LOCK;
dout(20) << __func__ << " took lock on obc, " << obc->rwstate << dendl;
finish_ctx(tctx, pg_log_entry_t::PROMOTE);
finish_ctx(tctx.get(), pg_log_entry_t::PROMOTE);
simple_repop_submit(repop);
simple_opc_submit(std::move(tctx));
osd->logger->inc(l_osd_tier_promote);
@ -8020,8 +8019,7 @@ int ReplicatedPG::try_flush_mark_clean(FlushOpRef fop)
}
dout(10) << __func__ << " clearing DIRTY flag for " << oid << dendl;
RepGather *repop = simple_repop_create(fop->obc);
OpContext *ctx = repop->ctx;
OpContextUPtr ctx = simple_opc_create(fop->obc);
ctx->on_finish = fop->on_flush;
fop->on_flush = NULL;
@ -8033,7 +8031,7 @@ int ReplicatedPG::try_flush_mark_clean(FlushOpRef fop)
ctx->new_obs.oi.clear_flag(object_info_t::FLAG_DIRTY);
--ctx->delta_stats.num_objects_dirty;
finish_ctx(ctx, pg_log_entry_t::CLEAN);
finish_ctx(ctx.get(), pg_log_entry_t::CLEAN);
osd->logger->inc(l_osd_tier_clean);
@ -8046,7 +8044,7 @@ int ReplicatedPG::try_flush_mark_clean(FlushOpRef fop)
requeue_ops(ls);
}
simple_repop_submit(repop);
simple_opc_submit(std::move(ctx));
flush_ops.erase(oid);
@ -8124,10 +8122,6 @@ void ReplicatedPG::repop_all_applied(RepGather *repop)
repop->all_applied = true;
if (!repop->rep_aborted) {
eval_repop(repop);
if (repop->on_applied) {
repop->on_applied->complete(0);
repop->on_applied = NULL;
}
}
}
@ -8208,6 +8202,12 @@ void ReplicatedPG::eval_repop(RepGather *repop)
// ondisk?
if (repop->all_committed) {
for (auto p = repop->on_committed.begin();
p != repop->on_committed.end();
repop->on_committed.erase(p++)) {
(*p)();
}
if (repop->ctx->op && !repop->log_op_stat) {
log_op_stats(repop->ctx);
repop->log_op_stat = true;
@ -8253,6 +8253,12 @@ void ReplicatedPG::eval_repop(RepGather *repop)
// applied?
if (repop->all_applied) {
dout(10) << " applied: " << *repop << " " << dendl;
for (auto p = repop->on_applied.begin();
p != repop->on_applied.end();
repop->on_applied.erase(p++)) {
(*p)();
}
// send dup acks, in order
if (waiting_for_ack.count(repop->v)) {
@ -8306,9 +8312,10 @@ void ReplicatedPG::eval_repop(RepGather *repop)
calc_min_last_complete_ondisk();
// kick snap_trimmer if necessary
if (repop->queue_snap_trimmer) {
queue_snap_trim();
for (auto p = repop->on_success.begin();
p != repop->on_success.end();
repop->on_success.erase(p++)) {
(*p)();
}
dout(10) << " removing " << *repop << dendl;
@ -8397,8 +8404,9 @@ void ReplicatedPG::issue_repop(RepGather *repop)
repop->ctx->op_t = NULL;
}
ReplicatedPG::RepGather *ReplicatedPG::new_repop(OpContext *ctx, ObjectContextRef obc,
ceph_tid_t rep_tid)
ReplicatedPG::RepGather *ReplicatedPG::new_repop(
OpContext *ctx, ObjectContextRef obc,
ceph_tid_t rep_tid)
{
if (ctx->op)
dout(10) << "new_repop rep_tid " << rep_tid << " on " << *ctx->op->get_req() << dendl;
@ -8426,6 +8434,13 @@ void ReplicatedPG::remove_repop(RepGather *repop)
dout(20) << " clone_obc " << *repop->ctx->clone_obc << dendl;
if (repop->ctx->snapset_obc)
dout(20) << " snapset_obc " << *repop->ctx->snapset_obc << dendl;
for (auto p = repop->on_finish.begin();
p != repop->on_finish.end();
repop->on_finish.erase(p++)) {
(*p)();
}
release_op_ctx_locks(repop->ctx);
repop->ctx->finish(0); // FIXME: return value here is sloppy
repop->put();
@ -8433,21 +8448,21 @@ void ReplicatedPG::remove_repop(RepGather *repop)
osd->logger->dec(l_osd_op_wip);
}
ReplicatedPG::RepGather *ReplicatedPG::simple_repop_create(ObjectContextRef obc)
ReplicatedPG::OpContextUPtr ReplicatedPG::simple_opc_create(ObjectContextRef obc)
{
dout(20) << __func__ << " " << obc->obs.oi.soid << dendl;
vector<OSDOp> ops;
ceph_tid_t rep_tid = osd->get_tid();
osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid);
OpContext *ctx = new OpContext(OpRequestRef(), reqid, ops, obc, this);
OpContextUPtr ctx(new OpContext(OpRequestRef(), reqid, ops, obc, this));
ctx->op_t = pgbackend->get_transaction();
ctx->mtime = ceph_clock_now(g_ceph_context);
RepGather *repop = new_repop(ctx, obc, rep_tid);
return repop;
return ctx;
}
void ReplicatedPG::simple_repop_submit(RepGather *repop)
void ReplicatedPG::simple_opc_submit(OpContextUPtr ctx)
{
RepGather *repop = new_repop(ctx.get(), ctx->obc, ctx->reqid.tid);
dout(20) << __func__ << " " << repop << dendl;
issue_repop(repop);
eval_repop(repop);
@ -8575,8 +8590,7 @@ void ReplicatedPG::handle_watch_timeout(WatchRef watch)
return;
}
RepGather *repop = simple_repop_create(obc);
OpContext *ctx = repop->ctx;
OpContextUPtr ctx = simple_opc_create(obc);
ctx->at_version = get_next_version();
object_info_t& oi = ctx->new_obs.oi;
@ -8594,11 +8608,11 @@ void ReplicatedPG::handle_watch_timeout(WatchRef watch)
0,
osd_reqid_t(), ctx->mtime));
oi.prior_version = repop->obc->obs.oi.version;
oi.prior_version = obc->obs.oi.version;
oi.version = ctx->at_version;
bufferlist bl;
::encode(oi, bl);
setattr_maybe_cache(obc, repop->ctx, t, OI_ATTR, bl);
setattr_maybe_cache(obc, ctx.get(), t, OI_ATTR, bl);
if (pool.info.require_rollback()) {
map<string, boost::optional<bufferlist> > to_set;
@ -8609,9 +8623,7 @@ void ReplicatedPG::handle_watch_timeout(WatchRef watch)
}
// no ctx->delta_stats
// obc ref swallowed by repop!
simple_repop_submit(repop);
simple_opc_submit(std::move(ctx));
// apply new object state.
ctx->obc->obs = ctx->new_obs;
@ -9569,10 +9581,9 @@ void ReplicatedPG::apply_and_flush_repops(bool requeue)
repop_queue.pop_front();
dout(10) << " canceling repop tid " << repop->rep_tid << dendl;
repop->rep_aborted = true;
if (repop->on_applied) {
delete repop->on_applied;
repop->on_applied = NULL;
}
repop->on_applied.clear();
repop->on_committed.clear();
repop->on_success.clear();
if (requeue) {
if (repop->ctx->op) {
@ -11164,15 +11175,14 @@ void ReplicatedPG::hit_set_remove_all()
ObjectContextRef obc = get_object_context(oid, false);
assert(obc);
RepGather *repop = simple_repop_create(obc);
OpContext *ctx = repop->ctx;
OpContextUPtr ctx = simple_opc_create(obc);
ctx->at_version = get_next_version();
ctx->updated_hset_history = info.hit_set;
utime_t now = ceph_clock_now(cct);
ctx->mtime = now;
hit_set_trim(repop, 0);
apply_ctx_stats(ctx);
simple_repop_submit(repop);
hit_set_trim(ctx, 0);
apply_ctx_stats(ctx.get());
simple_opc_submit(std::move(ctx));
}
info.hit_set = pg_hit_set_history_t();
@ -11252,15 +11262,6 @@ bool ReplicatedPG::hit_set_apply_log()
return true;
}
struct C_HitSetFlushing : public Context {
ReplicatedPGRef pg;
time_t hit_set_name;
C_HitSetFlushing(ReplicatedPG *p, time_t n) : pg(p), hit_set_name(n) { }
void finish(int r) {
pg->hit_set_flushing.erase(hit_set_name);
}
};
void ReplicatedPG::hit_set_persist()
{
dout(10) << __func__ << dendl;
@ -11268,7 +11269,6 @@ void ReplicatedPG::hit_set_persist()
unsigned max = pool.info.hit_set_count;
utime_t now = ceph_clock_now(cct);
RepGather *repop;
hobject_t oid;
time_t flush_time = 0;
@ -11337,10 +11337,15 @@ void ReplicatedPG::hit_set_persist()
flush_time = new_hset.begin;
ObjectContextRef obc = get_object_context(oid, true);
repop = simple_repop_create(obc);
if (flush_time != 0)
repop->on_applied = new C_HitSetFlushing(this, flush_time);
OpContext *ctx = repop->ctx;
OpContextUPtr ctx = simple_opc_create(obc);
if (flush_time != 0) {
ReplicatedPGRef pg(this);
ctx->register_on_applied(
[pg, flush_time]() {
pg->hit_set_flushing.erase(flush_time);
});
}
ctx->at_version = get_next_version();
ctx->updated_hset_history = info.hit_set;
pg_hit_set_history_t &updated_hit_set_hist = *(ctx->updated_hset_history);
@ -11377,7 +11382,7 @@ void ReplicatedPG::hit_set_persist()
map <string, bufferlist> attrs;
attrs[OI_ATTR].claim(boi);
attrs[SS_ATTR].claim(bss);
setattrs_maybe_cache(ctx->obc, ctx, ctx->op_t, attrs);
setattrs_maybe_cache(ctx->obc, ctx.get(), ctx->op_t, attrs);
ctx->log.push_back(
pg_log_entry_t(
pg_log_entry_t::MODIFY,
@ -11394,17 +11399,17 @@ void ReplicatedPG::hit_set_persist()
ctx->log.back().mod_desc.mark_unrollbackable();
}
hit_set_trim(repop, max);
hit_set_trim(ctx, max);
apply_ctx_stats(ctx);
simple_repop_submit(repop);
apply_ctx_stats(ctx.get());
simple_opc_submit(std::move(ctx));
}
void ReplicatedPG::hit_set_trim(RepGather *repop, unsigned max)
void ReplicatedPG::hit_set_trim(OpContextUPtr &ctx, unsigned max)
{
assert(repop->ctx->updated_hset_history);
assert(ctx->updated_hset_history);
pg_hit_set_history_t &updated_hit_set_hist =
*(repop->ctx->updated_hset_history);
*(ctx->updated_hset_history);
for (unsigned num = updated_hit_set_hist.history.size(); num > max; --num) {
list<pg_hit_set_info_t>::iterator p = updated_hit_set_hist.history.begin();
assert(p != updated_hit_set_hist.history.end());
@ -11413,34 +11418,34 @@ void ReplicatedPG::hit_set_trim(RepGather *repop, unsigned max)
assert(!is_degraded_or_backfilling_object(oid));
dout(20) << __func__ << " removing " << oid << dendl;
++repop->ctx->at_version.version;
repop->ctx->log.push_back(
++ctx->at_version.version;
ctx->log.push_back(
pg_log_entry_t(pg_log_entry_t::DELETE,
oid,
repop->ctx->at_version,
ctx->at_version,
p->version,
0,
osd_reqid_t(),
repop->ctx->mtime));
ctx->mtime));
if (pool.info.require_rollback()) {
if (repop->ctx->log.back().mod_desc.rmobject(
repop->ctx->at_version.version)) {
repop->ctx->op_t->stash(oid, repop->ctx->at_version.version);
if (ctx->log.back().mod_desc.rmobject(
ctx->at_version.version)) {
ctx->op_t->stash(oid, ctx->at_version.version);
} else {
repop->ctx->op_t->remove(oid);
ctx->op_t->remove(oid);
}
} else {
repop->ctx->op_t->remove(oid);
repop->ctx->log.back().mod_desc.mark_unrollbackable();
ctx->op_t->remove(oid);
ctx->log.back().mod_desc.mark_unrollbackable();
}
updated_hit_set_hist.history.pop_front();
ObjectContextRef obc = get_object_context(oid, false);
assert(obc);
--repop->ctx->delta_stats.num_objects;
--repop->ctx->delta_stats.num_objects_hit_set_archive;
repop->ctx->delta_stats.num_bytes -= obc->obs.oi.size;
repop->ctx->delta_stats.num_bytes_hit_set_archive -= obc->obs.oi.size;
--ctx->delta_stats.num_objects;
--ctx->delta_stats.num_objects_hit_set_archive;
ctx->delta_stats.num_bytes -= obc->obs.oi.size;
ctx->delta_stats.num_bytes_hit_set_archive -= obc->obs.oi.size;
}
}
@ -11868,14 +11873,13 @@ bool ReplicatedPG::agent_maybe_evict(ObjectContextRef& obc, bool after_flush)
}
dout(10) << __func__ << " evicting " << obc->obs.oi << dendl;
RepGather *repop = simple_repop_create(obc);
OpContext *ctx = repop->ctx;
OpContextUPtr ctx = simple_opc_create(obc);
Context *on_evict = new C_AgentEvictStartStop(this);
ctx->on_finish = on_evict;
ctx->lock_to_release = OpContext::W_LOCK;
ctx->at_version = get_next_version();
assert(ctx->new_obs.exists);
int r = _delete_oid(ctx, true);
int r = _delete_oid(ctx.get(), true);
if (obc->obs.oi.is_omap())
ctx->delta_stats.num_objects_omap--;
ctx->delta_stats.num_evict++;
@ -11883,8 +11887,8 @@ bool ReplicatedPG::agent_maybe_evict(ObjectContextRef& obc, bool after_flush)
if (obc->obs.oi.is_dirty())
--ctx->delta_stats.num_objects_dirty;
assert(r == 0);
finish_ctx(ctx, pg_log_entry_t::DELETE, false);
simple_repop_submit(repop);
finish_ctx(ctx.get(), pg_log_entry_t::DELETE, false);
simple_opc_submit(std::move(ctx));
osd->logger->inc(l_osd_tier_evict);
osd->logger->inc(l_osd_agent_evict);
return true;
@ -12583,15 +12587,14 @@ void ReplicatedPG::_scrub(
dout(10) << __func__ << " recording digests for " << p->first << dendl;
ObjectContextRef obc = get_object_context(p->first, false);
assert(obc);
RepGather *repop = simple_repop_create(obc);
OpContext *ctx = repop->ctx;
OpContextUPtr ctx = simple_opc_create(obc);
ctx->at_version = get_next_version();
ctx->mtime = utime_t(); // do not update mtime
ctx->new_obs.oi.set_data_digest(p->second.first);
ctx->new_obs.oi.set_omap_digest(p->second.second);
finish_ctx(ctx, pg_log_entry_t::MODIFY, true, true);
finish_ctx(ctx.get(), pg_log_entry_t::MODIFY, true, true);
ctx->on_finish = new C_ScrubDigestUpdated(this);
simple_repop_submit(repop);
simple_opc_submit(std::move(ctx));
++scrubber.num_digest_updates_pending;
}
@ -12674,10 +12677,7 @@ void ReplicatedPG::_scrub_finish()
ReplicatedPG::SnapTrimmer::~SnapTrimmer()
{
while (!repops.empty()) {
(*repops.begin())->put();
repops.erase(repops.begin());
}
in_flight.clear();
}
void ReplicatedPG::SnapTrimmer::log_enter(const char *state_name)
@ -12746,36 +12746,19 @@ ReplicatedPG::TrimmingObjects::TrimmingObjects(my_context ctx)
void ReplicatedPG::TrimmingObjects::exit()
{
context< SnapTrimmer >().log_exit(state_name, enter_time);
// Clean up repops in case of reset
set<RepGather *> &repops = context<SnapTrimmer>().repops;
for (set<RepGather *>::iterator i = repops.begin();
i != repops.end();
repops.erase(i++)) {
(*i)->put();
}
context<SnapTrimmer>().in_flight.clear();
}
boost::statechart::result ReplicatedPG::TrimmingObjects::react(const SnapTrim&)
{
dout(10) << "TrimmingObjects react" << dendl;
ReplicatedPG *pg = context< SnapTrimmer >().pg;
ReplicatedPGRef pg = context< SnapTrimmer >().pg;
snapid_t snap_to_trim = context<SnapTrimmer>().snap_to_trim;
set<RepGather *> &repops = context<SnapTrimmer>().repops;
auto &in_flight = context<SnapTrimmer>().in_flight;
dout(10) << "TrimmingObjects: trimming snap " << snap_to_trim << dendl;
for (set<RepGather *>::iterator i = repops.begin();
i != repops.end();
) {
if ((*i)->all_applied && (*i)->all_committed) {
(*i)->put();
repops.erase(i++);
} else {
++i;
}
}
while (repops.size() < g_conf->osd_pg_max_concurrent_snap_trims) {
while (in_flight.size() < g_conf->osd_pg_max_concurrent_snap_trims) {
// Get next
hobject_t old_pos = pos;
int r = pg->snap_mapper.get_next_object_to_trim(snap_to_trim, &pos);
@ -12790,20 +12773,25 @@ boost::statechart::result ReplicatedPG::TrimmingObjects::react(const SnapTrim&)
}
dout(10) << "TrimmingObjects react trimming " << pos << dendl;
RepGather *repop = pg->trim_object(pos);
if (!repop) {
OpContextUPtr ctx = pg->trim_object(pos);
if (!ctx) {
dout(10) << __func__ << " could not get write lock on obj "
<< pos << dendl;
pos = old_pos;
return discard_event();
}
assert(repop);
repop->queue_snap_trimmer = true;
assert(ctx);
hobject_t to_remove = pos;
ctx->register_on_success(
[pg, to_remove, &in_flight]() {
in_flight.erase(to_remove);
pg->queue_snap_trim();
});
pg->apply_ctx_stats(repop->ctx);
pg->apply_ctx_stats(ctx.get());
repops.insert(repop->get());
pg->simple_repop_submit(repop);
in_flight.insert(pos);
pg->simple_opc_submit(std::move(ctx));
}
return discard_event();
}
@ -12819,30 +12807,16 @@ ReplicatedPG::WaitingOnReplicas::WaitingOnReplicas(my_context ctx)
void ReplicatedPG::WaitingOnReplicas::exit()
{
context< SnapTrimmer >().log_exit(state_name, enter_time);
// Clean up repops in case of reset
set<RepGather *> &repops = context<SnapTrimmer>().repops;
for (set<RepGather *>::iterator i = repops.begin();
i != repops.end();
repops.erase(i++)) {
(*i)->put();
}
context<SnapTrimmer>().in_flight.clear();
}
boost::statechart::result ReplicatedPG::WaitingOnReplicas::react(const SnapTrim&)
{
// Have all the repops applied?
// Have all the trims finished?
dout(10) << "Waiting on Replicas react" << dendl;
ReplicatedPG *pg = context< SnapTrimmer >().pg;
set<RepGather *> &repops = context<SnapTrimmer>().repops;
for (set<RepGather *>::iterator i = repops.begin();
i != repops.end();
repops.erase(i++)) {
if (!(*i)->all_applied || !(*i)->all_committed) {
return discard_event();
} else {
(*i)->put();
}
if (!context<SnapTrimmer>().in_flight.empty()) {
return discard_event();
}
snapid_t &sn = context<SnapTrimmer>().snap_to_trim;

View File

@ -557,6 +557,29 @@ public:
// pending xattr updates
map<ObjectContextRef,
map<string, boost::optional<bufferlist> > > pending_attrs;
list<std::function<void()>> on_applied;
list<std::function<void()>> on_committed;
list<std::function<void()>> on_finish;
list<std::function<void()>> on_success;
template <typename F>
void register_on_finish(F &&f) {
on_finish.emplace_back(std::move(f));
}
template <typename F>
void register_on_success(F &&f) {
on_finish.emplace_back(std::move(f));
}
template <typename F>
void register_on_applied(F &&f) {
on_applied.emplace_back(std::move(f));
}
template <typename F>
void register_on_commit(F &&f) {
on_committed.emplace_back(std::move(f));
}
void apply_pending_attrs() {
for (map<ObjectContextRef,
map<string, boost::optional<bufferlist> > >::iterator i =
@ -675,6 +698,7 @@ public:
}
}
};
using OpContextUPtr = std::unique_ptr<OpContext>;
friend struct OpContext;
/*
@ -705,9 +729,10 @@ public:
eversion_t pg_local_last_complete;
bool queue_snap_trimmer;
Context *on_applied;
list<std::function<void()>> on_applied;
list<std::function<void()>> on_committed;
list<std::function<void()>> on_success;
list<std::function<void()>> on_finish;
bool log_op_stat;
RepGather(OpContext *c, ObjectContextRef pi, ceph_tid_t rt,
@ -721,8 +746,10 @@ public:
//sent_nvram(false),
sent_disk(false),
pg_local_last_complete(lc),
queue_snap_trimmer(false),
on_applied(NULL),
on_applied(std::move(c->on_applied)),
on_committed(std::move(c->on_committed)),
on_success(std::move(c->on_success)),
on_finish(std::move(c->on_finish)),
log_op_stat(false) { }
RepGather *get() {
@ -733,7 +760,7 @@ public:
assert(nref > 0);
if (--nref == 0) {
delete ctx; // must already be unlocked
assert(on_applied == NULL);
assert(on_applied.empty());
delete this;
//generic_dout(0) << "deleting " << this << dendl;
}
@ -795,6 +822,11 @@ protected:
release_op_ctx_locks(ctx);
delete ctx->op_t;
ctx->op_t = NULL;
for (auto p = ctx->on_finish.begin();
p != ctx->on_finish.end();
ctx->on_finish.erase(p++)) {
(*p)();
}
ctx->finish(r);
delete ctx;
}
@ -896,11 +928,14 @@ protected:
void repop_all_committed(RepGather *repop);
void eval_repop(RepGather*);
void issue_repop(RepGather *repop);
RepGather *new_repop(OpContext *ctx, ObjectContextRef obc, ceph_tid_t rep_tid);
RepGather *new_repop(
OpContext *ctx,
ObjectContextRef obc,
ceph_tid_t rep_tid);
void remove_repop(RepGather *repop);
RepGather *simple_repop_create(ObjectContextRef obc);
void simple_repop_submit(RepGather *repop);
OpContextUPtr simple_opc_create(ObjectContextRef obc);
void simple_opc_submit(OpContextUPtr ctx);
// hot/cold tracking
HitSetRef hit_set; ///< currently accumulating HitSet
@ -913,7 +948,7 @@ protected:
void hit_set_create(); ///< create a new HitSet
void hit_set_persist(); ///< persist hit info
bool hit_set_apply_log(); ///< apply log entries to update in-memory HitSet
void hit_set_trim(RepGather *repop, unsigned max); ///< discard old HitSets
void hit_set_trim(OpContextUPtr &ctx, unsigned max); ///< discard old HitSets
void hit_set_in_memory_trim(uint32_t max_in_memory); ///< discard old in memory HitSets
void hit_set_remove_all();
@ -1474,7 +1509,7 @@ public:
ThreadPool::TPHandle &handle);
void do_backfill(OpRequestRef op);
RepGather *trim_object(const hobject_t &coid);
OpContextUPtr trim_object(const hobject_t &coid);
void snap_trimmer(epoch_t e);
int do_osd_ops(OpContext *ctx, vector<OSDOp>& ops);
@ -1544,7 +1579,7 @@ private:
};
struct SnapTrimmer : public boost::statechart::state_machine< SnapTrimmer, NotTrimming > {
ReplicatedPG *pg;
set<RepGather *> repops;
set<hobject_t, hobject_t::BitwiseComparator> in_flight;
snapid_t snap_to_trim;
bool need_share_pg_info;
explicit SnapTrimmer(ReplicatedPG *pg) : pg(pg), need_share_pg_info(false) {}