*** empty log message ***

git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@355 29311d96-e01e-0410-9327-a35deaab8ce9
This commit is contained in:
sage 2005-06-28 16:28:24 +00:00
parent e794904198
commit 0e65074698
11 changed files with 63 additions and 525 deletions

View File

@ -3,17 +3,27 @@
#include "include/types.h"
#include <ext/hash_map>
#include <string>
#include <fstream>
using namespace std;
#include <ext/hash_map>
using namespace __gnu_cxx;
// for const char* comparisons
struct ltstr
{
bool operator()(const char* s1, const char* s2) const
{
return strcmp(s1, s2) < 0;
}
};
class LogType {
protected:
vector<string> keys;
vector<string> inc_keys;
vector<string> set_keys;
set<const char*, ltstr> keyset;
vector<const char*> keys;
vector<const char*> inc_keys;
vector<const char*> set_keys;
int version;
@ -23,35 +33,22 @@ class LogType {
LogType() {
version = 1;
}
void add_inc(char *s) {
string name = s;
add_inc(name);
}
void add_inc(string& key) {
void add_inc(const char* key) {
if (have_key(key)) return;
keys.push_back(key);
keyset.insert(key);
inc_keys.push_back(key);
version++;
}
void add_set(char *s) {
string name = s;
add_set(name);
}
void add_set(string& key){
void add_set(const char* key){
if (have_key(key)) return;
keys.push_back(key);
keyset.insert(key);
set_keys.push_back(key);
version++;
}
bool have_key(char *s) {
string n = s;
return have_key(n);
}
bool have_key(string& key) {
for (vector<string>::iterator it = keys.begin(); it != keys.end(); it++) {
if (*it == key) return true;
}
return false;
bool have_key(const char* key) {
return keyset.count(key) ? true:false;
}
};

View File

@ -34,13 +34,9 @@ Logger::~Logger()
out.close();
}
long Logger::inc(char *s, long v)
{
string key = s;
return inc(key,v);
}
long Logger::inc(string& key, long v)
long Logger::inc(const char *key, long v)
{
if (!g_conf.log) return 0;
lock.Lock();
if (!type->have_key(key))
type->add_inc(key);
@ -51,13 +47,9 @@ long Logger::inc(string& key, long v)
return r;
}
long Logger::set(char *s, long v)
{
string key = s;
return set(key,v);
}
long Logger::set(string& key, long v)
long Logger::set(const char *key, long v)
{
if (!g_conf.log) return 0;
lock.Lock();
if (!type->have_key(key))
type->add_set(key);
@ -69,13 +61,9 @@ long Logger::set(string& key, long v)
return r;
}
long Logger::get(char *s)
{
string key = s;
return get(key);
}
long Logger::get(string& key)
long Logger::get(const char* key)
{
if (!g_conf.log) return 0;
lock.Lock();
long r = vals[key];
lock.Unlock();
@ -84,6 +72,7 @@ long Logger::get(string& key)
void Logger::flush(bool force)
{
if (!g_conf.log) return;
lock.Lock();
timepair_t now = g_clock.gettimepair();
@ -107,7 +96,7 @@ void Logger::flush(bool force)
if (wrote_header != type->version ||
wrote_header_last > 10) {
out << "#";
for (vector<string>::iterator it = type->keys.begin(); it != type->keys.end(); it++) {
for (vector<const char*>::iterator it = type->keys.begin(); it != type->keys.end(); it++) {
out << "\t" << *it;
}
out << endl;
@ -117,13 +106,13 @@ void Logger::flush(bool force)
// write line to log
out << last_logged;
for (vector<string>::iterator it = type->keys.begin(); it != type->keys.end(); it++) {
for (vector<const char*>::iterator it = type->keys.begin(); it != type->keys.end(); it++) {
out << "\t" << get(*it);
}
out << endl;
// reset the counters
for (vector<string>::iterator it = type->inc_keys.begin(); it != type->inc_keys.end(); it++)
for (vector<const char*>::iterator it = type->inc_keys.begin(); it != type->inc_keys.end(); it++)
this->vals[*it] = 0;
}

View File

@ -12,10 +12,21 @@ using namespace std;
#include <ext/hash_map>
using namespace __gnu_cxx;
class LogType;
#include "LogType.h"
struct eqstr
{
bool operator()(const char* s1, const char* s2) const
{
return strcmp(s1, s2) == 0;
}
};
class Logger {
protected:
hash_map<string, long> vals;
hash_map<const char*, long, hash<const char*>, eqstr> vals;
Mutex lock;
LogType *type;
@ -34,12 +45,9 @@ class Logger {
Logger(string fn, LogType *type);
~Logger();
long inc(char *s, long v = 1);
long inc(string& key, long v = 1);
long set(char *s, long v);
long set(string& key, long v);
long get(char *s);
long get(string& key);
long inc(const char *s, long v = 1);
long set(const char *s, long v);
long get(const char *s);
void flush(bool force = false);
};

View File

@ -130,8 +130,8 @@ void Timer::cancel_timer()
dout(10) << "setting thread_stop flag" << endl;
lock.Lock();
thread_stop = true;
lock.Unlock();
cond.Signal();
lock.Unlock();
dout(10) << "waiting for thread to finish" << endl;
void *ptr;

View File

@ -17,15 +17,15 @@
long buffer_total_alloc = 0;
//OSDFileLayout g_OSD_FileLayout( 1<<20, 1, 1<<20 ); // stripe files over whole objects
OSDFileLayout g_OSD_FileLayout( 1<<17, 4, 1<<20 ); // 128k stripes over sets of 4
OSDFileLayout g_OSD_FileLayout( 1<<20, 1, 1<<20 ); // stripe files over whole objects
//OSDFileLayout g_OSD_FileLayout( 1<<17, 4, 1<<20 ); // 128k stripes over sets of 4
// ??
OSDFileLayout g_OSD_MDDirLayout( 1<<14, 1<<2, 1<<19 );
// stripe mds log over 128 byte bits (see mds_log_pad_entry below to match!)
OSDFileLayout g_OSD_MDLogLayout( 1<<7, 32, 1<<20 );
//OSDFileLayout g_OSD_MDLogLayout( 1<<20, 1, 1<<20 );
md_config_t g_conf = {
@ -34,6 +34,7 @@ md_config_t g_conf = {
num_client: 1,
// profiling and debugging
log: true,
log_interval: 1,
log_name: (char*)0,
@ -146,6 +147,8 @@ void parse_config_options(int argc, char **argv,
else if (strcmp(argv[i], "--debug_buffer") == 0)
g_conf.debug_buffer = atoi(argv[++i]);
else if (strcmp(argv[i], "--log") == 0)
g_conf.log = atoi(argv[++i]);
else if (strcmp(argv[i], "--log_name") == 0)
g_conf.log_name = argv[++i];
else if (strcmp(argv[i], "--mds_cache_size") == 0)

View File

@ -12,6 +12,7 @@ struct md_config_t {
// profiling
bool log;
int log_interval;
char *log_name;

View File

@ -185,15 +185,17 @@ class LRU {
void lru_adjust() {
if (!lru_max) return;
int toplen = lru_top.get_length();
__uint32_t topwant = (__uint32_t)(lru_midpoint * (double)lru_max);
while (lru_top.get_length() > 0 &&
lru_top.get_length() > topwant) {
while (toplen > 0 &&
toplen > topwant) {
// remove from tail of top, stick at head of bot
// FIXME: this could be way more efficient by moving a whole chain of items.
LRUObject *o = lru_top.get_tail();
lru_top.remove(o);
lru_bot.insert_head(o);
toplen--;
}
}

View File

@ -15,12 +15,14 @@
#include <stdio.h>
#include <stdlib.h>
#include <map>
#include <ext/hash_map>
#include <cassert>
#include <iostream>
using namespace std;
#include <ext/hash_map>
using namespace __gnu_cxx;
#include "common/Cond.h"
#include "common/Mutex.h"

View File

@ -34,7 +34,7 @@ class FakeMessenger : public Messenger {
// -- incoming queue --
// (that nothing uses)
Message *get_message() {
if (incoming.size() > 0) {
if (!incoming.empty()) {
Message *m = incoming.front();
incoming.pop_front();
return m;

View File

@ -454,7 +454,7 @@ void* tcp_sendthread(void*)
outgoing_lock.Lock();
while (!outgoing.empty() || !tcp_done) {
while (outgoing.size()) {
while (!outgoing.empty()) {
Message *m = outgoing.front();
outgoing.pop_front();
tcp_send(m);

View File

@ -1,464 +0,0 @@
#include "include/types.h"
#include "OSD.h"
#include "OBFSStore.h"
#include "OSDCluster.h"
#include "mds/MDS.h"
#include "msg/Messenger.h"
#include "msg/Message.h"
#include "msg/HostMonitor.h"
#include "messages/MGenericMessage.h"
#include "messages/MPing.h"
#include "messages/MPingAck.h"
#include "messages/MOSDOp.h"
#include "messages/MOSDOpReply.h"
#include "messages/MOSDGetClusterAck.h"
#include "common/Logger.h"
#include "common/LogType.h"
#include "common/ThreadPool.h"
#include <iostream>
#include <cassert>
#include <errno.h>
#include <sys/stat.h>
#include "include/config.h"
#undef dout
#define dout(l) if (l<=g_conf.debug) cout << "osd" << whoami << " "
char *osd_base_path = "./osddata";
// cons/des
LogType osd_logtype;
OSD::OSD(int id, Messenger *m)
{
whoami = id;
messenger = m;
messenger->set_dispatcher(this);
osdcluster = 0;
// use fake store
store = new OBFSStore(whoami, "./param.in", NULL);
// monitor
char s[80];
sprintf(s, "osd%d", whoami);
string st = s;
monitor = new HostMonitor(m, st);
monitor->set_notify_port(MDS_PORT_OSDMON);
// hack
int i = whoami;
if (++i == g_conf.num_osd) i = 0;
monitor->get_hosts().insert(MSG_ADDR_OSD(i));
if (++i == g_conf.num_osd) i = 0;
monitor->get_hosts().insert(MSG_ADDR_OSD(i));
if (++i == g_conf.num_osd) i = 0;
monitor->get_hosts().insert(MSG_ADDR_OSD(i));
monitor->get_notify().insert(MSG_ADDR_MDS(0));
// log
char name[80];
sprintf(name, "osd%02d", whoami);
logger = new Logger(name, (LogType*)&osd_logtype);
// Thread pool
{
char name[80];
sprintf(name,"osd%d.threadpool", whoami);
threadpool = new ThreadPool<OSD, MOSDOp>(name, g_conf.osd_maxthreads, (void (*)(OSD*, MOSDOp*))doop, this);
}
}
OSD::~OSD()
{
if (osdcluster) { delete osdcluster; osdcluster = 0; }
if (monitor) { delete monitor; monitor = 0; }
if (messenger) { delete messenger; messenger = 0; }
if (logger) { delete logger; logger = 0; }
if (store) { delete store; store = 0; }
if (threadpool) { delete threadpool; threadpool = 0; }
}
int OSD::init()
{
osd_lock.Lock();
int r = store->init();
monitor->init();
osd_lock.Unlock();
return r;
}
int OSD::shutdown()
{
dout(1) << "shutdown" << endl;
// stop threads
delete threadpool;
threadpool = 0;
// shut everything else down
monitor->shutdown();
messenger->shutdown();
int r = store->finalize();
return r;
}
// dispatch
void OSD::dispatch(Message *m)
{
switch (m->get_type()) {
// host monitor
case MSG_PING_ACK:
case MSG_FAILURE_ACK:
monitor->proc_message(m);
break;
// osd
case MSG_SHUTDOWN:
shutdown();
delete m;
break;
case MSG_OSD_GETCLUSTERACK:
handle_getcluster_ack((MOSDGetClusterAck*)m);
break;
case MSG_PING:
// take note.
monitor->host_is_alive(m->get_source());
handle_ping((MPing*)m);
break;
case MSG_OSD_OP:
monitor->host_is_alive(m->get_source());
handle_op((MOSDOp*)m);
break;
default:
dout(1) << " got unknown message " << m->get_type() << endl;
}
}
void OSD::handle_ping(MPing *m)
{
// play dead?
if (whoami == 1) {
dout(7) << "playing dead" << endl;
} else {
dout(7) << "got ping, replying" << endl;
messenger->send_message(new MPingAck(m),
m->get_source(), m->get_source_port(), 0);
}
delete m;
}
void OSD::handle_getcluster_ack(MOSDGetClusterAck *m)
{
// SAB
osd_lock.Lock();
if (!osdcluster) osdcluster = new OSDCluster();
osdcluster->decode(m->get_osdcluster());
dout(7) << "got OSDCluster version " << osdcluster->get_version() << endl;
delete m;
// process waiters
list<MOSDOp*> waiting;
waiting.splice(waiting.begin(), waiting_for_osdcluster);
for (list<MOSDOp*>::iterator it = waiting.begin();
it != waiting.end();
it++) {
handle_op(*it);
}
// SAB
osd_lock.Unlock();
}
void OSD::handle_op(MOSDOp *op)
{
// starting up?
if (!osdcluster) {
// SAB
osd_lock.Lock();
dout(7) << "no OSDCluster, starting up" << endl;
if (waiting_for_osdcluster.empty())
messenger->send_message(new MGenericMessage(MSG_OSD_GETCLUSTER),
MSG_ADDR_MDS(0), MDS_PORT_MAIN);
waiting_for_osdcluster.push_back(op);
// SAB
osd_lock.Unlock();
return;
}
// check cluster version
if (op->get_ocv() > osdcluster->get_version()) {
// op's is newer
dout(7) << "op cluster " << op->get_ocv() << " > " << osdcluster->get_version() << endl;
// query MDS
dout(7) << "querying MDS" << endl;
messenger->send_message(new MGenericMessage(MSG_OSD_GETCLUSTER),
MSG_ADDR_MDS(0), MDS_PORT_MAIN);
assert(0);
// SAB
osd_lock.Lock();
waiting_for_osdcluster.push_back(op);
// SAB
osd_lock.Unlock();
return;
}
if (op->get_ocv() < osdcluster->get_version()) {
// op's is old
dout(7) << "op cluster " << op->get_ocv() << " > " << osdcluster->get_version() << endl;
}
// am i the right rg_role?
if (0) {
repgroup_t rg = op->get_rg();
if (op->get_rg_role() == 0) {
// PRIMARY
// verify that we are primary, or acting primary
int acting_primary = osdcluster->get_rg_acting_primary( op->get_rg() );
if (acting_primary != whoami) {
dout(7) << " acting primary is " << acting_primary << ", forwarding" << endl;
messenger->send_message(op, MSG_ADDR_OSD(acting_primary), 0);
logger->inc("fwd");
return;
}
} else {
// REPLICA
int my_role = osdcluster->get_rg_role(rg, whoami);
dout(7) << "rg " << rg << " my_role " << my_role << " wants " << op->get_rg_role() << endl;
if (my_role != op->get_rg_role()) {
assert(0);
}
}
}
queue_op(op);
// do_op(op);
}
void OSD::queue_op(MOSDOp *op) {
threadpool->put_op(op);
}
void OSD::do_op(MOSDOp *op)
{
// do the op
switch (op->get_op()) {
case OSD_OP_READ:
op_read(op);
break;
case OSD_OP_WRITE:
op_write(op);
break;
case OSD_OP_MKFS:
op_mkfs(op);
break;
case OSD_OP_DELETE:
op_delete(op);
break;
case OSD_OP_TRUNCATE:
op_truncate(op);
break;
case OSD_OP_STAT:
op_stat(op);
break;
default:
assert(0);
}
}
void OSD::op_read(MOSDOp *r)
{
// read into a buffer
bufferptr bptr = new buffer(r->get_length()); // prealloc space for entire read
long got = store->read(r->get_oid(),
r->get_length(), r->get_offset(),
bptr.c_str());
// set up reply
MOSDOpReply *reply = new MOSDOpReply(r, 0, osdcluster);
if (got >= 0) {
bptr.set_length(got); // properly size the buffer
// give it to the reply in a bufferlist
bufferlist bl;
bl.push_back( bptr );
reply->set_result(0);
reply->set_data(bl);
reply->set_length(got);
} else {
reply->set_result(got); // error
reply->set_length(0);
}
dout(10) << "read got " << got << " / " << r->get_length() << " bytes from " << r->get_oid() << endl;
logger->inc("rd");
if (got >= 0) logger->inc("rdb", got);
// send it
messenger->send_message(reply, r->get_asker());
delete r;
}
// -- osd_write
void OSD::op_write(MOSDOp *m)
{
// take buffers from the message
bufferlist bl;
bl.claim( m->get_data() );
// write out buffers
off_t off = m->get_offset();
for (list<bufferptr>::iterator it = bl.buffers().begin();
it != bl.buffers().end();
it++) {
int r = store->write(m->get_oid(),
(*it).length(), off,
(*it).c_str(),
g_conf.osd_fsync);
off += (*it).length();
if (r < 0) {
dout(1) << "write error on " << m->get_oid() << " r = " << r << endl;
assert(r >= 0);
}
}
// trucnate after?
/*
if (m->get_flags() & OSD_OP_FLAG_TRUNCATE) {
size_t at = m->get_offset() + m->get_length();
int r = store->truncate(m->get_oid(), at);
dout(7) << "truncating object after tail of write at " << at << ", r = " << r << endl;
}
*/
logger->inc("wr");
logger->inc("wrb", m->get_length());
// assume success. FIXME.
// reply
MOSDOpReply *reply = new MOSDOpReply(m, 0, osdcluster);
messenger->send_message(reply, m->get_asker());
delete m;
}
void OSD::op_mkfs(MOSDOp *op)
{
dout(3) << "MKFS" << endl;
{
int r = store->mkfs();
messenger->send_message(new MOSDOpReply(op, r, osdcluster), op->get_asker());
}
delete op;
}
void OSD::op_delete(MOSDOp *op)
{
int r = store->remove(op->get_oid());
dout(3) << "delete on " << op->get_oid() << " r = " << r << endl;
// "ack"
messenger->send_message(new MOSDOpReply(op, r, osdcluster), op->get_asker());
logger->inc("rm");
delete op;
}
void OSD::op_truncate(MOSDOp *op)
{
int r = store->truncate(op->get_oid(), op->get_offset());
dout(3) << "truncate on " << op->get_oid() << " at " << op->get_offset() << " r = " << r << endl;
// "ack"
messenger->send_message(new MOSDOpReply(op, r, osdcluster), op->get_asker());
logger->inc("trunc");
delete op;
}
void OSD::op_stat(MOSDOp *op)
{
struct stat st;
memset(&st, sizeof(st), 0);
int r = store->stat(op->get_oid(), &st);
dout(3) << "stat on " << op->get_oid() << " r = " << r << " size = " << st.st_size << endl;
MOSDOpReply *reply = new MOSDOpReply(op, r, osdcluster);
reply->set_object_size(st.st_size);
messenger->send_message(reply, op->get_asker());
logger->inc("stat");
delete op;
}
void doop(OSD *u, MOSDOp *p) {
u->do_op(p);
}