Merge pull request #2058 from ceph/wip-refs

refcount debugging for RefCountedObject

Reviewed-by: Joao Eduardo Luis <joao.luis@inktank.com>
Reviewed-by: Samuel Just <sam.just@inktank.com>
This commit is contained in:
Sage Weil 2014-07-03 17:36:48 -07:00
commit 276dbfc4cb
9 changed files with 39 additions and 21 deletions

View File

@ -18,23 +18,38 @@
#include "common/Mutex.h"
#include "common/Cond.h"
#include "include/atomic.h"
#include "common/ceph_context.h"
struct RefCountedObject {
private:
atomic_t nref;
RefCountedObject() : nref(1) {}
virtual ~RefCountedObject() {}
CephContext *cct;
public:
RefCountedObject(CephContext *c = NULL, int n=1) : nref(n), cct(c) {}
virtual ~RefCountedObject() {
assert(nref.read() == 0);
}
RefCountedObject *get() {
//generic_dout(0) << "RefCountedObject::get " << this << " " << nref.read() << " -> " << (nref.read() + 1) << dendl;
nref.inc();
int v = nref.inc();
if (cct)
lsubdout(cct, refs, 1) << "RefCountedObject::get " << this << " "
<< (v - 1) << " -> " << v
<< dendl;
return this;
}
void put() {
//generic_dout(0) << "RefCountedObject::put " << this << " " << nref.read() << " -> " << (nref.read() - 1) << dendl;
if (nref.dec() == 0)
int v = nref.dec();
if (cct)
lsubdout(cct, refs, 1) << "RefCountedObject::put " << this << " "
<< (v + 1) << " -> " << v
<< dendl;
if (v == 0)
delete this;
}
void set_cct(CephContext *c) {
cct = c;
}
};
/**

View File

@ -95,6 +95,7 @@ SUBSYS(rgw, 1, 5) // log level for the Rados gateway
SUBSYS(javaclient, 1, 5)
SUBSYS(asok, 1, 5)
SUBSYS(throttle, 1, 1)
SUBSYS(refs, 0, 0)
OPTION(key, OPT_STR, "")
OPTION(keyfile, OPT_STR, "")

View File

@ -2633,7 +2633,7 @@ void Monitor::handle_forward(MForward *m)
dout(0) << "forward from entity with insufficient caps! "
<< session->caps << dendl;
} else {
Connection *c = new Connection(NULL); // msgr must be null; see PaxosService::dispatch()
Connection *c = new Connection(g_ceph_context, NULL); // msgr must be null; see PaxosService::dispatch()
MonSession *s = new MonSession(m->msg->get_source_inst(), c);
c->set_priv(s);
c->set_peer_addr(m->client.addr);

View File

@ -687,6 +687,8 @@ Message *decode_message(CephContext *cct, ceph_msg_header& header, ceph_msg_foot
return 0;
}
m->set_cct(cct);
// m->header.version, if non-zero, should be populated with the
// newest version of the encoding the code supports. If set, check
// it against compat_version.

View File

@ -190,8 +190,11 @@ public:
friend class boost::intrusive_ptr<Connection>;
public:
Connection(Messenger *m)
: lock("Connection::lock"),
Connection(CephContext *cct, Messenger *m)
// we are managed exlusively by ConnectionRef; make it so you can
// ConnectionRef foo = new Connection;
: RefCountedObject(cct, 0),
lock("Connection::lock"),
msgr(m),
priv(NULL),
peer_type(-1),
@ -199,9 +202,6 @@ public:
pipe(NULL),
failed(false),
rx_buffers_version(0) {
// we are managed exlusively by ConnectionRef; make it so you can
// ConnectionRef foo = new Connection;
nref.set(0);
}
~Connection() {
//generic_dout(0) << "~Connection " << this << dendl;
@ -371,7 +371,6 @@ public:
protected:
virtual ~Message() {
assert(nref.read() == 0);
if (byte_throttler)
byte_throttler->put(payload.length() + middle.length() + data.length());
if (msg_throttler)

View File

@ -73,7 +73,7 @@ ostream& Pipe::_pipe_prefix(std::ostream *_dout) {
*/
Pipe::Pipe(SimpleMessenger *r, int st, Connection *con)
: reader_thread(this), writer_thread(this),
: RefCountedObject(r->cct), reader_thread(this), writer_thread(this),
delay_thread(NULL),
msgr(r),
conn_id(r->dispatch_queue.get_id()),
@ -95,7 +95,7 @@ Pipe::Pipe(SimpleMessenger *r, int st, Connection *con)
connection_state = con;
connection_state->reset_pipe(this);
} else {
connection_state = new Connection(msgr);
connection_state = new Connection(msgr->cct, msgr);
connection_state->pipe = get();
}

View File

@ -52,7 +52,7 @@ SimpleMessenger::SimpleMessenger(CephContext *cct, entity_name_t name,
dispatch_throttler(cct, string("msgr_dispatch_throttler-") + mname, cct->_conf->ms_dispatch_throttle_bytes),
reaper_started(false), reaper_stop(false),
timeout(0),
local_connection(new Connection(this))
local_connection(new Connection(cct, this))
{
ceph_spin_init(&global_seq_lock);
init_local_connection();

View File

@ -3708,7 +3708,7 @@ void OSD::ms_handle_fast_connect(Connection *con)
if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON) {
Session *s = static_cast<Session*>(con->get_priv());
if (!s) {
s = new Session;
s = new Session(cct);
con->set_priv(s->get());
s->con = con;
dout(10) << " new session (outgoing)" << s << " con=" << s->con
@ -3726,7 +3726,7 @@ void OSD::ms_handle_fast_accept(Connection *con)
if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON) {
Session *s = static_cast<Session*>(con->get_priv());
if (!s) {
s = new Session();
s = new Session(cct);
con->set_priv(s->get());
s->con = con;
dout(10) << "new session (incoming)" << s << " con=" << con
@ -5161,7 +5161,7 @@ bool OSD::ms_verify_authorizer(Connection *con, int peer_type,
if (isvalid) {
Session *s = static_cast<Session *>(con->get_priv());
if (!s) {
s = new Session;
s = new Session(cct);
con->set_priv(s->get());
s->con = con;
dout(10) << " new session " << s << " con=" << s->con << " addr=" << s->con->get_peer_addr() << dendl;

View File

@ -1152,7 +1152,8 @@ public:
Mutex received_map_lock;
epoch_t received_map_epoch; // largest epoch seen in MOSDMap from here
Session() :
Session(CephContext *cct) :
RefCountedObject(cct),
auid(-1), con(0),
session_dispatch_lock("Session::session_dispatch_lock"),
sent_epoch_lock("Session::sent_epoch_lock"), last_sent_epoch(0),