mon: system-wide log

Pretty rudimentary still.
This commit is contained in:
Sage Weil 2008-12-02 14:28:30 -08:00
parent 95d6947a1d
commit 566a955766
10 changed files with 499 additions and 0 deletions

View File

@ -196,6 +196,7 @@ libmon_a_SOURCES = \
mon/MDSMonitor.cc \
mon/ClientMonitor.cc \
mon/PGMonitor.cc \
mon/LogMonitor.cc \
mon/Elector.cc \
mon/MonitorStore.cc

50
src/include/LogEntry.h Normal file
View File

@ -0,0 +1,50 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#ifndef __CEPH_LOG_H
#define __CEPH_LOG_H
#include "include/types.h"
#include "include/encoding.h"
struct LogEntry {
entity_inst_t who;
utime_t stamp;
__u64 seq;
__u8 level;
string msg;
void encode(bufferlist& bl) const {
::encode(who, bl);
::encode(stamp, bl);
::encode(seq, bl);
::encode(level, bl);
::encode(msg, bl);
}
void decode(bufferlist::iterator& bl) {
::decode(who, bl);
::decode(stamp, bl);
::decode(seq, bl);
::decode(level, bl);
::decode(msg, bl);
}
};
WRITE_CLASS_ENCODER(LogEntry)
inline ostream& operator<<(ostream& out, const LogEntry& e)
{
return out << e.stamp << " " << e.who << " : " << e.seq << " : " << (int)e.level << " : " << e.msg;
}
#endif

48
src/messages/MLog.h Normal file
View File

@ -0,0 +1,48 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#ifndef __MLOG_H
#define __MLOG_H
#include "include/LogEntry.h"
class MLog : public Message {
public:
ceph_fsid fsid;
deque<LogEntry> entries;
version_t last;
MLog() : Message(MSG_PGSTATS) {}
MLog(ceph_fsid& f) :
Message(MSG_LOG), fsid(f) {}
const char *get_type_name() { return "log"; }
void print(ostream& out) {
out << "log";
}
void encode_payload() {
::encode(fsid, payload);
::encode(entries, payload);
::encode(last, payload);
}
void decode_payload() {
bufferlist::iterator p = payload.begin();
::decode(fsid, p);
::decode(entries, p);
::decode(last, p);
}
};
#endif

299
src/mon/LogMonitor.cc Normal file
View File

