Merge branch 'wip-mon-leaks-fix' into next

This commit is contained in:
Sage Weil 2012-11-18 14:37:22 -08:00
commit 34e5f9bbfc
18 changed files with 159 additions and 71 deletions

View File

@ -381,17 +381,17 @@ int main(int argc, const char **argv)
// throttle client traffic
Throttle client_throttler(g_ceph_context, "mon_client_bytes",
g_conf->mon_client_bytes);
messenger->set_policy_throttler(entity_name_t::TYPE_CLIENT, &client_throttler);
Throttle *client_throttler = new Throttle(g_ceph_context, "mon_client_bytes",
g_conf->mon_client_bytes);
messenger->set_policy_throttler(entity_name_t::TYPE_CLIENT, client_throttler);
// throttle daemon traffic
// NOTE: actual usage on the leader may multiply by the number of
// monitors if they forward large update messages from daemons.
Throttle daemon_throttler(g_ceph_context, "mon_daemon_bytes",
g_conf->mon_daemon_bytes);
messenger->set_policy_throttler(entity_name_t::TYPE_OSD, &daemon_throttler);
messenger->set_policy_throttler(entity_name_t::TYPE_MDS, &daemon_throttler);
Throttle *daemon_throttler = new Throttle(g_ceph_context, "mon_daemon_bytes",
g_conf->mon_daemon_bytes);
messenger->set_policy_throttler(entity_name_t::TYPE_OSD, daemon_throttler);
messenger->set_policy_throttler(entity_name_t::TYPE_MDS, daemon_throttler);
cout << "starting " << g_conf->name << " rank " << rank
<< " at " << ipaddr
@ -430,9 +430,14 @@ int main(int argc, const char **argv)
unregister_async_signal_handler(SIGINT, handle_mon_signal);
unregister_async_signal_handler(SIGTERM, handle_mon_signal);
shutdown_async_signal_handler();
store.umount();
delete mon;
delete messenger;
delete client_throttler;
delete daemon_throttler;
g_ceph_context->put();
// cd on exit, so that gmon.out (if any) goes into a separate directory for each node.
char s[20];

View File

