From 53c26bc335c4856c926ac9a812fef4736669bbdc Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Wed, 20 Nov 2013 16:17:36 -0800 Subject: [PATCH] osd/: move client op handling into ReplicatedBackend Signed-off-by: Samuel Just --- src/osd/PGBackend.h | 18 +- src/osd/ReplicatedBackend.cc | 211 +++++++++- src/osd/ReplicatedBackend.h | 105 ++++- src/osd/ReplicatedPG.cc | 776 ++++++++++++++--------------------- src/osd/ReplicatedPG.h | 124 +++--- 5 files changed, 683 insertions(+), 551 deletions(-) diff --git a/src/osd/PGBackend.h b/src/osd/PGBackend.h index 30804fdbc41..1d7a682fdac 100644 --- a/src/osd/PGBackend.h +++ b/src/osd/PGBackend.h @@ -114,13 +114,29 @@ const hobject_t &hoid, map &attrs) = 0; - virtual void op_applied_replica( + virtual void op_applied( const eversion_t &applied_version) = 0; virtual bool should_send_op( int peer, const hobject_t &hoid) = 0; + virtual void log_operation( + vector &logv, + const eversion_t &trim_to, + bool update_snaps, + ObjectStore::Transaction *t) = 0; + + virtual void update_peer_last_complete_ondisk( + int fromosd, + eversion_t lcod) = 0; + + virtual void update_last_complete_ondisk( + eversion_t lcod) = 0; + + virtual void update_stats( + const pg_stat_t &stat) = 0; + virtual ~Listener() {} }; Listener *parent; diff --git a/src/osd/ReplicatedBackend.cc b/src/osd/ReplicatedBackend.cc index 6d893b4e3e8..a7fdeb0e555 100644 --- a/src/osd/ReplicatedBackend.cc +++ b/src/osd/ReplicatedBackend.cc @@ -12,6 +12,7 @@ * */ #include "ReplicatedBackend.h" +#include "messages/MOSDOp.h" #include "messages/MOSDSubOp.h" #include "messages/MOSDSubOpReply.h" #include "messages/MOSDPGPush.h" @@ -148,6 +149,8 @@ bool ReplicatedBackend::handle_message( default: break; } + } else { + sub_op_modify(op); } break; } @@ -162,6 +165,8 @@ bool ReplicatedBackend::handle_message( sub_op_push_reply(op); return true; } + } else { + sub_op_modify_reply(op); } break; } @@ -192,6 +197,14 @@ void ReplicatedBackend::on_change(ObjectStore::Transaction *t) t->remove(get_temp_coll(t), *i); } temp_contents.clear(); + for (map::iterator i = in_progress_ops.begin(); + i != in_progress_ops.end(); + in_progress_ops.erase(i++)) { + if (i->second.on_commit) + delete i->second.on_commit; + if (i->second.on_applied) + delete i->second.on_applied; + } clear_state(); } @@ -480,6 +493,9 @@ public: bool empty() const { return t->empty(); } + uint64_t get_bytes_written() const { + return t->get_encoded_bytes(); + } ~RPGTransaction() { delete t; } }; @@ -488,13 +504,202 @@ PGBackend::PGTransaction *ReplicatedBackend::get_transaction() return new RPGTransaction(coll, get_temp_coll()); } +class C_OSD_OnOpCommit : public Context { + ReplicatedBackend *pg; + ReplicatedBackend::InProgressOp *op; +public: + C_OSD_OnOpCommit(ReplicatedBackend *pg, ReplicatedBackend::InProgressOp *op) + : pg(pg), op(op) {} + void finish(int) { + pg->op_commit(op); + } +}; + +class C_OSD_OnOpApplied : public Context { + ReplicatedBackend *pg; + ReplicatedBackend::InProgressOp *op; +public: + C_OSD_OnOpApplied(ReplicatedBackend *pg, ReplicatedBackend::InProgressOp *op) + : pg(pg), op(op) {} + void finish(int) { + pg->op_applied(op); + } +}; + void ReplicatedBackend::submit_transaction( + const hobject_t &soid, + const eversion_t &at_version, PGTransaction *_t, + const eversion_t &trim_to, vector &log_entries, + Context *on_local_applied_sync, Context *on_all_acked, Context *on_all_commit, - tid_t tid) + tid_t tid, + osd_reqid_t reqid, + OpRequestRef orig_op) { - //RPGTransaction *t = dynamic_cast(_t); - return; + RPGTransaction *t = dynamic_cast(_t); + ObjectStore::Transaction *op_t = t->get_transaction(); + + assert(t->get_temp_added().size() <= 1); + assert(t->get_temp_cleared().size() <= 1); + + assert(!in_progress_ops.count(tid)); + InProgressOp &op = in_progress_ops.insert( + make_pair( + tid, + InProgressOp( + tid, on_all_commit, on_all_acked, + orig_op, at_version) + ) + ).first->second; + + issue_op( + soid, + at_version, + tid, + reqid, + trim_to, + t->get_temp_added().size() ? *(t->get_temp_added().begin()) : hobject_t(), + t->get_temp_cleared().size() ? + *(t->get_temp_cleared().begin()) :hobject_t(), + log_entries, + &op, + op_t); + + // add myself to gather set + op.waiting_for_applied.insert(osd->whoami); + op.waiting_for_commit.insert(osd->whoami); + + ObjectStore::Transaction local_t; + if (t->get_temp_added().size()) { + get_temp_coll(&local_t); + temp_contents.insert(t->get_temp_added().begin(), t->get_temp_added().end()); + } + for (set::const_iterator i = t->get_temp_cleared().begin(); + i != t->get_temp_cleared().end(); + ++i) { + temp_contents.erase(*i); + } + parent->log_operation(log_entries, trim_to, true, &local_t); + local_t.append(*op_t); + local_t.swap(*op_t); + + op_t->register_on_applied_sync(on_local_applied_sync); + op_t->register_on_applied( + parent->bless_context( + new C_OSD_OnOpApplied(this, &op))); + op_t->register_on_applied( + new ObjectStore::C_DeleteTransaction(op_t)); + op_t->register_on_commit( + parent->bless_context( + new C_OSD_OnOpCommit(this, &op))); + + parent->queue_transaction(op_t, op.op); + delete t; +} + +void ReplicatedBackend::op_applied( + InProgressOp *op) +{ + dout(10) << __func__ << ": " << op->tid << dendl; + if (op->op) + op->op->mark_event("op_applied"); + + op->waiting_for_applied.erase(osd->whoami); + parent->op_applied(op->v); + + if (op->waiting_for_applied.empty()) { + op->on_applied->complete(0); + op->on_applied = 0; + } + if (op->done()) { + assert(!op->on_commit && !op->on_applied); + in_progress_ops.erase(op->tid); + } +} + +void ReplicatedBackend::op_commit( + InProgressOp *op) +{ + dout(10) << __func__ << ": " << op->tid << dendl; + if (op->op) + op->op->mark_event("op_commit"); + + op->waiting_for_commit.erase(osd->whoami); + + if (op->waiting_for_commit.empty()) { + op->on_commit->complete(0); + op->on_commit = 0; + } + if (op->done()) { + assert(!op->on_commit && !op->on_applied); + in_progress_ops.erase(op->tid); + } +} + +void ReplicatedBackend::sub_op_modify_reply(OpRequestRef op) +{ + MOSDSubOpReply *r = static_cast(op->get_req()); + assert(r->get_header().type == MSG_OSD_SUBOPREPLY); + + op->mark_started(); + + // must be replication. + tid_t rep_tid = r->get_tid(); + int fromosd = r->get_source().num(); + + if (in_progress_ops.count(rep_tid)) { + map::iterator iter = + in_progress_ops.find(rep_tid); + InProgressOp &ip_op = iter->second; + MOSDOp *m; + if (ip_op.op) + m = static_cast(ip_op.op->get_req()); + + if (m) + dout(7) << __func__ << ": tid " << ip_op.tid << " op " //<< *m + << " ack_type " << r->ack_type + << " from osd." << fromosd + << dendl; + else + dout(7) << __func__ << ": tid " << ip_op.tid << " (no op) " + << " ack_type " << r->ack_type + << " from osd." << fromosd + << dendl; + + // oh, good. + + if (r->ack_type & CEPH_OSD_FLAG_ONDISK) { + assert(ip_op.waiting_for_commit.count(fromosd)); + ip_op.waiting_for_commit.erase(fromosd); + if (ip_op.op) + ip_op.op->mark_event("sub_op_commit_rec"); + } else { + assert(ip_op.waiting_for_applied.count(fromosd)); + if (ip_op.op) + ip_op.op->mark_event("sub_op_applied_rec"); + } + ip_op.waiting_for_applied.erase(fromosd); + + parent->update_peer_last_complete_ondisk( + fromosd, + r->get_last_complete_ondisk()); + + if (ip_op.waiting_for_applied.empty() && + ip_op.on_applied) { + ip_op.on_applied->complete(0); + ip_op.on_applied = 0; + } + if (ip_op.waiting_for_commit.empty() && + ip_op.on_commit) { + ip_op.on_commit->complete(0); + ip_op.on_commit= 0; + } + if (ip_op.done()) { + assert(!ip_op.on_commit && !ip_op.on_applied); + in_progress_ops.erase(iter); + } + } } diff --git a/src/osd/ReplicatedBackend.h b/src/osd/ReplicatedBackend.h index 6960d223e2a..166826251e3 100644 --- a/src/osd/ReplicatedBackend.h +++ b/src/osd/ReplicatedBackend.h @@ -172,18 +172,6 @@ public: const string &attr, bufferlist *out); - /** - * Client IO - */ - PGTransaction *get_transaction(); - void submit_transaction( - PGTransaction *t, - vector &log_entries, - Context *on_all_acked, - Context *on_all_commit, - tid_t tid - ); - private: // push struct PushInfo { @@ -340,6 +328,99 @@ private: const ObjectRecoveryInfo& recovery_info, SnapSetContext *ssc ); + + /** + * Client IO + */ + struct InProgressOp { + tid_t tid; + set waiting_for_commit; + set waiting_for_applied; + Context *on_commit; + Context *on_applied; + OpRequestRef op; + eversion_t v; + InProgressOp( + tid_t tid, Context *on_commit, Context *on_applied, + OpRequestRef op, eversion_t v) + : tid(tid), on_commit(on_commit), on_applied(on_applied), + op(op), v(v) {} + bool done() const { + return waiting_for_commit.empty() && + waiting_for_applied.empty(); + } + }; + map in_progress_ops; +public: + PGTransaction *get_transaction(); + friend class C_OSD_OnOpCommit; + friend class C_OSD_OnOpApplied; + void submit_transaction( + const hobject_t &hoid, + const eversion_t &at_version, + PGTransaction *t, + const eversion_t &trim_to, + vector &log_entries, + Context *on_local_applied_sync, + Context *on_all_applied, + Context *on_all_commit, + tid_t tid, + osd_reqid_t reqid, + OpRequestRef op + ); +private: + void issue_op( + const hobject_t &soid, + const eversion_t &at_version, + tid_t tid, + osd_reqid_t reqid, + eversion_t pg_trim_to, + hobject_t new_temp_oid, + hobject_t discard_temp_oid, + vector &log_entries, + InProgressOp *op, + ObjectStore::Transaction *op_t); + void op_applied(InProgressOp *op); + void op_commit(InProgressOp *op); + void sub_op_modify_reply(OpRequestRef op); + void sub_op_modify(OpRequestRef op); + + struct RepModify { + OpRequestRef op; + bool applied, committed; + int ackerosd; + eversion_t last_complete; + epoch_t epoch_started; + + uint64_t bytes_written; + + ObjectStore::Transaction opt, localt; + + RepModify() : applied(false), committed(false), ackerosd(-1), + epoch_started(0), bytes_written(0) {} + }; + typedef std::tr1::shared_ptr RepModifyRef; + + struct C_OSD_RepModifyApply : public Context { + ReplicatedBackend *pg; + RepModifyRef rm; + C_OSD_RepModifyApply(ReplicatedBackend *pg, RepModifyRef r) + : pg(pg), rm(r) {} + void finish(int r) { + pg->sub_op_modify_applied(rm); + } + }; + struct C_OSD_RepModifyCommit : public Context { + ReplicatedBackend *pg; + RepModifyRef rm; + C_OSD_RepModifyCommit(ReplicatedBackend *pg, RepModifyRef r) + : pg(pg), rm(r) {} + void finish(int r) { + pg->sub_op_modify_commit(rm); + } + }; + void sub_op_modify_applied(RepModifyRef rm); + void sub_op_modify_commit(RepModifyRef rm); }; #endif diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 332fbdcd067..52667289de0 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -1301,6 +1301,7 @@ void ReplicatedPG::do_op(OpRequestRef op) OpContext *ctx = new OpContext(op, m->get_reqid(), m->ops, &obc->obs, obc->ssc, this); + ctx->op_t = pgbackend->get_transaction(); ctx->obc = obc; if (m->get_flags() & CEPH_OSD_FLAG_SKIPRWLOCKS) { dout(20) << __func__ << ": skipping rw locks" << dendl; @@ -1496,8 +1497,8 @@ void ReplicatedPG::execute_ctx(OpContext *ctx) // this method must be idempotent since we may call it several times // before we finally apply the resulting transaction. - ctx->op_t = ObjectStore::Transaction(); - ctx->local_t = ObjectStore::Transaction(); + delete ctx->op_t; + ctx->op_t = pgbackend->get_transaction(); if (op->may_write() || op->may_cache()) { // dup/replay? @@ -1618,7 +1619,7 @@ void ReplicatedPG::execute_ctx(OpContext *ctx) // possible to construct an operation that does a read, does a guard // check (e.g., CMPXATTR), and then a write. Then we either succeed // with the write, or return a CMPXATTR and the read value. - if ((ctx->op_t.empty() && !ctx->modify) || result < 0) { + if ((ctx->op_t->empty() && !ctx->modify) || result < 0) { // read. ctx->reply->claim_op_out_data(ctx->ops); ctx->reply->get_header().data_off = ctx->data_off; @@ -1632,7 +1633,7 @@ void ReplicatedPG::execute_ctx(OpContext *ctx) ctx->reply->set_result(result); // read or error? - if (ctx->op_t.empty() || result < 0) { + if (ctx->op_t->empty() || result < 0) { MOSDOpReply *reply = ctx->reply; ctx->reply = NULL; @@ -1660,8 +1661,6 @@ void ReplicatedPG::execute_ctx(OpContext *ctx) // trim log? calc_trim_to(); - append_log(ctx->log, pg_trim_to, ctx->local_t); - // verify that we are doing this in order? if (cct->_conf->osd_debug_op_order && m->get_source().is_client()) { map& cm = debug_op_order[obc->obs.oi.soid]; @@ -1798,8 +1797,6 @@ void ReplicatedPG::do_sub_op(OpRequestRef op) return; } } - - sub_op_modify(op); } void ReplicatedPG::do_sub_op_reply(OpRequestRef op) @@ -1814,8 +1811,6 @@ void ReplicatedPG::do_sub_op_reply(OpRequestRef op) return; } } - - sub_op_modify_reply(op); } void ReplicatedPG::do_scan( @@ -2134,7 +2129,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid) OpContext *ctx = repop->ctx; ctx->at_version = get_next_version(); - ObjectStore::Transaction *t = &ctx->op_t; + PGBackend::PGTransaction *t = ctx->op_t; set new_snaps; for (set::iterator i = old_snaps.begin(); i != old_snaps.end(); @@ -2147,7 +2142,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid) // remove clone dout(10) << coid << " snaps " << old_snaps << " -> " << new_snaps << " ... deleting" << dendl; - t->remove(coll, coid); + t->remove(coid); // ...from snapset snapid_t last = coid.snap; @@ -2207,7 +2202,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid) coi.version = ctx->at_version; bl.clear(); ::encode(coi, bl); - t->setattr(coll, coid, OI_ATTR, bl); + t->setattr(coid, OI_ATTR, bl); ctx->log.push_back( pg_log_entry_t( @@ -2246,7 +2241,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid) ); ctx->snapset_obc->obs.exists = false; - t->remove(coll, snapoid); + t->remove(snapoid); } else { dout(10) << coid << " updating snapset on " << snapoid << dendl; ctx->log.push_back( @@ -2266,11 +2261,11 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid) bl.clear(); ::encode(snapset, bl); - t->setattr(coll, snapoid, SS_ATTR, bl); + t->setattr(snapoid, SS_ATTR, bl); bl.clear(); ::encode(ctx->snapset_obc->obs.oi, bl); - t->setattr(coll, snapoid, OI_ATTR, bl); + t->setattr(snapoid, OI_ATTR, bl); } return repop; @@ -2687,7 +2682,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) bool first_read = true; - ObjectStore::Transaction& t = ctx->op_t; + PGBackend::PGTransaction* t = ctx->op_t; dout(10) << "do_osd_op " << soid << " " << ops << dendl; @@ -3393,7 +3388,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) if (obs.exists && !oi.is_whiteout()) { dout(10) << " truncate_seq " << op.extent.truncate_seq << " > current " << seq << ", truncating to " << op.extent.truncate_size << dendl; - t.truncate(coll, soid, op.extent.truncate_size); + t->truncate(soid, op.extent.truncate_size); oi.truncate_seq = op.extent.truncate_seq; oi.truncate_size = op.extent.truncate_size; if (op.extent.truncate_size != oi.size) { @@ -3411,7 +3406,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) result = check_offset_and_length(op.extent.offset, op.extent.length, cct->_conf->osd_max_object_size); if (result < 0) break; - t.write(coll, soid, op.extent.offset, op.extent.length, osd_op.indata); + t->write(soid, op.extent.offset, op.extent.length, osd_op.indata); write_update_size_and_usage(ctx->delta_stats, oi, ssc->snapset, ctx->modified_ranges, op.extent.offset, op.extent.length, true); if (!obs.exists) { @@ -3432,12 +3427,12 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) if (result < 0) break; if (obs.exists) { - t.truncate(coll, soid, 0); + t->truncate(soid, 0); } else { ctx->delta_stats.num_objects++; obs.exists = true; } - t.write(coll, soid, op.extent.offset, op.extent.length, osd_op.indata); + t->write(soid, op.extent.offset, op.extent.length, osd_op.indata); interval_set ch; if (oi.size > 0) ch.insert(0, oi.size); @@ -3465,7 +3460,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) break; assert(op.extent.length); if (obs.exists && !oi.is_whiteout()) { - t.zero(coll, soid, op.extent.offset, op.extent.length); + t->zero(soid, op.extent.offset, op.extent.length); interval_set ch; ch.insert(op.extent.offset, op.extent.length); ctx->modified_ranges.union_of(ch); @@ -3503,7 +3498,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) } } if (result >= 0 && !obs.exists) { - t.touch(coll, soid); + t->touch(soid); ctx->delta_stats.num_objects++; obs.exists = true; } @@ -3542,7 +3537,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) oi.truncate_size = op.extent.truncate_size; } - t.truncate(coll, soid, op.extent.offset); + t->truncate(soid, op.extent.offset); if (oi.size > op.extent.offset) { interval_set trim; trim.insert(op.extent.offset, oi.size-op.extent.offset); @@ -3573,7 +3568,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) ++ctx->num_write; { if (!obs.exists) { - t.touch(coll, obs.oi.soid); + t->touch(obs.oi.soid); ctx->delta_stats.num_objects++; obs.exists = true; } @@ -3584,7 +3579,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) result = -EINVAL; break; } - t.clone_range(coll, src_obc->obs.oi.soid, + t->clone_range(src_obc->obs.oi.soid, obs.oi.soid, op.clonerange.src_offset, op.clonerange.length, op.clonerange.offset); @@ -3616,7 +3611,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) } else { dout(10) << " registered new watch " << w << " by " << entity << dendl; oi.watchers[make_pair(cookie, entity)] = w; - t.nop(); // make sure update the object_info on disk! + t->nop(); // make sure update the object_info on disk! } ctx->watch_connects.push_back(w); } else { @@ -3626,7 +3621,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) dout(10) << " removed watch " << oi_iter->second << " by " << entity << dendl; oi.watchers.erase(oi_iter); - t.nop(); // update oi on disk + t->nop(); // update oi on disk ctx->watch_disconnects.push_back(w); } else { dout(10) << " can't remove: no watch by " << entity << dendl; @@ -3647,7 +3642,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) break; } if (!obs.exists) { - t.touch(coll, soid); + t->touch(soid); ctx->delta_stats.num_objects++; obs.exists = true; } @@ -3656,7 +3651,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) string name = "_" + aname; bufferlist bl; bp.copy(op.xattr.value_len, bl); - t.setattr(coll, soid, name, bl); + t->setattr(soid, name, bl); ctx->delta_stats.num_wr++; } break; @@ -3667,7 +3662,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) string aname; bp.copy(op.xattr.name_len, aname); string name = "_" + aname; - t.rmattr(coll, soid, name); + t->rmattr(soid, name); ctx->delta_stats.num_wr++; } break; @@ -3692,7 +3687,8 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) break; case CEPH_OSD_OP_STARTSYNC: - t.start_sync(); + // TODOSAM: either nop this or fix it + //t.start_sync(); break; @@ -3951,7 +3947,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) ctx->delta_stats.num_objects++; obs.exists = true; } - t.touch(coll, soid); + t->touch(soid); map to_set; try { ::decode(to_set, bp); @@ -3966,7 +3962,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) ++i) { dout(20) << "\t" << i->first << dendl; } - t.omap_setkeys(coll, soid, to_set); + t->omap_setkeys(soid, to_set); ctx->delta_stats.num_wr++; } break; @@ -3978,8 +3974,8 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) ctx->delta_stats.num_objects++; obs.exists = true; } - t.touch(coll, soid); - t.omap_setheader(coll, soid, osd_op.indata); + t->touch(soid); + t->omap_setheader(soid, osd_op.indata); ctx->delta_stats.num_wr++; } break; @@ -3991,8 +3987,8 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) result = -ENOENT; break; } - t.touch(coll, soid); - t.omap_clear(coll, soid); + t->touch(soid); + t->omap_clear(soid); ctx->delta_stats.num_wr++; } break; @@ -4004,7 +4000,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) result = -ENOENT; break; } - t.touch(coll, soid); + t->touch(soid); set to_rm; try { ::decode(to_rm, bp); @@ -4013,7 +4009,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) result = -EINVAL; goto fail; } - t.omap_rmkeys(coll, soid, to_rm); + t->omap_rmkeys(soid, to_rm); ctx->delta_stats.num_wr++; } break; @@ -4123,11 +4119,13 @@ inline int ReplicatedPG::_delete_head(OpContext *ctx, bool no_whiteout) ObjectState& obs = ctx->new_obs; object_info_t& oi = obs.oi; const hobject_t& soid = oi.soid; - ObjectStore::Transaction& t = ctx->op_t; + PGBackend::PGTransaction* t = ctx->op_t; if (!obs.exists || (obs.oi.is_whiteout() && !no_whiteout)) return -ENOENT; + t->remove(soid); + if (oi.size > 0) { interval_set ch; ch.insert(0, oi.size); @@ -4143,13 +4141,10 @@ inline int ReplicatedPG::_delete_head(OpContext *ctx, bool no_whiteout) dout(20) << __func__ << " setting whiteout on " << soid << dendl; oi.set_flag(object_info_t::FLAG_WHITEOUT); ctx->delta_stats.num_whiteouts++; - t.truncate(coll, soid, 0); - t.omap_clear(coll, soid); - t.rmattrs(coll, soid); + t->touch(soid); return 0; } - t.remove(coll, soid); ctx->delta_stats.num_objects--; if (oi.is_dirty()) ctx->delta_stats.num_objects_dirty--; @@ -4166,7 +4161,7 @@ int ReplicatedPG::_rollback_to(OpContext *ctx, ceph_osd_op& op) ObjectState& obs = ctx->new_obs; object_info_t& oi = obs.oi; const hobject_t& soid = oi.soid; - ObjectStore::Transaction& t = ctx->op_t; + PGBackend::PGTransaction* t = ctx->op_t; snapid_t snapid = (uint64_t)op.snap.snapid; hobject_t missing_oid; @@ -4225,10 +4220,9 @@ int ReplicatedPG::_rollback_to(OpContext *ctx, ceph_osd_op& op) << " and rolling back to old snap" << dendl; if (obs.exists) - t.remove(coll, soid); + t->remove(soid); - t.clone(coll, - rollback_to_sobject, soid); + t->clone(rollback_to_sobject, soid); snapset.head_exists = true; map >::iterator iter = @@ -4263,23 +4257,23 @@ int ReplicatedPG::_rollback_to(OpContext *ctx, ceph_osd_op& op) return ret; } -void ReplicatedPG::_make_clone(ObjectStore::Transaction& t, +void ReplicatedPG::_make_clone(PGBackend::PGTransaction* t, const hobject_t& head, const hobject_t& coid, object_info_t *poi) { bufferlist bv; ::encode(*poi, bv); - t.clone(coll, head, coid); - t.setattr(coll, coid, OI_ATTR, bv); - t.rmattr(coll, coid, SS_ATTR); + t->clone(head, coid); + t->setattr(coid, OI_ATTR, bv); + t->rmattr(coid, SS_ATTR); } void ReplicatedPG::make_writeable(OpContext *ctx) { const hobject_t& soid = ctx->obs->oi.soid; SnapContext& snapc = ctx->snapc; - ObjectStore::Transaction t; + PGBackend::PGTransaction *t = pgbackend->get_transaction(); // clone? assert(soid.snap == CEPH_NOSNAP); @@ -4384,8 +4378,9 @@ void ReplicatedPG::make_writeable(OpContext *ctx) } // prepend transaction to op_t - t.append(ctx->op_t); - t.swap(ctx->op_t); + t->append(ctx->op_t); + delete ctx->op_t; + ctx->op_t = t; // update snapset with latest snap context ctx->new_snapset.seq = snapc.seq; @@ -4524,8 +4519,6 @@ hobject_t ReplicatedPG::generate_temp_object() ostringstream ss; ss << "temp_" << info.pgid << "_" << get_role() << "_" << osd->monc->get_global_id() << "_" << (++temp_seq); hobject_t hoid = hobject_t::make_temp(ss.str()); - // TODOSAM: adjust when this method gets absorbed into ReplicatedBackend - pgbackend->add_temp_obj(hoid); dout(20) << __func__ << " " << hoid << dendl; return hoid; } @@ -4552,7 +4545,7 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx) do_osd_op_effects(ctx); // read-op? done? - if (ctx->op_t.empty() && !ctx->modify) { + if (ctx->op_t->empty() && !ctx->modify) { unstable_stats.add(ctx->delta_stats, ctx->obc->obs.oi.category); return result; } @@ -4600,7 +4593,7 @@ void ReplicatedPG::finish_ctx(OpContext *ctx, int log_op_type) ctx->snapset_obc = get_object_context(snapoid, false); if (ctx->snapset_obc && ctx->snapset_obc->obs.exists) { - ctx->op_t.remove(coll, snapoid); + ctx->op_t->remove(snapoid); dout(10) << " removing old " << snapoid << dendl; ctx->log.push_back(pg_log_entry_t(pg_log_entry_t::DELETE, snapoid, @@ -4631,9 +4624,9 @@ void ReplicatedPG::finish_ctx(OpContext *ctx, int log_op_type) bufferlist bv(sizeof(ctx->new_obs.oi)); ::encode(ctx->snapset_obc->obs.oi, bv); - ctx->op_t.touch(coll, snapoid); - ctx->op_t.setattr(coll, snapoid, OI_ATTR, bv); - ctx->op_t.setattr(coll, snapoid, SS_ATTR, bss); + ctx->op_t->touch(snapoid); + ctx->op_t->setattr(snapoid, OI_ATTR, bv); + ctx->op_t->setattr(snapoid, SS_ATTR, bss); ctx->at_version.version++; } } @@ -4650,7 +4643,7 @@ void ReplicatedPG::finish_ctx(OpContext *ctx, int log_op_type) ctx->user_at_version = ctx->at_version.version; ctx->new_obs.oi.user_version = ctx->user_at_version; } - ctx->bytes_written = ctx->op_t.get_encoded_bytes(); + ctx->bytes_written = ctx->op_t->get_bytes_written(); if (ctx->new_obs.exists) { // on the head object @@ -4666,12 +4659,12 @@ void ReplicatedPG::finish_ctx(OpContext *ctx, int log_op_type) bufferlist bv(sizeof(ctx->new_obs.oi)); ::encode(ctx->new_obs.oi, bv); - ctx->op_t.setattr(coll, soid, OI_ATTR, bv); + ctx->op_t->setattr(soid, OI_ATTR, bv); if (soid.snap == CEPH_NOSNAP) { dout(10) << " final snapset " << ctx->new_snapset << " in " << soid << dendl; - ctx->op_t.setattr(coll, soid, SS_ATTR, bss); + ctx->op_t->setattr(soid, SS_ATTR, bss); } else { dout(10) << " no snapset (this is a clone)" << dendl; } @@ -4989,6 +4982,8 @@ void ReplicatedPG::process_copy_chunk(hobject_t oid, tid_t tid, int r) } } + assert(cop->rval >= 0); + if (!cop->cursor.is_complete()) { // write out what we have so far if (cop->temp_cursor.is_initial()) { @@ -5002,7 +4997,7 @@ void ReplicatedPG::process_copy_chunk(hobject_t oid, tid_t tid, int r) if (cop->temp_cursor.is_initial()) { repop->ctx->new_temp_oid = cop->results.temp_oid; } - _write_copy_chunk(cop, &repop->ctx->op_t); + _write_copy_chunk(cop, repop->ctx->op_t); simple_repop_submit(repop); dout(10) << __func__ << " fetching more" << dendl; _copy_some(cobc, cop); @@ -5010,6 +5005,7 @@ void ReplicatedPG::process_copy_chunk(hobject_t oid, tid_t tid, int r) } dout(20) << __func__ << " success; committing" << dendl; + cop->results.final_tx = pgbackend->get_transaction(); _build_finish_copy_transaction(cop, cop->results.final_tx); out: @@ -5022,7 +5018,7 @@ void ReplicatedPG::process_copy_chunk(hobject_t oid, tid_t tid, int r) kick_object_context_blocked(cobc); } -void ReplicatedPG::_write_copy_chunk(CopyOpRef cop, ObjectStore::Transaction *t) +void ReplicatedPG::_write_copy_chunk(CopyOpRef cop, PGBackend::PGTransaction *t) { dout(20) << __func__ << " " << cop << " " << cop->attrs.size() << " attrs" @@ -5030,51 +5026,51 @@ void ReplicatedPG::_write_copy_chunk(CopyOpRef cop, ObjectStore::Transaction *t) << " " << cop->omap.size() << " keys" << dendl; if (!cop->temp_cursor.attr_complete) { - t->touch(cop->results.temp_coll, cop->results.temp_oid); + t->touch(cop->results.temp_oid); for (map::iterator p = cop->attrs.begin(); - p != cop->attrs.end(); ++p) - t->setattr(cop->results.temp_coll, cop->results.temp_oid, - string("_") + p->first, p->second); + p != cop->attrs.end(); + ++p) + t->setattr( + cop->results.temp_oid, + string("_") + p->first, p->second); cop->attrs.clear(); } if (!cop->temp_cursor.data_complete) { - t->write(cop->results.temp_coll, cop->results.temp_oid, - cop->temp_cursor.data_offset, cop->data.length(), cop->data); + t->write( + cop->results.temp_oid, + cop->temp_cursor.data_offset, cop->data.length(), cop->data); cop->data.clear(); } if (!cop->temp_cursor.omap_complete) { if (cop->omap_header.length()) { - t->omap_setheader(cop->results.temp_coll, cop->results.temp_oid, - cop->omap_header); + t->omap_setheader( + cop->results.temp_oid, + cop->omap_header); cop->omap_header.clear(); } - t->omap_setkeys(cop->results.temp_coll, cop->results.temp_oid, cop->omap); + t->omap_setkeys(cop->results.temp_oid, cop->omap); cop->omap.clear(); } cop->temp_cursor = cop->cursor; } void ReplicatedPG::_build_finish_copy_transaction(CopyOpRef cop, - ObjectStore::Transaction& t) + PGBackend::PGTransaction* t) { ObjectState& obs = cop->obc->obs; if (obs.exists) { - t.remove(coll, obs.oi.soid); + t->remove(obs.oi.soid); } if (cop->temp_cursor.is_initial()) { // write directly to final object cop->results.temp_oid = obs.oi.soid; - _write_copy_chunk(cop, &t); + _write_copy_chunk(cop, t); } else { // finish writing to temp object, then move into place - _write_copy_chunk(cop, &t); - t.collection_move_rename( - cop->results.temp_coll, cop->results.temp_oid, coll, obs.oi.soid); - - // TODOSAM: adjust when this method gets absorbed into ReplicatedBackend - pgbackend->clear_temp_obj(cop->results.temp_oid); + _write_copy_chunk(cop, t); + t->rename(cop->results.temp_oid, obs.oi.soid); } } @@ -5091,8 +5087,9 @@ void ReplicatedPG::finish_copyfrom(OpContext *ctx) if (cb->is_temp_obj_used()) { ctx->discard_temp_oid = cb->results->temp_oid; } - ctx->op_t.swap(cb->results->final_tx); - ctx->op_t.append(cb->results->final_tx); + ctx->op_t->append(cb->results->final_tx); + delete cb->results->final_tx; + cb->results->final_tx = NULL; // CopyFromCallback fills this in for us obs.oi.user_version = ctx->user_at_version; @@ -5172,12 +5169,14 @@ void ReplicatedPG::finish_promote(int r, OpRequestRef op, if (whiteout) { // create a whiteout - tctx->op_t.touch(coll, soid); + tctx->op_t->touch(soid); tctx->new_obs.oi.set_flag(object_info_t::FLAG_WHITEOUT); ++tctx->delta_stats.num_whiteouts; dout(20) << __func__ << " creating whiteout" << dendl; } else { - tctx->op_t.swap(results->final_tx); + tctx->op_t->append(results->final_tx); + delete results->final_tx; + results->final_tx = NULL; if (results->started_temp_obj) { tctx->discard_temp_oid = results->temp_oid; } @@ -5577,100 +5576,67 @@ void ReplicatedPG::cancel_flush_ops(bool requeue) // ======================================================================== // rep op gather -class C_OSD_OpApplied : public Context { -public: +class C_OSD_RepopApplied : public Context { ReplicatedPGRef pg; - ReplicatedPG::RepGather *repop; - - C_OSD_OpApplied(ReplicatedPG *p, ReplicatedPG::RepGather *rg) : - pg(p), repop(rg) { - repop->get(); - } - void finish(int r) { - pg->op_applied(repop); + boost::intrusive_ptr repop; +public: + C_OSD_RepopApplied(ReplicatedPG *pg, ReplicatedPG::RepGather *repop) + : pg(pg), repop(repop) {} + void finish(int) { + pg->repop_all_applied(repop.get()); } }; -class C_OSD_OpCommit : public Context { -public: - ReplicatedPGRef pg; - ReplicatedPG::RepGather *repop; - C_OSD_OpCommit(ReplicatedPG *p, ReplicatedPG::RepGather *rg) : - pg(p), repop(rg) { - repop->get(); - } - void finish(int r) { - pg->op_commit(repop); - } -}; - -void ReplicatedPG::apply_repop(RepGather *repop) +void ReplicatedPG::repop_all_applied(RepGather *repop) { - dout(10) << "apply_repop applying update on " << *repop << dendl; - assert(!repop->applying); - assert(!repop->applied); - - repop->applying = true; - - repop->tls.push_back(&repop->ctx->local_t); - repop->tls.push_back(&repop->ctx->op_t); - - repop->obc->ondisk_write_lock(); - if (repop->ctx->clone_obc) - repop->ctx->clone_obc->ondisk_write_lock(); - - bool unlock_snapset_obc = false; - if (repop->ctx->snapset_obc && repop->ctx->snapset_obc->obs.oi.soid != - repop->obc->obs.oi.soid) { - repop->ctx->snapset_obc->ondisk_write_lock(); - unlock_snapset_obc = true; - } - - Context *oncommit = new C_OSD_OpCommit(this, repop); - Context *onapplied = new C_OSD_OpApplied(this, repop); - Context *onapplied_sync = new C_OSD_OndiskWriteUnlock( - repop->obc, - repop->ctx->clone_obc, - unlock_snapset_obc ? repop->ctx->snapset_obc : ObjectContextRef()); - int r = osd->store->queue_transactions(osr.get(), repop->tls, onapplied, oncommit, onapplied_sync, repop->ctx->op); - if (r) { - derr << "apply_repop queue_transactions returned " << r << " on " << *repop << dendl; - assert(0); + dout(10) << __func__ << ": repop tid " << repop->rep_tid << " all applied " + << dendl; + repop->all_applied = true; + if (!repop->rep_aborted) { + eval_repop(repop); } } -void ReplicatedPG::op_applied(RepGather *repop) +class C_OSD_RepopCommit : public Context { + ReplicatedPGRef pg; + boost::intrusive_ptr repop; +public: + C_OSD_RepopCommit(ReplicatedPG *pg, ReplicatedPG::RepGather *repop) + : pg(pg), repop(repop) {} + void finish(int) { + pg->repop_all_committed(repop.get()); + } +}; + +void ReplicatedPG::repop_all_committed(RepGather *repop) { - lock(); - dout(10) << "op_applied " << *repop << dendl; - if (repop->ctx->op) - repop->ctx->op->mark_event("op_applied"); - - repop->applying = false; - repop->applied = true; - - // (logical) local ack. - int whoami = osd->get_nodeid(); - - if (!repop->aborted) { - assert(repop->waitfor_ack.count(whoami) || - repop->waitfor_disk.count(whoami) == 0); // commit before ondisk - repop->waitfor_ack.erase(whoami); + dout(10) << __func__ << ": repop tid " << repop->rep_tid << " all committed " + << dendl; + repop->all_committed = true; + if (!repop->rep_aborted) { if (repop->v != eversion_t()) { - assert(info.last_update >= repop->v); - assert(last_update_applied < repop->v); - last_update_applied = repop->v; + last_update_ondisk = repop->v; + last_complete_ondisk = repop->pg_local_last_complete; } + eval_repop(repop); + } +} - // chunky scrub +void ReplicatedPG::op_applied(const eversion_t &applied_version) +{ + dout(10) << "op_applied on primary on version " << applied_version << dendl; + if (applied_version == eversion_t()) + return; + assert(applied_version > last_update_applied); + assert(applied_version <= info.last_update); + last_update_applied = applied_version; + if (is_primary()) { if (scrubber.active && scrubber.is_chunky) { if (last_update_applied == scrubber.subset_last_update) { osd->scrub_wq.queue(this); } - - // classic scrub } else if (last_update_applied == info.last_update && scrubber.block_writes) { dout(10) << "requeueing scrub for cleanup" << dendl; scrubber.finalizing = true; @@ -5679,49 +5645,17 @@ void ReplicatedPG::op_applied(RepGather *repop) scrubber.waiting_on_whom.insert(osd->whoami); osd->scrub_wq.queue(this); } - } - - if (!repop->aborted) - eval_repop(repop); - - repop->put(); - unlock(); -} - -void ReplicatedPG::op_commit(RepGather *repop) -{ - lock(); - if (repop->ctx->op) - repop->ctx->op->mark_event("op_commit"); - - if (repop->aborted) { - dout(10) << "op_commit " << *repop << " -- aborted" << dendl; - } else if (repop->waitfor_disk.count(osd->get_nodeid()) == 0) { - dout(10) << "op_commit " << *repop << " -- already marked ondisk" << dendl; } else { - dout(10) << "op_commit " << *repop << dendl; - int whoami = osd->get_nodeid(); - - repop->waitfor_disk.erase(whoami); - - // remove from ack waitfor list too. sub_op_modify_commit() - // behaves the same in that the COMMIT implies and ACK and there - // is no separate reply sent. - repop->waitfor_ack.erase(whoami); - - if (repop->v != eversion_t()) { - last_update_ondisk = repop->v; - last_complete_ondisk = repop->pg_local_last_complete; + dout(10) << "op_applied on replica on version " << applied_version << dendl; + if (scrubber.active_rep_scrub) { + if (last_update_applied == scrubber.active_rep_scrub->scrub_to) { + osd->rep_scrub_wq.queue(scrubber.active_rep_scrub); + scrubber.active_rep_scrub = 0; + } } - eval_repop(repop); } - - repop->put(); - unlock(); } - - void ReplicatedPG::eval_repop(RepGather *repop) { MOSDOp *m = NULL; @@ -5731,27 +5665,23 @@ void ReplicatedPG::eval_repop(RepGather *repop) if (m) dout(10) << "eval_repop " << *repop << " wants=" << (m->wants_ack() ? "a":"") << (m->wants_ondisk() ? "d":"") - << (repop->done() ? " DONE" : "") + << (repop->rep_done ? " DONE" : "") << dendl; else dout(10) << "eval_repop " << *repop << " (no op)" - << (repop->done() ? " DONE" : "") + << (repop->rep_done ? " DONE" : "") << dendl; - if (repop->done()) + if (repop->rep_done) return; - // apply? - if (!repop->applied && !repop->applying) - apply_repop(repop); - if (m) { // an 'ondisk' reply implies 'ack'. so, prefer to send just one // ondisk instead of ack followed by ondisk. // ondisk? - if (repop->waitfor_disk.empty()) { + if (repop->all_committed) { release_op_ctx_locks(repop->ctx); @@ -5795,7 +5725,7 @@ void ReplicatedPG::eval_repop(RepGather *repop) } // applied? - if (repop->waitfor_ack.empty()) { + if (repop->all_applied) { // send dup acks, in order if (waiting_for_ack.count(repop->v)) { @@ -5840,9 +5770,8 @@ void ReplicatedPG::eval_repop(RepGather *repop) } // done. - if (repop->waitfor_ack.empty() && repop->waitfor_disk.empty() && - repop->applied) { - repop->mark_done(); + if (repop->all_applied && repop->all_committed) { + repop->rep_done = true; calc_min_last_complete_ondisk(); @@ -5868,74 +5797,124 @@ void ReplicatedPG::issue_repop(RepGather *repop, utime_t now) { OpContext *ctx = repop->ctx; const hobject_t& soid = ctx->obs->oi.soid; - + if (ctx->op && + ((static_cast( + ctx->op->get_req()))->get_flags() & CEPH_OSD_FLAG_PARALLELEXEC)) { + // replicate original op for parallel execution on replica + assert(0 == "broken implementation, do not use"); + } dout(7) << "issue_repop rep_tid " << repop->rep_tid << " o " << soid << dendl; repop->v = ctx->at_version; - // add myself to gather set - repop->waitfor_ack.insert(acting[0]); - repop->waitfor_disk.insert(acting[0]); + for (vector::iterator i = actingbackfill.begin() + 1; + i != actingbackfill.end(); + ++i) { + pg_info_t &pinfo = peer_info[*i]; + // keep peer_info up to date + if (pinfo.last_complete == pinfo.last_update) + pinfo.last_complete = ctx->at_version; + pinfo.last_update = ctx->at_version; + } + repop->obc->ondisk_write_lock(); + if (repop->ctx->clone_obc) + repop->ctx->clone_obc->ondisk_write_lock(); + + bool unlock_snapset_obc = false; + if (repop->ctx->snapset_obc && repop->ctx->snapset_obc->obs.oi.soid != + repop->obc->obs.oi.soid) { + repop->ctx->snapset_obc->ondisk_write_lock(); + unlock_snapset_obc = true; + } + + Context *on_all_commit = new C_OSD_RepopCommit(this, repop); + Context *on_all_applied = new C_OSD_RepopApplied(this, repop); + Context *onapplied_sync = new C_OSD_OndiskWriteUnlock( + repop->obc, + repop->ctx->clone_obc, + unlock_snapset_obc ? repop->ctx->snapset_obc : ObjectContextRef()); + pgbackend->submit_transaction( + soid, + repop->ctx->at_version, + repop->ctx->op_t, + pg_trim_to, + repop->ctx->log, + onapplied_sync, + on_all_applied, + on_all_commit, + repop->rep_tid, + repop->ctx->reqid, + repop->ctx->op); + repop->ctx->op_t = NULL; +} + +void ReplicatedBackend::issue_op( + const hobject_t &soid, + const eversion_t &at_version, + tid_t tid, + osd_reqid_t reqid, + eversion_t pg_trim_to, + hobject_t new_temp_oid, + hobject_t discard_temp_oid, + vector &log_entries, + InProgressOp *op, + ObjectStore::Transaction *op_t) +{ int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK; - assert(actingbackfill.size() > 0); - if (ctx->op && actingbackfill.size() > 1) { + if (parent->get_actingbackfill().size() > 1) { ostringstream ss; - ss << "waiting for subops from " << vector(actingbackfill.begin() + 1, actingbackfill.end()); - ctx->op->mark_sub_op_sent(ss.str()); + ss << "waiting for subops from " << + vector( + parent->get_actingbackfill().begin() + 1, + parent->get_actingbackfill().end()); + if (op->op) + op->op->mark_sub_op_sent(ss.str()); } - for (unsigned i=1; iget_actingbackfill().size(); i++) { + int peer = parent->get_actingbackfill()[i]; + const pg_info_t &pinfo = parent->get_peer_info().find(peer)->second; - repop->waitfor_ack.insert(peer); - repop->waitfor_disk.insert(peer); + op->waiting_for_applied.insert(peer); + op->waiting_for_commit.insert(peer); // forward the write/update/whatever - MOSDSubOp *wr = new MOSDSubOp(repop->ctx->reqid, info.pgid, soid, - false, acks_wanted, - get_osdmap()->get_epoch(), - repop->rep_tid, repop->ctx->at_version); - if (ctx->op && - ((static_cast(ctx->op->get_req()))->get_flags() & CEPH_OSD_FLAG_PARALLELEXEC)) { - // replicate original op for parallel execution on replica - assert(0 == "broken implementation, do not use"); - } + MOSDSubOp *wr = new MOSDSubOp( + reqid, get_info().pgid, soid, + false, acks_wanted, + get_osdmap()->get_epoch(), + tid, at_version); // ship resulting transaction, log entries, and pg_stats - if (!should_send_op(peer, soid)) { + if (!parent->should_send_op(peer, soid)) { dout(10) << "issue_repop shipping empty opt to osd." << peer <<", object " << soid << " beyond MAX(last_backfill_started " - << last_backfill_started << ", pinfo.last_backfill " + << ", pinfo.last_backfill " << pinfo.last_backfill << ")" << dendl; ObjectStore::Transaction t; ::encode(t, wr->get_data()); } else { - ::encode(repop->ctx->op_t, wr->get_data()); + ::encode(*op_t, wr->get_data()); } - ::encode(repop->ctx->log, wr->logbl); + ::encode(log_entries, wr->logbl); - if (is_backfill_targets(peer)) + if (pinfo.is_incomplete()) wr->pg_stats = pinfo.stats; // reflects backfill progress else - wr->pg_stats = info.stats; + wr->pg_stats = get_info().stats; wr->pg_trim_to = pg_trim_to; - wr->new_temp_oid = repop->ctx->new_temp_oid; - wr->discard_temp_oid = repop->ctx->discard_temp_oid; + wr->new_temp_oid = new_temp_oid; + wr->discard_temp_oid = discard_temp_oid; osd->send_message_osd_cluster(peer, wr, get_osdmap()->get_epoch()); - // keep peer_info up to date - if (pinfo.last_complete == pinfo.last_update) - pinfo.last_update = ctx->at_version; - pinfo.last_update = ctx->at_version; } } @@ -5970,53 +5949,6 @@ void ReplicatedPG::remove_repop(RepGather *repop) osd->logger->set(l_osd_op_wip, repop_map.size()); } -void ReplicatedPG::repop_ack(RepGather *repop, int result, int ack_type, - int fromosd, eversion_t peer_lcod) -{ - MOSDOp *m = NULL; - - if (repop->ctx->op) - m = static_cast(repop->ctx->op->get_req()); - - if (m) - dout(7) << "repop_ack rep_tid " << repop->rep_tid << " op " << *m - << " result " << result - << " ack_type " << ack_type - << " from osd." << fromosd - << dendl; - else - dout(7) << "repop_ack rep_tid " << repop->rep_tid << " (no op) " - << " result " << result - << " ack_type " << ack_type - << " from osd." << fromosd - << dendl; - - if (ack_type & CEPH_OSD_FLAG_ONDISK) { - if (repop->ctx->op) - repop->ctx->op->mark_event("sub_op_commit_rec"); - // disk - if (repop->waitfor_disk.count(fromosd)) { - repop->waitfor_disk.erase(fromosd); - //repop->waitfor_nvram.erase(fromosd); - repop->waitfor_ack.erase(fromosd); - peer_last_complete_ondisk[fromosd] = peer_lcod; - } -/*} else if (ack_type & CEPH_OSD_FLAG_ONNVRAM) { - // nvram - repop->waitfor_nvram.erase(fromosd); - repop->waitfor_ack.erase(fromosd);*/ - } else { - // ack - if (repop->ctx->op) - repop->ctx->op->mark_event("sub_op_applied_rec"); - repop->waitfor_ack.erase(fromosd); - } - - if (!repop->aborted) - eval_repop(repop); -} - - ReplicatedPG::RepGather *ReplicatedPG::simple_repop_create(ObjectContextRef obc) { dout(20) << __func__ << " " << obc->obs.oi.soid << dendl; @@ -6025,6 +5957,7 @@ ReplicatedPG::RepGather *ReplicatedPG::simple_repop_create(ObjectContextRef obc) osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid); OpContext *ctx = new OpContext(OpRequestRef(), reqid, ops, &obc->obs, obc->ssc, this); + ctx->op_t = pgbackend->get_transaction(); ctx->mtime = ceph_clock_now(g_ceph_context); ctx->obc = obc; RepGather *repop = new_repop(ctx, obc, rep_tid); @@ -6034,17 +5967,11 @@ ReplicatedPG::RepGather *ReplicatedPG::simple_repop_create(ObjectContextRef obc) void ReplicatedPG::simple_repop_submit(RepGather *repop) { dout(20) << __func__ << " " << repop << dendl; - if (!repop->ctx->log.empty()) - append_log(repop->ctx->log, eversion_t(), repop->ctx->local_t); issue_repop(repop, repop->ctx->mtime); eval_repop(repop); repop->put(); } - - - - // ------------------------------------------------------- void ReplicatedPG::get_watchers(list &pg_watchers) @@ -6174,6 +6101,7 @@ void ReplicatedPG::handle_watch_timeout(WatchRef watch) osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid); OpContext *ctx = new OpContext(OpRequestRef(), reqid, ops, &obc->obs, obc->ssc, this); + ctx->op_t = pgbackend->get_transaction(); ctx->mtime = ceph_clock_now(cct); ctx->at_version = get_next_version(); @@ -6181,7 +6109,7 @@ void ReplicatedPG::handle_watch_timeout(WatchRef watch) RepGather *repop = new_repop(ctx, obc, rep_tid); - ObjectStore::Transaction *t = &ctx->op_t; + PGBackend::PGTransaction *t = ctx->op_t; ctx->log.push_back(pg_log_entry_t(pg_log_entry_t::MODIFY, obc->obs.oi.soid, ctx->at_version, @@ -6193,9 +6121,7 @@ void ReplicatedPG::handle_watch_timeout(WatchRef watch) obc->obs.oi.version = ctx->at_version; bufferlist bl; ::encode(obc->obs.oi, bl); - t->setattr(coll, obc->obs.oi.soid, OI_ATTR, bl); - - append_log(repop->ctx->log, eversion_t(), repop->ctx->local_t); + t->setattr(obc->obs.oi.soid, OI_ATTR, bl); // obc ref swallowed by repop! issue_repop(repop, repop->ctx->mtime); @@ -6587,7 +6513,7 @@ void ReplicatedPG::put_snapset_context(SnapSetContext *ssc) // sub op modify -void ReplicatedPG::sub_op_modify(OpRequestRef op) +void ReplicatedBackend::sub_op_modify(OpRequestRef op) { MOSDSubOp *m = static_cast(op->get_req()); assert(m->get_header().type == MSG_OSD_SUBOP); @@ -6611,23 +6537,19 @@ void ReplicatedPG::sub_op_modify(OpRequestRef op) << dendl; // sanity checks - assert(m->map_epoch >= info.history.same_interval_since); - assert(is_active()); + assert(m->map_epoch >= get_info().history.same_interval_since); // we better not be missing this. - assert(!pg_log.get_missing().is_missing(soid)); + assert(!parent->get_log().get_missing().is_missing(soid)); - int ackerosd = acting[0]; + int ackerosd = m->get_source().num(); op->mark_started(); - RepModify *rm = new RepModify; - rm->pg = this; - get("RepModify"); + RepModifyRef rm(new RepModify); rm->op = op; - rm->ctx = 0; rm->ackerosd = ackerosd; - rm->last_complete = info.last_complete; + rm->last_complete = get_info().last_complete; rm->epoch_started = get_osdmap()->get_epoch(); if (!m->noop) { @@ -6639,14 +6561,12 @@ void ReplicatedPG::sub_op_modify(OpRequestRef op) if (m->new_temp_oid != hobject_t()) { dout(20) << __func__ << " start tracking temp " << m->new_temp_oid << dendl; - // TODOSAM: adjust when this method gets absorbed into ReplicatedBackend - pgbackend->add_temp_obj(m->new_temp_oid); + add_temp_obj(m->new_temp_oid); get_temp_coll(&rm->localt); } if (m->discard_temp_oid != hobject_t()) { dout(20) << __func__ << " stop tracking temp " << m->discard_temp_oid << dendl; - // TODOSAM: adjust when this method gets absorbed into ReplicatedBackend - pgbackend->clear_temp_obj(m->discard_temp_oid); + clear_temp_obj(m->discard_temp_oid); } ::decode(rm->opt, p); @@ -6659,13 +6579,12 @@ void ReplicatedPG::sub_op_modify(OpRequestRef op) i != log.end(); ++i) { if (!i->soid.is_max() && i->soid.pool == -1) - i->soid.pool = info.pgid.pool(); + i->soid.pool = get_info().pgid.pool(); } - rm->opt.set_pool_override(info.pgid.pool()); + rm->opt.set_pool_override(get_info().pgid.pool()); } rm->opt.set_replica(); - info.stats = m->pg_stats; bool update_snaps = false; if (!rm->opt.empty()) { // If the opt is non-empty, we infer we are before @@ -6674,149 +6593,80 @@ void ReplicatedPG::sub_op_modify(OpRequestRef op) // collections now. Otherwise, we do it later on push. update_snaps = true; } - append_log(log, m->pg_trim_to, rm->localt, update_snaps); - - rm->tls.push_back(&rm->localt); - rm->tls.push_back(&rm->opt); - + parent->update_stats(m->pg_stats); + parent->log_operation( + log, + m->pg_trim_to, + update_snaps, + &(rm->localt)); + rm->bytes_written = rm->opt.get_encoded_bytes(); } else { + assert(0); + #if 0 // just trim the log if (m->pg_trim_to != eversion_t()) { pg_log.trim(m->pg_trim_to, info); dirty_info = true; write_if_dirty(rm->localt); - rm->tls.push_back(&rm->localt); } + #endif } op->mark_started(); - Context *oncommit = new C_OSD_RepModifyCommit(rm); - Context *onapply = new C_OSD_RepModifyApply(rm); - int r = osd->store->queue_transactions(osr.get(), rm->tls, onapply, oncommit, 0, op); - if (r) { - dout(0) << "error applying transaction: r = " << r << dendl; - assert(0); - } + rm->localt.append(rm->opt); + rm->localt.register_on_commit( + parent->bless_context( + new C_OSD_RepModifyCommit(this, rm))); + rm->localt.register_on_applied( + parent->bless_context( + new C_OSD_RepModifyApply(this, rm))); + parent->queue_transaction(&(rm->localt), op); // op is cleaned up by oncommit/onapply when both are executed } -void ReplicatedPG::op_applied_replica( - const eversion_t &applied_version) +void ReplicatedBackend::sub_op_modify_applied(RepModifyRef rm) { - dout(10) << "op_applied_replica on version " << applied_version << dendl; - if (applied_version != eversion_t()) { - assert(info.last_update >= applied_version); - assert(last_update_applied < applied_version); - last_update_applied = applied_version; - } - if (scrubber.active_rep_scrub) { - if (last_update_applied == scrubber.active_rep_scrub->scrub_to) { - osd->rep_scrub_wq.queue(scrubber.active_rep_scrub); - scrubber.active_rep_scrub = 0; - } - } -} - -void ReplicatedPG::sub_op_modify_applied(RepModify *rm) -{ - lock(); rm->op->mark_event("sub_op_applied"); rm->applied = true; - if (!pg_has_reset_since(rm->epoch_started)) { - dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op->get_req() << dendl; - MOSDSubOp *m = static_cast(rm->op->get_req()); - assert(m->get_header().type == MSG_OSD_SUBOP); - - if (!rm->committed) { - // send ack to acker only if we haven't sent a commit already - MOSDSubOpReply *ack = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK); - ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority! - osd->send_message_osd_cluster(rm->ackerosd, ack, get_osdmap()->get_epoch()); - } - - op_applied_replica(m->version); - } else { - dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op->get_req() - << " from epoch " << rm->epoch_started << " < last_peering_reset " - << last_peering_reset << dendl; - } - - bool done = rm->applied && rm->committed; - unlock(); - if (done) { - delete rm->ctx; - delete rm; - put("RepModify"); + dout(10) << "sub_op_modify_applied on " << rm << " op " + << *rm->op->get_req() << dendl; + MOSDSubOp *m = static_cast(rm->op->get_req()); + assert(m->get_header().type == MSG_OSD_SUBOP); + + if (!rm->committed) { + // send ack to acker only if we haven't sent a commit already + MOSDSubOpReply *ack = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK); + ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority! + osd->send_message_osd_cluster(rm->ackerosd, ack, get_osdmap()->get_epoch()); } + + parent->op_applied(m->version); } -void ReplicatedPG::sub_op_modify_commit(RepModify *rm) +void ReplicatedBackend::sub_op_modify_commit(RepModifyRef rm) { - lock(); rm->op->mark_commit_sent(); rm->committed = true; - if (!pg_has_reset_since(rm->epoch_started)) { - // send commit. - dout(10) << "sub_op_modify_commit on op " << *rm->op->get_req() - << ", sending commit to osd." << rm->ackerosd - << dendl; - - if (get_osdmap()->is_up(rm->ackerosd)) { - last_complete_ondisk = rm->last_complete; - MOSDSubOpReply *commit = new MOSDSubOpReply(static_cast(rm->op->get_req()), 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK); - commit->set_last_complete_ondisk(rm->last_complete); - commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority! - osd->send_message_osd_cluster(rm->ackerosd, commit, get_osdmap()->get_epoch()); - } - } else { - dout(10) << "sub_op_modify_commit " << rm << " op " << *rm->op->get_req() - << " from epoch " << rm->epoch_started << " < last_peering_reset " - << last_peering_reset << dendl; - } + // send commit. + dout(10) << "sub_op_modify_commit on op " << *rm->op->get_req() + << ", sending commit to osd." << rm->ackerosd + << dendl; + + assert(get_osdmap()->is_up(rm->ackerosd)); + get_parent()->update_last_complete_ondisk(rm->last_complete); + MOSDSubOpReply *commit = new MOSDSubOpReply(static_cast(rm->op->get_req()), 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK); + commit->set_last_complete_ondisk(rm->last_complete); + commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority! + osd->send_message_osd_cluster(rm->ackerosd, commit, get_osdmap()->get_epoch()); log_subop_stats(osd, rm->op, l_osd_sop_w_inb, l_osd_sop_w_lat); - bool done = rm->applied && rm->committed; - unlock(); - if (done) { - delete rm->ctx; - delete rm; - put("RepModify"); - } } -void ReplicatedPG::sub_op_modify_reply(OpRequestRef op) -{ - MOSDSubOpReply *r = static_cast(op->get_req()); - assert(r->get_header().type == MSG_OSD_SUBOPREPLY); - - op->mark_started(); - - // must be replication. - tid_t rep_tid = r->get_tid(); - int fromosd = r->get_source().num(); - - if (repop_map.count(rep_tid)) { - // oh, good. - repop_ack(repop_map[rep_tid], - r->get_result(), r->ack_type, - fromosd, - r->get_last_complete_ondisk()); - } -} - - - - - - - - - // =========================================================== @@ -8080,6 +7930,8 @@ void ReplicatedBackend::sub_op_push(OpRequestRef op) t->register_on_complete(new C_OSD_SendMessageOnConn( osd, reply, m->get_connection())); } + t->register_on_applied( + new ObjectStore::C_DeleteTransaction(t)); get_parent()->queue_transaction(t); return; } @@ -8332,9 +8184,7 @@ void ReplicatedPG::apply_and_flush_repops(bool requeue) RepGather *repop = repop_queue.front(); repop_queue.pop_front(); dout(10) << " applying repop tid " << repop->rep_tid << dendl; - if (!repop->applied && !repop->applying) - apply_repop(repop); - repop->aborted = true; + repop->rep_aborted = true; if (requeue) { if (repop->ctx->op) { @@ -9763,15 +9613,15 @@ void ReplicatedPG::hit_set_persist() // the deleted object over this period. hobject_t old_obj = get_hit_set_current_object(info.hit_set.current_last_stamp); - ctx->op_t.remove(coll, old_obj); + ctx->op_t->remove(old_obj); ctx->log.push_back( - pg_log_entry_t(pg_log_entry_t::DELETE, - old_obj, - ctx->at_version, - info.hit_set.current_last_update, - 0, - osd_reqid_t(), - ctx->mtime)); + pg_log_entry_t(pg_log_entry_t::DELETE, + old_obj, + ctx->at_version, + info.hit_set.current_last_update, + 0, + osd_reqid_t(), + ctx->mtime)); ++ctx->at_version.version; struct stat st; @@ -9809,19 +9659,19 @@ void ReplicatedPG::hit_set_persist() bufferlist boi(sizeof(ctx->new_obs.oi)); ::encode(ctx->new_obs.oi, boi); - ctx->op_t.write(coll, oid, 0, bl.length(), bl); - ctx->op_t.setattr(coll, oid, OI_ATTR, boi); - ctx->op_t.setattr(coll, oid, SS_ATTR, bss); + ctx->op_t->write(oid, 0, bl.length(), bl); + ctx->op_t->setattr(oid, OI_ATTR, boi); + ctx->op_t->setattr(oid, SS_ATTR, bss); ctx->log.push_back( - pg_log_entry_t( - pg_log_entry_t::MODIFY, - oid, - ctx->at_version, - ctx->obs->oi.version, - 0, - osd_reqid_t(), - ctx->mtime) - ); + pg_log_entry_t( + pg_log_entry_t::MODIFY, + oid, + ctx->at_version, + ctx->obs->oi.version, + 0, + osd_reqid_t(), + ctx->mtime) + ); hit_set_trim(repop, pool.info.hit_set_count); @@ -9837,7 +9687,7 @@ void ReplicatedPG::hit_set_trim(RepGather *repop, unsigned max) assert(p != info.hit_set.history.end()); hobject_t oid = get_hit_set_archive_object(p->begin, p->end); dout(20) << __func__ << " removing " << oid << dendl; - repop->ctx->op_t.remove(coll, oid); + repop->ctx->op_t->remove(oid); ++repop->ctx->at_version.version; repop->ctx->log.push_back( pg_log_entry_t(pg_log_entry_t::DELETE, @@ -10172,6 +10022,7 @@ boost::statechart::result ReplicatedPG::TrimmingObjects::react(const SnapTrim&) RepGather *repop = pg->trim_object(pos); assert(repop); repop->queue_snap_trimmer = true; + repops.insert(repop->get()); pg->simple_repop_submit(repop); return discard_event(); @@ -10207,7 +10058,7 @@ boost::statechart::result ReplicatedPG::WaitingOnReplicas::react(const SnapTrim& for (set::iterator i = repops.begin(); i != repops.end(); repops.erase(i++)) { - if (!(*i)->applied || !(*i)->waitfor_ack.empty()) { + if (!(*i)->all_applied) { return discard_event(); } else { (*i)->put(); @@ -10243,3 +10094,6 @@ void intrusive_ptr_release(ReplicatedPG *pg) { pg->put("intptr"); } uint64_t get_with_id(ReplicatedPG *pg) { return pg->get_with_id(); } void put_with_id(ReplicatedPG *pg, uint64_t id) { return pg->put_with_id(id); } #endif + +void intrusive_ptr_add_ref(ReplicatedPG::RepGather *repop) { repop->get(); } +void intrusive_ptr_release(ReplicatedPG::RepGather *repop) { repop->put(); } diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 0626008dea0..1b56b02b751 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -112,7 +112,7 @@ public: * Final transaction; if non-empty the callback must execute it before any * other accesses to the object (in order to complete the copy). */ - ObjectStore::Transaction final_tx; + PGBackend::PGTransaction *final_tx; string category; ///< The copy source's category version_t user_version; ///< The copy source's user version bool should_requeue; ///< op should be requeued on cancel @@ -324,8 +324,15 @@ public: map &attrs) { return get_object_context(hoid, true, &attrs); } + void log_operation( + vector &logv, + const eversion_t &trim_to, + bool update_snaps, + ObjectStore::Transaction *t) { + append_log(logv, trim_to, *t, update_snaps); + } - void op_applied_replica( + void op_applied( const eversion_t &applied_version); bool should_send_op( @@ -338,6 +345,22 @@ public: assert(is_backfill_targets(peer)); return should_send; } + + void update_peer_last_complete_ondisk( + int fromosd, + eversion_t lcod) { + peer_last_complete_ondisk[fromosd] = lcod; + } + + void update_last_complete_ondisk( + eversion_t lcod) { + last_complete_ondisk = lcod; + } + + void update_stats( + const pg_stat_t &stat) { + info.stats = stat; + } /* * Capture all object state associated with an in-progress read or write. @@ -381,7 +404,7 @@ public: int current_osd_subop_num; - ObjectStore::Transaction op_t, local_t; + PGBackend::PGTransaction *op_t; vector log; interval_set modified_ranges; @@ -417,6 +440,7 @@ public: modify(false), user_modify(false), undirty(false), bytes_written(0), bytes_read(0), user_at_version(0), current_osd_subop_num(0), + op_t(NULL), data_off(0), reply(NULL), pg(_pg), num_read(0), num_write(0), @@ -435,6 +459,7 @@ public: } } ~OpContext() { + assert(!op_t); assert(lock_to_release == NONE); if (reply) reply->put(); @@ -445,7 +470,6 @@ public: * State on the PG primary associated with the replicated mutation */ class RepGather { - bool is_done; public: xlist::item queue_item; int nref; @@ -458,11 +482,10 @@ public: tid_t rep_tid; - bool applying, applied, aborted; + bool rep_aborted, rep_done; - set waitfor_ack; - //set waitfor_nvram; - set waitfor_disk; + bool all_applied; + bool all_committed; bool sent_ack; //bool sent_nvram; bool sent_disk; @@ -471,18 +494,16 @@ public: eversion_t pg_local_last_complete; - list tls; bool queue_snap_trimmer; RepGather(OpContext *c, ObjectContextRef pi, tid_t rt, eversion_t lc) : - is_done(false), queue_item(this), nref(1), ctx(c), obc(pi), rep_tid(rt), - applying(false), applied(false), aborted(false), - sent_ack(false), + rep_aborted(false), rep_done(false), + all_applied(false), all_committed(false), sent_ack(false), //sent_nvram(false), sent_disk(false), pg_local_last_complete(lc), @@ -500,16 +521,9 @@ public: //generic_dout(0) << "deleting " << this << dendl; } } - void mark_done() { - is_done = true; - } - bool done() { - return is_done; - } }; - protected: /** @@ -544,6 +558,8 @@ protected: */ void close_op_ctx(OpContext *ctx) { release_op_ctx_locks(ctx); + delete ctx->op_t; + ctx->op_t = NULL; delete ctx; } @@ -578,16 +594,14 @@ protected: xlist repop_queue; map repop_map; - void apply_repop(RepGather *repop); - void op_applied(RepGather *repop); - void op_commit(RepGather *repop); + friend class C_OSD_RepopApplied; + friend class C_OSD_RepopCommit; + void repop_all_applied(RepGather *repop); + void repop_all_committed(RepGather *repop); void eval_repop(RepGather*); void issue_repop(RepGather *repop, utime_t now); RepGather *new_repop(OpContext *ctx, ObjectContextRef obc, tid_t rep_tid); void remove_repop(RepGather *repop); - void repop_ack(RepGather *repop, - int result, int ack_type, - int fromosd, eversion_t pg_complete_thru=eversion_t(0,0)); RepGather *simple_repop_create(ObjectContextRef obc); void simple_repop_submit(RepGather *repop); @@ -613,7 +627,7 @@ protected: ++i) { if ((*i)->v > v) break; - if (!(*i)->waitfor_disk.empty()) + if (!(*i)->all_committed) return false; } return true; @@ -625,14 +639,12 @@ protected: ++i) { if ((*i)->v > v) break; - if (!(*i)->waitfor_ack.empty()) + if (!(*i)->all_applied) return false; } return true; } - friend class C_OSD_OpCommit; - friend class C_OSD_OpApplied; friend struct C_OnPushCommit; // projected object info @@ -792,7 +804,7 @@ protected: // low level ops - void _make_clone(ObjectStore::Transaction& t, + void _make_clone(PGBackend::PGTransaction* t, const hobject_t& head, const hobject_t& coid, object_info_t *poi); void execute_ctx(OpContext *ctx); @@ -878,38 +890,6 @@ protected: void send_remove_op(const hobject_t& oid, eversion_t v, int peer); - struct RepModify { - ReplicatedPG *pg; - OpRequestRef op; - OpContext *ctx; - bool applied, committed; - int ackerosd; - eversion_t last_complete; - epoch_t epoch_started; - - uint64_t bytes_written; - - ObjectStore::Transaction opt, localt; - list tls; - - RepModify() : pg(NULL), ctx(NULL), applied(false), committed(false), ackerosd(-1), - epoch_started(0), bytes_written(0) {} - }; - - struct C_OSD_RepModifyApply : public Context { - RepModify *rm; - C_OSD_RepModifyApply(RepModify *r) : rm(r) { } - void finish(int r) { - rm->pg->sub_op_modify_applied(rm); - } - }; - struct C_OSD_RepModifyCommit : public Context { - RepModify *rm; - C_OSD_RepModifyCommit(RepModify *r) : rm(r) { } - void finish(int r) { - rm->pg->sub_op_modify_commit(rm); - } - }; struct C_OSD_OndiskWriteUnlock : public Context { ObjectContextRef obc, obc2, obc3; C_OSD_OndiskWriteUnlock( @@ -964,11 +944,6 @@ protected: void sub_op_remove(OpRequestRef op); - void sub_op_modify(OpRequestRef op); - void sub_op_modify_applied(RepModify *rm); - void sub_op_modify_commit(RepModify *rm); - - void sub_op_modify_reply(OpRequestRef op); void _applied_recovered_object(ObjectContextRef obc); void _applied_recovered_object_replica(); void _committed_pushed_object(epoch_t epoch, eversion_t lc); @@ -993,10 +968,10 @@ protected: object_locator_t oloc, version_t version, unsigned flags, bool mirror_snapset); void process_copy_chunk(hobject_t oid, tid_t tid, int r); - void _write_copy_chunk(CopyOpRef cop, ObjectStore::Transaction *t); + void _write_copy_chunk(CopyOpRef cop, PGBackend::PGTransaction *t); void _copy_some(ObjectContextRef obc, CopyOpRef cop); void _build_finish_copy_transaction(CopyOpRef cop, - ObjectStore::Transaction& t); + PGBackend::PGTransaction *t); void finish_copyfrom(OpContext *ctx); void finish_promote(int r, OpRequestRef op, CopyResults *results, ObjectContextRef obc); @@ -1177,13 +1152,10 @@ public: inline ostream& operator<<(ostream& out, ReplicatedPG::RepGather& repop) { out << "repgather(" << &repop - << (repop.applying ? " applying" : "") - << (repop.applied ? " applied" : "") << " " << repop.v << " rep_tid=" << repop.rep_tid - << " wfack=" << repop.waitfor_ack - //<< " wfnvram=" << repop.waitfor_nvram - << " wfdisk=" << repop.waitfor_disk; + << " committed?=" << repop.all_committed + << " applied?=" << repop.all_applied; if (repop.ctx->lock_to_release != ReplicatedPG::OpContext::NONE) out << " lock=" << (int)repop.ctx->lock_to_release; if (repop.ctx->op) @@ -1192,4 +1164,8 @@ inline ostream& operator<<(ostream& out, ReplicatedPG::RepGather& repop) return out; } +void intrusive_ptr_add_ref(ReplicatedPG::RepGather *repop); +void intrusive_ptr_release(ReplicatedPG::RepGather *repop); + + #endif