@ -0,0 +1,299 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#include "LogMonitor.h"
#include "Monitor.h"
#include "MonitorStore.h"
#include "messages/MMonCommand.h"
#include "messages/MLog.h"
#include "common/Timer.h"
#include "osd/osd_types.h"
#include "osd/PG.h" // yuck
#include "config.h"
#include <sstream>
#define DOUT_SUBSYS mon
#undef dout_prefix
#define dout_prefix _prefix(mon, paxos->get_version())
static ostream& _prefix(Monitor *mon, version_t v) {
return *_dout << dbeginl
<< "mon" << mon->whoami
<< (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)")))
<< ".log v" << v << " ";
}
ostream& operator<<(ostream& out, LogMonitor& pm)
{
std::stringstream ss;
/*
for (hash_map<int,int>::iterator p = pm.pg_map.num_pg_by_state.begin();
p != pm.pg_map.num_pg_by_state.end();
++p) {
if (p != pm.pg_map.num_pg_by_state.begin())
ss << ", ";
ss << p->second << " " << pg_state_string(p->first);
}
string states = ss.str();
return out << "v" << pm.pg_map.version << ": "
<< pm.pg_map.pg_stat.size() << " pgs: "
<< states << "; "
<< kb_t(pm.pg_map.total_pg_kb()) << " data, "
<< kb_t(pm.pg_map.total_used_kb()) << " used, "
<< kb_t(pm.pg_map.total_avail_kb()) << " / "
<< kb_t(pm.pg_map.total_kb()) << " free";
*/
return out << "log";
}
/*
Tick function to update the map based on performance every N seconds
*/
void LogMonitor::tick()
{
if (!paxos->is_active()) return;
update_from_paxos();
dout(10) << *this << dendl;
if (!mon->is_leader()) return;
}
void LogMonitor::create_initial()
{
dout(10) << "create_initial -- creating initial map" << dendl;
LogEntry e;
memset(&e.who, 0, sizeof(e.who));
e.stamp = g_clock.now();
e.level = 0;
e.msg = "mkfs";
e.seq = 0;
stringstream ss;
ss << e;
string s;
getline(ss, s);
pending_inc.append(s);
pending_inc.append("\n");
}
bool LogMonitor::update_from_paxos()
{
return true;
}
void LogMonitor::create_pending()
{
pending_inc.clear();
dout(10) << "create_pending v " << (paxos->get_version() + 1) << dendl;
}
void LogMonitor::encode_pending(bufferlist &bl)
{
dout(10) << "encode_pending v " << (paxos->get_version() + 1) << dendl;
bl = pending_inc;
}
bool LogMonitor::preprocess_query(Message *m)
{
dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
switch (m->get_type()) {
case MSG_MON_COMMAND:
return preprocess_command((MMonCommand*)m);
case MSG_LOG:
return preprocess_log((MLog*)m);
default:
assert(0);
delete m;
return true;
}
}
bool LogMonitor::prepare_update(Message *m)
{
dout(10) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl;
switch (m->get_type()) {
case MSG_MON_COMMAND:
return prepare_command((MMonCommand*)m);
case MSG_LOG:
return prepare_log((MLog*)m);
default:
assert(0);
delete m;
return false;
}
}
void LogMonitor::committed()
{
}
bool LogMonitor::preprocess_log(MLog *m)
{
return false;
}
bool LogMonitor::prepare_log(MLog *m)
{
dout(10) << "prepare_log " << *m << " from " << m->get_orig_source() << dendl;
if (!ceph_fsid_equal(&m->fsid, &mon->monmap->fsid)) {
dout(0) << "handle_log on fsid " << m->fsid << " != " << mon->monmap->fsid << dendl;
delete m;
return false;
}
for (deque<LogEntry>::iterator p = m->entries.begin();
p != m->entries.end();
p++) {
dout(10) << " logging " << *p << dendl;
stringstream ss;
ss << *p;
string s;
getline(ss, s);
pending_inc.append(s);
pending_inc.append("\n");
}
paxos->wait_for_commit(new C_Log(this, m, m->get_orig_source_inst()));
return true;
}
void LogMonitor::_updated_log(MLog *ack, entity_inst_t who)
{
dout(7) << "_updated_log for " << who << dendl;
mon->messenger->send_message(ack, who);
}
bool LogMonitor::preprocess_command(MMonCommand *m)
{
int r = -1;
bufferlist rdata;
stringstream ss;
/*
if (m->cmd.size() > 1) {
if (m->cmd[1] == "stat") {
ss << *this;
r = 0;
}
else if (m->cmd[1] == "getmap") {
pg_map.encode(rdata);
ss << "got pgmap version " << pg_map.version;
r = 0;
}
else if (m->cmd[1] == "send_pg_creates") {
send_pg_creates();
ss << "sent pg creates ";
r = 0;
}
else if (m->cmd[1] == "dump") {
ss << "version " << pg_map.version << std::endl;
ss << "last_osdmap_epoch " << pg_map.last_osdmap_epoch << std::endl;
ss << "last_pg_scan " << pg_map.last_pg_scan << std::endl;
ss << "pg_stat\tobjects\tkb\tbytes\tv\treported\tstate\tosds" << std::endl;
for (set<pg_t>::iterator p = pg_map.pg_set.begin();
p != pg_map.pg_set.end();
p++) {
pg_stat_t &st = pg_map.pg_stat[*p];
ss << *p
<< "\t" << st.num_objects
<< "\t" << st.num_kb
<< "\t" << st.num_bytes
<< "\t" << pg_state_string(st.state)
<< "\t" << st.version
<< "\t" << st.reported
<< "\t" << st.acting
<< std::endl;
}
ss << "osdstat\tobject\tkbused\tkbavail\tkb\thb in\thb out" << std::endl;
for (hash_map<int,osd_stat_t>::iterator p = pg_map.osd_stat.begin();
p != pg_map.osd_stat.end();
p++)
ss << p->first
<< "\t" << p->second.num_objects
<< "\t" << p->second.kb_used
<< "\t" << p->second.kb_avail
<< "\t" << p->second.kb
<< "\t" << p->second.hb_in
<< "\t" << p->second.hb_out
<< std::endl;
while (!ss.eof()) {
string s;
getline(ss, s);
rdata.append(s.c_str(), s.length());
rdata.append("\n", 1);
}
ss << "ok";
r = 0;
}
else if (m->cmd[1] == "scrub" && m->cmd.size() == 3) {
pg_t pgid;
r = -EINVAL;
if (pgid.parse(m->cmd[2].c_str())) {
if (mon->pgmon->pg_map.pg_stat.count(pgid)) {
if (mon->pgmon->pg_map.pg_stat[pgid].acting.size()) {
int osd = mon->pgmon->pg_map.pg_stat[pgid].acting[0];
if (mon->osdmon->osdmap.is_up(osd)) {
vector<pg_t> pgs(1);
pgs[0] = pgid;
mon->messenger->send_message(new MOSDScrub(mon->monmap->fsid, pgs),
mon->osdmon->osdmap.get_inst(osd));
ss << "instructing pg " << pgid << " on osd" << osd << " to scrub";
r = 0;
} else
ss << "pg " << pgid << " primary osd" << osd << " not up";
} else
ss << "pg " << pgid << " has no primary osd";
} else
ss << "pg " << pgid << " dne";
} else
ss << "invalid pgid '" << m->cmd[2] << "'";
}
}
*/
if (r != -1) {
string rs;
getline(ss, rs);
mon->reply_command(m, r, rs, rdata);
return true;
} else
return false;
}
bool LogMonitor::prepare_command(MMonCommand *m)
{
stringstream ss;
string rs;
int err = -EINVAL;
// nothing here yet
ss << "unrecognized command";
getline(ss, rs);
mon->reply_command(m, err, rs);
return false;
}