@ -286,26 +286,16 @@ uint64_t AuthMonitor::assign_global_id(MAuth *m, bool should_increase_max)
dout(10) << "next_global_id should be " << next_global_id << dendl;
}
while (next_global_id >= max_global_id) {
if (!mon->is_leader()) {
dout(10) << "not the leader, requesting more ids from leader" << dendl;
int leader = mon->get_leader();
MMonGlobalID *req = new MMonGlobalID();
req->old_max_id = max_global_id;
mon->messenger->send_message(req, mon->monmap->get_inst(leader));
paxos->wait_for_commit(new C_RetryMessage(this, m));
if (next_global_id >= max_global_id) {
if (!mon->is_leader() || !should_increase_max) {
return 0;
} else {
if (!should_increase_max)
return 0;
dout(10) << "increasing max_global_id" << dendl;
}
while (next_global_id >= max_global_id) {
increase_max_global_id();
}
}
last_allocated_id = next_global_id;
return next_global_id;
}
@ -372,15 +362,22 @@ bool AuthMonitor::prep_auth(MAuth *m, bool paxos_writable)
if (!s->global_id) {
s->global_id = assign_global_id(m, paxos_writable);
if (!s->global_id) {
s->put();
delete s->auth_handler;
s->auth_handler = NULL;
s->put();
//we don't m->put() here because assign_global_id has queued it up
if (mon->is_leader())
return false;
return true;
if (!mon->is_leader()) {
dout(10) << "not the leader, requesting more ids from leader" << dendl;
int leader = mon->get_leader();
MMonGlobalID *req = new MMonGlobalID();
req->old_max_id = max_global_id;
mon->messenger->send_message(req, mon->monmap->get_inst(leader));
paxos->wait_for_commit(new C_RetryMessage(this, m));
return true;
}
assert(!paxos_writable);
return false;
}
}

View File

@ -24,9 +24,9 @@ using namespace std;
#include "PaxosService.h"
#include "common/LogEntry.h"
#include "messages/MLog.h"
class MMonCommand;
class MLog;
class LogMonitor : public PaxosService {
private:
@ -52,6 +52,11 @@ private:
MLog *ack;
C_Log(LogMonitor *p, MLog *a) : logmon(p), ack(a) {}
void finish(int r) {
if (r == -ECANCELED) {
if (ack)
ack->put();
return;
}
logmon->_updated_log(ack);
}
};

View File

@ -30,8 +30,8 @@ using namespace std;
#include "PaxosService.h"
#include "Session.h"
#include "messages/MMDSBeacon.h"
class MMDSBeacon;
class MMDSGetMap;
class MMonCommand;
class MMDSLoadTargets;
@ -54,6 +54,11 @@ class MDSMonitor : public PaxosService {
C_Updated(MDSMonitor *a, MMDSBeacon *c) :
mm(a), m(c) {}
void finish(int r) {
if (r == -ECANCELED) {
if (m)
m->put();
return;
}
if (r >= 0)
mm->_updated(m); // success
else

View File

@ -189,19 +189,7 @@ Monitor::~Monitor()
delete *p;
for (vector<Paxos*>::iterator p = paxos.begin(); p != paxos.end(); p++)
delete *p;
//clean out MonSessionMap's subscriptions
for (map<string, xlist<Subscription*>* >::iterator i
= session_map.subs.begin();
i != session_map.subs.end();
++i) {
while (!i->second->empty())
session_map.remove_sub(i->second->front());
delete i->second;
}
//clean out MonSessionMap's sessions
while (!session_map.sessions.empty()) {
session_map.remove_session(session_map.sessions.front());
}
assert(session_map.sessions.empty());
delete mon_caps;
}
@ -623,11 +611,16 @@ void Monitor::shutdown()
for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); p++)
(*p)->shutdown();
finish_contexts(g_ceph_context, waitfor_quorum, -ECANCELED);
finish_contexts(g_ceph_context, maybe_wait_for_quorum, -ECANCELED);
timer.shutdown();
// unlock before msgr shutdown...
lock.Unlock();
remove_all_sessions();
messenger->shutdown(); // last thing! ceph_mon.cc will delete mon.
}
@ -1833,9 +1826,17 @@ void Monitor::remove_session(MonSession *s)
routed_requests.erase(*p);
}
}
s->con->set_priv(NULL);
session_map.remove_session(s);
}
void Monitor::remove_all_sessions()
{
while (!session_map.sessions.empty()) {
MonSession *s = session_map.sessions.front();
remove_session(s);
}
}
void Monitor::send_command(const entity_inst_t& inst,
const vector<string>& com, version_t version)
@ -2026,15 +2027,14 @@ bool Monitor::_ms_dispatch(Message *m)
// paxos
case MSG_MON_PAXOS:
{
MMonPaxos *pm = (MMonPaxos*)m;
if (!src_is_mon &&
!s->caps.check_privileges(PAXOS_MONMAP, MON_CAP_X)) {
//can't send these!
m->put();
pm->put();
break;
}
MMonPaxos *pm = (MMonPaxos*)m;
// sanitize
if (pm->epoch > get_epoch()) {
bootstrap();
@ -2064,6 +2064,8 @@ bool Monitor::_ms_dispatch(Message *m)
!s->caps.check_privileges(PAXOS_MONMAP, MON_CAP_X)) {
dout(0) << "MMonElection received from entity without enough caps!"
<< s->caps << dendl;
m->put();
break;
}
if (!is_probing() && !is_slurping()) {
elector.dispatch(m);
@ -2195,9 +2197,6 @@ bool Monitor::ms_handle_reset(Connection *con)
if (!s->closed)
remove_session(s);
s->put();
// remove from connection, too.
con->set_priv(NULL);
return true;
}

View File

@ -46,6 +46,7 @@
#include "messages/MMonCommand.h"
#include <memory>
#include <errno.h>
#define CEPH_MON_PROTOCOL 9 /* cluster internal */
@ -373,6 +374,7 @@ public:
void no_reply(PaxosServiceMessage *req);
void resend_routed_requests();
void remove_session(MonSession *s);
void remove_all_sessions();
void send_command(const entity_inst_t& inst,
const vector<string>& com, version_t version);
@ -392,7 +394,9 @@ public:
void finish(int r) {
if (r >= 0)
mon->reply_command(m, rc, rs, rdata, version);
else
else if (r == -ECANCELED) {
m->put();
} else
mon->_ms_dispatch(m);
}
};
@ -404,7 +408,10 @@ public:
public:
C_RetryMessage(Monitor *m, Message *ms) : mon(m), msg(ms) {}
void finish(int r) {
mon->_ms_dispatch(msg);
if (r == -ECANCELED) {
msg->put();
} else
mon->_ms_dispatch(msg);
}
};

