*** empty log message ***

git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@42 29311d96-e01e-0410-9327-a35deaab8ce9
This commit is contained in:
sage 2004-07-26 20:42:21 +00:00
parent 45eab891bd
commit b891cc1568
22 changed files with 217 additions and 39 deletions

View File

@ -3,6 +3,9 @@ CC=g++
CFLAGS=-D __gnu_cxx=std -g -I.
LIBS=
LEAKTRACER=
LEAKTRACER=/g/g13/weil2/src/LeakTracer/LeakTracer.o
SRCS=*.cc */*.cc
OBJS=Logger.o mds/MDBalancer.o mds/MDS.o mds/CDentry.o mds/CDir.o mds/CInode.o mds/MDCache.o mds/MDStore.o clock.o FakeMessenger.o mds/LogStream.o mds/MDLog.o osd/OSD.o client/Client.o mds/MDCluster.o
TARGETS=test import
@ -11,7 +14,8 @@ all: depend ${TARGETS}
test: test.cc pmds.o
${CC} ${CFLAGS} ${LIBS} pmds.o test.cc -o test
test ! -e leak.out || rm leak.out
${CC} ${CFLAGS} ${LIBS} pmds.o ${LEAKTRACER} test.cc -o test
import: pmds.o import.cc
${CC} ${CFLAGS} ${LIBS} pmds.o import.cc -o import

View File

@ -48,6 +48,13 @@ int Client::init()
int Client::shutdown()
{
messenger->shutdown();
cache_lru.lru_set_max(0);
trim_cache();
if (root) { // not in lru
delete root;
root = 0;
}
return 0;
}
@ -101,6 +108,9 @@ void Client::assim_reply(MClientReply *r)
for (set<int>::iterator it = r->trace[i]->dist.begin(); it != r->trace[i]->dist.end(); it++)
cur->dist.push_back(*it);
cur->isdir = r->trace[i]->inode.isdir;
// free c_inode_info
delete r->trace[i];
}
@ -110,8 +120,10 @@ void Client::assim_reply(MClientReply *r)
vector<c_inode_info*>::iterator it;
for (it = r->dir_contents->begin(); it != r->dir_contents->end(); it++) {
if (cur->lookup((*it)->ref_dn))
if (cur->lookup((*it)->ref_dn)) {
delete *it;
continue; // skip if we already have it
}
ClNode *n = new ClNode();
n->ino = (*it)->inode.ino;
@ -125,7 +137,13 @@ void Client::assim_reply(MClientReply *r)
if (debug > 3)
cout << "client got dir item " << (*it)->ref_dn << endl;
// free the c_inode_info
delete *it;
}
// free dir_contents vector
delete r->dir_contents;
}
cwd = cur;

View File

@ -12,4 +12,4 @@
//#define FAKE_CLOCK
#define NUMCLIENT 10
#define CLIENT_REQUESTS 100000
#define CLIENT_REQUESTS 1000

View File

@ -56,7 +56,7 @@ void CDir::remove_child(CDentry *d) {
}
CDentry* CDir::lookup(string n) {
CDentry* CDir::lookup(string& n) {
//cout << " lookup " << n << " in " << this << endl;
map<string,CDentry*>::iterator iter = items.find(n);
if (iter == items.end()) return NULL;

View File

@ -170,7 +170,7 @@ class CDir {
// manipulation
void add_child(CDentry *d);
void remove_child(CDentry *d);
CDentry* lookup(string n);
CDentry* lookup(string& n);
// debuggin
void dump(int d = 0);

View File

@ -105,7 +105,7 @@ void CInode::add_read_waiter(Context *c) {
}
void CInode::take_read_waiting(list<Context*>& ls)
{
if (waiting_for_write.size())
if (waiting_for_read.size())
put(CINODE_PIN_RWAIT);
ls.splice(ls.end(), waiting_for_read);
}
@ -182,6 +182,22 @@ void CInode::add_parent(CDentry *p) {
parents.push_back(p);
}
void CInode::remove_parent(CDentry *p) {
nparents--;
if (nparents == 0) { // first
assert(parent == p);
parent = 0;
}
else if (nparents == 1) { // second, switch back from the vector
parent = parents.front();
if (parent == p)
parent = parents.back();
assert(parent != p);
parents.clear();
} else {
assert(0); // implement me
}
}

View File

@ -166,7 +166,7 @@ class CInode : LRUObject {
// --- reference counting
void put(int by) {
if (ref > 0 || ref_set.count(by) != 1) {
if (ref == 0 || ref_set.count(by) != 1) {
cout << "bad put " << *this << " by " << by << " was " << ref << " (" << ref_set << ")" << endl;
assert(ref_set.count(by) == 1);
assert(ref > 0);
@ -180,7 +180,10 @@ class CInode : LRUObject {
void get(int by) {
if (ref == 0)
lru_pin();
assert(ref_set.count(by) == 0);
if (ref_set.count(by)) {
cout << "bad get " << *this << " by " << by << " was " << ref << " (" << ref_set << ")" << endl;
assert(ref_set.count(by) == 0);
}
ref++;
ref_set.insert(by);
cout << "get " << *this << " by " << by << " now " << ref << " (" << ref_set << ")" << endl;
@ -188,6 +191,7 @@ class CInode : LRUObject {
// --- hierarchy stuff
void add_parent(CDentry *p);
void remove_parent(CDentry *p);
mdloc_t get_mdloc() {
return inode.ino; // use inode #

View File

@ -30,7 +30,7 @@ class LogEvent {
serial_buf = 0;
}
~LogEvent() {
if (serial_buf) { delete serial_buf; serial_buf = 0; }
if (serial_buf) { delete[] serial_buf; serial_buf = 0; }
}
int get_type() { return type; }

View File

@ -27,7 +27,7 @@ class LogStream {
buf = 0;
}
~LogStream() {
if (buf) { delete buf; buf = 0; }
if (buf) { delete[] buf; buf = 0; }
}
off_t seek(off_t offset) {

View File

@ -121,6 +121,8 @@ void MDBalancer::handle_heartbeat(MHeartbeat *m)
if (mds_load.size() == cluster_size)
do_rebalance();
// done
delete m;
}

View File

@ -48,7 +48,12 @@ MDCache::~MDCache()
bool MDCache::shutdown()
{
//if (root) clear_dir(root);
if (root) {
inode_map.erase(root->inode.ino);
lru->lru_remove(root);
delete root;
root = 0;
}
}
@ -67,10 +72,13 @@ bool MDCache::remove_inode(CInode *o)
if (o->nparents == 1) {
CDentry *dn = o->parent;
dn->dir->remove_child(dn);
o->remove_parent(dn);
delete dn;
}
else if (o->nparents > 1) {
assert(o->nparents <= 1);
} else {
assert(o->parent == 0); // root?
}
// remove from map
@ -95,26 +103,52 @@ bool MDCache::trim(__int32_t max) {
// notify authority?
int auth = in->authority(mds->get_cluster());
if (auth != mds->get_nodeid()) {
cout << "mds" << mds->get_nodeid() << " sending inode_expire to mds" << auth << " on " << *in << endl;
mds->messenger->send_message(new MInodeExpire(in->inode.ino),
MSG_ADDR_MDS(auth), MDS_PORT_CACHE,
MDS_PORT_CACHE);
}
// dir incomplete!
in->parent->dir->state_clear(CDIR_MASK_COMPLETE);
CInode *idir = in->parent->dir->inode;
// remove it
cout << "mds" << mds->get_nodeid() << " trim deleting " << *in << " " << in << endl;
remove_inode(in);
delete in;
}
// dir incomplete!
idir->dir->state_clear(CDIR_MASK_COMPLETE);
if (imports.count(idir) && !idir->is_root()) {
// it's an import!
cout << "mds" << mds->get_nodeid() << " trimmed parent dir is an import; rexporting" << endl;
export_dir( idir, idir->authority(mds->get_cluster()) );
}
}
return true;
}
bool MDCache::shutdown_pass()
{
cout << "mds" << mds->get_nodeid() << " shutdown_pass" << endl;
assert(mds->is_shutting_down());
// make a pass on the cache
trim(1);
cout << "mds" << mds->get_nodeid() << " cache size now " << lru->lru_get_size() << endl;
if (lru->lru_get_size() <= 1) {
cout << "mds" << mds->get_nodeid() << " done, sending shutdown_finish" << endl;
mds->messenger->send_message(new Message(MSG_MDS_SHUTDOWNFINISH),
0, MDS_PORT_MAIN, MDS_PORT_MAIN);
return true;
}
return false;
}
CInode* MDCache::get_file(string& fn) {
int off = 1;
CInode *cur = root;
@ -152,6 +186,8 @@ int MDCache::link_inode( CInode *parent, string& dname, CInode *in )
return -ENOTDIR; // not a dir
}
assert(parent->dir->lookup(dname) == 0);
// create dentry
CDentry* dn = new CDentry(dname, in);
in->add_parent(dn);
@ -654,7 +690,7 @@ int MDCache::handle_discover(MDiscover *dis)
// link in
add_inode( in );
link_inode( cur, (*dis->want)[i], in );
cout << " adding " << *in << endl;
cout << " discover assimilating " << *in << endl;
}
cur->dir->take_waiting((*dis->want)[i],
@ -1164,6 +1200,8 @@ void MDCache::export_dir_frozen(CInode *in,
MSG_ADDR_MDS(dest), MDS_PORT_CACHE,
MDS_PORT_CACHE);
// queue finisher
in->dir->add_waiter( fin ); // is this right?
}
void MDCache::export_dir_walk(MExportDir *req,
@ -1464,9 +1502,9 @@ void MDCache::import_dir_block(pchar& p, CInode *containing_import)
// add
add_inode(in);
link_inode(idir, dname, in);
cout << " adding " << *in << endl;
cout << " import_dir_block adding " << *in << endl;
} else {
cout << " already had " << *in << endl;
cout << " import_dir_block already had " << *in << endl;
in->inode = istate->inode;
}

View File

@ -71,6 +71,7 @@ class MDCache {
lru->lru_set_max(max);
}
bool trim(__int32_t max = -1); // trim cache
bool shutdown_pass();
bool shutdown(); // clear cache (ie at shutodwn)

View File

@ -11,7 +11,7 @@
#include "include/Logger.h"
#include "include/Message.h"
LogType *mdlog_logtype = 0;
LogType mdlog_logtype;
// cons/des
@ -29,9 +29,6 @@ MDLog::MDLog(MDS *m)
mds->get_cluster()->get_log_osd(mds->get_nodeid()),
mds->get_cluster()->get_log_oid(mds->get_nodeid()));
if (!mdlog_logtype)
mdlog_logtype = new LogType();
string name;
name = "log.mds";
int w = mds->get_nodeid();
@ -39,7 +36,7 @@ MDLog::MDLog(MDS *m)
if (w >= 100) name += ('0' + ((w/100)%10));
if (w >= 10) name += ('0' + ((w/10)%10));
name += ('0' + ((w/1)%10));
logger = new Logger(name, mdlog_logtype);
logger = new Logger(name, (LogType*)&mdlog_logtype);
}
@ -47,6 +44,7 @@ MDLog::~MDLog()
{
if (reader) { delete reader; reader = 0; }
if (writer) { delete writer; writer = 0; }
if (logger) { delete logger; logger = 0; }
}

View File

@ -59,6 +59,8 @@ MDS::MDS(MDCluster *mdc, int whoami, Messenger *m) {
mdlog->set_max_events(100);
shutting_down = false;
stat_ops = 0;
last_heartbeat = 0;
osd_last_tid = 0;
@ -66,9 +68,10 @@ MDS::MDS(MDCluster *mdc, int whoami, Messenger *m) {
MDS::~MDS() {
if (mdcache) { delete mdcache; mdcache = NULL; }
if (mdstore) { delete mdstore; mdstore = NULL; }
if (messenger) { delete messenger; messenger = NULL; }
if (mdlog) { delete mdlog; mdlog = NULL; }
if (balancer) { delete balancer; balancer = NULL; }
if (messenger) { delete messenger; messenger = NULL; }
}
@ -78,7 +81,45 @@ int MDS::init()
messenger->init(this);
}
int MDS::shutdown()
int MDS::shutdown_start()
{
cout << "mds" << whoami << " shutdown_start" << endl;
shutting_down = true;
for (int i=0; i<mdcluster->get_num_mds(); i++) {
if (i == whoami) continue;
cout << "mds" << whoami << " sending MShutdownStart to mds" << i << endl;
messenger->send_message(new Message(MSG_MDS_SHUTDOWNSTART),
i, MDS_PORT_MAIN,
MDS_PORT_MAIN);
}
mdcache->shutdown_pass();
}
void MDS::handle_shutdown_start(Message *m)
{
cout << "mds" << whoami << " handle_shutdown_start" << endl;
shutting_down = true;
delete m;
}
void MDS::handle_shutdown_finish(Message *m)
{
cout << "mds" << whoami << " handle_shutdown_finish from " << m->get_source() << endl;
did_shut_down.insert(m->get_source());
cout << "mds" << whoami << " shut down so far: " << did_shut_down << endl;
delete m;
if (did_shut_down.size() == mdcluster->get_num_mds()) {
shutting_down = false;
}
}
int MDS::shutdown_final()
{
// shut down cache
mdcache->shutdown();
@ -117,6 +158,15 @@ void MDS::proc_message(Message *m)
handle_ping((MPing*)m);
break;
// MDS
case MSG_MDS_SHUTDOWNSTART:
handle_shutdown_start(m);
break;
case MSG_MDS_SHUTDOWNFINISH:
handle_shutdown_finish(m);
break;
// CLIENTS ===========
case MSG_CLIENT_REQUEST:
@ -182,6 +232,11 @@ void MDS::dispatch(Message *m)
balancer->send_heartbeat();
}
if (shutting_down) {
if (mdcache->shutdown_pass())
shutting_down = false;
}
}
@ -202,6 +257,12 @@ void MDS::handle_ping(MPing *m)
int MDS::handle_client_request(MClientRequest *req)
{
cout << "mds" << whoami << " req client" << req->client << '.' << req->tid << " op " << req->op << " on " << req->path << endl;
if (is_shutting_down()) {
cout << "mds" << whoami << " shutting down, discarding client request." << endl;
delete req;
return 0;
}
if (!mdcache->get_root()) {
cout << "mds" << whoami << " need open root" << endl;
@ -524,17 +585,18 @@ int MDS::osd_read_finish(Message *rawm)
if (p->buf) { // user buffer
memcpy(p->buf, m->buf, m->len); // copy
delete m->buf; // free message buf
delete[] m->buf; // free message buf
} else { // new buffer
*p->bufptr = m->buf; // steal message's buffer
}
m->buf = 0;
}
// del pendingOsdRead_t
delete p;
long result = m->len;
delete m;
delete m; // del message
if (c) {
c->finish(result);

View File

@ -5,6 +5,7 @@
#include <list>
#include <ext/hash_map>
#include <vector>
#include <set>
#include "include/types.h"
#include "include/Context.h"
@ -73,6 +74,9 @@ class MDS : public Dispatcher {
MDCluster *mdcluster;
bool shutting_down;
set<int> did_shut_down;
// import/export
list<CInode*> import_list;
list<CInode*> export_list;
@ -111,8 +115,11 @@ class MDS : public Dispatcher {
mds_load_t get_load();
bool is_shutting_down() { return shutting_down; }
int init();
int shutdown();
int shutdown_start();
int shutdown_final();
void proc_message(Message *m);
virtual void dispatch(Message *m);
@ -122,6 +129,8 @@ class MDS : public Dispatcher {
void handle_ping(class MPing *m);
void handle_shutdown_start(Message *m);
void handle_shutdown_finish(Message *m);
int handle_client_request(MClientRequest *m);

View File

@ -149,11 +149,15 @@ bool MDStore::fetch_dir_2( int result, char *buf, size_t buflen, inodeno_t ino)
}
}
// free buffer
delete[] buf;
// dir is now complete
idir->dir->state_set(CDIR_MASK_COMPLETE);
// trim cache?
mds->mdcache->trim();
// finish
list<Context*> finished;
idir->dir->take_waiting(finished);

View File

@ -75,7 +75,7 @@ class MDiscover : public Message {
return a;
}
string next_dentry() {
string& next_dentry() {
return (*want)[trace.size()];
}

View File

@ -19,7 +19,7 @@ using namespace std;
map<int, FakeMessenger*> directory;
hash_map<int, Logger*> loggers;
LogType *logtype;
LogType fakemsg_logtype;
// lame main looper
@ -73,10 +73,18 @@ FakeMessenger::FakeMessenger(long me)
if (w >= 100) name += ('0' + ((w/100)%10));
if (w >= 10) name += ('0' + ((w/10)%10));
name += ('0' + ((w/1)%10));
if (!logtype)
logtype = new LogType();
Logger *l = new Logger(name, logtype);
loggers[ whoami ] = l;
logger = new Logger(name, (LogType*)&fakemsg_logtype);
loggers[ whoami ] = logger;
}
FakeMessenger::~FakeMessenger()
{
cout << " FAKEMESSEGER DES" << endl;
shutdown();
cout << " logger is " << logger << endl;
delete logger;
}
@ -88,9 +96,11 @@ int FakeMessenger::init(Dispatcher *d)
int FakeMessenger::shutdown()
{
directory.erase(whoami);
remove_dispatcher();
}
bool FakeMessenger::send_message(Message *m, long dest, int port, int fromport)
{
m->set_source(whoami, fromport);

View File

@ -11,10 +11,13 @@
class FakeMessenger : public Messenger {
protected:
int whoami;
class Logger *logger;
public:
FakeMessenger(long me);
~FakeMessenger();
virtual int init(Dispatcher *dis);
virtual int shutdown();
virtual bool send_message(Message *m, long dest, int port=0, int fromport=0);

View File

@ -29,6 +29,9 @@
#define MSG_MDS_INODESYNCACK 161
#define MSG_MDS_INODESYNCRELEASE 162
#define MSG_MDS_SHUTDOWNSTART 900
#define MSG_MDS_SHUTDOWNFINISH 901
#define MSG_ADDR_MDS(x) (x)
#define MSG_ADDR_OSD(x) (0x800 + x)

View File

@ -170,6 +170,8 @@ void OSD::write(MOSDWrite *m)
cout << "sending reply" << endl;
messenger->send_message(reply, m->get_source(), m->get_source_port());
delete m->buf;
// free buffer
delete[] m->buf;
m->buf = 0;
}

View File

@ -61,10 +61,14 @@ int main(char **argv, int argc) {
// loop
fakemessenger_do_loop();
mds[0]->shutdown_start();
fakemessenger_do_loop();
// cleanup
cout << "cleanup" << endl;
for (int i=0; i<NUMMDS; i++) {
if (mds[i]->shutdown() == 0) {
if (mds[i]->shutdown_final() == 0) {
//cout << "clean shutdown of mds " << i << endl;
delete mds[i];
} else {