*** empty log message ***

git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@646 29311d96-e01e-0410-9327-a35deaab8ce9
This commit is contained in:
sage 2006-02-16 17:18:09 +00:00
parent 0cf77b524f
commit 157183d18f
10 changed files with 138 additions and 57 deletions

View File

@ -1,37 +1,37 @@
OSDI
- tune ebofs
- bc size
- max dirty
- idle timeout
-
- stability
- fix mds balancer
- tcpmessenger stats
- tcp recv throttling
- vary osd_maxthreads, [ ebofs, fakestore ], write_size
medium, large writes: ebofs 10% faster
small writes: fake 20% faster
- tune ebofs
- vary osd_maxthreads, [ ebofs, fakestore ], write_size
-> medium, large writes: ebofs 10% faster
-> small writes: fake 20% faster
- obfs?
- crush vs linear
/- crush vs linear
- makedirs vs ebo/fake.
- streaming small writes (mds log)
- osd write tests
- get obfs working
- ebofs threading?
- mds scaling
- shared /lib, /include .. verify fixed open() behavior works!
- client buffer cache!
- ld_preload?
- msg_addr_t class
client
@ -55,7 +55,8 @@ filer
mds
- implement/test truncate()
- chdir
- client handles for directories!
ebofs
- combine inodes into same blocks?

View File