View File

@ -31,11 +31,11 @@ using namespace std;
#include "Session.h"
class Monitor;
class MOSDBoot;
class MMonCommand;
class MPoolSnap;
class MOSDMap;
class MOSDFailure;
#include "messages/MOSDBoot.h"
#include "messages/MMonCommand.h"
#include "messages/MOSDMap.h"
#include "messages/MOSDFailure.h"
#include "messages/MPoolOp.h"
/// information about a particular peer's failure reports for one osd
struct failure_reporter_t {
@ -208,6 +208,12 @@ private:
C_Booted(OSDMonitor *cm, MOSDBoot *m_, bool l=true) :
cmon(cm), m(m_), logit(l) {}
void finish(int r) {
if (r == -ECANCELED) {
if (m)
m->put();
return;
}
if (r >= 0)
cmon->_booted(m, logit);
else
@ -221,6 +227,11 @@ private:
epoch_t e;
C_ReplyMap(OSDMonitor *o, PaxosServiceMessage *mm, epoch_t ee) : osdmon(o), m(mm), e(ee) {}
void finish(int r) {
if (r == -ECANCELED) {
if (m)
m->put();
return;
}
osdmon->_reply_map(m, e);
}
};
@ -233,6 +244,11 @@ private:
C_PoolOp(OSDMonitor * osd, MPoolOp *m_, int rc, int e, bufferlist *rd=NULL) :
osdmon(osd), m(m_), replyCode(rc), epoch(e), reply_data(rd) {}
void finish(int r) {
if (r == -ECANCELED) {
if (m)
m->put();
return;
}
osdmon->_pool_op_reply(m, replyCode, epoch, reply_data);
}
};

View File

@ -545,6 +545,8 @@ struct RetryCheckOSDMap : public Context {
epoch_t epoch;
RetryCheckOSDMap(PGMonitor *p, epoch_t e) : pgmon(p), epoch(e) {}
void finish(int r) {
if (r == -ECANCELED)
return;
pgmon->check_osd_map(epoch);
}
};

View File

@ -31,8 +31,8 @@ using namespace std;
#include "msg/Messenger.h"
#include "common/config.h"
class MPGStats;
class MPGStatsAck;
#include "messages/MPGStats.h"
#include "messages/MPGStatsAck.h"
class MStatfs;
class MMonCommand;
class MGetPoolStats;
@ -71,6 +71,11 @@ private:
entity_inst_t who;
C_Stats(PGMonitor *p, MPGStats *r, MPGStatsAck *a) : pgmon(p), req(r), ack(a) {}
void finish(int r) {
if (r == -ECANCELED) {
req->put();
ack->put();
return;
}
pgmon->_updated_stats(req, ack);
}
};

View File

