msgr: simplify policy

We may be a server, and we may be lossy.  This gives us a few policies:

!server, !lossy = lossless_peer(), for bidirectional intracluster fun.
server, !lossy = lossless_server(), e.g. mds <-> client.
server, lossy = lossy_server(), e.g. mon and osd

also, the default is
!server, !lossy = client(), but that doesn't mean much.  The server
decides if the connection is lossy or not.  And !server just means we can
initiate the outgoing connection.
This commit is contained in:
Sage Weil 2009-10-12 22:45:50 -07:00
parent 78dcc39f41
commit 329f5c6490
10 changed files with 41 additions and 97 deletions

View File

@ -617,7 +617,6 @@ int main(int argc, const char **argv, const char *envp[])
messenger->add_dispatcher_head(&dispatcher);
rank.start();
rank.set_default_policy(SimpleMessenger::Policy::lossy_fail_after(1.0));
mc.set_messenger(messenger);
mc.init();

View File

@ -75,10 +75,6 @@ int main(int argc, const char **argv, const char *envp[]) {
rank.start();
rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fail_after(1.0));
rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless());
rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossy_fast_fail());
// start client
client->init();

View File

@ -78,10 +78,8 @@ int main(int argc, const char **argv)
if (!m)
return 1;
rank.set_default_policy(SimpleMessenger::Policy::stateful_server());
rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fail_after(1.0));
rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless());
rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossy_fast_fail());
rank.set_policy(entity_name_t::TYPE_CLIENT, SimpleMessenger::Policy::stateful_server());
rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless_peer());
rank.start();

View File

@ -149,9 +149,7 @@ int main(int argc, const char **argv)
rank.start(); // may daemonize
rank.set_default_policy(SimpleMessenger::Policy::stateless_server());
rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossless());
rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossy_fail_after(2.0));
rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossy_fail_after(2.0));
rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossless_peer());
mon->init();
rank.wait();

View File

@ -145,8 +145,8 @@ int main(int argc, const char **argv)
return 1;
rank.set_default_policy(SimpleMessenger::Policy::stateless_server());
rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fast_fail());
rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossless());
rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::client());
rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossless_peer());
rank.start();

View File

@ -58,10 +58,6 @@ int main(int argc, const char **argv, char *envp[])
SimpleMessenger rank;
cout << "starting csyn" << std::endl;
rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fail_after(2.0));
rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless());
rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossy_fast_fail());
list<Client*> clients;
list<SyntheticClient*> synclients;

View File

@ -54,9 +54,6 @@ extern "C" int ceph_initialize(int argc, const char **argv)
client = new Client(rank->register_entity(entity_name_t::CLIENT()), monclient);
rank->start();
rank->set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fast_fail());
rank->set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless());
rank->set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossy_fast_fail());
client->init();
}

View File

@ -290,11 +290,6 @@ bool RadosClient::init()
messenger->add_dispatcher_head(this);
rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fail_after(1.0));
rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless());
rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossy_fast_fail());
rank.set_policy(entity_name_t::TYPE_CLIENT, SimpleMessenger::Policy::lossless()); // mds does its own timeout/markdown
rank.start(1);
objecter = new Objecter(messenger, &monclient, &osdmap, lock);

View File

