diff --git a/src/Makefile.am b/src/Makefile.am index 31bd0bcd62c..c613154fbd1 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -196,6 +196,7 @@ libmon_a_SOURCES = \ mon/MDSMonitor.cc \ mon/ClientMonitor.cc \ mon/PGMonitor.cc \ + mon/LogMonitor.cc \ mon/Elector.cc \ mon/MonitorStore.cc diff --git a/src/include/LogEntry.h b/src/include/LogEntry.h new file mode 100644 index 00000000000..8c5bb8ade7e --- /dev/null +++ b/src/include/LogEntry.h @@ -0,0 +1,50 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef __CEPH_LOG_H +#define __CEPH_LOG_H + +#include "include/types.h" +#include "include/encoding.h" + +struct LogEntry { + entity_inst_t who; + utime_t stamp; + __u64 seq; + __u8 level; + string msg; + + void encode(bufferlist& bl) const { + ::encode(who, bl); + ::encode(stamp, bl); + ::encode(seq, bl); + ::encode(level, bl); + ::encode(msg, bl); + } + void decode(bufferlist::iterator& bl) { + ::decode(who, bl); + ::decode(stamp, bl); + ::decode(seq, bl); + ::decode(level, bl); + ::decode(msg, bl); + } +}; +WRITE_CLASS_ENCODER(LogEntry) + +inline ostream& operator<<(ostream& out, const LogEntry& e) +{ + return out << e.stamp << " " << e.who << " : " << e.seq << " : " << (int)e.level << " : " << e.msg; +} + +#endif diff --git a/src/messages/MLog.h b/src/messages/MLog.h new file mode 100644 index 00000000000..e56342a2501 --- /dev/null +++ b/src/messages/MLog.h @@ -0,0 +1,48 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef __MLOG_H +#define __MLOG_H + +#include "include/LogEntry.h" + +class MLog : public Message { +public: + ceph_fsid fsid; + deque entries; + version_t last; + + MLog() : Message(MSG_PGSTATS) {} + MLog(ceph_fsid& f) : + Message(MSG_LOG), fsid(f) {} + + const char *get_type_name() { return "log"; } + void print(ostream& out) { + out << "log"; + } + + void encode_payload() { + ::encode(fsid, payload); + ::encode(entries, payload); + ::encode(last, payload); + } + void decode_payload() { + bufferlist::iterator p = payload.begin(); + ::decode(fsid, p); + ::decode(entries, p); + ::decode(last, p); + } +}; + +#endif diff --git a/src/mon/LogMonitor.cc b/src/mon/LogMonitor.cc new file mode 100644 index 00000000000..a07cb222676 --- /dev/null +++ b/src/mon/LogMonitor.cc @@ -0,0 +1,299 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#include "LogMonitor.h" +#include "Monitor.h" +#include "MonitorStore.h" + +#include "messages/MMonCommand.h" +#include "messages/MLog.h" + +#include "common/Timer.h" + +#include "osd/osd_types.h" +#include "osd/PG.h" // yuck + +#include "config.h" +#include + +#define DOUT_SUBSYS mon +#undef dout_prefix +#define dout_prefix _prefix(mon, paxos->get_version()) +static ostream& _prefix(Monitor *mon, version_t v) { + return *_dout << dbeginl + << "mon" << mon->whoami + << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) + << ".log v" << v << " "; +} + +ostream& operator<<(ostream& out, LogMonitor& pm) +{ + std::stringstream ss; + /* + for (hash_map::iterator p = pm.pg_map.num_pg_by_state.begin(); + p != pm.pg_map.num_pg_by_state.end(); + ++p) { + if (p != pm.pg_map.num_pg_by_state.begin()) + ss << ", "; + ss << p->second << " " << pg_state_string(p->first); + } + string states = ss.str(); + return out << "v" << pm.pg_map.version << ": " + << pm.pg_map.pg_stat.size() << " pgs: " + << states << "; " + << kb_t(pm.pg_map.total_pg_kb()) << " data, " + << kb_t(pm.pg_map.total_used_kb()) << " used, " + << kb_t(pm.pg_map.total_avail_kb()) << " / " + << kb_t(pm.pg_map.total_kb()) << " free"; + */ + return out << "log"; +} + +/* + Tick function to update the map based on performance every N seconds +*/ + +void LogMonitor::tick() +{ + if (!paxos->is_active()) return; + + update_from_paxos(); + dout(10) << *this << dendl; + + if (!mon->is_leader()) return; + +} + +void LogMonitor::create_initial() +{ + dout(10) << "create_initial -- creating initial map" << dendl; + LogEntry e; + memset(&e.who, 0, sizeof(e.who)); + e.stamp = g_clock.now(); + e.level = 0; + e.msg = "mkfs"; + e.seq = 0; + stringstream ss; + ss << e; + string s; + getline(ss, s); + pending_inc.append(s); + pending_inc.append("\n"); +} + +bool LogMonitor::update_from_paxos() +{ + return true; +} + +void LogMonitor::create_pending() +{ + pending_inc.clear(); + dout(10) << "create_pending v " << (paxos->get_version() + 1) << dendl; +} + +void LogMonitor::encode_pending(bufferlist &bl) +{ + dout(10) << "encode_pending v " << (paxos->get_version() + 1) << dendl; + bl = pending_inc; +} + +bool LogMonitor::preprocess_query(Message *m) +{ + dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl; + switch (m->get_type()) { + case MSG_MON_COMMAND: + return preprocess_command((MMonCommand*)m); + + case MSG_LOG: + return preprocess_log((MLog*)m); + + default: + assert(0); + delete m; + return true; + } +} + +bool LogMonitor::prepare_update(Message *m) +{ + dout(10) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl; + switch (m->get_type()) { + case MSG_MON_COMMAND: + return prepare_command((MMonCommand*)m); + case MSG_LOG: + return prepare_log((MLog*)m); + default: + assert(0); + delete m; + return false; + } +} + +void LogMonitor::committed() +{ + +} + +bool LogMonitor::preprocess_log(MLog *m) +{ + return false; +} + +bool LogMonitor::prepare_log(MLog *m) +{ + dout(10) << "prepare_log " << *m << " from " << m->get_orig_source() << dendl; + + if (!ceph_fsid_equal(&m->fsid, &mon->monmap->fsid)) { + dout(0) << "handle_log on fsid " << m->fsid << " != " << mon->monmap->fsid << dendl; + delete m; + return false; + } + + for (deque::iterator p = m->entries.begin(); + p != m->entries.end(); + p++) { + dout(10) << " logging " << *p << dendl; + stringstream ss; + ss << *p; + string s; + getline(ss, s); + pending_inc.append(s); + pending_inc.append("\n"); + } + + paxos->wait_for_commit(new C_Log(this, m, m->get_orig_source_inst())); + return true; +} + +void LogMonitor::_updated_log(MLog *ack, entity_inst_t who) +{ + dout(7) << "_updated_log for " << who << dendl; + mon->messenger->send_message(ack, who); +} + + + +bool LogMonitor::preprocess_command(MMonCommand *m) +{ + int r = -1; + bufferlist rdata; + stringstream ss; + + /* + if (m->cmd.size() > 1) { + if (m->cmd[1] == "stat") { + ss << *this; + r = 0; + } + else if (m->cmd[1] == "getmap") { + pg_map.encode(rdata); + ss << "got pgmap version " << pg_map.version; + r = 0; + } + else if (m->cmd[1] == "send_pg_creates") { + send_pg_creates(); + ss << "sent pg creates "; + r = 0; + } + else if (m->cmd[1] == "dump") { + ss << "version " << pg_map.version << std::endl; + ss << "last_osdmap_epoch " << pg_map.last_osdmap_epoch << std::endl; + ss << "last_pg_scan " << pg_map.last_pg_scan << std::endl; + ss << "pg_stat\tobjects\tkb\tbytes\tv\treported\tstate\tosds" << std::endl; + for (set::iterator p = pg_map.pg_set.begin(); + p != pg_map.pg_set.end(); + p++) { + pg_stat_t &st = pg_map.pg_stat[*p]; + ss << *p + << "\t" << st.num_objects + << "\t" << st.num_kb + << "\t" << st.num_bytes + << "\t" << pg_state_string(st.state) + << "\t" << st.version + << "\t" << st.reported + << "\t" << st.acting + << std::endl; + } + ss << "osdstat\tobject\tkbused\tkbavail\tkb\thb in\thb out" << std::endl; + for (hash_map::iterator p = pg_map.osd_stat.begin(); + p != pg_map.osd_stat.end(); + p++) + ss << p->first + << "\t" << p->second.num_objects + << "\t" << p->second.kb_used + << "\t" << p->second.kb_avail + << "\t" << p->second.kb + << "\t" << p->second.hb_in + << "\t" << p->second.hb_out + << std::endl; + while (!ss.eof()) { + string s; + getline(ss, s); + rdata.append(s.c_str(), s.length()); + rdata.append("\n", 1); + } + ss << "ok"; + r = 0; + } + else if (m->cmd[1] == "scrub" && m->cmd.size() == 3) { + pg_t pgid; + r = -EINVAL; + if (pgid.parse(m->cmd[2].c_str())) { + if (mon->pgmon->pg_map.pg_stat.count(pgid)) { + if (mon->pgmon->pg_map.pg_stat[pgid].acting.size()) { + int osd = mon->pgmon->pg_map.pg_stat[pgid].acting[0]; + if (mon->osdmon->osdmap.is_up(osd)) { + vector pgs(1); + pgs[0] = pgid; + mon->messenger->send_message(new MOSDScrub(mon->monmap->fsid, pgs), + mon->osdmon->osdmap.get_inst(osd)); + ss << "instructing pg " << pgid << " on osd" << osd << " to scrub"; + r = 0; + } else + ss << "pg " << pgid << " primary osd" << osd << " not up"; + } else + ss << "pg " << pgid << " has no primary osd"; + } else + ss << "pg " << pgid << " dne"; + } else + ss << "invalid pgid '" << m->cmd[2] << "'"; + } + } + */ + + if (r != -1) { + string rs; + getline(ss, rs); + mon->reply_command(m, r, rs, rdata); + return true; + } else + return false; +} + + +bool LogMonitor::prepare_command(MMonCommand *m) +{ + stringstream ss; + string rs; + int err = -EINVAL; + + // nothing here yet + ss << "unrecognized command"; + + getline(ss, rs); + mon->reply_command(m, err, rs); + return false; +} diff --git a/src/mon/LogMonitor.h b/src/mon/LogMonitor.h new file mode 100644 index 00000000000..7d9eb7c91d7 --- /dev/null +++ b/src/mon/LogMonitor.h @@ -0,0 +1,68 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef __LOGMONITOR_H +#define __LOGMONITOR_H + +#include +#include +using namespace std; + +#include "include/types.h" +#include "msg/Messenger.h" +#include "PaxosService.h" + +#include "include/LogEntry.h" + +class MMonCommand; +class MLog; + +class LogMonitor : public PaxosService { +private: + bufferlist pending_inc; + + void create_initial(); + bool update_from_paxos(); + void create_pending(); // prepare a new pending + void encode_pending(bufferlist &bl); // propose pending update to peers + + void committed(); + + bool preprocess_query(Message *m); // true if processed. + bool prepare_update(Message *m); + + bool preprocess_log(MLog *m); + bool prepare_log(MLog *m); + void _updated_log(MLog *m, entity_inst_t who); + + struct C_Log : public Context { + LogMonitor *logmon; + MLog *ack; + entity_inst_t who; + C_Log(LogMonitor *p, MLog *a, entity_inst_t w) : logmon(p), ack(a), who(w) {} + void finish(int r) { + logmon->_updated_log(ack, who); + } + }; + + bool preprocess_command(MMonCommand *m); + bool prepare_command(MMonCommand *m); + + public: + LogMonitor(Monitor *mn, Paxos *p) : PaxosService(mn, p) { } + + void tick(); // check state, take actions +}; + +#endif diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index 3147091d04a..79d397266af 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -38,6 +38,7 @@ #include "MDSMonitor.h" #include "ClientMonitor.h" #include "PGMonitor.h" +#include "LogMonitor.h" #include "config.h" @@ -75,6 +76,7 @@ Monitor::Monitor(int w, MonitorStore *s, Messenger *m, MonMap *map) : paxos_osdmap(this, w, PAXOS_OSDMAP), paxos_clientmap(this, w, PAXOS_CLIENTMAP), paxos_pgmap(this, w, PAXOS_PGMAP), + paxos_log(this, w, PAXOS_LOG), osdmon(0), mdsmon(0), clientmon(0) { @@ -82,6 +84,7 @@ Monitor::Monitor(int w, MonitorStore *s, Messenger *m, MonMap *map) : mdsmon = new MDSMonitor(this, &paxos_mdsmap); clientmon = new ClientMonitor(this, &paxos_clientmap); pgmon = new PGMonitor(this, &paxos_pgmap); + logmon = new LogMonitor(this, &paxos_log); } Monitor::~Monitor() @@ -90,6 +93,7 @@ Monitor::~Monitor() delete mdsmon; delete clientmon; delete pgmon; + delete logmon; if (messenger) messenger->destroy(); } @@ -137,6 +141,7 @@ void Monitor::shutdown() mdsmon->shutdown(); clientmon->shutdown(); pgmon->shutdown(); + logmon->shutdown(); // cancel all events cancel_tick(); @@ -160,11 +165,13 @@ void Monitor::call_election() paxos_osdmap.election_starting(); paxos_clientmap.election_starting(); paxos_pgmap.election_starting(); + paxos_log.election_starting(); mdsmon->election_starting(); osdmon->election_starting(); clientmon->election_starting(); pgmon->election_starting(); + logmon->election_starting(); // call a new election elector.call_election(); @@ -183,12 +190,14 @@ void Monitor::win_election(epoch_t epoch, set& active) paxos_osdmap.leader_init(); paxos_clientmap.leader_init(); paxos_pgmap.leader_init(); + paxos_log.leader_init(); // init pgmon->election_finished(); // hack: before osdmon, for osd->pg kick works ok osdmon->election_finished(); mdsmon->election_finished(); clientmon->election_finished(); + logmon->election_finished(); } void Monitor::lose_election(epoch_t epoch, set &q, int l) @@ -205,12 +214,14 @@ void Monitor::lose_election(epoch_t epoch, set &q, int l) paxos_osdmap.peon_init(); paxos_clientmap.peon_init(); paxos_pgmap.peon_init(); + paxos_log.peon_init(); // init osdmon->election_finished(); mdsmon->election_finished(); clientmon->election_finished(); pgmon->election_finished(); + logmon->election_finished(); } void Monitor::handle_command(MMonCommand *m) @@ -373,6 +384,10 @@ void Monitor::dispatch(Message *m) pgmon->dispatch(m); break; + // log + case MSG_LOG: + logmon->dispatch(m); + break; // paxos case MSG_MON_PAXOS: @@ -401,6 +416,9 @@ void Monitor::dispatch(Message *m) case PAXOS_PGMAP: paxos_pgmap.dispatch(m); break; + case PAXOS_LOG: + paxos_log.dispatch(m); + break; default: assert(0); } @@ -502,6 +520,7 @@ void Monitor::tick() mdsmon->tick(); clientmon->tick(); pgmon->tick(); + logmon->tick(); // next tick! reset_tick(); @@ -541,6 +560,7 @@ int Monitor::mkfs() services.push_back(mdsmon); services.push_back(clientmon); services.push_back(pgmon); + services.push_back(logmon); for (list::iterator p = services.begin(); p != services.end(); p++) { diff --git a/src/mon/Monitor.h b/src/mon/Monitor.h index 9acceccaf4f..78048dda27c 100644 --- a/src/mon/Monitor.h +++ b/src/mon/Monitor.h @@ -38,6 +38,7 @@ class OSDMonitor; class MDSMonitor; class ClientMonitor; class PGMonitor; +class LogMonitor; class MMonGetMap; @@ -104,6 +105,7 @@ public: Paxos paxos_osdmap; Paxos paxos_clientmap; Paxos paxos_pgmap; + Paxos paxos_log; friend class Paxos; @@ -112,11 +114,13 @@ public: MDSMonitor *mdsmon; ClientMonitor *clientmon; PGMonitor *pgmon; + LogMonitor *logmon; friend class OSDMonitor; friend class MDSMonitor; friend class ClientMonitor; friend class PGMonitor; + friend class LogMonitor; // messages diff --git a/src/mon/mon_types.h b/src/mon/mon_types.h index 8d1ac928223..bcba5729ef1 100644 --- a/src/mon/mon_types.h +++ b/src/mon/mon_types.h @@ -20,6 +20,7 @@ #define PAXOS_OSDMAP 2 #define PAXOS_CLIENTMAP 3 #define PAXOS_PGMAP 4 +#define PAXOS_LOG 5 inline const char *get_paxos_name(int p) { switch (p) { @@ -28,6 +29,7 @@ inline const char *get_paxos_name(int p) { case PAXOS_OSDMAP: return "osdmap"; case PAXOS_CLIENTMAP: return "clientmap"; case PAXOS_PGMAP: return "pgmap"; + case PAXOS_LOG: return "log"; default: assert(0); return 0; } } diff --git a/src/msg/Message.cc b/src/msg/Message.cc index f286ca89246..3e322a1b40b 100644 --- a/src/msg/Message.cc +++ b/src/msg/Message.cc @@ -21,6 +21,7 @@ using namespace std; #include "messages/MMonPaxos.h" #include "messages/MMonElection.h" +#include "messages/MLog.h" #include "messages/MPing.h" @@ -164,6 +165,10 @@ Message *decode_message(ceph_msg_header& header, ceph_msg_footer& footer, m = new MMonElection; break; + case MSG_LOG: + m = new MLog; + break; + case CEPH_MSG_PING: m = new MPing(); break; diff --git a/src/msg/Message.h b/src/msg/Message.h index fe4e7dd8648..bf6eca6d497 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -26,6 +26,8 @@ #define MSG_MON_COMMAND 50 #define MSG_MON_COMMAND_ACK 51 +#define MSG_LOG 52 + // osd internal #define MSG_OSD_PING 70 #define MSG_OSD_BOOT 71