@ -842,6 +842,14 @@ void Paxos::cancel_events()
}
}
void Paxos::shutdown() {
dout(10) << __func__ << " cancel all contexts" << dendl;
finish_contexts(g_ceph_context, waiting_for_writeable, -ECANCELED);
finish_contexts(g_ceph_context, waiting_for_commit, -ECANCELED);
finish_contexts(g_ceph_context, waiting_for_readable, -ECANCELED);
finish_contexts(g_ceph_context, waiting_for_active, -ECANCELED);
}
void Paxos::leader_init()
{
cancel_events();
@ -869,8 +877,8 @@ void Paxos::peon_init()
dout(10) << "peon_init -- i am a peon" << dendl;
// no chance to write now!
finish_contexts(g_ceph_context, waiting_for_writeable, -1);
finish_contexts(g_ceph_context, waiting_for_commit, -1);
finish_contexts(g_ceph_context, waiting_for_writeable, -EAGAIN);
finish_contexts(g_ceph_context, waiting_for_commit, -EAGAIN);
}
void Paxos::restart()
@ -879,8 +887,8 @@ void Paxos::restart()
cancel_events();
new_value.clear();
finish_contexts(g_ceph_context, waiting_for_commit, -1);
finish_contexts(g_ceph_context, waiting_for_active, -1);
finish_contexts(g_ceph_context, waiting_for_commit, -EAGAIN);
finish_contexts(g_ceph_context, waiting_for_active, -EAGAIN);
}

View File

