diff --git a/src/ceph.cc b/src/ceph.cc index 7cff7363ed8..753e3eaec75 100644 --- a/src/ceph.cc +++ b/src/ceph.cc @@ -617,7 +617,6 @@ int main(int argc, const char **argv, const char *envp[]) messenger->add_dispatcher_head(&dispatcher); rank.start(); - rank.set_default_policy(SimpleMessenger::Policy::lossy_fail_after(1.0)); mc.set_messenger(messenger); mc.init(); diff --git a/src/cfuse.cc b/src/cfuse.cc index 0d358990437..f367df9fec0 100644 --- a/src/cfuse.cc +++ b/src/cfuse.cc @@ -75,10 +75,6 @@ int main(int argc, const char **argv, const char *envp[]) { rank.start(); - rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fail_after(1.0)); - rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless()); - rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossy_fast_fail()); - // start client client->init(); diff --git a/src/cmds.cc b/src/cmds.cc index e572068ceef..6abd9a27b4d 100644 --- a/src/cmds.cc +++ b/src/cmds.cc @@ -78,10 +78,8 @@ int main(int argc, const char **argv) if (!m) return 1; - rank.set_default_policy(SimpleMessenger::Policy::stateful_server()); - rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fail_after(1.0)); - rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless()); - rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossy_fast_fail()); + rank.set_policy(entity_name_t::TYPE_CLIENT, SimpleMessenger::Policy::stateful_server()); + rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless_peer()); rank.start(); diff --git a/src/cmon.cc b/src/cmon.cc index e077f253145..ecde44f2e35 100644 --- a/src/cmon.cc +++ b/src/cmon.cc @@ -149,9 +149,7 @@ int main(int argc, const char **argv) rank.start(); // may daemonize rank.set_default_policy(SimpleMessenger::Policy::stateless_server()); - rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossless()); - rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossy_fail_after(2.0)); - rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossy_fail_after(2.0)); + rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossless_peer()); mon->init(); rank.wait(); diff --git a/src/cosd.cc b/src/cosd.cc index 8b0ba5d03d3..4336a676474 100644 --- a/src/cosd.cc +++ b/src/cosd.cc @@ -145,8 +145,8 @@ int main(int argc, const char **argv) return 1; rank.set_default_policy(SimpleMessenger::Policy::stateless_server()); - rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fast_fail()); - rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossless()); + rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::client()); + rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossless_peer()); rank.start(); diff --git a/src/csyn.cc b/src/csyn.cc index 715c3ec9e6e..4e5955d713b 100644 --- a/src/csyn.cc +++ b/src/csyn.cc @@ -58,10 +58,6 @@ int main(int argc, const char **argv, char *envp[]) SimpleMessenger rank; cout << "starting csyn" << std::endl; - rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fail_after(2.0)); - rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless()); - rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossy_fast_fail()); - list clients; list synclients; diff --git a/src/libceph.cc b/src/libceph.cc index 678c02563e4..ae6dc87a471 100644 --- a/src/libceph.cc +++ b/src/libceph.cc @@ -54,9 +54,6 @@ extern "C" int ceph_initialize(int argc, const char **argv) client = new Client(rank->register_entity(entity_name_t::CLIENT()), monclient); rank->start(); - rank->set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fast_fail()); - rank->set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless()); - rank->set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossy_fast_fail()); client->init(); } diff --git a/src/librados.cc b/src/librados.cc index d1af13dd247..097e3821e8d 100644 --- a/src/librados.cc +++ b/src/librados.cc @@ -290,11 +290,6 @@ bool RadosClient::init() messenger->add_dispatcher_head(this); - rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fail_after(1.0)); - rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless()); - rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossy_fast_fail()); - rank.set_policy(entity_name_t::TYPE_CLIENT, SimpleMessenger::Policy::lossless()); // mds does its own timeout/markdown - rank.start(1); objecter = new Objecter(messenger, &monclient, &osdmap, lock); diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index 028d63b0fbc..0e5e7a21c86 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -470,7 +470,7 @@ ostream& SimpleMessenger::Pipe::_pipe_prefix() { << " sd=" << sd << " pgs=" << peer_global_seq << " cs=" << connect_seq - << " ltx=" << policy.lossy_tx + << " l=" << policy.lossy << ")."; } @@ -598,9 +598,9 @@ int SimpleMessenger::Pipe::accept() // note peer's type, flags peer_type = connect.host_type; policy = rank->get_policy(connect.host_type); - dout(10) << "accept host_type " << connect.host_type - << ", setting policy, lossy_tx=" << policy.lossy_tx << dendl; - lossy_rx = connect.flags & CEPH_MSG_CONNECT_LOSSY; + dout(10) << "accept of host_type " << connect.host_type + << ", policy.lossy=" << policy.lossy + << dendl; memset(&reply, 0, sizeof(reply)); reply.protocol_version = get_proto_version(rank->my_type, peer_type, false); @@ -632,12 +632,13 @@ int SimpleMessenger::Pipe::accept() << " <= " << connect.global_seq << ", looks ok" << dendl; } - if (existing->policy.lossy_tx) { - dout(-10) << "accept replacing existing (lossy) channel" << dendl; + if (existing->policy.lossy) { + dout(-10) << "accept replacing existing (lossy) channel (new one lossy=" + << policy.lossy << ")" << dendl; existing->was_session_reset(); goto replace; } - if (lossy_rx) { + /*if (lossy_rx) { if (existing->state == STATE_STANDBY) { dout(-10) << "accept incoming lossy connection, kicking outgoing lossless " << existing << dendl; @@ -649,7 +650,7 @@ int SimpleMessenger::Pipe::accept() } existing->lock.Unlock(); goto fail; - } + }*/ dout(-10) << "accept connect_seq " << connect.connect_seq << " vs existing " << existing->connect_seq @@ -762,7 +763,7 @@ int SimpleMessenger::Pipe::accept() reply.global_seq = rank->get_global_seq(); reply.connect_seq = connect_seq; reply.flags = 0; - if (policy.lossy_tx) + if (policy.lossy) reply.flags = reply.flags | CEPH_MSG_CONNECT_LOSSY; // ok! @@ -922,8 +923,8 @@ int SimpleMessenger::Pipe::connect() connect.connect_seq = cseq; connect.protocol_version = get_proto_version(rank->my_type, peer_type, true); connect.flags = 0; - if (policy.lossy_tx) - connect.flags |= CEPH_MSG_CONNECT_LOSSY; + if (policy.lossy) + connect.flags |= CEPH_MSG_CONNECT_LOSSY; // this is fyi, actually, server decides! memset(&msg, 0, sizeof(msg)); msgvec[0].iov_base = (char*)&connect; msgvec[0].iov_len = sizeof(connect); @@ -995,12 +996,12 @@ int SimpleMessenger::Pipe::connect() if (reply.tag == CEPH_MSGR_TAG_READY) { // hooray! peer_global_seq = reply.global_seq; - lossy_rx = reply.flags & CEPH_MSG_CONNECT_LOSSY; + policy.lossy = reply.flags & CEPH_MSG_CONNECT_LOSSY; state = STATE_OPEN; connect_seq = cseq + 1; assert(connect_seq == reply.connect_seq); first_fault = last_attempt = utime_t(); - dout(20) << "connect success " << connect_seq << ", lossy_rx = " << lossy_rx << dendl; + dout(20) << "connect success " << connect_seq << ", lossy = " << policy.lossy << dendl; if (!reader_running) { dout(20) << "connect starting reader" << dendl; @@ -1102,7 +1103,7 @@ void SimpleMessenger::Pipe::fault(bool onconnect, bool onread) } // lossy channel? - if (policy.lossy_tx) { + if (policy.lossy) { dout(10) << "fault on lossy channel, failing" << dendl; was_session_reset(); fail(); @@ -1117,7 +1118,7 @@ void SimpleMessenger::Pipe::fault(bool onconnect, bool onread) dout(10) << "fault on connect, or already closing, and q empty: setting closed." << dendl; state = STATE_CLOSED; } else { - dout(0) << "fault nothing to send, going to standby" << dendl; + dout(0) << "fault with nothing to send, going to standby" << dendl; state = STATE_STANDBY; } return; @@ -1125,30 +1126,29 @@ void SimpleMessenger::Pipe::fault(bool onconnect, bool onread) utime_t now = g_clock.now(); if (state != STATE_CONNECTING) { - if (!onconnect) dout(0) << "fault initiating reconnect" << dendl; + if (!onconnect) + dout(0) << "fault initiating reconnect" << dendl; connect_seq++; state = STATE_CONNECTING; first_fault = now; } else if (first_fault.sec() == 0) { - if (!onconnect) dout(0) << "fault first fault" << dendl; + if (!onconnect) + dout(0) << "fault first fault" << dendl; first_fault = now; } else { + +#warning clean me up + utime_t failinterval = now - first_fault; utime_t retryinterval = now - last_attempt; if (!onconnect) dout(10) << "fault failure was " << failinterval << " ago, last attempt was at " << last_attempt << ", " << retryinterval << " ago" << dendl; - if (policy.fail_interval > 0 && failinterval > policy.fail_interval) { - // give up - dout(0) << "fault giving up" << dendl; - fail(); - } else if (retryinterval < policy.retry_interval) { - // wait - now += (policy.retry_interval - retryinterval); - dout(10) << "fault waiting until " << now << dendl; - cond.WaitUntil(lock, now); - dout(10) << "fault done waiting or woke up" << dendl; - } + // wait + now += 1.0; + dout(10) << "fault waiting until " << now << dendl; + cond.WaitUntil(lock, now); + dout(10) << "fault done waiting or woke up" << dendl; } last_attempt = now; } @@ -1309,7 +1309,7 @@ void SimpleMessenger::Pipe::reader() } in_seq++; - if (!lossy_rx && in_seq != m->get_seq()) { + if (!policy.lossy && in_seq != m->get_seq()) { dout(0) << "reader got bad seq " << m->get_seq() << " expected " << in_seq << " for " << *m << " from " << m->get_source() << dendl; derr(0) << "reader got bad seq " << m->get_seq() << " expected " << in_seq diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index fe599c105c5..230af5d55e2 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -39,50 +39,16 @@ using namespace __gnu_cxx; class SimpleMessenger { public: struct Policy { - bool lossy_tx; // + bool lossy; bool server; - float retry_interval; // initial retry interval. 0 => fail immediately (lossy_tx=true) - float fail_interval; // before we call ms_handle_failure (lossy_tx=true) - bool drop_msg_callback; - bool fail_callback; - bool remote_reset_callback; - Policy() : - lossy_tx(false), server(false), - retry_interval(g_conf.ms_retry_interval), - fail_interval(g_conf.ms_fail_interval), - drop_msg_callback(true), - fail_callback(true), - remote_reset_callback(true) {} - Policy(bool tx, bool sr, float r, float f, bool dmc, bool fc, bool rrc) : - lossy_tx(tx), server(sr), - retry_interval(r), fail_interval(f), - drop_msg_callback(dmc), - fail_callback(fc), - remote_reset_callback(rrc) {} + Policy(bool l=false, bool s=false) : + lossy(l), server(s) {} - // new - static Policy stateful_server() { return Policy(false, true, g_conf.ms_retry_interval, 0, - true, true, true); } - static Policy stateless_server() { return Policy(true, true, -1, -1, - true, true, true); } - - // old - static Policy lossless() { return Policy(false, false, - g_conf.ms_retry_interval, 0, - true, true, true); } - static Policy lossy_fail_after(float f) { - return Policy(true, false, - MIN(g_conf.ms_retry_interval, f), f, - true, true, true); - } - static Policy lossy_fast_fail() { return Policy(true, false, -1, -1, true, true, true); } - - /* - static Policy fast_fail() { return Policy(-1, -1, true, true, true); } - static Policy fail_after(float f) { return Policy(MIN(g_conf.ms_retry_interval, f), f, true, true, true); } - static Policy retry_forever() { return Policy(g_conf.ms_retry_interval, -1, false, true, true); } - */ + static Policy stateful_server() { return Policy(false, true); } + static Policy stateless_server() { return Policy(true, true); } + static Policy lossless_peer() { return Policy(false, false); } + static Policy client() { return Policy(false, false); } }; @@ -130,7 +96,6 @@ private: int peer_type; entity_addr_t peer_addr; Policy policy; - bool lossy_rx; Mutex lock; int state;