@ -152,7 +152,7 @@ MDS::MDS(MDCluster *mdc, int whoami, Messenger *m) {
// log
string name;
name = "mds";
int w = MSG_ADDR_NUM(whoami);
int w = whoami;
if (w >= 1000) name += ('0' + ((w/1000)%10));
if (w >= 100) name += ('0' + ((w/100)%10));
if (w >= 10) name += ('0' + ((w/10)%10));

View File

@ -32,7 +32,7 @@ class MNSRegister : public Message {
public:
MNSRegister() {}
MNSRegister(long a, int r, int ti) :
MNSRegister(msg_addr_t a, int r, int ti) :
Message(MSG_NS_REGISTER) {
addr = a;
rank = r;
@ -41,7 +41,7 @@ class MNSRegister : public Message {
char *get_type_name() { return "NSReg"; }
int get_entity() { return addr; }
msg_addr_t get_entity() { return addr; }
int get_rank() { return rank; }
long get_tid() { return tid; }

View File

@ -50,7 +50,7 @@ using namespace __gnu_cxx;
// global queue.
map<int, FakeMessenger*> directory;
map<msg_addr_t, FakeMessenger*> directory;
hash_map<int, Logger*> loggers;
LogType fakemsg_logtype;
@ -169,7 +169,7 @@ int fakemessenger_do_loop_2()
lock.Lock();
// messages
map<int, FakeMessenger*>::iterator it = directory.begin();
map<msg_addr_t, FakeMessenger*>::iterator it = directory.begin();
while (it != directory.end()) {
dout(18) << "messenger " << it->second << " at " << MSG_ADDR_NICE(it->first) << " has " << it->second->num_incoming() << " queued" << endl;
@ -236,28 +236,28 @@ int fakemessenger_do_loop_2()
}
FakeMessenger::FakeMessenger(long me) : Messenger(me)
FakeMessenger::FakeMessenger(msg_addr_t me) : Messenger(me)
{
whoami = me;
myaddr = me;
lock.Lock();
directory[ whoami ] = this;
directory[ myaddr ] = this;
lock.Unlock();
cout << "fakemessenger " << whoami << " messenger is " << this << endl;
cout << "fakemessenger " << myaddr << " messenger is " << this << endl;
g_timer.set_messenger(this);
/*
string name;
name = "m.";
name += MSG_ADDR_TYPE(whoami);
int w = MSG_ADDR_NUM(whoami);
name += MSG_ADDR_TYPE(myaddr);
int w = MSG_ADDR_NUM(myaddr);
if (w >= 1000) name += ('0' + ((w/1000)%10));
if (w >= 100) name += ('0' + ((w/100)%10));
if (w >= 10) name += ('0' + ((w/10)%10));
name += ('0' + ((w/1)%10));
loggers[ whoami ] = new Logger(name, (LogType*)&fakemsg_logtype);
loggers[ myaddr ] = new Logger(name, (LogType*)&fakemsg_logtype);
*/
}
@ -271,11 +271,11 @@ int FakeMessenger::shutdown()
{
//cout << "shutdown on messenger " << this << " has " << num_incoming() << " queued" << endl;
lock.Lock();
assert(directory.count(whoami) == 1);
assert(directory.count(myaddr) == 1);
shutdown_set.insert(this);
/*
directory.erase(whoami);
directory.erase(myaddr);
if (directory.empty()) {
dout(1) << "fakemessenger: last shutdown" << endl;
::shutdown = true;
@ -284,9 +284,9 @@ int FakeMessenger::shutdown()
*/
/*
if (loggers[whoami]) {
delete loggers[whoami];
loggers.erase(whoami);
if (loggers[myaddr]) {
delete loggers[myaddr];
loggers.erase(myaddr);
}
*/
@ -307,7 +307,7 @@ void FakeMessenger::trigger_timer(Timer *t)
int FakeMessenger::send_message(Message *m, msg_addr_t dest, int port, int fromport)
{
m->set_source(whoami, fromport);
m->set_source(myaddr, fromport);
m->set_dest(dest, port);
m->set_lamport_stamp( get_lamport() );
@ -317,12 +317,12 @@ int FakeMessenger::send_message(Message *m, msg_addr_t dest, int port, int fromp
try {
#ifdef LOG_MESSAGES
// stats
loggers[whoami]->inc("+send",1);
loggers[myaddr]->inc("+send",1);
loggers[dest]->inc("-recv",1);
char s[20];
sprintf(s,"+%s", m->get_type_name());
loggers[whoami]->inc(s);
loggers[myaddr]->inc(s);
sprintf(s,"-%s", m->get_type_name());
loggers[dest]->inc(s);
#endif

View File

@ -33,14 +33,14 @@ class Timer;
class FakeMessenger : public Messenger {
protected:
int whoami;
msg_addr_t myaddr;
class Logger *logger;
list<Message*> incoming; // incoming queue
public:
FakeMessenger(long me);
FakeMessenger(msg_addr_t me);
~FakeMessenger();
virtual int shutdown();

View File

@ -147,13 +147,21 @@
// use fixed offsets and static entity -> logical addr mapping!
#define MSG_ADDR_RANK_BASE 0x10000000 // per-rank messenger services
#define MSG_ADDR_DIRECTORY_BASE 0
#define MSG_ADDR_RANK_BASE 0x10000000 // per-rank messenger services
#define MSG_ADDR_MDS_BASE 0x20000000
#define MSG_ADDR_OSD_BASE 0x30000000
#define MSG_ADDR_CLIENT_BASE 0x40000000
#define MSG_ADDR_TYPE_MASK 0xf0000000
#define MSG_ADDR_NUM_MASK 0x0fffffff
#define MSG_ADDR_NEW 0x0fffffff
#define MSG_ADDR_UNDEF_BASE 0xffffffff
/* old int way, which lacked type safety...
typedef int msg_addr_t;
#define MSG_ADDR_RANK(x) (MSG_ADDR_RANK_BASE + (x))
#define MSG_ADDR_MDS(x) (MSG_ADDR_MDS_BASE + (x))
@ -174,6 +182,77 @@
((x) == MSG_ADDR_DIRECTORY ? "namer":"unknown")))))
#define MSG_ADDR_NUM(x) ((x) & MSG_ADDR_NUM_MASK)
#define MSG_ADDR_NICE(x) MSG_ADDR_TYPE(x) << MSG_ADDR_NUM(x)
*/
// new typed msg_addr_t way!
class msg_addr_t {
public:
int _addr;
msg_addr_t() : _addr(MSG_ADDR_UNDEF_BASE) {}
msg_addr_t(int t, int n) : _addr(t | n) {}
int num() const { return _addr & MSG_ADDR_NUM_MASK; }
int type() const { return _addr & MSG_ADDR_TYPE_MASK; }
const char *type_str() const {
switch (type()) {
case MSG_ADDR_RANK_BASE: return "rank";
case MSG_ADDR_MDS_BASE: return "mds";
case MSG_ADDR_OSD_BASE: return "osd";
case MSG_ADDR_CLIENT_BASE: return "client";
case MSG_ADDR_DIRECTORY_BASE: return "namer";
}
return "unknown";
}
bool is_new() const { return num() == MSG_ADDR_NEW; }
bool is_client() const { return type() == MSG_ADDR_CLIENT_BASE; }
bool is_mds() const { return type() == MSG_ADDR_MDS_BASE; }
bool is_osd() const { return type() == MSG_ADDR_OSD_BASE; }
bool is_namer() const { return type() == MSG_ADDR_DIRECTORY_BASE; }
};
inline bool operator== (const msg_addr_t& l, const msg_addr_t& r) { return l._addr == r._addr; }
inline bool operator!= (const msg_addr_t& l, const msg_addr_t& r) { return l._addr != r._addr; }
inline bool operator< (const msg_addr_t& l, const msg_addr_t& r) { return l._addr < r._addr; }
//typedef struct msg_addr msg_addr_t;
inline ostream& operator<<(ostream& out, const msg_addr_t& addr) {
if (addr.is_namer()) return out << "namer";
return out << addr.type_str() << addr.num();
}
namespace __gnu_cxx {
template<> struct hash< msg_addr_t >
{
size_t operator()( const msg_addr_t m ) const
{
static hash<int> H;
return H(m._addr);
}
};
}
#define MSG_ADDR_RANK(x) msg_addr_t(MSG_ADDR_RANK_BASE,x)
#define MSG_ADDR_MDS(x) msg_addr_t(MSG_ADDR_MDS_BASE,x)
#define MSG_ADDR_OSD(x) msg_addr_t(MSG_ADDR_OSD_BASE,x)
#define MSG_ADDR_CLIENT(x) msg_addr_t(MSG_ADDR_CLIENT_BASE,x)
#define MSG_ADDR_UNDEF msg_addr_t()
#define MSG_ADDR_DIRECTORY msg_addr_t(MSG_ADDR_DIRECTORY_BASE,0)
#define MSG_ADDR_RANK_NEW MSG_ADDR_RANK(MSG_ADDR_NEW)
#define MSG_ADDR_MDS_NEW MSG_ADDR_MDS(MSG_ADDR_NEW)
#define MSG_ADDR_OSD_NEW MSG_ADDR_OSD(MSG_ADDR_NEW)
#define MSG_ADDR_CLIENT_NEW MSG_ADDR_CLIENT(MSG_ADDR_NEW)
#define MSG_ADDR_ISCLIENT(x) x.is_client()
#define MSG_ADDR_TYPE(x) x.type_str()
#define MSG_ADDR_NUM(x) x.num()
#define MSG_ADDR_NICE(x) x.type_str() << x.num()
#include <stdlib.h>
@ -188,7 +267,8 @@ using namespace __gnu_cxx;
// abstract Message class
typedef int msg_addr_t;
typedef struct {
int type;
@ -217,13 +297,13 @@ public:
public:
Message() : tcp_sd(0) {
env.source_port = env.dest_port = -1;
env.source = env.dest = -1;
env.source = env.dest = MSG_ADDR_UNDEF;
env.nchunks = 0;
env.lamport_stamp = 0;
};
Message(int t) : tcp_sd(0) {
env.source_port = env.dest_port = -1;
env.source = env.dest = -1;
env.source = env.dest = MSG_ADDR_UNDEF;
env.nchunks = 0;
env.type = t;
env.lamport_stamp = 0;
@ -264,11 +344,11 @@ public:
virtual char *get_type_name() = 0;
// source/dest
msg_addr_t get_dest() { return env.dest; }
msg_addr_t& get_dest() { return env.dest; }
void set_dest(msg_addr_t a, int p) { env.dest = a; env.dest_port = p; }
int get_dest_port() { return env.dest_port; }
msg_addr_t get_source() { return env.source; }
msg_addr_t& get_source() { return env.source; }
void set_source(msg_addr_t a, int p) { env.source = a; env.source_port = p; }
int get_source_port() { return env.source_port; }

View File

@ -61,25 +61,25 @@ void TCPDirectory::handle_register(MNSRegister *m)
// pick id
int rank = m->get_rank();
int entity = m->get_entity();
msg_addr_t entity = m->get_entity();
if ((entity & MSG_ADDR_NUM_MASK) == MSG_ADDR_NEW) {
if (entity.is_new()) {
// make up a new address!
switch (entity) {
switch (entity.type()) {
case MSG_ADDR_RANK_NEW: // stupid client should be able to figure this out
case MSG_ADDR_RANK_BASE: // stupid client should be able to figure this out
entity = MSG_ADDR_RANK(rank);
break;
case MSG_ADDR_MDS_NEW:
case MSG_ADDR_MDS_BASE:
entity = MSG_ADDR_MDS(nmds++);
break;
case MSG_ADDR_OSD_NEW:
case MSG_ADDR_OSD_BASE:
entity = MSG_ADDR_OSD(nosd++);
break;
case MSG_ADDR_CLIENT_NEW:
case MSG_ADDR_CLIENT_BASE:
entity = MSG_ADDR_CLIENT(nclient++);
break;
@ -135,7 +135,7 @@ void TCPDirectory::handle_started(Message *m)
void TCPDirectory::handle_unregister(Message *m)
{
int who = m->get_source();
msg_addr_t who = m->get_source();
dout(2) << "unregister from entity " << MSG_ADDR_NICE(who) << endl;
assert(dir.count(who));

View File

@ -47,7 +47,7 @@ class TCPDirectory : public Dispatcher {
hash_map<int, tcpaddr_t> rank_addr; // rank -> ADDR (e.g. host:port)
__uint64_t version;
map<__uint64_t, int> update_log;
map<__uint64_t, msg_addr_t> update_log;
int nrank;
int nclient, nmds, nosd;

View File

@ -82,7 +82,7 @@ off_t stat_outq = 0, stat_outqb = 0;
// local directory
hash_map<int, TCPMessenger*> directory; // local
hash_map<msg_addr_t, TCPMessenger*> directory; // local
Mutex directory_lock;
// connecting
@ -751,14 +751,13 @@ void* tcp_outthread(void*)
void *tcp_inthread(void *r)
{
int sd = (int)r;
int who = -1;
dout(DBL) << "tcp_inthread reading on sd " << sd << " who is " << who << endl;
dout(DBL) << "tcp_inthread reading on sd " << sd << endl;
while (!tcp_done) {
Message *m = tcp_recv(sd);
if (!m) break;
who = m->get_source();
msg_addr_t who = m->get_source();
//dout(1) << "inthread got " << m << " from sd " << sd << " who is " << who << endl;
@ -882,7 +881,7 @@ void* tcp_dispatchthread(void*)
}
// ok
int dest = m->get_dest();
msg_addr_t dest = m->get_dest();
directory_lock.Lock();
if (directory.count(dest)) {
Messenger *who = directory[ dest ];
@ -1017,7 +1016,7 @@ msg_addr_t register_entity(msg_addr_t addr)
cond.Wait(lookup_lock);
// get result, clean up
int entity = waiting_for_register_result[id];
msg_addr_t entity = waiting_for_register_result[id];
waiting_for_register_result.erase(id);
waiting_for_register_cond.erase(id);

View File

@ -611,7 +611,7 @@ void OSD::update_map(bufferlist& state, bool mkfs)
dout(7) << "created " << *pg << endl;
pg_list.push_back(pgid);
}
}
}
}
} else {
@ -634,6 +634,7 @@ void OSD::update_map(bufferlist& state, bool mkfs)
it++) {
PGPeer *p = it->second;
//dout(7) << " " << *pg << " telling peer osd" << p->get_peer() << " they are complete" << endl;
messenger->send_message(new MOSDPGUpdate(osdmap->get_version(), pg->get_pgid(), true, osdmap->get_version()),
MSG_ADDR_OSD(p->get_peer()));
}