osd: use per-PG ObjectStore::Sequencer when possible

Otherwise, when it doesn't matter, use the default.  This
serialized io submission to btrfs at the lower layer within a
single PG.  In general this is not significant, since we have
lots of PGs.  We could potentionally do per object in some cases,
but we'd have to be careful about snapshots and such.  Keep it
simple for now.
This commit is contained in:
Sage Weil 2010-03-02 15:09:24 -08:00
parent 0ca85bcdbf
commit 5fa2083f49
5 changed files with 30 additions and 27 deletions

View File

@ -181,7 +181,7 @@ int OSD::mkfs(const char *dev, const char *jdev, ceph_fsid_t fsid, int whoami)
for (int i=0; i<1000; i++) {
ObjectStore::Transaction *t = new ObjectStore::Transaction;
t->write(meta_coll, sobject_t(oid, 0), i*bl.length(), bl.length(), bl);
store->queue_transaction(t);
store->queue_transaction(NULL, t);
}
store->sync();
utime_t end = g_clock.now();
@ -2052,7 +2052,7 @@ void OSD::handle_osd_map(MOSDMap *m)
dout(10) << "handle_osd_map got full map epoch " << p->first << dendl;
ObjectStore::Transaction *ft = new ObjectStore::Transaction;
ft->write(meta_coll, poid, 0, p->second.length(), p->second); // store _outside_ transaction; activate_map reads it.
int r = store->queue_transaction(ft);
int r = store->queue_transaction(NULL, ft);
assert(r == 0);
if (p->first > superblock.newest_map)
@ -2079,7 +2079,7 @@ void OSD::handle_osd_map(MOSDMap *m)
dout(10) << "handle_osd_map got incremental map epoch " << p->first << dendl;
ObjectStore::Transaction *ft = new ObjectStore::Transaction;
ft->write(meta_coll, poid, 0, p->second.length(), p->second); // store _outside_ transaction; activate_map reads it.
int r = store->queue_transaction(ft);
int r = store->queue_transaction(NULL, ft);
assert(r == 0);
if (p->first > superblock.newest_map)
@ -2818,7 +2818,7 @@ void OSD::kick_pg_split_queue()
pg->unlock();
created++;
}
int tr = store->queue_transaction(t, new ObjectStore::C_DeleteTransaction(t), fin);
int tr = store->queue_transaction(NULL, t, new ObjectStore::C_DeleteTransaction(t), fin);
assert(tr == 0);
// remove from queue
@ -3035,7 +3035,7 @@ void OSD::handle_pg_create(MOSDPGCreate *m)
pg->unlock();
}
int tr = store->queue_transaction(t, new ObjectStore::C_DeleteTransaction(t), fin);
int tr = store->queue_transaction(NULL, t, new ObjectStore::C_DeleteTransaction(t), fin);
assert(tr == 0);
do_queries(query_map);
@ -3211,7 +3211,7 @@ void OSD::handle_pg_notify(MOSDPGNotify *m)
pg->unlock();
}
int tr = store->queue_transaction(t, new ObjectStore::C_DeleteTransaction(t), fin);
int tr = store->queue_transaction(NULL, t, new ObjectStore::C_DeleteTransaction(t), fin);
assert(tr == 0);
do_queries(query_map);
@ -3323,7 +3323,7 @@ void OSD::_process_pg_info(epoch_t epoch, int from,
}
}
int tr = store->queue_transaction(t, new ObjectStore::C_DeleteTransaction(t), fin);
int tr = store->queue_transaction(NULL, t, new ObjectStore::C_DeleteTransaction(t), fin);
assert(tr == 0);
pg->unlock();
@ -3402,7 +3402,7 @@ void OSD::handle_pg_trim(MOSDPGTrim *m)
ObjectStore::Transaction *t = new ObjectStore::Transaction;
pg->trim(*t, m->trim_to);
pg->write_info(*t);
int tr = store->queue_transaction(t, new ObjectStore::C_DeleteTransaction(t));
int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t));
assert(tr == 0);
}
pg->unlock();
@ -3474,7 +3474,7 @@ void OSD::handle_pg_query(MOSDPGQuery *m)
pg->info.history = history;
pg->write_info(*t);
pg->write_log(*t);
int tr = store->queue_transaction(t);
int tr = store->queue_transaction(&pg->osr, t);
assert(tr == 0);
created++;
@ -3611,7 +3611,7 @@ void OSD::_remove_pg(PG *pg)
ObjectStore::Transaction *t = new ObjectStore::Transaction;
pg->write_info(*t);
t->remove(meta_coll, pg->log_oid);
int tr = store->queue_transaction(t);
int tr = store->queue_transaction(&pg->osr, t);
assert(tr == 0);
}
@ -3632,7 +3632,7 @@ void OSD::_remove_pg(PG *pg)
ObjectStore::Transaction *t = new ObjectStore::Transaction;
t->remove(coll_t::build_snap_pg_coll(pgid, *p), *q);
t->remove(coll_t::build_pg_coll(pgid), *q); // we may hit this twice, but it's harmless
int tr = store->queue_transaction(t);
int tr = store->queue_transaction(&pg->osr, t);
assert(tr == 0);
if ((++n & 0xff) == 0) {
@ -3657,7 +3657,7 @@ void OSD::_remove_pg(PG *pg)
p++) {
ObjectStore::Transaction *t = new ObjectStore::Transaction;
t->remove(coll_t::build_pg_coll(pgid), *p);
int tr = store->queue_transaction(t);
int tr = store->queue_transaction(&pg->osr, t);
assert(tr == 0);
if ((++n & 0xff) == 0) {
@ -3690,7 +3690,7 @@ void OSD::_remove_pg(PG *pg)
{
rmt->remove_collection(coll_t::build_pg_coll(pgid));
int tr = store->queue_transaction(rmt);
int tr = store->queue_transaction(NULL, rmt);
assert(tr == 0);
}
@ -3794,7 +3794,7 @@ void OSD::generate_backlog(PG *pg)
pg->write_info(*t);
if (pg->dirty_log)
pg->write_log(*t);
int tr = store->queue_transaction(t, new ObjectStore::C_DeleteTransaction(t), fin);
int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin);
assert(tr == 0);
}
@ -3841,7 +3841,7 @@ void OSD::activate_pg(pg_t pgid, utime_t activate_at)
ObjectStore::Transaction *t = new ObjectStore::Transaction;
C_Contexts *fin = new C_Contexts;
pg->activate(*t, fin->contexts);
int tr = store->queue_transaction(t, new ObjectStore::C_DeleteTransaction(t), fin);
int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin);
assert(tr == 0);
}
pg->unlock();

