mirror of
https://github.com/ceph/ceph
synced 2024-12-17 08:57:28 +00:00
*** empty log message ***
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@859 29311d96-e01e-0410-9327-a35deaab8ce9
This commit is contained in:
parent
ee14d9841c
commit
ef41218dd1
@ -137,6 +137,9 @@ md_config_t g_conf = {
|
||||
client_trace: 0,
|
||||
fuse_direct_io: 0,
|
||||
|
||||
// --- objecter ---
|
||||
objecter_buffer_uncommitted: true,
|
||||
|
||||
// --- mds ---
|
||||
mds_cache_size: MDS_CACHE_SIZE,
|
||||
mds_cache_mid: .7,
|
||||
@ -465,6 +468,10 @@ void parse_config_options(vector<char*>& args)
|
||||
else if (strcmp(args[i], "--fakemessenger_serialize") == 0)
|
||||
g_conf.fakemessenger_serialize = atoi(args[++i]);
|
||||
|
||||
|
||||
else if (strcmp(args[i], "--objecter_buffer_uncommitted") == 0)
|
||||
g_conf.objecter_buffer_uncommitted = atoi(args[++i]);
|
||||
|
||||
else if (strcmp(args[i], "--mds_cache_size") == 0)
|
||||
g_conf.mds_cache_size = atoi(args[++i]);
|
||||
|
||||
|
@ -98,6 +98,7 @@ struct md_config_t {
|
||||
int client_oc_max_dirty;
|
||||
size_t client_oc_max_sync_write;
|
||||
|
||||
|
||||
|
||||
/*
|
||||
bool client_bcache;
|
||||
@ -113,6 +114,9 @@ struct md_config_t {
|
||||
int client_trace;
|
||||
int fuse_direct_io;
|
||||
|
||||
// objecter
|
||||
bool objecter_buffer_uncommitted;
|
||||
|
||||
// mds
|
||||
int mds_cache_size;
|
||||
float mds_cache_mid;
|
||||
|
@ -15,6 +15,7 @@
|
||||
#include "config.h"
|
||||
#undef dout
|
||||
#define dout(x) if (x <= g_conf.debug || x <= g_conf.debug_objecter) cout << messenger->get_myaddr() << ".objecter "
|
||||
#define derr(x) if (x <= g_conf.debug || x <= g_conf.debug_objecter) cerr << messenger->get_myaddr() << ".objecter "
|
||||
|
||||
|
||||
// messages ------------------------------
|
||||
@ -171,9 +172,14 @@ void Objecter::kick_requests(set<pg_t>& changed_pgs)
|
||||
|
||||
// WRITE
|
||||
if (wr->tid_version.count(tid)) {
|
||||
dout(0) << "kick_requests missing commit, replay write " << tid
|
||||
<< " v " << wr->tid_version[tid] << endl;
|
||||
modifyx_submit(wr, wr->waitfor_commit[tid], tid);
|
||||
if (wr->op == OSD_OP_WRITE &&
|
||||
!g_conf.objecter_buffer_uncommitted) {
|
||||
derr(0) << "kick_requests missing commit, cannot replay: objecter_buffer_uncommitted == FALSE" << endl;
|
||||
} else {
|
||||
dout(0) << "kick_requests missing commit, replay write " << tid
|
||||
<< " v " << wr->tid_version[tid] << endl;
|
||||
modifyx_submit(wr, wr->waitfor_commit[tid], tid);
|
||||
}
|
||||
}
|
||||
else if (wr->waitfor_ack.count(tid)) {
|
||||
dout(0) << "kick_requests missing ack, resub write " << tid << endl;
|
||||
@ -554,6 +560,9 @@ tid_t Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, tid_t usetid)
|
||||
op_modify[tid] = wr;
|
||||
pg.active_tids.insert(tid);
|
||||
|
||||
++num_unacked;
|
||||
++num_uncommitted;
|
||||
|
||||
// send
|
||||
dout(10) << "modifyx_submit " << MOSDOp::get_opname(wr->op) << " tid " << tid
|
||||
<< " oid " << hex << ex.oid << dec
|
||||
@ -564,6 +573,8 @@ tid_t Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, tid_t usetid)
|
||||
if (pg.primary() >= 0)
|
||||
messenger->send_message(m, MSG_ADDR_OSD(pg.primary()), 0);
|
||||
|
||||
dout(5) << num_unacked << " unacked, " << num_uncommitted << " uncommitted" << endl;
|
||||
|
||||
return tid;
|
||||
}
|
||||
|
||||
@ -618,6 +629,8 @@ void Objecter::handle_osd_modify_reply(MOSDOpReply *m)
|
||||
wr->waitfor_ack.erase(tid);
|
||||
wr->waitfor_commit.erase(tid);
|
||||
|
||||
num_uncommitted--;
|
||||
|
||||
if (wr->waitfor_commit.empty()) {
|
||||
onack = wr->onack;
|
||||
oncommit = wr->oncommit;
|
||||
@ -628,6 +641,8 @@ void Objecter::handle_osd_modify_reply(MOSDOpReply *m)
|
||||
//dout(15) << " handle_osd_write_reply ack on " << tid << endl;
|
||||
assert(wr->waitfor_ack.count(tid));
|
||||
wr->waitfor_ack.erase(tid);
|
||||
|
||||
num_unacked--;
|
||||
|
||||
if (wr->tid_version.count(tid) &&
|
||||
wr->tid_version[tid].version != m->get_version().version) {
|
||||
@ -639,6 +654,13 @@ void Objecter::handle_osd_modify_reply(MOSDOpReply *m)
|
||||
if (wr->waitfor_ack.empty()) {
|
||||
onack = wr->onack;
|
||||
wr->onack = 0; // only do callback once
|
||||
|
||||
// buffer uncommitted?
|
||||
if (!g_conf.objecter_buffer_uncommitted &&
|
||||
wr->op == OSD_OP_WRITE) {
|
||||
// discard buffer!
|
||||
((OSDWrite*)wr)->bl.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -25,6 +25,8 @@ class Objecter {
|
||||
|
||||
private:
|
||||
tid_t last_tid;
|
||||
int num_unacked;
|
||||
int num_uncommitted;
|
||||
|
||||
/*** track pending operations ***/
|
||||
// read
|
||||
@ -121,7 +123,8 @@ class Objecter {
|
||||
public:
|
||||
Objecter(Messenger *m, OSDMap *om) :
|
||||
messenger(m), osdmap(om),
|
||||
last_tid(0)
|
||||
last_tid(0),
|
||||
num_unacked(0), num_uncommitted(0)
|
||||
{}
|
||||
~Objecter() {
|
||||
// clean up op_*
|
||||
|
Loading…
Reference in New Issue
Block a user