osd: rip out old peer stat sharing

This never really worked, and has bitrotted, and the stats it's calculating
are mostly redundant anyway.

Signed-off-by: Sage Weil <sage.weil@dreamhost.com>
This commit is contained in:
Sage Weil 2011-06-23 14:57:07 -07:00
parent dfa6c8d256
commit d37948407a
4 changed files with 15 additions and 246 deletions

View File

@ -42,6 +42,8 @@ class MOSDPing : public Message {
__u8 op; __u8 op;
osd_peer_stat_t peer_stat; osd_peer_stat_t peer_stat;
MOSDPing(const ceph_fsid_t& f, epoch_t e, epoch_t pe, __u8 o) :
Message(MSG_OSD_PING), fsid(f), map_epoch(e), peer_as_of_epoch(pe), op(o) { }
MOSDPing(const ceph_fsid_t& f, epoch_t e, epoch_t pe, osd_peer_stat_t& ps, __u8 o=HEARTBEAT) : MOSDPing(const ceph_fsid_t& f, epoch_t e, epoch_t pe, osd_peer_stat_t& ps, __u8 o=HEARTBEAT) :
Message(MSG_OSD_PING), fsid(f), map_epoch(e), peer_as_of_epoch(pe), op(o), peer_stat(ps) { } Message(MSG_OSD_PING), fsid(f), map_epoch(e), peer_as_of_epoch(pe), op(o), peer_stat(ps) { }
MOSDPing() {} MOSDPing() {}

View File

@ -426,12 +426,7 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger,
heartbeat_messenger(hbm), heartbeat_messenger(hbm),
heartbeat_thread(this), heartbeat_thread(this),
heartbeat_dispatcher(this), heartbeat_dispatcher(this),
decayrate(5.0),
stat_oprate(ceph_clock_now(g_ceph_context)),
stat_lock("OSD::stat_lock"), stat_lock("OSD::stat_lock"),
read_latency_calc(g_conf->osd_max_opq<1 ? 1:g_conf->osd_max_opq),
qlen_calc(3),
iat_averager(g_conf->osd_flash_crowd_iat_alpha),
finished_lock("OSD::finished_lock"), finished_lock("OSD::finished_lock"),
op_wq(this, &op_tp), op_wq(this, &op_tp),
osdmap(NULL), osdmap(NULL),
@ -465,13 +460,6 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger,
osdmap = 0; osdmap = 0;
memset(&my_stat, 0, sizeof(my_stat));
stat_ops = 0;
stat_qlen = 0;
stat_rd_ops = stat_rd_ops_shed_in = stat_rd_ops_shed_out = 0;
stat_rd_ops_in_queue = 0;
pending_ops = 0; pending_ops = 0;
waiting_for_no_ops = false; waiting_for_no_ops = false;
} }
@ -683,9 +671,6 @@ void OSD::open_logger()
osd_logtype.add_inc(l_osd_rop, "rop"); // recovery ops (started) osd_logtype.add_inc(l_osd_rop, "rop"); // recovery ops (started)
osd_logtype.add_set(l_osd_qlen, "qlen");
osd_logtype.add_set(l_osd_rqlen, "rqlen");
osd_logtype.add_set(l_osd_loadavg, "loadavg"); osd_logtype.add_set(l_osd_loadavg, "loadavg");
osd_logtype.add_set(l_osd_pg, "numpg"); // num pgs osd_logtype.add_set(l_osd_pg, "numpg"); // num pgs
@ -1334,79 +1319,6 @@ void OSD::update_osd_stat()
dout(20) << "update_osd_stat " << osd_stat << dendl; dout(20) << "update_osd_stat " << osd_stat << dendl;
} }
void OSD::_refresh_my_stat(utime_t now)
{
assert(heartbeat_lock.is_locked());
assert(stat_lock.is_locked());
// refresh?
if (now - my_stat.stamp > g_conf->osd_stat_refresh_interval ||
pending_ops > 2*my_stat.qlen) {
update_osd_stat();
now.encode_timeval(&my_stat.stamp);
my_stat.oprate = stat_oprate.get(now, decayrate);
//read_latency_calc.set_size( 20 ); // hrm.
// qlen
my_stat.qlen = 0;
if (stat_ops)
my_stat.qlen = (float)stat_qlen / (float)stat_ops; //get_average();
// rd ops shed in
float frac_rd_ops_shed_in = 0;
float frac_rd_ops_shed_out = 0;
if (stat_rd_ops) {
frac_rd_ops_shed_in = (float)stat_rd_ops_shed_in / (float)stat_rd_ops;
frac_rd_ops_shed_out = (float)stat_rd_ops_shed_out / (float)stat_rd_ops;
}
my_stat.frac_rd_ops_shed_in = (my_stat.frac_rd_ops_shed_in + frac_rd_ops_shed_in) / 2.0;
my_stat.frac_rd_ops_shed_out = (my_stat.frac_rd_ops_shed_out + frac_rd_ops_shed_out) / 2.0;
// recent_qlen
qlen_calc.add(my_stat.qlen);
my_stat.recent_qlen = qlen_calc.get_average();
// read latency
if (stat_rd_ops) {
my_stat.read_latency = read_latency_calc.get_average();
if (my_stat.read_latency < 0) my_stat.read_latency = 0;
} else {
my_stat.read_latency = 0;
}
my_stat.read_latency_mine = my_stat.read_latency * (1.0 - frac_rd_ops_shed_in);
logger->fset(l_osd_qlen, my_stat.qlen);
logger->fset(l_osd_rqlen, my_stat.recent_qlen);
dout(30) << "_refresh_my_stat " << my_stat << dendl;
stat_rd_ops = 0;
stat_rd_ops_shed_in = 0;
stat_rd_ops_shed_out = 0;
stat_ops = 0;
stat_qlen = 0;
}
}
osd_peer_stat_t OSD::get_my_stat_for(utime_t now, int peer)
{
Mutex::Locker hlock(heartbeat_lock);
Mutex::Locker lock(stat_lock);
_refresh_my_stat(now);
my_stat_on_peer[peer] = my_stat;
return my_stat;
}
void OSD::take_peer_stat(int peer, const osd_peer_stat_t& stat)
{
Mutex::Locker lock(stat_lock);
dout(15) << "take_peer_stat peer osd" << peer << " " << stat << dendl;
peer_stat[peer] = stat;
}
void OSD::update_heartbeat_peers() void OSD::update_heartbeat_peers()
{ {
assert(osd_lock.is_locked()); assert(osd_lock.is_locked());
@ -1487,7 +1399,7 @@ void OSD::update_heartbeat_peers()
dout(10) << "update_heartbeat_peers: new _from osd" << p dout(10) << "update_heartbeat_peers: new _from osd" << p
<< " " << heartbeat_con[p]->get_peer_addr() << dendl; << " " << heartbeat_con[p]->get_peer_addr() << dendl;
heartbeat_from_stamp[p] = now; heartbeat_from_stamp[p] = now;
MOSDPing *m = new MOSDPing(osdmap->get_fsid(), 0, heartbeat_epoch, my_stat, MOSDPing *m = new MOSDPing(osdmap->get_fsid(), 0, heartbeat_epoch,
MOSDPing::REQUEST_HEARTBEAT); MOSDPing::REQUEST_HEARTBEAT);
heartbeat_messenger->send_message(m, heartbeat_con[p]); heartbeat_messenger->send_message(m, heartbeat_con[p]);
} }
@ -1555,7 +1467,7 @@ void OSD::update_heartbeat_peers()
<< " " << old_con[p->first]->get_peer_addr() << " " << old_con[p->first]->get_peer_addr()
<< " they are down" << dendl; << " they are down" << dendl;
heartbeat_messenger->send_message(new MOSDPing(osdmap->get_fsid(), heartbeat_epoch, heartbeat_messenger->send_message(new MOSDPing(osdmap->get_fsid(), heartbeat_epoch,
heartbeat_epoch, my_stat, heartbeat_epoch,
MOSDPing::YOU_DIED), con); MOSDPing::YOU_DIED), con);
} }
heartbeat_messenger->mark_down_on_empty(con); heartbeat_messenger->mark_down_on_empty(con);
@ -1651,15 +1563,8 @@ void OSD::handle_osd_ping(MOSDPing *m)
case MOSDPing::HEARTBEAT: case MOSDPing::HEARTBEAT:
if (heartbeat_from.count(from) && if (heartbeat_from.count(from) &&
heartbeat_con[from] == m->get_connection()) { heartbeat_con[from] == m->get_connection()) {
// only take peer stat or share map now if map_lock is uncontended
if (locked) { dout(20) << "handle_osd_ping " << m->get_source_inst() << dendl;
dout(20) << "handle_osd_ping " << m->get_source_inst()
<< " took stat " << m->peer_stat << dendl;
take_peer_stat(from, m->peer_stat); // only with map_lock held!
} else {
dout(20) << "handle_osd_ping " << m->get_source_inst()
<< " dropped stat " << m->peer_stat << dendl;
}
note_peer_epoch(from, m->map_epoch); note_peer_epoch(from, m->map_epoch);
if (locked && !is_booting()) if (locked && !is_booting())
@ -1754,16 +1659,14 @@ void OSD::heartbeat()
dout(30) << "heartbeat checking stats" << dendl; dout(30) << "heartbeat checking stats" << dendl;
// calc my stats // refresh stats?
Mutex::Locker lock(stat_lock); {
_refresh_my_stat(now); Mutex::Locker lock(stat_lock);
my_stat_on_peer.clear(); update_osd_stat();
}
dout(5) << "heartbeat: " << my_stat << dendl;
dout(5) << "heartbeat: " << osd_stat << dendl; dout(5) << "heartbeat: " << osd_stat << dendl;
//load_calc.set_size(stat_ops);
bool map_locked = map_lock.try_get_read(); bool map_locked = map_lock.try_get_read();
dout(30) << "heartbeat map_locked=" << map_locked << dendl; dout(30) << "heartbeat map_locked=" << map_locked << dendl;
@ -1773,12 +1676,10 @@ void OSD::heartbeat()
i++) { i++) {
int peer = i->first; int peer = i->first;
if (heartbeat_con.count(peer)) { if (heartbeat_con.count(peer)) {
my_stat_on_peer[peer] = my_stat;
dout(30) << "heartbeat allocating ping for osd" << peer << dendl; dout(30) << "heartbeat allocating ping for osd" << peer << dendl;
Message *m = new MOSDPing(osdmap->get_fsid(), Message *m = new MOSDPing(osdmap->get_fsid(),
map_locked ? osdmap->get_epoch():0, map_locked ? osdmap->get_epoch():0,
i->second, i->second, MOSDPing::HEARTBEAT);
my_stat);
m->set_priority(CEPH_MSG_PRIO_HIGH); m->set_priority(CEPH_MSG_PRIO_HIGH);
dout(30) << "heartbeat sending ping to osd" << peer << dendl; dout(30) << "heartbeat sending ping to osd" << peer << dendl;
heartbeat_messenger->send_message(m, heartbeat_con[peer]); heartbeat_messenger->send_message(m, heartbeat_con[peer]);
@ -5028,19 +4929,6 @@ void OSD::handle_op(MOSDOp *op)
return; return;
} }
// update qlen stats
stat_oprate.hit(now, decayrate);
stat_ops++;
stat_qlen += pending_ops;
if (!op->may_write()) {
stat_rd_ops++;
if (op->get_source().is_osd()) {
//dout(0) << "shed in " << stat_rd_ops_shed_in << " / " << stat_rd_ops << dendl;
stat_rd_ops_shed_in++;
}
}
// we don't need encoded payload anymore // we don't need encoded payload anymore
op->clear_payload(); op->clear_payload();
@ -5136,11 +5024,6 @@ void OSD::handle_op(MOSDOp *op)
dout(10) << "handle_op " << *op << " in " << *pg << dendl; dout(10) << "handle_op " << *op << " in " << *pg << dendl;
if (!op->may_write()) {
Mutex::Locker lock(stat_lock);
stat_rd_ops_in_queue++;
}
pg->get(); pg->get();
if (g_conf->osd_op_threads < 1) { if (g_conf->osd_op_threads < 1) {
// do it now. // do it now.

View File

@ -85,8 +85,6 @@ enum {
l_osd_rop, l_osd_rop,
l_osd_qlen,
l_osd_rqlen,
l_osd_loadavg, l_osd_loadavg,
l_osd_pg, l_osd_pg,
@ -298,105 +296,8 @@ private:
Mutex stat_lock; Mutex stat_lock;
osd_stat_t osd_stat; osd_stat_t osd_stat;
DecayRate decayrate;
DecayCounter stat_oprate;
int stat_ops; // ops since last heartbeat
int stat_rd_ops;
int stat_rd_ops_shed_in;
int stat_rd_ops_shed_out;
int stat_qlen; // cumulative queue length since last refresh
int stat_rd_ops_in_queue; // in queue
osd_peer_stat_t my_stat;
hash_map<int, osd_peer_stat_t, rjhash<uint32_t> > peer_stat;
hash_map<int, osd_peer_stat_t, rjhash<uint32_t> > my_stat_on_peer; // what the peer thinks of me
void update_osd_stat(); void update_osd_stat();
void _refresh_my_stat(utime_t now);
osd_peer_stat_t get_my_stat_for(utime_t now, int peer);
void take_peer_stat(int peer, const osd_peer_stat_t& stat);
// load calculation
//current implementation is moving averges.
class MovingAverager {
private:
Mutex lock;
deque<double> m_Data;
unsigned m_Size;
double m_Total;
public:
MovingAverager(unsigned size) : lock("OSD::MovingAverager::lock"), m_Size(size), m_Total(0) { }
void set_size(unsigned size) {
m_Size = size;
}
void add(double value) {
Mutex::Locker locker(lock);
// add item
m_Data.push_back(value);
m_Total += value;
// trim
while (m_Data.size() > m_Size) {
m_Total -= m_Data.front();
m_Data.pop_front();
}
}
double get_average() {
Mutex::Locker locker(lock);
if (m_Data.empty()) return -1;
return m_Total / (double)m_Data.size();
}
} read_latency_calc, qlen_calc;
class IATAverager {
public:
struct iat_data {
double last_req_stamp;
double average_iat;
iat_data() : last_req_stamp(0), average_iat(0) {}
};
private:
mutable Mutex lock;
double alpha;
hash_map<object_t, iat_data> iat_map;
public:
IATAverager(double a) : lock("IATAverager::lock"),alpha(a) {}
void add_sample(const object_t& oid, double now) {
Mutex::Locker locker(lock);
iat_data &r = iat_map[oid];
double iat = now - r.last_req_stamp;
r.last_req_stamp = now;
r.average_iat = r.average_iat*(1.0-alpha) + iat*alpha;
}
bool have(const object_t& oid) const {
Mutex::Locker locker(lock);
return iat_map.count(oid);
}
double get_average_iat(const object_t& oid) const {
Mutex::Locker locker(lock);
hash_map<object_t, iat_data>::const_iterator p = iat_map.find(oid);
assert(p != iat_map.end());
return p->second.average_iat;
}
bool is_flash_crowd_candidate(const object_t& oid) const {
Mutex::Locker locker(lock);
return get_average_iat(oid) <= g_conf->osd_flash_crowd_iat_threshold;
}
};
IATAverager iat_averager;
// -- waiters -- // -- waiters --
list<class Message*> finished; list<class Message*> finished;
Mutex finished_lock; Mutex finished_lock;

View File

@ -677,11 +677,6 @@ void ReplicatedPG::log_op_stats(OpContext *ctx)
osd->logger->inc(l_osd_op_r); osd->logger->inc(l_osd_op_r);
osd->logger->inc(l_osd_op_r_outb, outb); osd->logger->inc(l_osd_op_r_outb, outb);
osd->logger->favg(l_osd_op_r_lat, latency); osd->logger->favg(l_osd_op_r_lat, latency);
Mutex::Locker lock(osd->stat_lock);
osd->stat_rd_ops_in_queue--;
osd->read_latency_calc.add(latency);
} else if (op->may_write()) { } else if (op->may_write()) {
osd->logger->inc(l_osd_op_w); osd->logger->inc(l_osd_op_w);
osd->logger->inc(l_osd_op_w_inb, inb); osd->logger->inc(l_osd_op_w_inb, inb);
@ -2783,9 +2778,7 @@ void ReplicatedPG::issue_repop(RepGather *repop, utime_t now,
} }
wr->pg_trim_to = pg_trim_to; wr->pg_trim_to = pg_trim_to;
wr->peer_stat = osd->get_my_stat_for(now, peer); osd->cluster_messenger->send_message(wr, osd->osdmap->get_cluster_inst(peer));
osd->cluster_messenger->
send_message(wr, osd->osdmap->get_cluster_inst(peer));
// keep peer_info up to date // keep peer_info up to date
Info &in = peer_info[peer]; Info &in = peer_info[peer];
@ -3148,10 +3141,6 @@ void ReplicatedPG::sub_op_modify(MOSDSubOp *op)
assert(is_active()); assert(is_active());
assert(is_replica()); assert(is_replica());
// note peer's stat
int fromosd = op->get_source().num();
osd->take_peer_stat(fromosd, op->peer_stat);
// we better not be missing this. // we better not be missing this.
assert(!missing.is_missing(soid)); assert(!missing.is_missing(soid));
@ -3240,10 +3229,8 @@ void ReplicatedPG::sub_op_modify_applied(RepModify *rm)
if (!rm->committed) { if (!rm->committed) {
// send ack to acker only if we haven't sent a commit already // send ack to acker only if we haven't sent a commit already
MOSDSubOpReply *ack = new MOSDSubOpReply(rm->op, 0, osd->osdmap->get_epoch(), CEPH_OSD_FLAG_ACK); MOSDSubOpReply *ack = new MOSDSubOpReply(rm->op, 0, osd->osdmap->get_epoch(), CEPH_OSD_FLAG_ACK);
ack->set_peer_stat(osd->get_my_stat_for(ceph_clock_now(g_ceph_context), rm->ackerosd));
ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority! ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority!
osd->cluster_messenger-> osd->cluster_messenger->send_message(ack, osd->osdmap->get_cluster_inst(rm->ackerosd));
send_message(ack, osd->osdmap->get_cluster_inst(rm->ackerosd));
} }
rm->applied = true; rm->applied = true;
@ -3282,9 +3269,7 @@ void ReplicatedPG::sub_op_modify_commit(RepModify *rm)
MOSDSubOpReply *commit = new MOSDSubOpReply(rm->op, 0, osd->osdmap->get_epoch(), CEPH_OSD_FLAG_ONDISK); MOSDSubOpReply *commit = new MOSDSubOpReply(rm->op, 0, osd->osdmap->get_epoch(), CEPH_OSD_FLAG_ONDISK);
commit->set_last_complete_ondisk(rm->last_complete); commit->set_last_complete_ondisk(rm->last_complete);
commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority! commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
commit->set_peer_stat(osd->get_my_stat_for(ceph_clock_now(g_ceph_context), rm->ackerosd)); osd->cluster_messenger->send_message(commit, osd->osdmap->get_cluster_inst(rm->ackerosd));
osd->cluster_messenger->
send_message(commit, osd->osdmap->get_cluster_inst(rm->ackerosd));
} }
rm->committed = true; rm->committed = true;
@ -3305,8 +3290,6 @@ void ReplicatedPG::sub_op_modify_reply(MOSDSubOpReply *r)
tid_t rep_tid = r->get_tid(); tid_t rep_tid = r->get_tid();
int fromosd = r->get_source().num(); int fromosd = r->get_source().num();
osd->take_peer_stat(fromosd, r->get_peer_stat());
if (repop_map.count(rep_tid)) { if (repop_map.count(rep_tid)) {
// oh, good. // oh, good.
repop_ack(repop_map[rep_tid], repop_ack(repop_map[rep_tid],