From 5fa2083f49d3e2d8d741a85da58c509a410d8df4 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 2 Mar 2010 15:09:24 -0800 Subject: [PATCH] osd: use per-PG ObjectStore::Sequencer when possible Otherwise, when it doesn't matter, use the default. This serialized io submission to btrfs at the lower layer within a single PG. In general this is not significant, since we have lots of PGs. We could potentionally do per object in some cases, but we'd have to be careful about snapshots and such. Keep it simple for now. --- src/osd/OSD.cc | 30 +++++++++++++++--------------- src/osd/PG.cc | 4 ++-- src/osd/PG.h | 3 +++ src/osd/ReplicatedPG.cc | 18 +++++++++--------- src/streamtest.cc | 2 +- 5 files changed, 30 insertions(+), 27 deletions(-) diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index a3955785e35..d7733c89c71 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -181,7 +181,7 @@ int OSD::mkfs(const char *dev, const char *jdev, ceph_fsid_t fsid, int whoami) for (int i=0; i<1000; i++) { ObjectStore::Transaction *t = new ObjectStore::Transaction; t->write(meta_coll, sobject_t(oid, 0), i*bl.length(), bl.length(), bl); - store->queue_transaction(t); + store->queue_transaction(NULL, t); } store->sync(); utime_t end = g_clock.now(); @@ -2052,7 +2052,7 @@ void OSD::handle_osd_map(MOSDMap *m) dout(10) << "handle_osd_map got full map epoch " << p->first << dendl; ObjectStore::Transaction *ft = new ObjectStore::Transaction; ft->write(meta_coll, poid, 0, p->second.length(), p->second); // store _outside_ transaction; activate_map reads it. - int r = store->queue_transaction(ft); + int r = store->queue_transaction(NULL, ft); assert(r == 0); if (p->first > superblock.newest_map) @@ -2079,7 +2079,7 @@ void OSD::handle_osd_map(MOSDMap *m) dout(10) << "handle_osd_map got incremental map epoch " << p->first << dendl; ObjectStore::Transaction *ft = new ObjectStore::Transaction; ft->write(meta_coll, poid, 0, p->second.length(), p->second); // store _outside_ transaction; activate_map reads it. - int r = store->queue_transaction(ft); + int r = store->queue_transaction(NULL, ft); assert(r == 0); if (p->first > superblock.newest_map) @@ -2818,7 +2818,7 @@ void OSD::kick_pg_split_queue() pg->unlock(); created++; } - int tr = store->queue_transaction(t, new ObjectStore::C_DeleteTransaction(t), fin); + int tr = store->queue_transaction(NULL, t, new ObjectStore::C_DeleteTransaction(t), fin); assert(tr == 0); // remove from queue @@ -3035,7 +3035,7 @@ void OSD::handle_pg_create(MOSDPGCreate *m) pg->unlock(); } - int tr = store->queue_transaction(t, new ObjectStore::C_DeleteTransaction(t), fin); + int tr = store->queue_transaction(NULL, t, new ObjectStore::C_DeleteTransaction(t), fin); assert(tr == 0); do_queries(query_map); @@ -3211,7 +3211,7 @@ void OSD::handle_pg_notify(MOSDPGNotify *m) pg->unlock(); } - int tr = store->queue_transaction(t, new ObjectStore::C_DeleteTransaction(t), fin); + int tr = store->queue_transaction(NULL, t, new ObjectStore::C_DeleteTransaction(t), fin); assert(tr == 0); do_queries(query_map); @@ -3323,7 +3323,7 @@ void OSD::_process_pg_info(epoch_t epoch, int from, } } - int tr = store->queue_transaction(t, new ObjectStore::C_DeleteTransaction(t), fin); + int tr = store->queue_transaction(NULL, t, new ObjectStore::C_DeleteTransaction(t), fin); assert(tr == 0); pg->unlock(); @@ -3402,7 +3402,7 @@ void OSD::handle_pg_trim(MOSDPGTrim *m) ObjectStore::Transaction *t = new ObjectStore::Transaction; pg->trim(*t, m->trim_to); pg->write_info(*t); - int tr = store->queue_transaction(t, new ObjectStore::C_DeleteTransaction(t)); + int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t)); assert(tr == 0); } pg->unlock(); @@ -3474,7 +3474,7 @@ void OSD::handle_pg_query(MOSDPGQuery *m) pg->info.history = history; pg->write_info(*t); pg->write_log(*t); - int tr = store->queue_transaction(t); + int tr = store->queue_transaction(&pg->osr, t); assert(tr == 0); created++; @@ -3611,7 +3611,7 @@ void OSD::_remove_pg(PG *pg) ObjectStore::Transaction *t = new ObjectStore::Transaction; pg->write_info(*t); t->remove(meta_coll, pg->log_oid); - int tr = store->queue_transaction(t); + int tr = store->queue_transaction(&pg->osr, t); assert(tr == 0); } @@ -3632,7 +3632,7 @@ void OSD::_remove_pg(PG *pg) ObjectStore::Transaction *t = new ObjectStore::Transaction; t->remove(coll_t::build_snap_pg_coll(pgid, *p), *q); t->remove(coll_t::build_pg_coll(pgid), *q); // we may hit this twice, but it's harmless - int tr = store->queue_transaction(t); + int tr = store->queue_transaction(&pg->osr, t); assert(tr == 0); if ((++n & 0xff) == 0) { @@ -3657,7 +3657,7 @@ void OSD::_remove_pg(PG *pg) p++) { ObjectStore::Transaction *t = new ObjectStore::Transaction; t->remove(coll_t::build_pg_coll(pgid), *p); - int tr = store->queue_transaction(t); + int tr = store->queue_transaction(&pg->osr, t); assert(tr == 0); if ((++n & 0xff) == 0) { @@ -3690,7 +3690,7 @@ void OSD::_remove_pg(PG *pg) { rmt->remove_collection(coll_t::build_pg_coll(pgid)); - int tr = store->queue_transaction(rmt); + int tr = store->queue_transaction(NULL, rmt); assert(tr == 0); } @@ -3794,7 +3794,7 @@ void OSD::generate_backlog(PG *pg) pg->write_info(*t); if (pg->dirty_log) pg->write_log(*t); - int tr = store->queue_transaction(t, new ObjectStore::C_DeleteTransaction(t), fin); + int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin); assert(tr == 0); } @@ -3841,7 +3841,7 @@ void OSD::activate_pg(pg_t pgid, utime_t activate_at) ObjectStore::Transaction *t = new ObjectStore::Transaction; C_Contexts *fin = new C_Contexts; pg->activate(*t, fin->contexts); - int tr = store->queue_transaction(t, new ObjectStore::C_DeleteTransaction(t), fin); + int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin); assert(tr == 0); } pg->unlock(); diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 9f18cccd0ab..d240173e60d 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -1803,7 +1803,7 @@ void PG::_finish_recovery(Context *c) drop_backlog(); write_info(*t); write_log(*t); - int tr = osd->store->queue_transaction(t); + int tr = osd->store->queue_transaction(&osr, t); assert(tr == 0); } } else { @@ -2255,7 +2255,7 @@ void PG::read_log(ObjectStore *store) ::encode(oi, bl); ObjectStore::Transaction *t = new ObjectStore::Transaction; t->setattr(coll_t::build_pg_coll(info.pgid), i->soid, OI_ATTR, bl); - int tr = osd->store->queue_transaction(t); + int tr = osd->store->queue_transaction(&osr, t); assert(tr == 0); stringstream ss; diff --git a/src/osd/PG.h b/src/osd/PG.h index 1d45245b619..8c7d230ff6b 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -760,6 +760,9 @@ public: bool pg_stats_valid; pg_stat_t pg_stats_stable; + // for ordering writes + ObjectStore::Sequencer osr; + void update_stats(); void clear_stats(); diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 3e076cc7103..6988efc7972 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -835,7 +835,7 @@ bool ReplicatedPG::snap_trimmer() t->setattr(coll_t::build_pg_coll(info.pgid), snapoid, SS_ATTR, bl); } - int tr = osd->store->queue_transaction(t); + int tr = osd->store->queue_transaction(&osr, t); assert(tr == 0); // give other threads a chance at this pg @@ -849,7 +849,7 @@ bool ReplicatedPG::snap_trimmer() snap_collections.erase(sn); write_info(*t); t->remove_collection(c); - int tr = osd->store->queue_transaction(t); + int tr = osd->store->queue_transaction(&osr, t); assert(tr == 0); info.snap_trimq.erase(sn); @@ -860,7 +860,7 @@ bool ReplicatedPG::snap_trimmer() ObjectStore::Transaction *t = new ObjectStore::Transaction; write_info(*t); - int tr = osd->store->queue_transaction(t); + int tr = osd->store->queue_transaction(&osr, t); assert(tr == 0); unlock(); return true; @@ -1773,7 +1773,7 @@ void ReplicatedPG::apply_repop(RepGather *repop) Context *oncommit = new C_OSD_OpCommit(this, repop); Context *onapplied = new C_OSD_OpApplied(this, repop); Context *onapplied_sync = new C_OSD_OndiskWriteUnlock(repop->obc); - int r = osd->store->queue_transactions(repop->tls, onapplied, oncommit, onapplied_sync); + int r = osd->store->queue_transactions(&osr, repop->tls, onapplied, oncommit, onapplied_sync); if (r) { dout(-10) << "apply_repop queue_transactions returned " << r << " on " << *repop << dendl; assert(0); @@ -2444,7 +2444,7 @@ void ReplicatedPG::sub_op_modify(MOSDSubOp *op) Context *oncommit = new C_OSD_RepModifyCommit(rm); Context *onapply = new C_OSD_RepModifyApply(rm); - int r = osd->store->queue_transactions(rm->tls, onapply, oncommit); + int r = osd->store->queue_transactions(&osr, rm->tls, onapply, oncommit); if (r) { derr(0) << "error applying transaction: r = " << r << dendl; assert(0); @@ -3146,7 +3146,7 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op) // apply to disk! write_info(*t); - int r = osd->store->queue_transaction(t, + int r = osd->store->queue_transaction(&osr, t, onreadable, new C_OSD_Commit(this, info.history.same_acting_since, info.last_complete), @@ -3380,7 +3380,7 @@ int ReplicatedPG::recover_primary(int max) put_object_context(headobc); // XXX: track objectcontext! - int tr = osd->store->queue_transaction(t); + int tr = osd->store->queue_transaction(&osr, t); assert(tr == 0); missing.got(latest->soid, latest->version); missing_loc.erase(latest->soid); @@ -3429,7 +3429,7 @@ int ReplicatedPG::recover_primary(int max) ObjectStore::Transaction *t = new ObjectStore::Transaction; C_Contexts *fin = new C_Contexts; finish_recovery(*t, fin->contexts); - int tr = osd->store->queue_transaction(t, new ObjectStore::C_DeleteTransaction(t), fin); + int tr = osd->store->queue_transaction(&osr, t, new ObjectStore::C_DeleteTransaction(t), fin); assert(tr == 0); } else { dout(-10) << "recover_primary primary now complete, starting peer recovery" << dendl; @@ -3494,7 +3494,7 @@ int ReplicatedPG::recover_replicas(int max) ObjectStore::Transaction *t = new ObjectStore::Transaction; C_Contexts *fin = new C_Contexts; finish_recovery(*t, fin->contexts); - int tr = osd->store->queue_transaction(t, new ObjectStore::C_DeleteTransaction(t), fin); + int tr = osd->store->queue_transaction(&osr, t, new ObjectStore::C_DeleteTransaction(t), fin); assert(tr == 0); } else { dout(10) << "recover_replicas not all uptodate, acting " << acting << ", uptodate " << uptodate_set << dendl; diff --git a/src/streamtest.cc b/src/streamtest.cc index c3f5b8571f9..d46ffa8af98 100644 --- a/src/streamtest.cc +++ b/src/streamtest.cc @@ -144,7 +144,7 @@ int main(int argc, const char **argv) set_start(pos, g_clock.now()); ObjectStore::Transaction *t = new ObjectStore::Transaction; t->write(coll_t(), poid, pos, bytes, bl); - fs->queue_transaction(t, new C_Ack(pos), new C_Commit(pos)); + fs->queue_transaction(NULL, t, new C_Ack(pos), new C_Commit(pos)); pos += bytes; throttle();