From ef41218dd1de8ba92d97c3a06d60f83b09dfd7e4 Mon Sep 17 00:00:00 2001 From: sage Date: Fri, 15 Sep 2006 23:36:00 +0000 Subject: [PATCH] *** empty log message *** git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@859 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/config.cc | 7 +++++++ ceph/config.h | 4 ++++ ceph/osdc/Objecter.cc | 28 +++++++++++++++++++++++++--- ceph/osdc/Objecter.h | 5 ++++- 4 files changed, 40 insertions(+), 4 deletions(-) diff --git a/ceph/config.cc b/ceph/config.cc index 3e5368b7943..7a6017ff0b9 100644 --- a/ceph/config.cc +++ b/ceph/config.cc @@ -137,6 +137,9 @@ md_config_t g_conf = { client_trace: 0, fuse_direct_io: 0, + // --- objecter --- + objecter_buffer_uncommitted: true, + // --- mds --- mds_cache_size: MDS_CACHE_SIZE, mds_cache_mid: .7, @@ -465,6 +468,10 @@ void parse_config_options(vector& args) else if (strcmp(args[i], "--fakemessenger_serialize") == 0) g_conf.fakemessenger_serialize = atoi(args[++i]); + + else if (strcmp(args[i], "--objecter_buffer_uncommitted") == 0) + g_conf.objecter_buffer_uncommitted = atoi(args[++i]); + else if (strcmp(args[i], "--mds_cache_size") == 0) g_conf.mds_cache_size = atoi(args[++i]); diff --git a/ceph/config.h b/ceph/config.h index ebd0dfdd49b..160c817e860 100644 --- a/ceph/config.h +++ b/ceph/config.h @@ -98,6 +98,7 @@ struct md_config_t { int client_oc_max_dirty; size_t client_oc_max_sync_write; + /* bool client_bcache; @@ -113,6 +114,9 @@ struct md_config_t { int client_trace; int fuse_direct_io; + // objecter + bool objecter_buffer_uncommitted; + // mds int mds_cache_size; float mds_cache_mid; diff --git a/ceph/osdc/Objecter.cc b/ceph/osdc/Objecter.cc index 95a961587e7..23c17544403 100644 --- a/ceph/osdc/Objecter.cc +++ b/ceph/osdc/Objecter.cc @@ -15,6 +15,7 @@ #include "config.h" #undef dout #define dout(x) if (x <= g_conf.debug || x <= g_conf.debug_objecter) cout << messenger->get_myaddr() << ".objecter " +#define derr(x) if (x <= g_conf.debug || x <= g_conf.debug_objecter) cerr << messenger->get_myaddr() << ".objecter " // messages ------------------------------ @@ -171,9 +172,14 @@ void Objecter::kick_requests(set& changed_pgs) // WRITE if (wr->tid_version.count(tid)) { - dout(0) << "kick_requests missing commit, replay write " << tid - << " v " << wr->tid_version[tid] << endl; - modifyx_submit(wr, wr->waitfor_commit[tid], tid); + if (wr->op == OSD_OP_WRITE && + !g_conf.objecter_buffer_uncommitted) { + derr(0) << "kick_requests missing commit, cannot replay: objecter_buffer_uncommitted == FALSE" << endl; + } else { + dout(0) << "kick_requests missing commit, replay write " << tid + << " v " << wr->tid_version[tid] << endl; + modifyx_submit(wr, wr->waitfor_commit[tid], tid); + } } else if (wr->waitfor_ack.count(tid)) { dout(0) << "kick_requests missing ack, resub write " << tid << endl; @@ -554,6 +560,9 @@ tid_t Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, tid_t usetid) op_modify[tid] = wr; pg.active_tids.insert(tid); + ++num_unacked; + ++num_uncommitted; + // send dout(10) << "modifyx_submit " << MOSDOp::get_opname(wr->op) << " tid " << tid << " oid " << hex << ex.oid << dec @@ -564,6 +573,8 @@ tid_t Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, tid_t usetid) if (pg.primary() >= 0) messenger->send_message(m, MSG_ADDR_OSD(pg.primary()), 0); + dout(5) << num_unacked << " unacked, " << num_uncommitted << " uncommitted" << endl; + return tid; } @@ -618,6 +629,8 @@ void Objecter::handle_osd_modify_reply(MOSDOpReply *m) wr->waitfor_ack.erase(tid); wr->waitfor_commit.erase(tid); + num_uncommitted--; + if (wr->waitfor_commit.empty()) { onack = wr->onack; oncommit = wr->oncommit; @@ -628,6 +641,8 @@ void Objecter::handle_osd_modify_reply(MOSDOpReply *m) //dout(15) << " handle_osd_write_reply ack on " << tid << endl; assert(wr->waitfor_ack.count(tid)); wr->waitfor_ack.erase(tid); + + num_unacked--; if (wr->tid_version.count(tid) && wr->tid_version[tid].version != m->get_version().version) { @@ -639,6 +654,13 @@ void Objecter::handle_osd_modify_reply(MOSDOpReply *m) if (wr->waitfor_ack.empty()) { onack = wr->onack; wr->onack = 0; // only do callback once + + // buffer uncommitted? + if (!g_conf.objecter_buffer_uncommitted && + wr->op == OSD_OP_WRITE) { + // discard buffer! + ((OSDWrite*)wr)->bl.clear(); + } } } diff --git a/ceph/osdc/Objecter.h b/ceph/osdc/Objecter.h index 140bb58dc7e..450f5701159 100644 --- a/ceph/osdc/Objecter.h +++ b/ceph/osdc/Objecter.h @@ -25,6 +25,8 @@ class Objecter { private: tid_t last_tid; + int num_unacked; + int num_uncommitted; /*** track pending operations ***/ // read @@ -121,7 +123,8 @@ class Objecter { public: Objecter(Messenger *m, OSDMap *om) : messenger(m), osdmap(om), - last_tid(0) + last_tid(0), + num_unacked(0), num_uncommitted(0) {} ~Objecter() { // clean up op_*