@ -470,7 +470,7 @@ ostream& SimpleMessenger::Pipe::_pipe_prefix() {
<< " sd=" << sd
<< " pgs=" << peer_global_seq
<< " cs=" << connect_seq
<< " ltx=" << policy.lossy_tx
<< " l=" << policy.lossy
<< ").";
}
@ -598,9 +598,9 @@ int SimpleMessenger::Pipe::accept()
// note peer's type, flags
peer_type = connect.host_type;
policy = rank->get_policy(connect.host_type);
dout(10) << "accept host_type " << connect.host_type
<< ", setting policy, lossy_tx=" << policy.lossy_tx << dendl;
lossy_rx = connect.flags & CEPH_MSG_CONNECT_LOSSY;
dout(10) << "accept of host_type " << connect.host_type
<< ", policy.lossy=" << policy.lossy
<< dendl;
memset(&reply, 0, sizeof(reply));
reply.protocol_version = get_proto_version(rank->my_type, peer_type, false);
@ -632,12 +632,13 @@ int SimpleMessenger::Pipe::accept()
<< " <= " << connect.global_seq << ", looks ok" << dendl;
}
if (existing->policy.lossy_tx) {
dout(-10) << "accept replacing existing (lossy) channel" << dendl;
if (existing->policy.lossy) {
dout(-10) << "accept replacing existing (lossy) channel (new one lossy="
<< policy.lossy << ")" << dendl;
existing->was_session_reset();
goto replace;
}
if (lossy_rx) {
/*if (lossy_rx) {
if (existing->state == STATE_STANDBY) {
dout(-10) << "accept incoming lossy connection, kicking outgoing lossless "
<< existing << dendl;
@ -649,7 +650,7 @@ int SimpleMessenger::Pipe::accept()
}
existing->lock.Unlock();
goto fail;
}
}*/
dout(-10) << "accept connect_seq " << connect.connect_seq
<< " vs existing " << existing->connect_seq
@ -762,7 +763,7 @@ int SimpleMessenger::Pipe::accept()
reply.global_seq = rank->get_global_seq();
reply.connect_seq = connect_seq;
reply.flags = 0;
if (policy.lossy_tx)
if (policy.lossy)
reply.flags = reply.flags | CEPH_MSG_CONNECT_LOSSY;
// ok!
@ -922,8 +923,8 @@ int SimpleMessenger::Pipe::connect()
connect.connect_seq = cseq;
connect.protocol_version = get_proto_version(rank->my_type, peer_type, true);
connect.flags = 0;
if (policy.lossy_tx)
connect.flags |= CEPH_MSG_CONNECT_LOSSY;
if (policy.lossy)
connect.flags |= CEPH_MSG_CONNECT_LOSSY; // this is fyi, actually, server decides!
memset(&msg, 0, sizeof(msg));
msgvec[0].iov_base = (char*)&connect;
msgvec[0].iov_len = sizeof(connect);
@ -995,12 +996,12 @@ int SimpleMessenger::Pipe::connect()
if (reply.tag == CEPH_MSGR_TAG_READY) {
// hooray!
peer_global_seq = reply.global_seq;
lossy_rx = reply.flags & CEPH_MSG_CONNECT_LOSSY;
policy.lossy = reply.flags & CEPH_MSG_CONNECT_LOSSY;
state = STATE_OPEN;
connect_seq = cseq + 1;
assert(connect_seq == reply.connect_seq);
first_fault = last_attempt = utime_t();
dout(20) << "connect success " << connect_seq << ", lossy_rx = " << lossy_rx << dendl;
dout(20) << "connect success " << connect_seq << ", lossy = " << policy.lossy << dendl;
if (!reader_running) {
dout(20) << "connect starting reader" << dendl;
@ -1102,7 +1103,7 @@ void SimpleMessenger::Pipe::fault(bool onconnect, bool onread)
}
// lossy channel?
if (policy.lossy_tx) {
if (policy.lossy) {
dout(10) << "fault on lossy channel, failing" << dendl;
was_session_reset();
fail();
@ -1117,7 +1118,7 @@ void SimpleMessenger::Pipe::fault(bool onconnect, bool onread)
dout(10) << "fault on connect, or already closing, and q empty: setting closed." << dendl;
state = STATE_CLOSED;
} else {
dout(0) << "fault nothing to send, going to standby" << dendl;
dout(0) << "fault with nothing to send, going to standby" << dendl;
state = STATE_STANDBY;
}
return;
@ -1125,30 +1126,29 @@ void SimpleMessenger::Pipe::fault(bool onconnect, bool onread)
utime_t now = g_clock.now();
if (state != STATE_CONNECTING) {
if (!onconnect) dout(0) << "fault initiating reconnect" << dendl;
if (!onconnect)
dout(0) << "fault initiating reconnect" << dendl;
connect_seq++;
state = STATE_CONNECTING;
first_fault = now;
} else if (first_fault.sec() == 0) {
if (!onconnect) dout(0) << "fault first fault" << dendl;
if (!onconnect)
dout(0) << "fault first fault" << dendl;
first_fault = now;
} else {
#warning clean me up
utime_t failinterval = now - first_fault;
utime_t retryinterval = now - last_attempt;
if (!onconnect) dout(10) << "fault failure was " << failinterval
<< " ago, last attempt was at " << last_attempt
<< ", " << retryinterval << " ago" << dendl;
if (policy.fail_interval > 0 && failinterval > policy.fail_interval) {
// give up
dout(0) << "fault giving up" << dendl;
fail();
} else if (retryinterval < policy.retry_interval) {
// wait
now += (policy.retry_interval - retryinterval);
dout(10) << "fault waiting until " << now << dendl;
cond.WaitUntil(lock, now);
dout(10) << "fault done waiting or woke up" << dendl;
}
// wait
now += 1.0;
dout(10) << "fault waiting until " << now << dendl;
cond.WaitUntil(lock, now);
dout(10) << "fault done waiting or woke up" << dendl;
}
last_attempt = now;
}
@ -1309,7 +1309,7 @@ void SimpleMessenger::Pipe::reader()
}
in_seq++;
if (!lossy_rx && in_seq != m->get_seq()) {
if (!policy.lossy && in_seq != m->get_seq()) {
dout(0) << "reader got bad seq " << m->get_seq() << " expected " << in_seq
<< " for " << *m << " from " << m->get_source() << dendl;
derr(0) << "reader got bad seq " << m->get_seq() << " expected " << in_seq

View File

@ -39,50 +39,16 @@ using namespace __gnu_cxx;
class SimpleMessenger {
public:
struct Policy {
bool lossy_tx; //
bool lossy;
bool server;
float retry_interval; // initial retry interval. 0 => fail immediately (lossy_tx=true)
float fail_interval; // before we call ms_handle_failure (lossy_tx=true)
bool drop_msg_callback;
bool fail_callback;
bool remote_reset_callback;
Policy() :
lossy_tx(false), server(false),
retry_interval(g_conf.ms_retry_interval),
fail_interval(g_conf.ms_fail_interval),
drop_msg_callback(true),
fail_callback(true),
remote_reset_callback(true) {}
Policy(bool tx, bool sr, float r, float f, bool dmc, bool fc, bool rrc) :
lossy_tx(tx), server(sr),
retry_interval(r), fail_interval(f),
drop_msg_callback(dmc),
fail_callback(fc),
remote_reset_callback(rrc) {}
Policy(bool l=false, bool s=false) :
lossy(l), server(s) {}
// new
static Policy stateful_server() { return Policy(false, true, g_conf.ms_retry_interval, 0,
true, true, true); }
static Policy stateless_server() { return Policy(true, true, -1, -1,
true, true, true); }
// old
static Policy lossless() { return Policy(false, false,
g_conf.ms_retry_interval, 0,
true, true, true); }
static Policy lossy_fail_after(float f) {
return Policy(true, false,
MIN(g_conf.ms_retry_interval, f), f,
true, true, true);
}
static Policy lossy_fast_fail() { return Policy(true, false, -1, -1, true, true, true); }
/*
static Policy fast_fail() { return Policy(-1, -1, true, true, true); }
static Policy fail_after(float f) { return Policy(MIN(g_conf.ms_retry_interval, f), f, true, true, true); }
static Policy retry_forever() { return Policy(g_conf.ms_retry_interval, -1, false, true, true); }
*/
static Policy stateful_server() { return Policy(false, true); }
static Policy stateless_server() { return Policy(true, true); }
static Policy lossless_peer() { return Policy(false, false); }
static Policy client() { return Policy(false, false); }
};
@ -130,7 +96,6 @@ private:
int peer_type;
entity_addr_t peer_addr;
Policy policy;
bool lossy_rx;
Mutex lock;
int state;