merged r1409:1471 from trunk/ceph into branches/sage/pgs

git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1472 29311d96-e01e-0410-9327-a35deaab8ce9
This commit is contained in:
sageweil 2007-07-04 04:19:32 +00:00
parent 2c9f27e288
commit 4e4c5bcd53
17 changed files with 2598 additions and 1418 deletions

View File

@ -17,6 +17,7 @@
#include "Monitor.h"
#include "MDSMonitor.h"
#include "OSDMonitor.h"
#include "MonitorStore.h"
#include "messages/MClientMount.h"
#include "messages/MClientUnmount.h"
@ -30,105 +31,207 @@
void ClientMonitor::dispatch(Message *m)
bool ClientMonitor::update_from_paxos()
{
switch (m->get_type()) {
case MSG_CLIENT_MOUNT:
handle_client_mount((MClientMount*)m);
break;
case MSG_CLIENT_UNMOUNT:
handle_client_unmount((MClientUnmount*)m);
break;
default:
assert(0);
}
}
void ClientMonitor::handle_client_mount(MClientMount *m)
{
dout(7) << "client_mount from " << m->get_source_inst() << endl;
assert(m->get_source().is_client());
int from = m->get_source().num();
assert(paxos->is_active());
// choose a client id
if (from < 0 ||
(client_map.count(from) &&
client_map[from] != m->get_source_addr())) {
from = num_clients++;
dout(10) << "client_mount assigned client" << from << endl;
}
client_map[from] = m->get_source_addr();
// reply with latest mds map
entity_inst_t to = m->get_source_inst();
to.name = MSG_ADDR_CLIENT(from);
mon->mdsmon->send_latest(to);
mon->osdmon->send_latest(to);
delete m;
}
version_t paxosv = paxos->get_version();
dout(10) << "update_from_paxos paxosv " << paxosv
<< ", my v " << client_map.version << endl;
void ClientMonitor::handle_client_unmount(MClientUnmount *m)
{
dout(7) << "client_unmount from " << m->get_source()
<< " at " << m->get_source_inst() << endl;
assert(m->get_source().is_client());
int from = m->get_source().num();
if (paxosv == client_map.version) return true;
assert(paxosv >= client_map.version);
if (client_map.count(from)) {
client_map.erase(from);
if (client_map.version == 0 && paxosv > 1 &&
mon->store->exists_bl_ss("clientmap","latest")) {
// starting up: load latest
dout(7) << "update_from_paxos startup: loading latest full clientmap" << endl;
bufferlist bl;
mon->store->get_bl_ss(bl, "clientmap", "latest");
int off = 0;
client_map._decode(bl, off);
}
if (client_map.empty() &&
g_conf.mds_shutdown_on_last_unmount) {
dout(1) << "last client unmounted" << endl;
mon->do_stop();
// walk through incrementals
while (paxosv > client_map.version) {
bufferlist bl;
bool success = paxos->read(client_map.version+1, bl);
if (success) {
dout(7) << "update_from_paxos applying incremental " << client_map.version+1 << endl;
Incremental inc;
int off = 0;
inc._decode(bl, off);
client_map.apply_incremental(inc);
} else {
dout(7) << "update_from_paxos couldn't read incremental " << client_map.version+1 << endl;
return false;
}
}
// reply with (same) unmount message to ack
mon->messenger->send_message(m, m->get_source_inst());
// save latest
bufferlist bl;
client_map._encode(bl);
mon->store->put_bl_ss(bl, "clientmap", "latest");
return true;
}
void ClientMonitor::create_pending()
{
assert(mon->is_leader());
pending_inc = Incremental();
pending_inc.version = client_map.version + 1;
pending_inc.next_client = client_map.next_client;
dout(10) << "create_pending v " << pending_inc.version
<< ", next is " << pending_inc.next_client
<< endl;
}
void ClientMonitor::create_initial()
{
dout(1) << "create_initial -- creating initial map" << endl;
}
/*
void ClientMonitor::handle_mds_shutdown(Message *m)
void ClientMonitor::encode_pending(bufferlist &bl)
{
assert(m->get_source().is_mds());
int from = m->get_source().num();
mdsmap.mds_inst.erase(from);
mdsmap.all_mds.erase(from);
dout(7) << "mds_shutdown from " << m->get_source()
<< ", still have " << mdsmap.all_mds
<< endl;
assert(mon->is_leader());
dout(10) << "encode_pending v " << pending_inc.version
<< ", next is " << pending_inc.next_client
<< endl;
// tell someone?
// fixme
delete m;
assert(paxos->get_version() + 1 == pending_inc.version);
pending_inc._encode(bl);
}
*/
/*
void ClientMonitor::bcast_latest_mds()
// -------
bool ClientMonitor::preprocess_query(Message *m)
{
dout(10) << "bcast_latest_mds " << mdsmap.get_epoch() << endl;
// tell mds
for (set<int>::iterator p = mdsmap.get_mds().begin();
p != mdsmap.get_mds().end();
p++) {
if (mdsmap.is_down(*p)) continue;
send_full(MSG_ADDR_MDS(*p), mdsmap.get_inst(*p));
dout(10) << "preprocess_query " << *m << " from " << m->get_source_inst() << endl;
switch (m->get_type()) {
case MSG_CLIENT_MOUNT:
{
// already mounted?
entity_addr_t addr = m->get_source_addr();
if (client_map.addr_client.count(addr)) {
int client = client_map.addr_client[addr];
dout(7) << " client" << client << " already mounted" << endl;
_mounted(client, (MClientMount*)m);
return true;
}
}
return false;
case MSG_CLIENT_UNMOUNT:
{
// already unmounted?
int client = m->get_source().num();
if (client_map.client_addr.count(client) == 0) {
dout(7) << " client" << client << " not mounted" << endl;
_unmounted((MClientUnmount*)m);
return true;
}
}
return false;
default:
assert(0);
delete m;
return true;
}
}
*/
bool ClientMonitor::prepare_update(Message *m)
{
dout(10) << "prepare_update " << *m << " from " << m->get_source_inst() << endl;
switch (m->get_type()) {
case MSG_CLIENT_MOUNT:
{
MClientMount *mount = (MClientMount*)m;
entity_addr_t addr = mount->addr;
int client = -1;
if (mount->get_source().is_client())
client = mount->get_source().num();
// choose a client id
if (client < 0 ||
(client_map.client_addr.count(client) &&
client_map.client_addr[client] != addr)) {
client = pending_inc.next_client;
dout(10) << "mount: assigned client" << client << " to " << addr << endl;
} else {
dout(10) << "mount: client" << client << " requested by " << addr << endl;
}
pending_inc.add_mount(client, addr);
paxos->wait_for_commit(new C_Mounted(this, client, mount));
}
return true;
case MSG_CLIENT_UNMOUNT:
{
MClientUnmount *unmount = (MClientUnmount*)m;
assert(unmount->inst.name.is_client());
int client = unmount->inst.name.num();
assert(client_map.client_addr.count(client));
pending_inc.add_unmount(client);
paxos->wait_for_commit(new C_Unmounted(this, unmount));
}
return true;
default:
assert(0);
delete m;
return false;
}
}
// MOUNT
void ClientMonitor::_mounted(int client, MClientMount *m)
{
entity_inst_t to;
to.addr = m->addr;
to.name = MSG_ADDR_CLIENT(client);
dout(10) << "_mounted client" << client << " at " << to << endl;
// reply with latest mds, osd maps
mon->mdsmon->send_latest(to);
mon->osdmon->send_latest(0, to);
delete m;
}
void ClientMonitor::_unmounted(MClientUnmount *m)
{
dout(10) << "_unmounted " << m->inst << endl;
// reply with (same) unmount message
mon->messenger->send_message(m, m->inst);
// auto-shutdown?
// (hack for fakesyn/newsyn, mostly)
if (mon->is_leader() &&
client_map.version > 1 &&
client_map.client_addr.empty() &&
g_conf.mon_stop_on_last_unmount) {
dout(1) << "last client unmounted" << endl;
mon->do_stop();
}
}

View File

@ -24,31 +24,153 @@ using namespace std;
#include "mds/MDSMap.h"
#include "PaxosService.h"
class Monitor;
class Paxos;
class MClientMount;
class MClientUnmount;
class ClientMonitor : public Dispatcher {
Monitor *mon;
Messenger *messenger;
Mutex &lock;
class ClientMonitor : public PaxosService {
public:
private:
int num_clients;
map<int,entity_addr_t> client_map;
struct Incremental {
version_t version;
uint32_t next_client;
map<int32_t, entity_addr_t> mount;
set<int32_t> unmount;
Incremental() : version(0), next_client() {}
void bcast_latest_mds();
bool is_empty() { return mount.empty() && unmount.empty(); }
void add_mount(uint32_t client, entity_addr_t addr) {
next_client = MAX(next_client, client+1);
mount[client] = addr;
}
void add_unmount(uint32_t client) {
assert(client < next_client);
if (mount.count(client))
mount.erase(client);
else
unmount.insert(client);
}
void _encode(bufferlist &bl) {
::_encode(version, bl);
::_encode(next_client, bl);
::_encode(mount, bl);
::_encode(unmount, bl);
}
void _decode(bufferlist &bl, int& off) {
::_decode(version, bl, off);
::_decode(next_client, bl, off);
::_decode(mount, bl, off);
::_decode(unmount, bl, off);
}
};
//void accept_pending(); // accept pending, new map.
//void send_incremental(epoch_t since, msg_addr_t dest);
struct Map {
version_t version;
uint32_t next_client;
map<uint32_t,entity_addr_t> client_addr;
hash_map<entity_addr_t,uint32_t> addr_client;
void handle_client_mount(class MClientMount *m);
void handle_client_unmount(class MClientUnmount *m);
Map() : version(0), next_client(0) {}
void reverse() {
addr_client.clear();
for (map<uint32_t,entity_addr_t>::iterator p = client_addr.begin();
p != client_addr.end();
++p) {
addr_client[p->second] = p->first;
}
}
void apply_incremental(Incremental &inc) {
assert(inc.version == version+1);
version = inc.version;
next_client = inc.next_client;
for (map<int32_t, entity_addr_t>::iterator p = inc.mount.begin();
p != inc.mount.end();
++p) {
client_addr[p->first] = p->second;
addr_client[p->second] = p->first;
}
for (set<int32_t>::iterator p = inc.unmount.begin();
p != inc.unmount.end();
++p) {
assert(client_addr.count(*p));
addr_client.erase(client_addr[*p]);
client_addr.erase(*p);
}
}
void _encode(bufferlist &bl) {
::_encode(version, bl);
::_encode(next_client, bl);
::_encode(client_addr, bl);
}
void _decode(bufferlist &bl, int& off) {
::_decode(version, bl, off);
::_decode(next_client, bl, off);
::_decode(client_addr, bl, off);
reverse();
}
};
class C_Mounted : public Context {
ClientMonitor *cmon;
int client;
MClientMount *m;
public:
C_Mounted(ClientMonitor *cm, int c, MClientMount *m_) :
cmon(cm), client(c), m(m_) {}
void finish(int r) {
if (r >= 0)
cmon->_mounted(client, m);
else
cmon->dispatch((Message*)m);
}
};
class C_Unmounted : public Context {
ClientMonitor *cmon;
MClientUnmount *m;
public:
C_Unmounted(ClientMonitor *cm, MClientUnmount *m_) :
cmon(cm), m(m_) {}
void finish(int r) {
if (r >= 0)
cmon->_unmounted(m);
else
cmon->dispatch((Message*)m);
}
};
private:
Map client_map;
// leader
Incremental pending_inc;
void create_initial();
bool update_from_paxos();
void create_pending(); // prepare a new pending
void encode_pending(bufferlist &bl); // propose pending update to peers
void _mounted(int c, MClientMount *m);
void _unmounted(MClientUnmount *m);
bool preprocess_query(Message *m); // true if processed.
bool prepare_update(Message *m);
public:
ClientMonitor(Monitor *mn, Messenger *m, Mutex& l) : mon(mn), messenger(m), lock(l),
num_clients(0) { }
void dispatch(Message *m);
void tick(); // check state, take actions
public:
ClientMonitor(Monitor *mn, Paxos *p) : PaxosService(mn, p) { }
//void tick(); // check state, take actions
};
#endif

View File

@ -16,33 +16,58 @@
#include "Monitor.h"
#include "common/Timer.h"
#include "messages/MMonElectionPropose.h"
#include "messages/MMonElectionAck.h"
#include "messages/MMonElectionVictory.h"
#include "MonitorStore.h"
#include "messages/MMonElection.h"
#include "config.h"
#undef dout
#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".elector "
#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".elector "
#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".elector(" << epoch << ") "
#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".elector(" << epoch << ") "
void Elector::init()
{
epoch = mon->store->get_int("mon_epoch");
if (!epoch)
epoch = 1;
dout(1) << "init, last seen epoch " << epoch << endl;
}
void Elector::shutdown()
{
if (expire_event)
mon->timer.cancel_event(expire_event);
}
void Elector::bump_epoch(epoch_t e)
{
dout(10) << "bump_epoch " << epoch << " to " << e << endl;
assert(epoch < e);
epoch = e;
mon->store->put_int(epoch, "mon_epoch");
// clear up some state
electing_me = false;
acked_me.clear();
leader_acked = -1;
}
void Elector::start()
{
dout(5) << "start -- can i be leader?" << endl;
leader_acked = -1;
// start by trying to elect me
if (epoch % 2 == 0)
bump_epoch(epoch+1); // odd == election cycle
start_stamp = g_clock.now();
acked_me.clear();
acked_me.insert(whoami);
electing_me = true;
acked_me.insert(whoami);
// bcast to everyone else
for (int i=0; i<mon->monmap->num_mon; ++i) {
if (i == whoami) continue;
mon->messenger->send_message(new MMonElectionPropose,
mon->messenger->send_message(new MMonElection(MMonElection::OP_PROPOSE, epoch),
mon->monmap->get_inst(i));
}
@ -54,6 +79,7 @@ void Elector::defer(int who)
dout(5) << "defer to " << who << endl;
if (electing_me) {
// drop out
acked_me.clear();
electing_me = false;
}
@ -61,7 +87,7 @@ void Elector::defer(int who)
// ack them
leader_acked = who;
ack_stamp = g_clock.now();
mon->messenger->send_message(new MMonElectionAck,
mon->messenger->send_message(new MMonElection(MMonElection::OP_ACK, epoch),
mon->monmap->get_inst(who));
// set a timer
@ -69,29 +95,22 @@ void Elector::defer(int who)
}
class C_Mon_ElectionExpire : public Context {
Elector *elector;
public:
C_Mon_ElectionExpire(Elector *e) : elector(e) { }
void finish(int r) {
elector->expire();
}
};
void Elector::reset_timer(double plus)
{
// set the timer
cancel_timer();
expire_event = new C_Mon_ElectionExpire(this);
g_timer.add_event_after(g_conf.mon_lease + plus,
expire_event);
expire_event = new C_ElectionExpire(this);
mon->timer.add_event_after(g_conf.mon_lease + plus,
expire_event);
}
void Elector::cancel_timer()
{
if (expire_event)
g_timer.cancel_event(expire_event);
if (expire_event) {
mon->timer.cancel_event(expire_event);
expire_event = 0;
}
}
void Elector::expire()
@ -114,29 +133,48 @@ void Elector::victory()
{
leader_acked = -1;
electing_me = false;
set<int> quorum = acked_me;
cancel_timer();
assert(epoch % 2 == 1); // election
bump_epoch(epoch+1); // is over!
// tell everyone
for (int i=0; i<mon->monmap->num_mon; ++i) {
if (i == whoami) continue;
mon->messenger->send_message(new MMonElectionVictory,
mon->monmap->get_inst(i));
for (set<int>::iterator p = quorum.begin();
p != quorum.end();
++p) {
if (*p == whoami) continue;
mon->messenger->send_message(new MMonElection(MMonElection::OP_VICTORY, epoch),
mon->monmap->get_inst(*p));
}
// tell monitor
mon->win_election(acked_me);
mon->win_election(epoch, quorum);
}
void Elector::handle_propose(MMonElectionPropose *m)
void Elector::handle_propose(MMonElection *m)
{
dout(5) << "handle_propose from " << m->get_source() << endl;
int from = m->get_source().num();
if (from > whoami) {
if (leader_acked >= 0 && // we already acked someone
leader_acked < from) { // who would win over them
assert(m->epoch % 2 == 1); // election
if (m->epoch > epoch) {
bump_epoch(m->epoch);
}
else if (m->epoch < epoch && // got an "old" propose,
epoch % 2 == 0 && // in a non-election cycle
mon->quorum.count(from) == 0) { // from someone outside the quorum
// a mon just started up, call a new election so they can rejoin!
dout(5) << " got propose from old epoch, " << m->get_source() << " must have just started" << endl;
start();
}
if (whoami < from) {
// i would win over them.
if (leader_acked >= 0) { // we already acked someone
assert(leader_acked < from); // and they still win, of course
dout(5) << "no, we already acked " << leader_acked << endl;
} else {
// wait, i should win!
@ -158,11 +196,21 @@ void Elector::handle_propose(MMonElectionPropose *m)
delete m;
}
void Elector::handle_ack(MMonElectionAck *m)
void Elector::handle_ack(MMonElection *m)
{
dout(5) << "handle_ack from " << m->get_source() << endl;
int from = m->get_source().num();
assert(m->epoch % 2 == 1); // election
if (m->epoch > epoch) {
dout(5) << "woah, that's a newer epoch, i must have rebooted. bumping and re-starting!" << endl;
bump_epoch(m->epoch);
start();
delete m;
return;
}
assert(m->epoch == epoch);
if (electing_me) {
// thanks
acked_me.insert(from);
@ -175,26 +223,28 @@ void Elector::handle_ack(MMonElectionAck *m)
}
} else {
// ignore, i'm deferring already.
assert(leader_acked >= 0);
}
delete m;
}
void Elector::handle_victory(MMonElectionVictory *m)
void Elector::handle_victory(MMonElection *m)
{
dout(5) << "handle_victory from " << m->get_source() << endl;
int from = m->get_source().num();
assert(from < whoami);
assert(m->epoch % 2 == 0);
assert(m->epoch == epoch + 1); // i should have seen this election if i'm getting the victory.
bump_epoch(m->epoch);
if (from < whoami) {
// ok, fine, they win
mon->lose_election(from);
// cancel my timer
cancel_timer();
} else {
// no, that makes no sense, i should win. start over!
start();
}
// they win
mon->lose_election(epoch, from);
// cancel my timer
cancel_timer();
}
@ -203,19 +253,37 @@ void Elector::handle_victory(MMonElectionVictory *m)
void Elector::dispatch(Message *m)
{
switch (m->get_type()) {
case MSG_MON_ELECTION_ACK:
handle_ack((MMonElectionAck*)m);
case MSG_MON_ELECTION:
{
MMonElection *em = (MMonElection*)m;
switch (em->op) {
case MMonElection::OP_PROPOSE:
handle_propose(em);
return;
}
if (em->epoch < epoch) {
dout(5) << "old epoch, dropping" << endl;
delete em;
break;
}
switch (em->op) {
case MMonElection::OP_ACK:
handle_ack(em);
return;
case MMonElection::OP_VICTORY:
handle_victory(em);
return;
default:
assert(0);
}
}
break;
case MSG_MON_ELECTION_PROPOSE:
handle_propose((MMonElectionPropose*)m);
break;
case MSG_MON_ELECTION_VICTORY:
handle_victory((MMonElectionVictory*)m);
break;
default:
default:
assert(0);
}
}

View File

@ -39,6 +39,8 @@ class Elector {
void reset_timer(double plus=0.0);
void cancel_timer();
epoch_t epoch; // latest epoch we've seen. odd == election, even == stable,
// electing me
bool electing_me;
utime_t start_stamp;
@ -48,25 +50,42 @@ class Elector {
int leader_acked; // who i've acked
utime_t ack_stamp; // and when
public:
void bump_epoch(epoch_t e=0); // i just saw a larger epoch
class C_ElectionExpire : public Context {
Elector *elector;
public:
C_ElectionExpire(Elector *e) : elector(e) { }
void finish(int r) {
elector->expire();
}
};
void start(); // start an electing me
void defer(int who);
void expire(); // timer goes off
void victory();
void handle_propose(class MMonElectionPropose *m);
void handle_ack(class MMonElectionAck *m);
void handle_victory(class MMonElectionVictory *m);
void handle_propose(class MMonElection *m);
void handle_ack(class MMonElection *m);
void handle_victory(class MMonElection *m);
public:
Elector(Monitor *m, int w) : mon(m), whoami(w) {
// initialize all those values!
// ...
}
Elector(Monitor *m, int w) : mon(m), whoami(w),
expire_event(0),
epoch(0),
electing_me(false),
leader_acked(-1) { }
void init();
void shutdown();
void dispatch(Message *m);
void call_election() {
start();
}
};

View File

@ -16,12 +16,17 @@
#include "MDSMonitor.h"
#include "Monitor.h"
#include "MonitorStore.h"
#include "OSDMonitor.h"
#include "messages/MMDSMap.h"
#include "messages/MMDSGetMap.h"
#include "messages/MMDSBeacon.h"
#include "messages/MMonCommand.h"
#include "messages/MMonCommandAck.h"
#include "messages/MGenericMessage.h"
#include "common/Timer.h"
@ -34,105 +39,334 @@
/********* MDS map **************/
// my methods
void MDSMonitor::dispatch(Message *m)
void MDSMonitor::print_map(MDSMap &m)
{
switch (m->get_type()) {
case MSG_MDS_BEACON:
handle_mds_beacon((MMDSBeacon*)m);
break;
case MSG_MDS_GETMAP:
handle_mds_getmap((MMDSGetMap*)m);
break;
default:
assert(0);
}
}
void MDSMonitor::election_finished()
{
if (mon->is_leader()) {
// FIXME be smarter later.
if (g_conf.mkfs) {
create_initial();
save_map();
} else {
load_map();
}
}
}
void MDSMonitor::create_initial()
{
mdsmap.epoch = 0; // until everyone boots
mdsmap.created = g_clock.now();
mdsmap.encode(encoded_map);
print_map();
}
void MDSMonitor::load_map()
{
int r = mon->store->get_bl_ss(encoded_map, "mdsmap", "current");
assert(r > 0);
mdsmap.decode(encoded_map);
dout(7) << "load_map epoch " << mdsmap.get_epoch() << endl;
}
void MDSMonitor::save_map()
{
dout(7) << "save_map epoch " << mdsmap.get_epoch() << endl;
int r = mon->store->put_bl_ss(encoded_map, "mdsmap", "current");
assert(r>=0);
}
void MDSMonitor::print_map()
{
dout(7) << "print_map epoch " << mdsmap.get_epoch() << " num_mds " << g_conf.num_mds << endl;
dout(7) << "print_map epoch " << m.get_epoch() << " num_mds " << g_conf.num_mds << endl;
entity_inst_t blank;
set<int> all;
mdsmap.get_mds_set(all);
m.get_mds_set(all);
for (set<int>::iterator p = all.begin();
p != all.end();
++p) {
dout(7) << " mds" << *p << "." << mdsmap.mds_inc[*p]
<< " : " << MDSMap::get_state_name(mdsmap.get_state(*p))
<< " : " << (mdsmap.have_inst(*p) ? mdsmap.get_inst(*p) : blank)
dout(7) << " mds" << *p << "." << m.mds_inc[*p]
<< " : " << MDSMap::get_state_name(m.get_state(*p))
<< " : " << (m.have_inst(*p) ? m.get_inst(*p) : blank)
<< endl;
}
}
void MDSMonitor::issue_map()
{
mdsmap.inc_epoch();
encoded_map.clear();
mdsmap.encode(encoded_map);
dout(7) << "issue_map epoch " << mdsmap.get_epoch() << endl;
// service methods
void MDSMonitor::create_initial()
{
dout(10) << "create_initial" << endl;
pending_mdsmap.created = g_clock.now();
print_map(pending_mdsmap);
}
bool MDSMonitor::update_from_paxos()
{
assert(paxos->is_active());
version_t paxosv = paxos->get_version();
dout(10) << "update_from_paxos paxosv " << paxosv
<< ", my e " << mdsmap.epoch << endl;
if (paxosv == mdsmap.epoch) return true;
assert(paxosv >= mdsmap.epoch);
// read and decode
mdsmap_bl.clear();
bool success = paxos->read(paxosv, mdsmap_bl);
assert(success);
dout(10) << "update_from_paxos got " << paxosv << endl;
mdsmap.decode(mdsmap_bl);
// new map
print_map(mdsmap);
// bcast map to mds, waiters
if (mon->is_leader())
bcast_latest_mds();
send_to_waiting();
// hackish: did all mds's shut down?
if (mon->is_leader() &&
g_conf.mon_stop_with_last_mds &&
mdsmap.get_epoch() > 1 &&
mdsmap.get_num_up_or_failed_mds() == 0)
mon->messenger->send_message(new MGenericMessage(MSG_SHUTDOWN),
mon->monmap->get_inst(mon->whoami));
return true;
}
void MDSMonitor::create_pending()
{
pending_mdsmap = mdsmap;
pending_mdsmap.epoch++;
dout(10) << "create_pending e" << pending_mdsmap.epoch << endl;
}
void MDSMonitor::encode_pending(bufferlist &bl)
{
dout(10) << "encode_pending e" << pending_mdsmap.epoch << endl;
save_map();
print_map();
// bcast map
bcast_latest_mds();
send_current();
print_map(pending_mdsmap);
// apply to paxos
assert(paxos->get_version() + 1 == pending_mdsmap.epoch);
pending_mdsmap.encode(bl);
}
void MDSMonitor::handle_command(MMonCommand *m, int& r, string& rs)
bool MDSMonitor::preprocess_query(Message *m)
{
dout(10) << "preprocess_query " << *m << " from " << m->get_source_inst() << endl;
switch (m->get_type()) {
case MSG_MDS_BEACON:
return preprocess_beacon((MMDSBeacon*)m);
case MSG_MDS_GETMAP:
send_full(m->get_source_inst());
return true;
case MSG_MON_COMMAND:
return false;
default:
assert(0);
delete m;
return true;
}
}
bool MDSMonitor::preprocess_beacon(MMDSBeacon *m)
{
dout(12) << "preprocess_beacon " << *m
<< " from " << m->get_mds_inst()
<< endl;
// fw to leader?
if (!mon->is_leader()) {
dout(10) << "fw to leader" << endl;
mon->messenger->send_message(m, mon->monmap->get_inst(mon->get_leader()));
return true;
}
// let's see.
int from = m->get_mds_inst().name.num();
int state = m->get_state();
version_t seq = m->get_seq();
// can i handle this query without a map update?
// boot?
if (state == MDSMap::STATE_BOOT) {
// already booted?
int already = mdsmap.get_addr_rank(m->get_mds_inst().addr);
if (already < 0)
return false; // need to update map
// already booted. just reply to beacon, as per usual.
from = already;
}
// reply to beacon
if (mdsmap.mds_state_seq[from] > seq) {
dout(7) << "mds_beacon " << *m << " has old seq, ignoring" << endl;
delete m;
return true;
}
// reply to beacon?
if (state != MDSMap::STATE_OUT) {
last_beacon[from] = g_clock.now(); // note time
mon->messenger->send_message(new MMDSBeacon(m->get_mds_inst(), state, seq),
m->get_mds_inst());
}
// is there a state change here?
if (mdsmap.mds_state.count(from) == 0 ||
mdsmap.mds_state[from] != state)
return false; // yep, need to update map.
// we're done.
delete m;
return true;
}
bool MDSMonitor::prepare_update(Message *m)
{
dout(10) << "prepare_update " << *m << endl;
switch (m->get_type()) {
case MSG_MDS_BEACON:
return handle_beacon((MMDSBeacon*)m);
case MSG_MON_COMMAND:
return handle_command((MMonCommand*)m);
default:
assert(0);
delete m;
}
return true;
}
bool MDSMonitor::should_propose_now()
{
return true;
}
bool MDSMonitor::handle_beacon(MMDSBeacon *m)
{
// -- this is an update --
dout(12) << "handle_beacon " << *m
<< " from " << m->get_mds_inst()
<< endl;
int from = m->get_mds_inst().name.num();
int state = m->get_state();
version_t seq = m->get_seq();
// boot?
if (state == MDSMap::STATE_BOOT) {
// assign a name.
if (from >= 0) {
// wants to be (or already is) a specific MDS.
if (mdsmap.is_failed(from)) {
dout(10) << "mds_beacon boot: mds" << from << " was failed, replaying" << endl;
state = MDSMap::STATE_REPLAY;
} else if (mdsmap.is_out(from)) {
dout(10) << "mds_beacon boot: mds" << from << " was out, starting" << endl;
state = MDSMap::STATE_STARTING;
} else if (!mdsmap.have_inst(from) || mdsmap.get_inst(from) != m->get_mds_inst()) {
dout(10) << "mds_beacon boot: mds" << from << " is someone else" << endl;
from = -1;
}
}
if (from < 0) {
from = pending_mdsmap.get_addr_rank(m->get_mds_inst().addr);
if (from >= 0) {
state = pending_mdsmap.mds_state[from];
dout(10) << "mds_beacon boot: already pending mds" << from
<< " " << MDSMap::get_state_name(state) << endl;
delete m;
return false;
}
}
if (from < 0) {
// pick a failed mds?
set<int> failed;
pending_mdsmap.get_failed_mds_set(failed);
if (!failed.empty()) {
from = *failed.begin();
dout(10) << "mds_beacon boot: assigned failed mds" << from << endl;
state = MDSMap::STATE_REPLAY;
}
}
if (from < 0) {
// ok, just pick any unused mds id.
for (from=0; ; ++from) {
if (pending_mdsmap.is_dne(from)) {
dout(10) << "mds_beacon boot: assigned new mds" << from << endl;
state = MDSMap::STATE_CREATING;
break;
} else if (pending_mdsmap.is_out(from)) {
dout(10) << "mds_beacon boot: assigned out mds" << from << endl;
state = MDSMap::STATE_STARTING;
break;
}
}
}
assert(state != MDSMap::STATE_BOOT);
// put it in the map.
pending_mdsmap.mds_inst[from].addr = m->get_mds_inst().addr;
pending_mdsmap.mds_inst[from].name = MSG_ADDR_MDS(from);
pending_mdsmap.mds_inc[from]++;
// someone (new) has joined the cluster.
pending_mdsmap.same_inst_since = pending_mdsmap.epoch;
// if degraded, starting -> standby
if (pending_mdsmap.is_degraded() &&
state == MDSMap::STATE_STARTING) {
dout(10) << "mds_beacon boot: cluster degraded, mds" << from << " will be standby" << endl;
state = MDSMap::STATE_STANDBY;
}
}
// if creating -> active, go to standby instead
if (state == MDSMap::STATE_ACTIVE &&
mdsmap.is_creating(from)) {
pending_mdsmap.mds_created.insert(from);
dout(10) << "mds_beacon created mds" << from << endl;
if (mdsmap.is_degraded()) {
dout(10) << "mds_beacon cluster degraded, marking mds" << from << " as standby" << endl;
state = MDSMap::STATE_STANDBY;
}
}
// update the map
dout(10) << "mds_beacon mds" << from << " " << MDSMap::get_state_name(mdsmap.mds_state[from])
<< " -> " << MDSMap::get_state_name(state)
<< endl;
// did someone leave the cluster?
if (state == MDSMap::STATE_OUT &&
mdsmap.mds_state[from] != MDSMap::STATE_OUT)
pending_mdsmap.same_inst_since = pending_mdsmap.epoch;
// change the state
pending_mdsmap.mds_state[from] = state;
if (pending_mdsmap.is_up(from))
pending_mdsmap.mds_state_seq[from] = seq;
else
pending_mdsmap.mds_state_seq.erase(from);
dout(7) << "pending map now:" << endl;
print_map(pending_mdsmap);
paxos->wait_for_commit(new C_Updated(this, from, m));
return true;
}
void MDSMonitor::_updated(int from, MMDSBeacon *m)
{
if (m->get_state() == MDSMap::STATE_BOOT) {
dout(10) << "_updated (booted) mds" << from << " " << *m << endl;
mon->osdmon->send_latest(0, mdsmap.get_inst(from));
} else {
dout(10) << "_updated mds" << from << " " << *m << endl;
}
delete m;
}
bool MDSMonitor::handle_command(MMonCommand *m)
{
int r = -1;
string rs = "unrecognized command";
stringstream ss;
if (m->cmd.size() > 1) {
if (m->cmd[1] == "stop" && m->cmd.size() > 2) {
int who = atoi(m->cmd[2].c_str());
@ -141,172 +375,30 @@ void MDSMonitor::handle_command(MMonCommand *m, int& r, string& rs)
ss << "telling mds" << who << " to stop";
getline(ss,rs);
// hack
mdsmap.mds_state[who] = MDSMap::STATE_STOPPING;
issue_map();
pending_mdsmap.mds_state[who] = MDSMap::STATE_STOPPING;
} else {
ss << "mds" << who << " not active (" << mdsmap.get_state_name(mdsmap.get_state(who)) << ")";
getline(ss,rs);
}
}
/*
else if (m->cmd[1] == "setnum" && m->cmd.size() > 2) {
g_conf.num_mds = atoi(m->cmd[2].c_str());
ss << "g_conf.num_mds = " << g_conf.num_mds << endl;
getline(ss,rs);
print_map();
}
}
}
void MDSMonitor::handle_mds_beacon(MMDSBeacon *m)
{
dout(12) << "mds_beacon " << *m
<< " from " << m->get_source()
<< " " << m->get_source_inst()
<< endl;
int from = m->get_source().num();
int state = m->get_state();
version_t seq = m->get_seq();
// initial boot?
bool booted = false;
// choose an MDS id
if (from >= 0) {
// wants to be (or already is) a specific MDS.
if (mdsmap.is_down(from)) {
dout(10) << "mds_beacon assigning requested mds" << from << endl;
booted = true;
} else if (mdsmap.get_inst(from) != m->get_source_inst()) {
dout(10) << "mds_beacon not assigning requested mds" << from
<< ", that mds is up and someone else" << endl;
from = -1;
}
}
if (from < 0) {
// pick a failed mds?
set<int> failed;
mdsmap.get_failed_mds_set(failed);
if (!failed.empty()) {
from = *failed.begin();
dout(10) << "mds_beacon assigned failed mds" << from << endl;
booted = true;
}
}
if (from < 0) {
// ok, just pick any unused mds id.
for (from=0; ; ++from) {
if (mdsmap.is_dne(from) ||
mdsmap.is_out(from)) {
dout(10) << "mds_beacon assigned out|dne mds" << from << endl;
booted = true;
break;
}
}
}
// old beacon?
if (mdsmap.mds_state_seq[from] > seq) {
dout(7) << "mds_beacon " << *m << " has old seq, ignoring" << endl;
delete m;
return;
}
// reply to beacon?
if (state != MDSMap::STATE_OUT) {
last_beacon[from] = g_clock.now(); // note time
messenger->send_message(new MMDSBeacon(state, seq),
m->get_source_inst());
}
// make sure it's in the map
if (booted) {
mdsmap.mds_inst[from].addr = m->get_source_addr();
mdsmap.mds_inst[from].name = MSG_ADDR_MDS(from);
mdsmap.mds_inc[from]++;
// someone (new) joined the cluster
mdsmap.same_inst_since = mdsmap.epoch+1;
// starting -> creating|starting|replay
if (mdsmap.is_degraded() &&
!mdsmap.is_failed(from)) {
dout(10) << "mds_beacon currently degraded, mds" << from << " will be standby" << endl;
state = MDSMap::STATE_STANDBY;
}
/*
else if (from >= g_conf.num_mds) {
dout(10) << "mds_beacon already have " << g_conf.num_mds << " mds's, standby (increase with 'mds setnum xxx')" << endl;
state = MDSMap::STATE_STANDBY;
}
*/
else if (state == MDSMap::STATE_STARTING) {
if (mdsmap.is_failed(from)) {
dout(10) << "mds_beacon will recover mds" << from << endl;
state = MDSMap::STATE_REPLAY;
}
else if (mdsmap.is_out(from)) {
dout(10) << "mds_beacon will start mds" << from << endl;
state = MDSMap::STATE_STARTING;
}
else {
dout(10) << "mds_beacon will create mds" << from << endl;
state = MDSMap::STATE_CREATING;
}
}
}
// if creating -> active, go to standby instead
if (state == MDSMap::STATE_ACTIVE && mdsmap.is_creating(from)) {
mdsmap.mds_created.insert(from);
dout(10) << "mds_beacon created mds" << from << endl;
if (mdsmap.is_degraded()) {
dout(10) << "mds_beacon current degraded, marking mds" << from << " as standby" << endl;
state = MDSMap::STATE_STANDBY;
}
}
// did we update the map?
if (mdsmap.mds_state.count(from) == 0 ||
mdsmap.mds_state[from] != state) {
// update mds state
dout(10) << "mds_beacon mds" << from << " " << MDSMap::get_state_name(mdsmap.mds_state[from])
<< " -> " << MDSMap::get_state_name(state)
<< endl;
// did someone leave the cluster?
if (state == MDSMap::STATE_OUT && mdsmap.mds_state[from] != MDSMap::STATE_OUT)
mdsmap.same_inst_since = mdsmap.epoch+1;
// change the state
mdsmap.mds_state[from] = state;
if (mdsmap.is_up(from))
mdsmap.mds_state_seq[from] = seq;
else
mdsmap.mds_state_seq.erase(from);
issue_map();
}
// reply
mon->messenger->send_message(new MMonCommandAck(r, rs), m->get_source_inst());
delete m;
return r >= 0;
}
void MDSMonitor::handle_mds_getmap(MMDSGetMap *m)
{
dout(7) << "mds_getmap from " << m->get_source() << " " << m->get_source_inst() << endl;
if (mdsmap.get_epoch() > 0)
send_full(m->get_source_inst());
else
awaiting_map.push_back( m->get_source_inst() );
}
void MDSMonitor::bcast_latest_mds()
{
@ -324,26 +416,25 @@ void MDSMonitor::bcast_latest_mds()
void MDSMonitor::send_full(entity_inst_t dest)
{
dout(11) << "send_full to " << dest << endl;
messenger->send_message(new MMDSMap(&mdsmap), dest);
mon->messenger->send_message(new MMDSMap(&mdsmap), dest);
}
void MDSMonitor::send_current()
void MDSMonitor::send_to_waiting()
{
dout(10) << "mds_send_current " << mdsmap.get_epoch() << endl;
for (list<entity_inst_t>::iterator i = awaiting_map.begin();
i != awaiting_map.end();
dout(10) << "send_to_waiting " << mdsmap.get_epoch() << endl;
for (list<entity_inst_t>::iterator i = waiting_for_map.begin();
i != waiting_for_map.end();
i++)
send_full(*i);
awaiting_map.clear();
waiting_for_map.clear();
}
void MDSMonitor::send_latest(entity_inst_t dest)
{
// FIXME: check if we're locked, etc.
if (mdsmap.get_epoch() > 0)
if (paxos->is_readable())
send_full(dest);
else
awaiting_map.push_back(dest);
waiting_for_map.push_back(dest);
}
@ -351,6 +442,11 @@ void MDSMonitor::tick()
{
// make sure mds's are still alive
utime_t now = g_clock.now();
// ...if i am an active leader
if (!mon->is_leader()) return;
if (!paxos->is_active()) return;
if (now > g_conf.mds_beacon_grace) {
utime_t cutoff = now;
cutoff -= g_conf.mds_beacon_grace;
@ -403,8 +499,8 @@ void MDSMonitor::tick()
<< endl;
// update map
mdsmap.mds_state[*p] = newstate;
mdsmap.mds_state_seq.erase(*p);
pending_mdsmap.mds_state[*p] = newstate;
pending_mdsmap.mds_state_seq.erase(*p);
changed = true;
}
} else {
@ -413,20 +509,29 @@ void MDSMonitor::tick()
}
}
if (changed) {
issue_map();
}
if (changed)
propose_pending();
}
}
void MDSMonitor::do_stop()
{
// hrm...
if (!mon->is_leader() ||
!paxos->is_active()) {
dout(-10) << "do_stop can't stop right now, mdsmap not writeable" << endl;
return;
}
dout(10) << "do_stop stopping active mds nodes" << endl;
print_map(mdsmap);
for (map<int,int>::iterator p = mdsmap.mds_state.begin();
p != mdsmap.mds_state.end();
++p)
if (mdsmap.is_active(p->first))
mdsmap.mds_state[p->first] = MDSMap::STATE_STOPPING;
pending_mdsmap.mds_state[p->first] = MDSMap::STATE_STOPPING;
issue_map();
propose_pending();
}

View File

@ -24,67 +24,71 @@ using namespace std;
#include "mds/MDSMap.h"
class Monitor;
#include "PaxosService.h"
class MDSMonitor : public Dispatcher {
Monitor *mon;
Messenger *messenger;
Mutex &lock;
class MMDSBeacon;
// mds maps
class MDSMonitor : public PaxosService {
public:
MDSMap mdsmap;
// mds maps
MDSMap mdsmap; // current
bufferlist mdsmap_bl; // encoded
private:
bufferlist encoded_map;
MDSMap pending_mdsmap; // current + pending updates
//map<epoch_t, bufferlist> inc_maps;
//MDSMap::Incremental pending_inc;
// my helpers
void print_map(MDSMap &m);
class C_Updated : public Context {
MDSMonitor *mm;
int mds;
MMDSBeacon *m;
public:
C_Updated(MDSMonitor *a, int b, MMDSBeacon *c) :
mm(a), mds(b), m(c) {}
void finish(int r) {
if (r >= 0)
mm->_updated(mds, m); // success
else
mm->dispatch((Message*)m); // try again
}
};
// service methods
void create_initial();
bool update_from_paxos();
void create_pending();
void encode_pending(bufferlist &bl);
list<entity_inst_t> awaiting_map;
void _updated(int m, MMDSBeacon *m);
bool preprocess_query(Message *m); // true if processed.
bool prepare_update(Message *m);
bool should_propose_now();
bool preprocess_beacon(class MMDSBeacon *m);
bool handle_beacon(class MMDSBeacon *m);
bool handle_command(class MMonCommand *m);
// beacons
map<int, utime_t> last_beacon;
bool is_alive(int mds);
public:
MDSMonitor(Monitor *mn, Paxos *p) : PaxosService(mn, p) { }
// sending the map
private:
list<entity_inst_t> waiting_for_map;
// maps
void create_initial();
void send_current(); // send current map to waiters.
void send_full(entity_inst_t dest);
void bcast_latest_mds();
void send_full(entity_inst_t dest);
void send_to_waiting();
void issue_map();
void save_map();
void load_map();
void print_map();
//void accept_pending(); // accept pending, new map.
//void send_incremental(epoch_t since, msg_addr_t dest);
void handle_mds_state(class MMDSState *m);
void handle_mds_beacon(class MMDSBeacon *m);
//void handle_mds_failure(class MMDSFailure *m);
void handle_mds_getmap(class MMDSGetMap *m);
public:
MDSMonitor(Monitor *mn, Messenger *m, Mutex& l) : mon(mn), messenger(m), lock(l) {
}
void dispatch(Message *m);
void tick(); // check state, take actions
void election_starting();
void election_finished();
public:
void send_latest(entity_inst_t dest);
void handle_command(class MMonCommand *m, int& r, string& rs);
void tick(); // check state, take actions
void do_stop();
};

View File

@ -24,7 +24,7 @@
class MonMap {
public:
epoch_t epoch; // what epoch of the osd cluster descriptor is this
epoch_t epoch; // what epoch/version of the monmap
int num_mon;
vector<entity_inst_t> mon_inst;
@ -41,7 +41,7 @@ class MonMap {
// choice should be stable, unless we explicitly ask for a new one.
int pick_mon(bool newmon=false) {
if (newmon || (last_mon < 0)) {
last_mon = 0; //last_mon = rand() % num_mon;
last_mon = rand() % num_mon;
}
return last_mon;
}
@ -68,6 +68,7 @@ class MonMap {
_decode(mon_inst, blist, off);
}
// read from/write to a file
int write(char *fn) {
// encode
bufferlist bl;

View File

@ -1,218 +1,251 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
// TODO: missing run() method, which creates the two main timers, refreshTimer and readTimer
// TODO: missing run() method, which creates the two main timers, refreshTimer and readTimer
#include "Monitor.h"
#include "Monitor.h"
#include "osd/OSDMap.h"
#include "osd/OSDMap.h"
#include "MonitorStore.h"
#include "MonitorStore.h"
#include "msg/Message.h"
#include "msg/Messenger.h"
#include "msg/Message.h"
#include "msg/Messenger.h"
#include "messages/MPing.h"
#include "messages/MPingAck.h"
#include "messages/MGenericMessage.h"
#include "messages/MMonCommand.h"
#include "messages/MMonCommandAck.h"
#include "messages/MPing.h"
#include "messages/MPingAck.h"
#include "messages/MGenericMessage.h"
#include "messages/MMonCommand.h"
#include "messages/MMonCommandAck.h"
#include "messages/MMonPaxos.h"
#include "messages/MMonPaxos.h"
#include "common/Timer.h"
#include "common/Clock.h"
#include "common/Timer.h"
#include "common/Clock.h"
#include "OSDMonitor.h"
#include "MDSMonitor.h"
#include "ClientMonitor.h"
#include "OSDMonitor.h"
#include "MDSMonitor.h"
#include "ClientMonitor.h"
#include "config.h"
#undef dout
#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << whoami << (is_starting() ? (const char*)"(starting)":(is_leader() ? (const char*)"(leader)":(is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << " "
#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << whoami << (is_starting() ? (const char*)"(starting)":(is_leader() ? (const char*)"(leader)":(is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << " "
#include "config.h"
#undef dout
#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << whoami << (is_starting() ? (const char*)"(starting)":(is_leader() ? (const char*)"(leader)":(is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << " "
#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << whoami << (is_starting() ? (const char*)"(starting)":(is_leader() ? (const char*)"(leader)":(is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << " "
void Monitor::init()
{
lock.Lock();
dout(1) << "init" << endl;
// store
char s[80];
sprintf(s, "mondata/mon%d", whoami);
store = new MonitorStore(s);
void Monitor::init()
{
lock.Lock();
if (g_conf.mkfs)
store->mkfs();
dout(1) << "init" << endl;
store->mount();
// store
char s[80];
sprintf(s, "mondata/mon%d", whoami);
store = new MonitorStore(s);
// create
osdmon = new OSDMonitor(this, messenger, lock);
mdsmon = new MDSMonitor(this, messenger, lock);
clientmon = new ClientMonitor(this, messenger, lock);
if (g_conf.mkfs)
store->mkfs();
// i'm ready!
messenger->set_dispatcher(this);
// start ticker
reset_tick();
store->mount();
// call election?
if (monmap->num_mon > 1) {
assert(monmap->num_mon != 2);
call_election();
} else {
// we're standalone.
set<int> q;
q.insert(whoami);
win_election(q);
}
// create
osdmon = new OSDMonitor(this, &paxos_osdmap);
mdsmon = new MDSMonitor(this, &paxos_mdsmap);
clientmon = new ClientMonitor(this, &paxos_clientmap);
lock.Unlock();
}
// init paxos
paxos_test.init();
paxos_osdmap.init();
paxos_mdsmap.init();
paxos_clientmap.init();
void Monitor::shutdown()
{
dout(1) << "shutdown" << endl;
// i'm ready!
messenger->set_dispatcher(this);
// cancel all events
cancel_tick();
timer.cancel_all();
timer.join();
// stop osds.
for (set<int>::iterator it = osdmon->osdmap.get_osds().begin();
it != osdmon->osdmap.get_osds().end();
it++) {
if (osdmon->osdmap.is_down(*it)) continue;
dout(10) << "sending shutdown to osd" << *it << endl;
messenger->send_message(new MGenericMessage(MSG_SHUTDOWN),
osdmon->osdmap.get_inst(*it));
}
osdmon->mark_all_down();
// monitors too.
for (int i=0; i<monmap->num_mon; i++)
if (i != whoami)
messenger->send_message(new MGenericMessage(MSG_SHUTDOWN),
monmap->get_inst(i));
// start ticker
reset_tick();
// unmount my local storage
if (store)
delete store;
// clean up
if (monmap) delete monmap;
if (osdmon) delete osdmon;
if (mdsmon) delete mdsmon;
if (clientmon) delete clientmon;
// call election?
if (monmap->num_mon > 1) {
assert(monmap->num_mon != 2);
call_election();
} else {
// we're standalone.
set<int> q;
q.insert(whoami);
win_election(1, q);
}
// die.
messenger->shutdown();
delete messenger;
}
lock.Unlock();
}
void Monitor::shutdown()
{
dout(1) << "shutdown" << endl;
elector.shutdown();
if (is_leader()) {
// stop osds.
for (set<int>::iterator it = osdmon->osdmap.get_osds().begin();
it != osdmon->osdmap.get_osds().end();
it++) {
if (osdmon->osdmap.is_down(*it)) continue;
dout(10) << "sending shutdown to osd" << *it << endl;
messenger->send_message(new MGenericMessage(MSG_SHUTDOWN),
osdmon->osdmap.get_inst(*it));
}
osdmon->mark_all_down();
// monitors too.
for (int i=0; i<monmap->num_mon; i++)
if (i != whoami)
messenger->send_message(new MGenericMessage(MSG_SHUTDOWN),
monmap->get_inst(i));
}
// cancel all events
cancel_tick();
timer.cancel_all();
timer.join();
// unmount my local storage
if (store)
delete store;
// clean up
if (osdmon) delete osdmon;
if (mdsmon) delete mdsmon;
if (clientmon) delete clientmon;
// die.
messenger->shutdown();
delete messenger;
}
void Monitor::call_election()
{
if (monmap->num_mon == 1) return;
void Monitor::call_election()
{
if (monmap->num_mon == 1) return;
dout(10) << "call_election" << endl;
state = STATE_STARTING;
dout(10) << "call_election" << endl;
state = STATE_STARTING;
elector.start();
// tell paxos
paxos_test.election_starting();
paxos_mdsmap.election_starting();
paxos_osdmap.election_starting();
paxos_clientmap.election_starting();
osdmon->election_starting();
//mdsmon->election_starting();
}
// call a new election
elector.call_election();
}
void Monitor::win_election(set<int>& active)
{
state = STATE_LEADER;
leader = whoami;
quorum = active;
dout(10) << "win_election, quorum is " << quorum << endl;
void Monitor::win_election(epoch_t epoch, set<int>& active)
{
state = STATE_LEADER;
leader = whoami;
mon_epoch = epoch;
quorum = active;
dout(10) << "win_election, epoch " << mon_epoch << " quorum is " << quorum << endl;
// init
osdmon->election_finished();
mdsmon->election_finished();
// init paxos
paxos_test.leader_init();
paxos_mdsmap.leader_init();
paxos_osdmap.leader_init();
paxos_clientmap.leader_init();
// init paxos
test_paxos.leader_start();
}
// init
osdmon->election_finished();
mdsmon->election_finished();
clientmon->election_finished();
}
void Monitor::lose_election(int l)
{
state = STATE_PEON;
leader = l;
dout(10) << "lose_election, leader is mon" << leader << endl;
}
void Monitor::lose_election(epoch_t epoch, int l)
{
state = STATE_PEON;
mon_epoch = epoch;
leader = l;
dout(10) << "lose_election, epoch " << mon_epoch << " leader is mon" << leader << endl;
// init paxos
paxos_test.peon_init();
paxos_mdsmap.peon_init();
paxos_osdmap.peon_init();
paxos_clientmap.peon_init();
// init
osdmon->election_finished();
mdsmon->election_finished();
clientmon->election_finished();
}
void Monitor::handle_command(MMonCommand *m)
{
dout(0) << "handle_command " << *m << endl;
int r = -1;
string rs = "unrecognized command";
void Monitor::handle_command(MMonCommand *m)
{
dout(0) << "handle_command " << *m << endl;
if (!m->cmd.empty()) {
if (m->cmd[0] == "stop") {
r = 0;
rs = "stopping";
do_stop();
}
else if (m->cmd[0] == "mds") {
mdsmon->handle_command(m, r, rs);
}
else if (m->cmd[0] == "osd") {
int r = -1;
string rs = "unrecognized command";
}
}
if (!m->cmd.empty()) {
if (m->cmd[0] == "stop") {
r = 0;
rs = "stopping";
do_stop();
}
else if (m->cmd[0] == "mds") {
mdsmon->dispatch(m);
return;
}
else if (m->cmd[0] == "osd") {
// reply
messenger->send_message(new MMonCommandAck(r, rs), m->get_source_inst());
delete m;
}
}
}
// reply
messenger->send_message(new MMonCommandAck(r, rs), m->get_source_inst());
delete m;
}
void Monitor::do_stop()
{
dout(0) << "do_stop -- shutting down" << endl;
mdsmon->do_stop();
}
void Monitor::do_stop()
{
dout(0) << "do_stop -- shutting down" << endl;
mdsmon->do_stop();
}
void Monitor::dispatch(Message *m)
{
lock.Lock();
{
switch (m->get_type()) {
void Monitor::dispatch(Message *m)
{
lock.Lock();
{
switch (m->get_type()) {
// misc
case MSG_PING_ACK:
handle_ping_ack((MPingAck*)m);
break;
// misc
case MSG_PING_ACK:
handle_ping_ack((MPingAck*)m);
break;
case MSG_SHUTDOWN:
assert(m->get_source().is_osd());
osdmon->dispatch(m);
break;
case MSG_SHUTDOWN:
if (m->get_source().is_osd())
osdmon->dispatch(m);
else
handle_shutdown(m);
break;
case MSG_MON_COMMAND:
handle_command((MMonCommand*)m);
@ -233,12 +266,6 @@ void Monitor::dispatch(Message *m)
case MSG_MDS_BEACON:
case MSG_MDS_GETMAP:
mdsmon->dispatch(m);
// hackish: did all mds's shut down?
if (g_conf.mon_stop_with_last_mds &&
mdsmon->mdsmap.get_num_up_or_failed_mds() == 0)
shutdown();
break;
// clients
@ -250,23 +277,39 @@ void Monitor::dispatch(Message *m)
// paxos
case MSG_MON_PAXOS:
// send it to the right paxos instance
switch (((MMonPaxos*)m)->machine_id) {
case PAXOS_TEST:
test_paxos.dispatch(m);
break;
case PAXOS_OSDMAP:
//...
default:
assert(0);
{
MMonPaxos *pm = (MMonPaxos*)m;
// sanitize
if (pm->epoch > mon_epoch)
call_election();
if (pm->epoch != mon_epoch) {
delete pm;
break;
}
// send it to the right paxos instance
switch (pm->machine_id) {
case PAXOS_TEST:
paxos_test.dispatch(m);
break;
case PAXOS_OSDMAP:
paxos_osdmap.dispatch(m);
break;
case PAXOS_MDSMAP:
paxos_mdsmap.dispatch(m);
break;
case PAXOS_CLIENTMAP:
paxos_clientmap.dispatch(m);
break;
default:
assert(0);
}
}
break;
// elector messages
case MSG_MON_ELECTION_PROPOSE:
case MSG_MON_ELECTION_ACK:
case MSG_MON_ELECTION_VICTORY:
case MSG_MON_ELECTION:
elector.dispatch(m);
break;
@ -282,9 +325,13 @@ void Monitor::dispatch(Message *m)
void Monitor::handle_shutdown(Message *m)
{
dout(1) << "shutdown from " << m->get_source() << endl;
shutdown();
assert(m->get_source().is_mon());
if (m->get_source().num() == get_leader()) {
dout(1) << "shutdown from leader " << m->get_source() << endl;
shutdown();
} else {
dout(1) << "ignoring shutdown from non-leader " << m->get_source() << endl;
}
delete m;
}

View File

@ -31,13 +31,9 @@ class OSDMonitor;
class MDSMonitor;
class ClientMonitor;
#define PAXOS_TEST 0
#define PAXOS_OSDMAP 1
#define PAXOS_MDSMAP 2
#define PAXOS_CLIENTMAP 3
class Monitor : public Dispatcher {
protected:
public:
// me
int whoami;
Messenger *messenger;
@ -52,63 +48,65 @@ protected:
void reset_tick();
friend class C_Mon_Tick;
// my local store
//ObjectStore *store;
// -- local storage --
public:
MonitorStore *store;
const static int INO_ELECTOR = 1;
const static int INO_MON_MAP = 2;
const static int INO_OSD_MAP = 10;
const static int INO_OSD_INC_MAP = 11;
const static int INO_MDS_MAP = 20;
// elector
Elector elector;
friend class Elector;
epoch_t mon_epoch; // monitor epoch (election instance)
set<int> quorum; // current active set of monitors (if !starting)
//void call_election();
// paxos
Paxos test_paxos;
friend class Paxos;
// monitor state
// -- monitor state --
private:
const static int STATE_STARTING = 0; // electing
const static int STATE_LEADER = 1;
const static int STATE_PEON = 2;
int state;
int leader; // current leader (to best of knowledge)
utime_t last_called_election; // [starting] last time i called an election
public:
bool is_starting() { return state == STATE_STARTING; }
bool is_leader() { return state == STATE_LEADER; }
bool is_peon() { return state == STATE_PEON; }
// my public services
// -- elector --
private:
Elector elector;
friend class Elector;
epoch_t mon_epoch; // monitor epoch (election instance)
int leader; // current leader (to best of knowledge)
set<int> quorum; // current active set of monitors (if !starting)
utime_t last_called_election; // [starting] last time i called an election
public:
epoch_t get_epoch() { return mon_epoch; }
int get_leader() { return leader; }
const set<int>& get_quorum() { return quorum; }
void call_election(); // initiate election
void win_election(epoch_t epoch, set<int>& q); // end election (called by Elector)
void lose_election(epoch_t epoch, int l); // end election (called by Elector)
// -- paxos --
Paxos paxos_test;
Paxos paxos_mdsmap;
Paxos paxos_osdmap;
Paxos paxos_clientmap;
friend class Paxos;
// -- services --
OSDMonitor *osdmon;
MDSMonitor *mdsmon;
ClientMonitor *clientmon;
// messages
void handle_shutdown(Message *m);
void handle_ping_ack(class MPingAck *m);
void handle_command(class MMonCommand *m);
friend class OSDMonitor;
friend class MDSMonitor;
friend class ClientMonitor;
// initiate election
void call_election();
// end election (called by Elector)
void win_election(set<int>& q);
void lose_election(int l);
// messages
void handle_shutdown(Message *m);
void handle_ping_ack(class MPingAck *m);
void handle_command(class MMonCommand *m);
@ -119,18 +117,22 @@ protected:
monmap(mm),
timer(lock), tick_timer(0),
store(0),
elector(this, w),
mon_epoch(0),
test_paxos(this, w, PAXOS_TEST, "tester"), // tester state machine
state(STATE_STARTING),
elector(this, w),
mon_epoch(0),
leader(0),
paxos_test(this, w, PAXOS_TEST),
paxos_mdsmap(this, w, PAXOS_MDSMAP),
paxos_osdmap(this, w, PAXOS_OSDMAP),
paxos_clientmap(this, w, PAXOS_CLIENTMAP),
osdmon(0), mdsmon(0), clientmon(0)
{
}
void init();
void shutdown();
void dispatch(Message *m);

View File

@ -134,6 +134,7 @@ bool MonitorStore::exists_bl_ss(const char *a, const char *b)
struct stat st;
int r = ::stat(fn, &st);
//dout(15) << "exists_bl stat " << fn << " r=" << r << " errno " << errno << " " << strerror(errno) << endl;
return r == 0;
}

File diff suppressed because it is too large Load Diff

View File

@ -25,85 +25,96 @@ using namespace std;
#include "osd/OSDMap.h"
#include "PaxosService.h"
class Monitor;
class MOSDBoot;
class OSDMonitor : public Dispatcher {
Monitor *mon;
Messenger *messenger;
Mutex &lock;
// osd maps
class OSDMonitor : public PaxosService {
public:
OSDMap osdmap;
private:
map<entity_name_t, pair<entity_inst_t, epoch_t> > awaiting_map;
void create_initial();
bool get_map_bl(epoch_t epoch, bufferlist &bl);
bool get_inc_map_bl(epoch_t epoch, bufferlist &bl);
void save_map();
void save_inc_map(OSDMap::Incremental &inc);
// [leader]
OSDMap::Incremental pending_inc;
map<int,utime_t> down_pending_out; // osd down -> out
set<int> pending_ack;
// svc
void create_initial();
bool update_from_paxos();
void create_pending(); // prepare a new pending
void encode_pending(bufferlist &bl);
// we are distributed
const static int STATE_INIT = 0; // startup
const static int STATE_SYNC = 1; // sync map copy (readonly)
const static int STATE_LOCK = 2; // [peon] map locked
const static int STATE_UPDATING = 3; // [leader] map locked, waiting for peon ack
void handle_query(Message *m);
bool preprocess_query(Message *m); // true if processed.
bool prepare_update(Message *m);
bool should_propose_now();
int state;
utime_t lease_expire; // when lease expires
// ...
bool get_map_bl(epoch_t epoch, bufferlist &bl);
bool get_inc_map_bl(epoch_t epoch, bufferlist &bl);
//void init();
// maps
void accept_pending(); // accept pending, new map.
void send_waiting(); // send current map to waiters.
void send_to_waiting(); // send current map to waiters.
void send_full(entity_inst_t dest);
void send_incremental(epoch_t since, entity_inst_t dest);
void bcast_latest_mds();
void bcast_latest_osd();
void update_map();
void handle_osd_boot(class MOSDBoot *m);
void handle_osd_in(class MOSDIn *m);
void handle_osd_out(class MOSDOut *m);
void handle_osd_failure(class MOSDFailure *m);
void bcast_full_osd();
void handle_osd_getmap(class MOSDGetMap *m);
void handle_info(class MMonOSDMapInfo*);
void handle_lease(class MMonOSDMapLease*);
void handle_lease_ack(class MMonOSDMapLeaseAck*);
void handle_update_prepare(class MMonOSDMapUpdatePrepare*);
void handle_update_ack(class MMonOSDMapUpdateAck*);
void handle_update_commit(class MMonOSDMapUpdateCommit*);
bool preprocess_failure(class MOSDFailure *m);
bool prepare_failure(class MOSDFailure *m);
void _reported_failure(MOSDFailure *m);
bool preprocess_boot(class MOSDBoot *m);
bool prepare_boot(class MOSDBoot *m);
void _booted(MOSDBoot *m);
class C_Booted : public Context {
OSDMonitor *cmon;
MOSDBoot *m;
public:
C_Booted(OSDMonitor *cm, MOSDBoot *m_) :
cmon(cm), m(m_) {}
void finish(int r) {
if (r >= 0)
cmon->_booted(m);
else
cmon->dispatch((Message*)m);
}
};
class C_Reported : public Context {
OSDMonitor *cmon;
MOSDFailure *m;
public:
C_Reported(OSDMonitor *cm, MOSDFailure *m_) :
cmon(cm), m(m_) {}
void finish(int r) {
if (r >= 0)
cmon->_reported_failure(m);
else
cmon->dispatch((Message*)m);
}
};
bool preprocess_in(class MOSDIn *m);
bool prepare_in(class MOSDIn *m);
bool preprocess_out(class MOSDOut *m);
bool prepare_out(class MOSDOut *m);
public:
OSDMonitor(Monitor *mn, Messenger *m, Mutex& l) :
mon(mn), messenger(m), lock(l),
state(STATE_SYNC) {
//init();
}
OSDMonitor(Monitor *mn, Paxos *p) :
PaxosService(mn, p) { }
void dispatch(Message *m);
void tick(); // check state, take actions
void election_starting(); // abort whatever.
void election_finished(); // reinitialize whatever.
void issue_leases();
void mark_all_down();
void send_latest(entity_inst_t i);
void send_latest(epoch_t since, entity_inst_t i);
void fake_osd_failure(int osd, bool down);
void fake_osdmap_update();

View File

@ -20,44 +20,79 @@
#include "config.h"
#undef dout
#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".paxos(" << machine_name << ") "
#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".paxos(" << machine_name << ") "
#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".paxos(" << machine_name << " " << get_statename(state) << " lc " << last_committed << ") "
#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".paxos(" << machine_name << " " << get_statename(state) << " lc " << last_committed << ") "
void Paxos::init()
{
// load paxos variables from stable storage
last_pn = mon->store->get_int(machine_name, "last_pn");
accepted_pn = mon->store->get_int(machine_name, "accepted_pn");
last_committed = mon->store->get_int(machine_name, "last_committed");
dout(10) << "init" << endl;
}
// ---------------------------------
// PHASE 1
// proposer
// leader
void Paxos::collect(version_t oldpn)
{
// we're recoverying, it seems!
state = STATE_RECOVERING;
assert(mon->is_leader());
// reset the number of lasts received
uncommitted_v = 0;
uncommitted_pn = 0;
uncommitted_value.clear();
// look for uncommitted value
if (mon->store->exists_bl_sn(machine_name, last_committed+1)) {
uncommitted_v = last_committed+1;
uncommitted_pn = accepted_pn;
mon->store->get_bl_sn(uncommitted_value, machine_name, last_committed+1);
dout(10) << "learned uncommitted " << (last_committed+1)
<< " (" << uncommitted_value.length() << " bytes) from myself"
<< endl;
}
// pick new pn
accepted_pn = get_new_proposal_number(MAX(accepted_pn, oldpn));
accepted_pn_from = last_committed;
num_last = 1;
old_accepted_pn = 0;
old_accepted_value.clear();
dout(10) << "collect with pn " << accepted_pn << endl;
// send collect
for (int i=0; i<mon->monmap->num_mon; ++i) {
if (i == whoami) continue;
for (set<int>::const_iterator p = mon->get_quorum().begin();
p != mon->get_quorum().end();
++p) {
if (*p == whoami) continue;
MMonPaxos *collect = new MMonPaxos(MMonPaxos::OP_COLLECT, machine_id);
MMonPaxos *collect = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COLLECT, machine_id);
collect->last_committed = last_committed;
collect->pn = accepted_pn;
mon->messenger->send_message(collect, mon->monmap->get_inst(i));
mon->messenger->send_message(collect, mon->monmap->get_inst(*p));
}
}
// peon
void Paxos::handle_collect(MMonPaxos *collect)
{
dout(10) << "handle_collect " << *collect << endl;
assert(mon->is_peon()); // mon epoch filter should catch strays
// we're recoverying, it seems!
state = STATE_RECOVERING;
// reply
MMonPaxos *last = new MMonPaxos(MMonPaxos::OP_LAST, machine_id);
MMonPaxos *last = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LAST, machine_id);
last->last_committed = last_committed;
// do we have an accepted but uncommitted value?
@ -66,9 +101,10 @@ void Paxos::handle_collect(MMonPaxos *collect)
if (mon->store->exists_bl_sn(machine_name, last_committed+1)) {
mon->store->get_bl_sn(bl, machine_name, last_committed+1);
assert(bl.length() > 0);
dout(10) << "sharing our accepted but uncommitted value for " << last_committed+1 << endl;
dout(10) << " sharing our accepted but uncommitted value for " << last_committed+1
<< " (" << bl.length() << " bytes)" << endl;
last->values[last_committed+1] = bl;
last->old_accepted_pn = accepted_pn;
last->uncommitted_pn = accepted_pn;
}
// can we accept this pn?
@ -77,6 +113,7 @@ void Paxos::handle_collect(MMonPaxos *collect)
accepted_pn = collect->pn;
accepted_pn_from = collect->pn_from;
dout(10) << "accepting pn " << accepted_pn << " from " << accepted_pn_from << endl;
mon->store->put_int(accepted_pn, machine_name, "accepted_pn");
} else {
// don't accept!
dout(10) << "NOT accepting pn " << collect->pn << " from " << collect->pn_from
@ -87,13 +124,13 @@ void Paxos::handle_collect(MMonPaxos *collect)
last->pn_from = accepted_pn_from;
// and share whatever data we have
for (version_t v = collect->last_committed;
for (version_t v = collect->last_committed+1;
v <= last_committed;
v++) {
if (mon->store->exists_bl_sn(machine_name, v)) {
mon->store->get_bl_sn(last->values[v], machine_name, v);
dout(10) << " sharing " << v << " "
<< last->values[v].length() << " bytes" << endl;
dout(10) << " sharing " << v << " ("
<< last->values[v].length() << " bytes)" << endl;
}
}
@ -103,28 +140,36 @@ void Paxos::handle_collect(MMonPaxos *collect)
}
// leader
void Paxos::handle_last(MMonPaxos *last)
{
dout(10) << "handle_last " << *last << endl;
if (!mon->is_leader()) {
dout(10) << "not leader, dropping" << endl;
delete last;
return;
}
// share committed values?
if (last->last_committed < last_committed) {
// share committed values
dout(10) << "sending commit to " << last->get_source() << endl;
MMonPaxos *commit = new MMonPaxos(MMonPaxos::OP_COMMIT, machine_id);
for (version_t v = last->last_committed;
MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT, machine_id);
for (version_t v = last->last_committed+1;
v <= last_committed;
v++) {
mon->store->get_bl_sn(commit->values[v], machine_name, v);
dout(10) << "sharing " << v << " "
<< commit->values[v].length() << " bytes" << endl;
dout(10) << " sharing " << v << " ("
<< commit->values[v].length() << " bytes)" << endl;
}
commit->last_committed = last_committed;
mon->messenger->send_message(commit, last->get_source_inst());
}
// did we receive committed value?
// did we receive a committed value?
if (last->last_committed > last_committed) {
for (version_t v = last_committed;
for (version_t v = last_committed+1;
v <= last->last_committed;
v++) {
mon->store->put_bl_sn(last->values[v], machine_name, v);
@ -132,37 +177,53 @@ void Paxos::handle_last(MMonPaxos *last)
<< last->values[v].length() << " bytes" << endl;
}
last_committed = last->last_committed;
mon->store->put_int(last_committed, machine_name, "last_commtted");
mon->store->put_int(last_committed, machine_name, "last_committed");
dout(10) << "last_committed now " << last_committed << endl;
}
// do they accept your pn?
if (last->old_accepted_pn > accepted_pn) {
dout(10) << "uh oh, they have a higher pn than us. pick a new one." << endl;
collect(last->old_accepted_pn);
if (last->pn > accepted_pn) {
// no, try again.
dout(10) << " they had a higher pn than us, picking a new one." << endl;
collect(last->pn);
} else {
// they accepted our pn. great.
// yes, they accepted our pn. great.
num_last++;
dout(10) << "great, they accepted our pn, we now have " << num_last << endl;
dout(10) << " they accepted our pn, we now have "
<< num_last << " peons" << endl;
// did this person send back an accepted but uncommitted value?
if (last->old_accepted_pn &&
last->old_accepted_pn > old_accepted_pn) {
version_t v = last->last_committed+1;
dout(10) << "we learned an old value for " << v << " pn " << last->old_accepted_pn;
old_accepted_pn = last->old_accepted_pn;
old_accepted_value = last->values[v];
if (last->uncommitted_pn &&
last->uncommitted_pn > uncommitted_pn) {
uncommitted_v = last->last_committed+1;
uncommitted_pn = last->uncommitted_pn;
uncommitted_value = last->values[uncommitted_v];
dout(10) << "we learned an uncommitted value for " << uncommitted_v
<< " pn " << uncommitted_pn
<< " " << uncommitted_value.length() << " bytes"
<< endl;
}
// do we have a majority?
if (num_last == mon->monmap->num_mon/2+1) {
// do this once.
// is that everyone?
if (num_last == mon->get_quorum().size()) {
// almost...
state = STATE_ACTIVE;
// did we learn an old value?
if (old_accepted_value.length()) {
dout(10) << "begin on old learned value" << endl;
begin(old_accepted_value);
}
if (uncommitted_v == last_committed+1 &&
uncommitted_value.length()) {
dout(10) << "that's everyone. begin on old learned value" << endl;
begin(uncommitted_value);
} else {
// active!
dout(10) << "that's everyone. active!" << endl;
extend_lease();
// wake people up
finish_contexts(waiting_for_active);
finish_contexts(waiting_for_readable);
finish_contexts(waiting_for_writeable);
}
}
}
@ -170,54 +231,86 @@ void Paxos::handle_last(MMonPaxos *last)
}
// leader
void Paxos::begin(bufferlist& v)
{
dout(10) << "begin for " << last_committed+1 << " "
<< new_value.length() << " bytes"
<< v.length() << " bytes"
<< endl;
// we must already have a majority for this to work.
assert(num_last > mon->monmap->num_mon/2);
assert(mon->is_leader());
assert(is_active());
state = STATE_UPDATING;
// we must already have a majority for this to work.
assert(mon->get_quorum().size() == 1 ||
num_last > (unsigned)mon->monmap->num_mon/2);
// and no value, yet.
assert(new_value.length() == 0);
// accept it ourselves
num_accepted = 1;
accepted.clear();
accepted.insert(whoami);
new_value = v;
mon->store->put_bl_sn(new_value, machine_name, last_committed+1);
// ask others to accept it to!
for (int i=0; i<mon->monmap->num_mon; ++i) {
if (i == whoami) continue;
if (mon->get_quorum().size() == 1) {
// we're alone, take it easy
commit();
state = STATE_ACTIVE;
finish_contexts(waiting_for_active);
finish_contexts(waiting_for_commit);
finish_contexts(waiting_for_readable);
finish_contexts(waiting_for_writeable);
return;
}
dout(10) << " sending begin to mon" << i << endl;
MMonPaxos *begin = new MMonPaxos(MMonPaxos::OP_BEGIN, machine_id);
// ask others to accept it to!
for (set<int>::const_iterator p = mon->get_quorum().begin();
p != mon->get_quorum().end();
++p) {
if (*p == whoami) continue;
dout(10) << " sending begin to mon" << *p << endl;
MMonPaxos *begin = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_BEGIN, machine_id);
begin->values[last_committed+1] = new_value;
begin->last_committed = last_committed;
begin->pn = accepted_pn;
mon->messenger->send_message(begin, mon->monmap->get_inst(i));
mon->messenger->send_message(begin, mon->monmap->get_inst(*p));
}
// set timeout event
accept_timeout_event = new C_AcceptTimeout(this);
mon->timer.add_event_after(g_conf.mon_accept_timeout, accept_timeout_event);
}
// peon
void Paxos::handle_begin(MMonPaxos *begin)
{
dout(10) << "handle_begin " << *begin << endl;
// can we accept this?
if (begin->pn != accepted_pn) {
if (begin->pn < accepted_pn) {
dout(10) << " we accepted a higher pn " << accepted_pn << ", ignoring" << endl;
delete begin;
return;
}
assert(begin->pn == accepted_pn);
assert(begin->last_committed == last_committed);
// set state.
state = STATE_UPDATING;
lease_expire = utime_t(); // cancel lease
// yes.
version_t v = last_committed+1;
dout(10) << "accepting value for " << v << " pn " << accepted_pn << endl;
mon->store->put_bl_sn(begin->values[v], machine_name, v);
// reply
MMonPaxos *accept = new MMonPaxos(MMonPaxos::OP_ACCEPT, machine_id);
MMonPaxos *accept = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_ACCEPT, machine_id);
accept->pn = accepted_pn;
accept->last_committed = last_committed;
mon->messenger->send_message(accept, begin->get_source_inst());
@ -225,33 +318,67 @@ void Paxos::handle_begin(MMonPaxos *begin)
delete begin;
}
// leader
void Paxos::handle_accept(MMonPaxos *accept)
{
dout(10) << "handle_accept " << *accept << endl;
int from = accept->get_source().num();
if (accept->pn != accepted_pn) {
// we accepted a higher pn, from some other leader
dout(10) << " we accepted a higher pn " << accepted_pn << ", ignoring" << endl;
delete accept;
return;
}
if (accept->last_committed != last_committed) {
dout(10) << " this is from an old round that's already committed, ignoring" << endl;
if (last_committed > 0 &&
accept->last_committed < last_committed-1) {
dout(10) << " this is from an old round, ignoring" << endl;
delete accept;
return;
}
assert(accept->last_committed == last_committed || // not committed
accept->last_committed == last_committed-1); // committed
num_accepted++;
dout(10) << "now " << num_accepted << " have accepted" << endl;
assert(state == STATE_UPDATING);
assert(accepted.count(from) == 0);
accepted.insert(from);
dout(10) << " now " << accepted << " have accepted" << endl;
// new majority?
if (num_accepted == mon->monmap->num_mon/2+1) {
if (accepted.size() == (unsigned)mon->monmap->num_mon/2+1) {
// yay, commit!
dout(10) << "we got a majority, committing too" << endl;
// note: this may happen before the lease is reextended (below)
dout(10) << " got majority, committing" << endl;
commit();
}
}
// done?
if (accepted == mon->get_quorum()) {
dout(10) << " got quorum, done with update" << endl;
// cancel timeout event
mon->timer.cancel_event(accept_timeout_event);
accept_timeout_event = 0;
// yay!
state = STATE_ACTIVE;
extend_lease();
// wake people up
finish_contexts(waiting_for_active);
finish_contexts(waiting_for_commit);
finish_contexts(waiting_for_readable);
finish_contexts(waiting_for_writeable);
}
}
void Paxos::accept_timeout()
{
dout(5) << "accept timeout, calling fresh election" << endl;
accept_timeout_event = 0;
assert(mon->is_leader());
assert(is_updating());
cancel_events();
mon->call_election();
}
void Paxos::commit()
@ -263,20 +390,21 @@ void Paxos::commit()
mon->store->put_int(last_committed, machine_name, "last_committed");
// tell everyone
for (int i=0; i<mon->monmap->num_mon; ++i) {
if (i == whoami) continue;
for (set<int>::const_iterator p = mon->get_quorum().begin();
p != mon->get_quorum().end();
++p) {
if (*p == whoami) continue;
dout(10) << " sending commit to mon" << i << endl;
MMonPaxos *commit = new MMonPaxos(MMonPaxos::OP_COMMIT, machine_id);
dout(10) << " sending commit to mon" << *p << endl;
MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT, machine_id);
commit->values[last_committed] = new_value;
commit->pn = accepted_pn;
mon->messenger->send_message(commit, mon->monmap->get_inst(i));
mon->messenger->send_message(commit, mon->monmap->get_inst(*p));
}
// get ready for a new round.
new_value.clear();
}
@ -284,14 +412,162 @@ void Paxos::handle_commit(MMonPaxos *commit)
{
dout(10) << "handle_commit on " << commit->last_committed << endl;
if (!mon->is_peon()) {
dout(10) << "not a peon, dropping" << endl;
assert(0);
delete commit;
return;
}
// commit locally.
last_committed = commit->last_committed;
mon->store->put_bl_sn(commit->values[last_committed], machine_name, last_committed);
for (map<version_t,bufferlist>::iterator p = commit->values.begin();
p != commit->values.end();
++p) {
assert(p->first == last_committed+1);
last_committed = p->first;
dout(10) << " storing " << last_committed << " (" << p->second.length() << " bytes)" << endl;
mon->store->put_bl_sn(p->second, machine_name, last_committed);
}
mon->store->put_int(last_committed, machine_name, "last_committed");
delete commit;
}
}
void Paxos::extend_lease()
{
assert(mon->is_leader());
assert(is_active());
lease_expire = g_clock.now();
lease_expire += g_conf.mon_lease;
acked_lease.clear();
acked_lease.insert(whoami);
dout(7) << "extend_lease now+" << g_conf.mon_lease << " (" << lease_expire << ")" << endl;
// bcast
for (set<int>::const_iterator p = mon->get_quorum().begin();
p != mon->get_quorum().end();
++p) {
if (*p == whoami) continue;
MMonPaxos *lease = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE, machine_id);
lease->last_committed = last_committed;
lease->lease_expire = lease_expire;
mon->messenger->send_message(lease, mon->monmap->get_inst(*p));
}
// set timeout event.
// if old timeout is still in place, leave it.
if (!lease_ack_timeout_event) {
lease_ack_timeout_event = new C_LeaseAckTimeout(this);
mon->timer.add_event_after(g_conf.mon_lease_ack_timeout, lease_ack_timeout_event);
}
// set renew event
lease_renew_event = new C_LeaseRenew(this);
utime_t at = lease_expire;
at -= g_conf.mon_lease;
at += g_conf.mon_lease_renew_interval;
mon->timer.add_event_at(at, lease_renew_event);
}
// peon
void Paxos::handle_lease(MMonPaxos *lease)
{
// sanity
if (!mon->is_peon() ||
last_committed != lease->last_committed) {
dout(10) << "handle_lease i'm not a peon, or they're not the leader, or the last_committed doesn't match, dropping" << endl;
delete lease;
return;
}
// extend lease
if (lease_expire < lease->lease_expire)
lease_expire = lease->lease_expire;
state = STATE_ACTIVE;
dout(10) << "handle_lease on " << lease->last_committed
<< " now " << lease_expire << endl;
// ack
MMonPaxos *ack = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE_ACK, machine_id);
ack->last_committed = last_committed;
ack->lease_expire = lease_expire;
mon->messenger->send_message(ack, lease->get_source_inst());
// (re)set timeout event.
if (lease_timeout_event)
mon->timer.cancel_event(lease_timeout_event);
lease_timeout_event = new C_LeaseTimeout(this);
mon->timer.add_event_after(g_conf.mon_lease_ack_timeout, lease_timeout_event);
// kick waiters
finish_contexts(waiting_for_active);
if (is_readable())
finish_contexts(waiting_for_readable);
delete lease;
}
void Paxos::handle_lease_ack(MMonPaxos *ack)
{
int from = ack->get_source().num();
if (!lease_ack_timeout_event) {
dout(10) << "handle_lease_ack from " << ack->get_source() << " -- stray (probably since revoked)" << endl;
}
else if (acked_lease.count(from) == 0) {
acked_lease.insert(from);
if (acked_lease == mon->get_quorum()) {
// yay!
dout(10) << "handle_lease_ack from " << ack->get_source()
<< " -- got everyone" << endl;
mon->timer.cancel_event(lease_ack_timeout_event);
lease_ack_timeout_event = 0;
} else {
dout(10) << "handle_lease_ack from " << ack->get_source()
<< " -- still need "
<< mon->get_quorum().size() - acked_lease.size()
<< " more" << endl;
}
} else {
dout(10) << "handle_lease_ack from " << ack->get_source()
<< " dup (lagging!), ignoring" << endl;
}
delete ack;
}
void Paxos::lease_ack_timeout()
{
dout(5) << "lease_ack_timeout -- calling new election" << endl;
assert(mon->is_leader());
assert(is_active());
lease_ack_timeout_event = 0;
cancel_events();
mon->call_election();
}
void Paxos::lease_timeout()
{
dout(5) << "lease_timeout -- calling new election" << endl;
assert(mon->is_peon());
lease_timeout_event = 0;
cancel_events();
mon->call_election();
}
void Paxos::lease_renew_timeout()
{
lease_renew_event = 0;
extend_lease();
}
/*
@ -299,37 +575,93 @@ void Paxos::handle_commit(MMonPaxos *commit)
*/
version_t Paxos::get_new_proposal_number(version_t gt)
{
// read last
version_t last = mon->store->get_int("last_paxos_proposal");
if (last < gt)
last = gt;
if (last_pn < gt)
last_pn = gt;
// update
last /= 100;
last++;
// make it unique among all monitors.
version_t pn = last*100 + (version_t)whoami;
// update. make it unique among all monitors.
last_pn /= 100;
last_pn++;
last_pn *= 100;
last_pn += (version_t)whoami;
// write
mon->store->put_int(pn, "last_paxos_proposal");
mon->store->put_int(last_pn, machine_name, "last_pn");
dout(10) << "get_new_proposal_number = " << pn << endl;
return pn;
dout(10) << "get_new_proposal_number = " << last_pn << endl;
return last_pn;
}
void Paxos::leader_start()
void Paxos::cancel_events()
{
dout(10) << "leader_start -- i am the leader, start paxos" << endl;
if (accept_timeout_event) {
mon->timer.cancel_event(accept_timeout_event);
accept_timeout_event = 0;
}
if (lease_renew_event) {
mon->timer.cancel_event(lease_renew_event);
lease_renew_event = 0;
}
if (lease_ack_timeout_event) {
mon->timer.cancel_event(lease_ack_timeout_event);
lease_ack_timeout_event = 0;
}
if (lease_timeout_event) {
mon->timer.cancel_event(lease_timeout_event);
lease_timeout_event = 0;
}
}
void Paxos::leader_init()
{
if (mon->get_quorum().size() == 1) {
state = STATE_ACTIVE;
return;
}
cancel_events();
state = STATE_RECOVERING;
lease_expire = utime_t();
dout(10) << "leader_init -- starting paxos recovery" << endl;
collect(0);
}
void Paxos::peon_init()
{
cancel_events();
state = STATE_RECOVERING;
lease_expire = utime_t();
dout(10) << "peon_init -- i am a peon" << endl;
// no chance to write now!
finish_contexts(waiting_for_writeable, -1);
finish_contexts(waiting_for_commit, -1);
}
void Paxos::election_starting()
{
dout(10) << "election_starting -- canceling timeouts" << endl;
cancel_events();
new_value.clear();
finish_contexts(waiting_for_commit, -1);
}
void Paxos::dispatch(Message *m)
{
// election in progress?
if (mon->is_starting()) {
dout(5) << "election in progress, dropping " << *m << endl;
delete m;
return;
}
// check sanity
assert(mon->is_leader() ||
(mon->is_peon() && m->get_source().num() == mon->get_leader()));
switch (m->get_type()) {
case MSG_MON_PAXOS:
{
MMonPaxos *pm = (MMonPaxos*)m;
@ -340,23 +672,24 @@ void Paxos::dispatch(Message *m)
case MMonPaxos::OP_COLLECT:
handle_collect(pm);
break;
case MMonPaxos::OP_LAST:
handle_last(pm);
break;
case MMonPaxos::OP_BEGIN:
handle_begin(pm);
break;
case MMonPaxos::OP_ACCEPT:
handle_accept(pm);
break;
case MMonPaxos::OP_COMMIT:
handle_commit(pm);
break;
case MMonPaxos::OP_LEASE:
handle_lease(pm);
break;
case MMonPaxos::OP_LEASE_ACK:
handle_lease_ack(pm);
break;
default:
assert(0);
}
@ -368,3 +701,84 @@ void Paxos::dispatch(Message *m)
}
}
// -----------------
// service interface
// -- READ --
bool Paxos::is_readable()
{
//dout(15) << "is_readable now=" << g_clock.now() << " lease_expire=" << lease_expire << endl;
return
(mon->is_peon() || mon->is_leader()) &&
is_active() &&
last_committed > 0 && // must have a value
(mon->get_quorum().size() == 1 || // alone, or
g_clock.now() < lease_expire); // have lease
}
bool Paxos::read(version_t v, bufferlist &bl)
{
if (!is_readable())
return false;
if (!mon->store->get_bl_sn(bl, machine_name, v))
return false;
return true;
}
version_t Paxos::read_current(bufferlist &bl)
{
if (!is_readable())
return 0;
if (read(last_committed, bl))
return last_committed;
return 0;
}
// -- WRITE --
bool Paxos::is_writeable()
{
if (mon->get_quorum().size() == 1) return true;
return
mon->is_leader() &&
is_active() &&
g_clock.now() < lease_expire;
}
bool Paxos::propose_new_value(bufferlist& bl, Context *oncommit)
{
/*
// writeable?
if (!is_writeable()) {
dout(5) << "propose_new_value " << last_committed+1 << " " << bl.length() << " bytes"
<< " -- not writeable" << endl;
if (oncommit) {
oncommit->finish(-1);
delete oncommit;
}
return false;
}
*/
assert(mon->is_leader() && is_active());
// cancel lease renewal and timeout events.
cancel_events();
// ok!
dout(5) << "propose_new_value " << last_committed+1 << " " << bl.length() << " bytes" << endl;
if (oncommit)
waiting_for_commit.push_back(oncommit);
begin(bl);
return true;
}

View File

@ -35,10 +35,22 @@ e 12v
*/
/*
* NOTE: This libary is based on the Paxos algorithm, but varies in a few key ways:
* 1- Only a single new value is generated at a time, simplifying the recovery logic.
* 2- Nodes track "committed" values, and share them generously (and trustingly)
* 3- A 'leasing' mechism is built-in, allowing nodes to determine when it is safe to
* "read" their copy of the last committed value.
*
* This provides a simple replication substrate that services can be built on top of.
*/
#ifndef __MON_PAXOS_H
#define __MON_PAXOS_H
#include "include/types.h"
#include "mon_types.h"
#include "include/buffer.h"
#include "msg/Message.h"
@ -49,6 +61,7 @@ e 12v
class Monitor;
class MMonPaxos;
// i am one state machine.
class Paxos {
Monitor *mon;
@ -58,40 +71,176 @@ class Paxos {
int machine_id;
const char *machine_name;
// phase 1
friend class PaxosService;
// LEADER+PEON
// -- generic state --
public:
const static int STATE_RECOVERING = 1; // leader|peon: recovering paxos state
const static int STATE_ACTIVE = 2; // leader|peon: idle. peon may or may not have valid lease
const static int STATE_UPDATING = 3; // leader|peon: updating to new value
const char *get_statename(int s) {
switch (s) {
case STATE_RECOVERING: return "recovering";
case STATE_ACTIVE: return "active";
case STATE_UPDATING: return "updating";
default: assert(0); return 0;
}
}
private:
int state;
public:
bool is_recovering() { return state == STATE_RECOVERING; }
bool is_active() { return state == STATE_ACTIVE; }
bool is_updating() { return state == STATE_UPDATING; }
private:
// recovery (phase 1)
version_t last_pn;
version_t last_committed;
version_t accepted_pn;
version_t accepted_pn_from;
// results from our last replies
int num_last;
version_t old_accepted_pn;
bufferlist old_accepted_value;
// phase 2
// active (phase 2)
utime_t lease_expire;
list<Context*> waiting_for_active;
list<Context*> waiting_for_readable;
// -- leader --
// recovery (paxos phase 1)
unsigned num_last;
version_t uncommitted_v;
version_t uncommitted_pn;
bufferlist uncommitted_value;
// active
set<int> acked_lease;
Context *lease_renew_event;
Context *lease_ack_timeout_event;
Context *lease_timeout_event;
// updating (paxos phase 2)
bufferlist new_value;
int num_accepted;
set<int> accepted;
Context *accept_timeout_event;
list<Context*> waiting_for_writeable;
list<Context*> waiting_for_commit;
class C_AcceptTimeout : public Context {
Paxos *paxos;
public:
C_AcceptTimeout(Paxos *p) : paxos(p) {}
void finish(int r) {
paxos->accept_timeout();
}
};
class C_LeaseAckTimeout : public Context {
Paxos *paxos;
public:
C_LeaseAckTimeout(Paxos *p) : paxos(p) {}
void finish(int r) {
paxos->lease_ack_timeout();
}
};
class C_LeaseTimeout : public Context {
Paxos *paxos;
public:
C_LeaseTimeout(Paxos *p) : paxos(p) {}
void finish(int r) {
paxos->lease_timeout();
}
};
class C_LeaseRenew : public Context {
Paxos *paxos;
public:
C_LeaseRenew(Paxos *p) : paxos(p) {}
void finish(int r) {
paxos->lease_renew_timeout();
}
};
void collect(version_t oldpn);
void handle_collect(MMonPaxos*);
void handle_last(MMonPaxos*);
void begin(bufferlist& value);
void handle_begin(MMonPaxos*);
void handle_accept(MMonPaxos*);
void accept_timeout();
void commit();
void handle_commit(MMonPaxos*);
void extend_lease();
void handle_lease(MMonPaxos*);
void handle_lease_ack(MMonPaxos*);
void lease_ack_timeout(); // on leader, if lease isn't acked by all peons
void lease_renew_timeout(); // on leader, to renew the lease
void lease_timeout(); // on peon, if lease isn't extended
void cancel_events();
version_t get_new_proposal_number(version_t gt=0);
public:
Paxos(Monitor *m, int w,
int mid,const char *mnm) : mon(m), whoami(w),
machine_id(mid), machine_name(mnm) {
}
int mid) : mon(m), whoami(w),
machine_id(mid),
machine_name(get_paxos_name(mid)),
state(STATE_RECOVERING),
lease_renew_event(0),
lease_ack_timeout_event(0),
lease_timeout_event(0),
accept_timeout_event(0) { }
void dispatch(Message *m);
void leader_start();
void init();
void election_starting();
void leader_init();
void peon_init();
// -- service interface --
void wait_for_active(Context *c) {
assert(!is_active());
waiting_for_active.push_back(c);
}
// read
version_t get_version() { return last_committed; }
bool is_readable();
bool read(version_t v, bufferlist &bl);
version_t read_current(bufferlist &bl);
void wait_for_readable(Context *onreadable) {
assert(!is_readable());
waiting_for_readable.push_back(onreadable);
}
// write
bool is_leader();
bool is_writeable();
void wait_for_writeable(Context *c) {
assert(!is_writeable());
waiting_for_writeable.push_back(c);
}
bool propose_new_value(bufferlist& bl, Context *oncommit=0);
void wait_for_commit(Context *oncommit) {
waiting_for_commit.push_back(oncommit);
}
void wait_for_commit_front(Context *oncommit) {
waiting_for_commit.push_front(oncommit);
}
};

View File

@ -0,0 +1,136 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#include "PaxosService.h"
#include "common/Clock.h"
#include "Monitor.h"
#include "config.h"
#undef dout
#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".paxosservice(" << get_paxos_name(paxos->machine_id) << ") "
//#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << "." << get_paxos_name(paxos->machine_id) << " "
void PaxosService::dispatch(Message *m)
{
dout(10) << "dispatch " << *m << " from " << m->get_source_inst() << endl;
// make sure our map is readable and up to date
if (!paxos->is_readable()) {
dout(10) << " waiting for paxos -> readable" << endl;
paxos->wait_for_readable(new C_RetryMessage(this, m));
return;
}
// make sure service has latest from paxos.
update_from_paxos();
// preprocess
if (preprocess_query(m))
return; // easy!
// leader?
if (!mon->is_leader()) {
// fw to leader
dout(10) << " fw to leader mon" << mon->get_leader() << endl;
mon->messenger->send_message(m, mon->monmap->get_inst(mon->get_leader()));
return;
}
// writeable?
if (!paxos->is_writeable()) {
dout(10) << " waiting for paxos -> writeable" << endl;
paxos->wait_for_writeable(new C_RetryMessage(this, m));
return;
}
// update
if (prepare_update(m) &&
should_propose_now())
propose_pending();
}
void PaxosService::_commit()
{
dout(7) << "_commit" << endl;
update_from_paxos(); // notify service of new paxos state
if (mon->is_leader()) {
dout(7) << "_commit creating new pending" << endl;
assert(have_pending == false);
create_pending();
have_pending = true;
}
}
void PaxosService::propose_pending()
{
dout(10) << "propose_pending" << endl;
assert(have_pending);
// finish and encode
bufferlist bl;
encode_pending(bl);
have_pending = false;
// apply to paxos
paxos->wait_for_commit_front(new C_Commit(this));
paxos->propose_new_value(bl);
}
void PaxosService::election_finished()
{
dout(10) << "election_finished" << endl;
if (have_pending &&
!mon->is_leader()) {
discard_pending();
have_pending = false;
}
// make sure we update our state
if (paxos->is_active())
_active();
else
paxos->wait_for_active(new C_Active(this));
}
void PaxosService::_active()
{
dout(10) << "_active" << endl;
assert(paxos->is_active());
// pull latest from paxos
update_from_paxos();
// create pending state?
if (mon->is_leader()) {
if (!have_pending) {
create_pending();
have_pending = true;
}
if (g_conf.mkfs &&
paxos->get_version() == 0) {
create_initial();
propose_pending();
}
}
}

View File

@ -0,0 +1,91 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#ifndef __PAXOSSERVICE_H
#define __PAXOSSERVICE_H
#include "msg/Dispatcher.h"
#include "include/Context.h"
class Monitor;
class Paxos;
class PaxosService : public Dispatcher {
protected:
Monitor *mon;
Paxos *paxos;
class C_RetryMessage : public Context {
PaxosService *svc;
Message *m;
public:
C_RetryMessage(PaxosService *s, Message *m_) : svc(s), m(m_) {}
void finish(int r) {
svc->dispatch(m);
}
};
class C_Active : public Context {
PaxosService *svc;
public:
C_Active(PaxosService *s) : svc(s) {}
void finish(int r) {
if (r >= 0)
svc->_active();
}
};
class C_Commit : public Context {
PaxosService *svc;
public:
C_Commit(PaxosService *s) : svc(s) {}
void finish(int r) {
if (r >= 0)
svc->_commit();
}
};
friend class C_Update;
private:
bool have_pending;
public:
PaxosService(Monitor *mn, Paxos *p) : mon(mn), paxos(p),
have_pending(false) { }
// i implement and you ignore
void dispatch(Message *m);
void election_finished();
private:
void _active();
void _commit();
public:
// i implement and you use
void propose_pending(); // propose current pending as new paxos state
// you implement
virtual bool update_from_paxos() = 0; // assimilate latest paxos state
virtual void create_pending() = 0; // [leader] create new pending structures
virtual void create_initial() = 0; // [leader] populate pending with initial state (1)
virtual void encode_pending(bufferlist& bl) = 0; // [leader] finish and encode pending for next paxos state
virtual void discard_pending() { } // [leader] discard pending
virtual bool preprocess_query(Message *m) = 0; // true if processed (e.g., read-only)
virtual bool prepare_update(Message *m) = 0;
virtual bool should_propose_now() { return true; }
};
#endif

View File

@ -0,0 +1,33 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#ifndef __MON_TYPES_H
#define __MON_TYPES_H
#define PAXOS_TEST 0
#define PAXOS_MDSMAP 1
#define PAXOS_OSDMAP 2
#define PAXOS_CLIENTMAP 3
inline const char *get_paxos_name(int p) {
switch (p) {
case PAXOS_TEST: return "test";
case PAXOS_MDSMAP: return "mdsmap";
case PAXOS_OSDMAP: return "osdmap";
case PAXOS_CLIENTMAP: return "clientmap";
default: assert(0); return 0;
}
}
#endif