View File

@ -1803,7 +1803,7 @@ void PG::_finish_recovery(Context *c)
drop_backlog();
write_info(*t);
write_log(*t);
int tr = osd->store->queue_transaction(t);
int tr = osd->store->queue_transaction(&osr, t);
assert(tr == 0);
}
} else {
@ -2255,7 +2255,7 @@ void PG::read_log(ObjectStore *store)
::encode(oi, bl);
ObjectStore::Transaction *t = new ObjectStore::Transaction;
t->setattr(coll_t::build_pg_coll(info.pgid), i->soid, OI_ATTR, bl);
int tr = osd->store->queue_transaction(t);
int tr = osd->store->queue_transaction(&osr, t);
assert(tr == 0);
stringstream ss;

View File

@ -760,6 +760,9 @@ public:
bool pg_stats_valid;
pg_stat_t pg_stats_stable;
// for ordering writes
ObjectStore::Sequencer osr;
void update_stats();
void clear_stats();

View File

@ -835,7 +835,7 @@ bool ReplicatedPG::snap_trimmer()
t->setattr(coll_t::build_pg_coll(info.pgid), snapoid, SS_ATTR, bl);
}
int tr = osd->store->queue_transaction(t);
int tr = osd->store->queue_transaction(&osr, t);
assert(tr == 0);
// give other threads a chance at this pg
@ -849,7 +849,7 @@ bool ReplicatedPG::snap_trimmer()
snap_collections.erase(sn);
write_info(*t);
t->remove_collection(c);
int tr = osd->store->queue_transaction(t);
int tr = osd->store->queue_transaction(&osr, t);
assert(tr == 0);
info.snap_trimq.erase(sn);
@ -860,7 +860,7 @@ bool ReplicatedPG::snap_trimmer()
ObjectStore::Transaction *t = new ObjectStore::Transaction;
write_info(*t);
int tr = osd->store->queue_transaction(t);
int tr = osd->store->queue_transaction(&osr, t);
assert(tr == 0);
unlock();
return true;
@ -1773,7 +1773,7 @@ void ReplicatedPG::apply_repop(RepGather *repop)
Context *oncommit = new C_OSD_OpCommit(this, repop);
Context *onapplied = new C_OSD_OpApplied(this, repop);
Context *onapplied_sync = new C_OSD_OndiskWriteUnlock(repop->obc);
int r = osd->store->queue_transactions(repop->tls, onapplied, oncommit, onapplied_sync);
int r = osd->store->queue_transactions(&osr, repop->tls, onapplied, oncommit, onapplied_sync);
if (r) {
dout(-10) << "apply_repop queue_transactions returned " << r << " on " << *repop << dendl;
assert(0);
@ -2444,7 +2444,7 @@ void ReplicatedPG::sub_op_modify(MOSDSubOp *op)
Context *oncommit = new C_OSD_RepModifyCommit(rm);
Context *onapply = new C_OSD_RepModifyApply(rm);
int r = osd->store->queue_transactions(rm->tls, onapply, oncommit);
int r = osd->store->queue_transactions(&osr, rm->tls, onapply, oncommit);
if (r) {
derr(0) << "error applying transaction: r = " << r << dendl;
assert(0);
@ -3146,7 +3146,7 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op)
// apply to disk!
write_info(*t);
int r = osd->store->queue_transaction(t,
int r = osd->store->queue_transaction(&osr, t,
onreadable,
new C_OSD_Commit(this, info.history.same_acting_since,
info.last_complete),
@ -3380,7 +3380,7 @@ int ReplicatedPG::recover_primary(int max)
put_object_context(headobc);
// XXX: track objectcontext!
int tr = osd->store->queue_transaction(t);
int tr = osd->store->queue_transaction(&osr, t);
assert(tr == 0);
missing.got(latest->soid, latest->version);
missing_loc.erase(latest->soid);
@ -3429,7 +3429,7 @@ int ReplicatedPG::recover_primary(int max)
ObjectStore::Transaction *t = new ObjectStore::Transaction;
C_Contexts *fin = new C_Contexts;
finish_recovery(*t, fin->contexts);
int tr = osd->store->queue_transaction(t, new ObjectStore::C_DeleteTransaction(t), fin);
int tr = osd->store->queue_transaction(&osr, t, new ObjectStore::C_DeleteTransaction(t), fin);
assert(tr == 0);
} else {
dout(-10) << "recover_primary primary now complete, starting peer recovery" << dendl;
@ -3494,7 +3494,7 @@ int ReplicatedPG::recover_replicas(int max)
ObjectStore::Transaction *t = new ObjectStore::Transaction;
C_Contexts *fin = new C_Contexts;
finish_recovery(*t, fin->contexts);
int tr = osd->store->queue_transaction(t, new ObjectStore::C_DeleteTransaction(t), fin);
int tr = osd->store->queue_transaction(&osr, t, new ObjectStore::C_DeleteTransaction(t), fin);
assert(tr == 0);
} else {
dout(10) << "recover_replicas not all uptodate, acting " << acting << ", uptodate " << uptodate_set << dendl;

View File

@ -144,7 +144,7 @@ int main(int argc, const char **argv)
set_start(pos, g_clock.now());
ObjectStore::Transaction *t = new ObjectStore::Transaction;
t->write(coll_t(), poid, pos, bytes, bl);
fs->queue_transaction(t, new C_Ack(pos), new C_Commit(pos));
fs->queue_transaction(NULL, t, new C_Ack(pos), new C_Commit(pos));
pos += bytes;
throttle();