68
src/mon/LogMonitor.h Normal file
View File

@ -0,0 +1,68 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#ifndef __LOGMONITOR_H
#define __LOGMONITOR_H
#include <map>
#include <set>
using namespace std;
#include "include/types.h"
#include "msg/Messenger.h"
#include "PaxosService.h"
#include "include/LogEntry.h"
class MMonCommand;
class MLog;
class LogMonitor : public PaxosService {
private:
bufferlist pending_inc;
void create_initial();
bool update_from_paxos();
void create_pending(); // prepare a new pending
void encode_pending(bufferlist &bl); // propose pending update to peers
void committed();
bool preprocess_query(Message *m); // true if processed.
bool prepare_update(Message *m);
bool preprocess_log(MLog *m);
bool prepare_log(MLog *m);
void _updated_log(MLog *m, entity_inst_t who);
struct C_Log : public Context {
LogMonitor *logmon;
MLog *ack;
entity_inst_t who;
C_Log(LogMonitor *p, MLog *a, entity_inst_t w) : logmon(p), ack(a), who(w) {}
void finish(int r) {
logmon->_updated_log(ack, who);
}
};
bool preprocess_command(MMonCommand *m);
bool prepare_command(MMonCommand *m);
public:
LogMonitor(Monitor *mn, Paxos *p) : PaxosService(mn, p) { }
void tick(); // check state, take actions
};
#endif

View File

