diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index a3955785e35..d7733c89c71 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -181,7 +181,7 @@ int OSD::mkfs(const char *dev, const char *jdev, ceph_fsid_t fsid, int whoami) for (int i=0; i<1000; i++) { ObjectStore::Transaction *t = new ObjectStore::Transaction; t->write(meta_coll, sobject_t(oid, 0), i*bl.length(), bl.length(), bl); - store->queue_transaction(t); + store->queue_transaction(NULL, t); } store->sync(); utime_t end = g_clock.now(); @@ -2052,7 +2052,7 @@ void OSD::handle_osd_map(MOSDMap *m) dout(10) << "handle_osd_map got full map epoch " << p->first << dendl; ObjectStore::Transaction *ft = new ObjectStore::Transaction; ft->write(meta_coll, poid, 0, p->second.length(), p->second); // store _outside_ transaction; activate_map reads it. - int r = store->queue_transaction(ft); + int r = store->queue_transaction(NULL, ft); assert(r == 0); if (p->first > superblock.newest_map) @@ -2079,7 +2079,7 @@ void OSD::handle_osd_map(MOSDMap *m) dout(10) << "handle_osd_map got incremental map epoch " << p->first << dendl; ObjectStore::Transaction *ft = new ObjectStore::Transaction; ft->write(meta_coll, poid, 0, p->second.length(), p->second); // store _outside_ transaction; activate_map reads it. - int r = store->queue_transaction(ft); + int r = store->queue_transaction(NULL, ft); assert(r == 0); if (p->first > superblock.newest_map) @@ -2818,7 +2818,7 @@ void OSD::kick_pg_split_queue() pg->unlock(); created++; } - int tr = store->queue_transaction(t, new ObjectStore::C_DeleteTransaction(t), fin); + int tr = store->queue_transaction(NULL, t, new ObjectStore::C_DeleteTransaction(t), fin); assert(tr == 0); // remove from queue @@ -3035,7 +3035,7 @@ void OSD::handle_pg_create(MOSDPGCreate *m) pg->unlock(); } - int tr = store->queue_transaction(t, new ObjectStore::C_DeleteTransaction(t), fin); + int tr = store->queue_transaction(NULL, t, new ObjectStore::C_DeleteTransaction(t), fin); assert(tr == 0); do_queries(query_map); @@ -3211,7 +3211,7 @@ void OSD::handle_pg_notify(MOSDPGNotify *m) pg->unlock(); } - int tr = store->queue_transaction(t, new ObjectStore::C_DeleteTransaction(t), fin); + int tr = store->queue_transaction(NULL, t, new ObjectStore::C_DeleteTransaction(t), fin); assert(tr == 0); do_queries(query_map); @@ -3323,7 +3323,7 @@ void OSD::_process_pg_info(epoch_t epoch, int from, } } - int tr = store->queue_transaction(t, new ObjectStore::C_DeleteTransaction(t), fin); + int tr = store->queue_transaction(NULL, t, new ObjectStore::C_DeleteTransaction(t), fin); assert(tr == 0); pg->unlock(); @@ -3402,7 +3402,7 @@ void OSD::handle_pg_trim(MOSDPGTrim *m) ObjectStore::Transaction *t = new ObjectStore::Transaction; pg->trim(*t, m->trim_to); pg->write_info(*t); - int tr = store->queue_transaction(t, new ObjectStore::C_DeleteTransaction(t)); + int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t)); assert(tr == 0); } pg->unlock(); @@ -3474,7 +3474,7 @@ void OSD::handle_pg_query(MOSDPGQuery *m) pg->info.history = history; pg->write_info(*t); pg->write_log(*t); - int tr = store->queue_transaction(t); + int tr = store->queue_transaction(&pg->osr, t); assert(tr == 0); created++; @@ -3611,7 +3611,7 @@ void OSD::_remove_pg(PG *pg) ObjectStore::Transaction *t = new ObjectStore::Transaction; pg->write_info(*t); t->remove(meta_coll, pg->log_oid); - int tr = store->queue_transaction(t); + int tr = store->queue_transaction(&pg->osr, t); assert(tr == 0); } @@ -3632,7 +3632,7 @@ void OSD::_remove_pg(PG *pg) ObjectStore::Transaction *t = new ObjectStore::Transaction; t->remove(coll_t::build_snap_pg_coll(pgid, *p), *q); t->remove(coll_t::build_pg_coll(pgid), *q); // we may hit this twice, but it's harmless - int tr = store->queue_transaction(t); + int tr = store->queue_transaction(&pg->osr, t); assert(tr == 0); if ((++n & 0xff) == 0) { @@ -3657,7 +3657,7 @@ void OSD::_remove_pg(PG *pg) p++) { ObjectStore::Transaction *t = new ObjectStore::Transaction; t->remove(coll_t::build_pg_coll(pgid), *p); - int tr = store->queue_transaction(t); + int tr = store->queue_transaction(&pg->osr, t); assert(tr == 0); if ((++n & 0xff) == 0) { @@ -3690,7 +3690,7 @@ void OSD::_remove_pg(PG *pg) { rmt->remove_collection(coll_t::build_pg_coll(pgid)); - int tr = store->queue_transaction(rmt); + int tr = store->queue_transaction(NULL, rmt); assert(tr == 0); } @@ -3794,7 +3794,7 @@ void OSD::generate_backlog(PG *pg) pg->write_info(*t); if (pg->dirty_log) pg->write_log(*t); - int tr = store->queue_transaction(t, new ObjectStore::C_DeleteTransaction(t), fin); + int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin); assert(tr == 0); } @@ -3841,7 +3841,7 @@ void OSD::activate_pg(pg_t pgid, utime_t activate_at) ObjectStore::Transaction *t = new ObjectStore::Transaction; C_Contexts *fin = new C_Contexts; pg->activate(*t, fin->contexts); - int tr = store->queue_transaction(t, new ObjectStore::C_DeleteTransaction(t), fin); + int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin); assert(tr == 0); } pg->unlock(); diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 9f18cccd0ab..d240173e60d 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -1803,7 +1803,7 @@ void PG::_finish_recovery(Context *c) drop_backlog(); write_info(*t); write_log(*t); - int tr = osd->store->queue_transaction(t); + int tr = osd->store->queue_transaction(&osr, t); assert(tr == 0); } } else { @@ -2255,7 +2255,7 @@ void PG::read_log(ObjectStore *store) ::encode(oi, bl); ObjectStore::Transaction *t = new ObjectStore::Transaction; t->setattr(coll_t::build_pg_coll(info.pgid), i->soid, OI_ATTR, bl); - int tr = osd->store->queue_transaction(t); + int tr = osd->store->queue_transaction(&osr, t); assert(tr == 0); stringstream ss; diff --git a/src/osd/PG.h b/src/osd/PG.h index 1d45245b619..8c7d230ff6b 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -760,6 +760,9 @@ public: bool pg_stats_valid; pg_stat_t pg_stats_stable; + // for ordering writes + ObjectStore::Sequencer osr; + void update_stats(); void clear_stats(); diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 3e076cc7103..6988efc7972 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -835,7 +835,7 @@ bool ReplicatedPG::snap_trimmer() t->setattr(coll_t::build_pg_coll(info.pgid), snapoid, SS_ATTR, bl); } - int tr = osd->store->queue_transaction(t); + int tr = osd->store->queue_transaction(&osr, t); assert(tr == 0); // give other threads a chance at this pg @@ -849,7 +849,7 @@ bool ReplicatedPG::snap_trimmer() snap_collections.erase(sn); write_info(*t); t->remove_collection(c); - int tr = osd->store->queue_transaction(t); + int tr = osd->store->queue_transaction(&osr, t); assert(tr == 0); info.snap_trimq.erase(sn); @@ -860,7 +860,7 @@ bool ReplicatedPG::snap_trimmer() ObjectStore::Transaction *t = new ObjectStore::Transaction; write_info(*t); - int tr = osd->store->queue_transaction(t); + int tr = osd->store->queue_transaction(&osr, t); assert(tr == 0); unlock(); return true; @@ -1773,7 +1773,7 @@ void ReplicatedPG::apply_repop(RepGather *repop) Context *oncommit = new C_OSD_OpCommit(this, repop); Context *onapplied = new C_OSD_OpApplied(this, repop); Context *onapplied_sync = new C_OSD_OndiskWriteUnlock(repop->obc); - int r = osd->store->queue_transactions(repop->tls, onapplied, oncommit, onapplied_sync); + int r = osd->store->queue_transactions(&osr, repop->tls, onapplied, oncommit, onapplied_sync); if (r) { dout(-10) << "apply_repop queue_transactions returned " << r << " on " << *repop << dendl; assert(0); @@ -2444,7 +2444,7 @@ void ReplicatedPG::sub_op_modify(MOSDSubOp *op) Context *oncommit = new C_OSD_RepModifyCommit(rm); Context *onapply = new C_OSD_RepModifyApply(rm); - int r = osd->store->queue_transactions(rm->tls, onapply, oncommit); + int r = osd->store->queue_transactions(&osr, rm->tls, onapply, oncommit); if (r) { derr(0) << "error applying transaction: r = " << r << dendl; assert(0); @@ -3146,7 +3146,7 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op) // apply to disk! write_info(*t); - int r = osd->store->queue_transaction(t, + int r = osd->store->queue_transaction(&osr, t, onreadable, new C_OSD_Commit(this, info.history.same_acting_since, info.last_complete), @@ -3380,7 +3380,7 @@ int ReplicatedPG::recover_primary(int max) put_object_context(headobc); // XXX: track objectcontext! - int tr = osd->store->queue_transaction(t); + int tr = osd->store->queue_transaction(&osr, t); assert(tr == 0); missing.got(latest->soid, latest->version); missing_loc.erase(latest->soid); @@ -3429,7 +3429,7 @@ int ReplicatedPG::recover_primary(int max) ObjectStore::Transaction *t = new ObjectStore::Transaction; C_Contexts *fin = new C_Contexts; finish_recovery(*t, fin->contexts); - int tr = osd->store->queue_transaction(t, new ObjectStore::C_DeleteTransaction(t), fin); + int tr = osd->store->queue_transaction(&osr, t, new ObjectStore::C_DeleteTransaction(t), fin); assert(tr == 0); } else { dout(-10) << "recover_primary primary now complete, starting peer recovery" << dendl; @@ -3494,7 +3494,7 @@ int ReplicatedPG::recover_replicas(int max) ObjectStore::Transaction *t = new ObjectStore::Transaction; C_Contexts *fin = new C_Contexts; finish_recovery(*t, fin->contexts); - int tr = osd->store->queue_transaction(t, new ObjectStore::C_DeleteTransaction(t), fin); + int tr = osd->store->queue_transaction(&osr, t, new ObjectStore::C_DeleteTransaction(t), fin); assert(tr == 0); } else { dout(10) << "recover_replicas not all uptodate, acting " << acting << ", uptodate " << uptodate_set << dendl; diff --git a/src/streamtest.cc b/src/streamtest.cc index c3f5b8571f9..d46ffa8af98 100644 --- a/src/streamtest.cc +++ b/src/streamtest.cc @@ -144,7 +144,7 @@ int main(int argc, const char **argv) set_start(pos, g_clock.now()); ObjectStore::Transaction *t = new ObjectStore::Transaction; t->write(coll_t(), poid, pos, bytes, bl); - fs->queue_transaction(t, new C_Ack(pos), new C_Commit(pos)); + fs->queue_transaction(NULL, t, new C_Ack(pos), new C_Commit(pos)); pos += bytes; throttle();