@ -47,6 +47,7 @@ e 12v
#include "include/Context.h"
#include "common/Timer.h"
#include <errno.h>
class Monitor;
class MMonPaxos;
@ -450,6 +451,8 @@ private:
public:
C_CollectTimeout(Paxos *p) : paxos(p) {}
void finish(int r) {
if (r == -ECANCELED)
return;
paxos->collect_timeout();
}
};
@ -462,6 +465,8 @@ private:
public:
C_AcceptTimeout(Paxos *p) : paxos(p) {}
void finish(int r) {
if (r == -ECANCELED)
return;
paxos->accept_timeout();
}
};
@ -474,6 +479,8 @@ private:
public:
C_LeaseAckTimeout(Paxos *p) : paxos(p) {}
void finish(int r) {
if (r == -ECANCELED)
return;
paxos->lease_ack_timeout();
}
};
@ -486,6 +493,8 @@ private:
public:
C_LeaseTimeout(Paxos *p) : paxos(p) {}
void finish(int r) {
if (r == -ECANCELED)
return;
paxos->lease_timeout();
}
};
@ -498,6 +507,8 @@ private:
public:
C_LeaseRenew(Paxos *p) : paxos(p) {}
void finish(int r) {
if (r == -ECANCELED)
return;
paxos->lease_renew_timeout();
}
};
@ -811,6 +822,10 @@ private:
* Cancel all of Paxos' timeout/renew events.
*/
void cancel_events();
/**
* Shutdown this Paxos machine
*/
void shutdown();
/**
* Generate a new Proposal Number based on @p gt

View File

@ -199,6 +199,7 @@ void PaxosService::_active()
void PaxosService::shutdown()
{
paxos->cancel_events();
paxos->shutdown();
if (proposal_timer) {
mon->timer.cancel_event(proposal_timer);

View File

@ -17,6 +17,7 @@
#include "messages/PaxosServiceMessage.h"
#include "include/Context.h"
#include <errno.h>
class Monitor;
class Paxos;
@ -63,6 +64,10 @@ protected:
public:
C_RetryMessage(PaxosService *s, PaxosServiceMessage *m_) : svc(s), m(m_) {}
void finish(int r) {
if (r == -ECANCELED) {
m->put();
return;
}
svc->dispatch(m);
}
};
@ -93,7 +98,9 @@ protected:
PaxosService *ps;
public:
C_Propose(PaxosService *p) : ps(p) { }
void finish(int r) {
void finish(int r) {
if (r == -ECANCELED)
return;
ps->proposal_timer = 0;
ps->propose_pending();
}

View File

@ -80,6 +80,15 @@ struct MonSessionMap {
map<string, xlist<Subscription*>* > subs;
multimap<int, MonSession*> by_osd;
MonSessionMap() {}
~MonSessionMap() {
while (!subs.empty()) {
assert(subs.begin()->second->empty());
delete subs.begin()->second;
subs.erase(subs.begin());
}
}
void remove_session(MonSession *s) {
assert(!s->closed);
for (map<string,Subscription*>::iterator p = s->sub_map.begin(); p != s->sub_map.end(); ++p) {

View File

@ -127,8 +127,9 @@ void DispatchQueue::discard_queue(uint64_t id) {
i != removed.end();
++i) {
assert(!(i->is_code())); // We don't discard id 0, ever!
msgr->dispatch_throttle_release(
i->get_message()->get_dispatch_throttle_size());
Message *m = i->get_message();
msgr->dispatch_throttle_release(m->get_dispatch_throttle_size());
m->put();
}
}

View File

@ -62,7 +62,8 @@ Pipe::Pipe(SimpleMessenger *r, int st, Connection *con)
state(st),
session_security(NULL),
connection_state(NULL),
reader_running(false), reader_joining(false), writer_running(false),
reader_running(false), reader_needs_join(false),
writer_running(false),
in_q(&(r->dispatch_queue)),
keepalive(false),
close_on_empty(false),
@ -118,6 +119,10 @@ void Pipe::start_reader()
{
assert(pipe_lock.is_locked());
assert(!reader_running);
if (reader_needs_join) {
reader_thread.join();
reader_needs_join = false;
}
reader_running = true;
reader_thread.create(msgr->cct->_conf->ms_rwthread_stack_bytes);
}
@ -134,14 +139,11 @@ void Pipe::join_reader()
{
if (!reader_running)
return;
assert(!reader_joining);
reader_joining = true;
cond.Signal();
pipe_lock.Unlock();
reader_thread.join();
pipe_lock.Lock();
assert(reader_joining);
reader_joining = false;
reader_needs_join = false;
}
@ -322,6 +324,7 @@ int Pipe::accept()
!authorizer_valid) {
ldout(msgr->cct,0) << "accept: got bad authorizer" << dendl;
reply.tag = CEPH_MSGR_TAG_BADAUTHORIZER;
delete session_security;
session_security = NULL;
goto reply;
}
@ -539,6 +542,7 @@ int Pipe::accept()
connection_state->set_features((int)reply.features & (int)connect.features);
ldout(msgr->cct,10) << "accept features " << connection_state->get_features() << dendl;
delete session_security;
session_security = get_auth_session_handler(msgr->cct, connect.authorizer_protocol, session_key,
connection_state->get_features());
@ -917,6 +921,7 @@ int Pipe::connect()
// If we have an authorizer, get a new AuthSessionHandler to deal with ongoing security of the
// connection. PLR
delete session_security;
if (authorizer != NULL) {
session_security = get_auth_session_handler(msgr->cct, authorizer->protocol, authorizer->session_key,
connection_state->get_features());
@ -1261,6 +1266,7 @@ void Pipe::reader()
// reap?
reader_running = false;
reader_needs_join = true;
unlock_maybe_reap();
ldout(msgr->cct,10) << "reader done" << dendl;
}

View File

@ -113,7 +113,7 @@ class DispatchQueue;
utime_t backoff; // backoff time
bool reader_running, reader_joining;
bool reader_running, reader_needs_join;
bool writer_running;
map<int, list<Message*> > out_q; // priority queue for outbound msgs

View File

@ -66,7 +66,7 @@ SimpleMessenger::~SimpleMessenger()
assert(!did_bind); // either we didn't bind or we shut down the Accepter
assert(rank_pipe.empty()); // we don't have any running Pipes.
assert(reaper_stop && !reaper_started); // the reaper thread is stopped
delete local_connection;
local_connection->put();
}
void SimpleMessenger::ready()