@ -38,6 +38,7 @@
#include "MDSMonitor.h"
#include "ClientMonitor.h"
#include "PGMonitor.h"
#include "LogMonitor.h"
#include "config.h"
@ -75,6 +76,7 @@ Monitor::Monitor(int w, MonitorStore *s, Messenger *m, MonMap *map) :
paxos_osdmap(this, w, PAXOS_OSDMAP),
paxos_clientmap(this, w, PAXOS_CLIENTMAP),
paxos_pgmap(this, w, PAXOS_PGMAP),
paxos_log(this, w, PAXOS_LOG),
osdmon(0), mdsmon(0), clientmon(0)
{
@ -82,6 +84,7 @@ Monitor::Monitor(int w, MonitorStore *s, Messenger *m, MonMap *map) :
mdsmon = new MDSMonitor(this, &paxos_mdsmap);
clientmon = new ClientMonitor(this, &paxos_clientmap);
pgmon = new PGMonitor(this, &paxos_pgmap);
logmon = new LogMonitor(this, &paxos_log);
}
Monitor::~Monitor()
@ -90,6 +93,7 @@ Monitor::~Monitor()
delete mdsmon;
delete clientmon;
delete pgmon;
delete logmon;
if (messenger)
messenger->destroy();
}
@ -137,6 +141,7 @@ void Monitor::shutdown()
mdsmon->shutdown();
clientmon->shutdown();
pgmon->shutdown();
logmon->shutdown();
// cancel all events
cancel_tick();
@ -160,11 +165,13 @@ void Monitor::call_election()
paxos_osdmap.election_starting();
paxos_clientmap.election_starting();
paxos_pgmap.election_starting();
paxos_log.election_starting();
mdsmon->election_starting();
osdmon->election_starting();
clientmon->election_starting();
pgmon->election_starting();
logmon->election_starting();
// call a new election
elector.call_election();
@ -183,12 +190,14 @@ void Monitor::win_election(epoch_t epoch, set<int>& active)
paxos_osdmap.leader_init();
paxos_clientmap.leader_init();
paxos_pgmap.leader_init();
paxos_log.leader_init();
// init
pgmon->election_finished(); // hack: before osdmon, for osd->pg kick works ok
osdmon->election_finished();
mdsmon->election_finished();
clientmon->election_finished();
logmon->election_finished();
}
void Monitor::lose_election(epoch_t epoch, set<int> &q, int l)
@ -205,12 +214,14 @@ void Monitor::lose_election(epoch_t epoch, set<int> &q, int l)
paxos_osdmap.peon_init();
paxos_clientmap.peon_init();
paxos_pgmap.peon_init();
paxos_log.peon_init();
// init
osdmon->election_finished();
mdsmon->election_finished();
clientmon->election_finished();
pgmon->election_finished();
logmon->election_finished();
}
void Monitor::handle_command(MMonCommand *m)
@ -373,6 +384,10 @@ void Monitor::dispatch(Message *m)
pgmon->dispatch(m);
break;
// log
case MSG_LOG:
logmon->dispatch(m);
break;
// paxos
case MSG_MON_PAXOS:
@ -401,6 +416,9 @@ void Monitor::dispatch(Message *m)
case PAXOS_PGMAP:
paxos_pgmap.dispatch(m);
break;
case PAXOS_LOG:
paxos_log.dispatch(m);
break;
default:
assert(0);
}
@ -502,6 +520,7 @@ void Monitor::tick()
mdsmon->tick();
clientmon->tick();
pgmon->tick();
logmon->tick();
// next tick!
reset_tick();
@ -541,6 +560,7 @@ int Monitor::mkfs()
services.push_back(mdsmon);
services.push_back(clientmon);
services.push_back(pgmon);
services.push_back(logmon);
for (list<PaxosService*>::iterator p = services.begin();
p != services.end();
p++) {

View File

@ -38,6 +38,7 @@ class OSDMonitor;
class MDSMonitor;
class ClientMonitor;
class PGMonitor;
class LogMonitor;
class MMonGetMap;
@ -104,6 +105,7 @@ public:
Paxos paxos_osdmap;
Paxos paxos_clientmap;
Paxos paxos_pgmap;
Paxos paxos_log;
friend class Paxos;
@ -112,11 +114,13 @@ public:
MDSMonitor *mdsmon;
ClientMonitor *clientmon;
PGMonitor *pgmon;
LogMonitor *logmon;
friend class OSDMonitor;
friend class MDSMonitor;
friend class ClientMonitor;
friend class PGMonitor;
friend class LogMonitor;
// messages

View File

@ -20,6 +20,7 @@
#define PAXOS_OSDMAP 2
#define PAXOS_CLIENTMAP 3
#define PAXOS_PGMAP 4
#define PAXOS_LOG 5
inline const char *get_paxos_name(int p) {
switch (p) {
@ -28,6 +29,7 @@ inline const char *get_paxos_name(int p) {
case PAXOS_OSDMAP: return "osdmap";
case PAXOS_CLIENTMAP: return "clientmap";
case PAXOS_PGMAP: return "pgmap";
case PAXOS_LOG: return "log";
default: assert(0); return 0;
}
}

View File

@ -21,6 +21,7 @@ using namespace std;
#include "messages/MMonPaxos.h"
#include "messages/MMonElection.h"
#include "messages/MLog.h"
#include "messages/MPing.h"
@ -164,6 +165,10 @@ Message *decode_message(ceph_msg_header& header, ceph_msg_footer& footer,
m = new MMonElection;
break;
case MSG_LOG:
m = new MLog;
break;
case CEPH_MSG_PING:
m = new MPing();
break;

View File

@ -26,6 +26,8 @@
#define MSG_MON_COMMAND 50
#define MSG_MON_COMMAND_ACK 51
#define MSG_LOG 52
// osd internal
#define MSG_OSD_PING 70
#define MSG_OSD_BOOT 71