diff --git a/src/client/Client.cc b/src/client/Client.cc index 8319d926dcb..7bcab7be4b1 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -264,8 +264,8 @@ void Client::init() monclient->init(); monclient->set_want_keys(CEPH_ENTITY_TYPE_MDS | CEPH_ENTITY_TYPE_OSD); - monclient->sub_want("mdsmap", mdsmap->get_epoch()); - monclient->sub_want_onetime("osdmap", 0); + monclient->sub_want("mdsmap", 0, 0); + monclient->sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME); // do logger crap only once per process. static bool did_init = false; diff --git a/src/include/ceph_fs.h b/src/include/ceph_fs.h index bf7dea67323..f8823e13386 100644 --- a/src/include/ceph_fs.h +++ b/src/include/ceph_fs.h @@ -43,6 +43,7 @@ #define CEPH_FEATURE_NOSRCADDR (1<<1) #define CEPH_FEATURE_MONCLOCKCHECK (1<<2) #define CEPH_FEATURE_FLOCK (1<<3) +#define CEPH_FEATURE_SUBSCRIBE2 (1<<4) /* @@ -201,9 +202,11 @@ struct ceph_client_mount { struct ceph_mon_request_header monhdr; } __attribute__ ((packed)); +#define CEPH_SUBSCRIBE_ONETIME 1 /* i want only 1 update after have */ + struct ceph_mon_subscribe_item { - __le64 have_version; __le64 have; - __u8 onetime; + __le64 start; + __u8 flags; } __attribute__ ((packed)); struct ceph_mon_subscribe_ack { diff --git a/src/include/types.h b/src/include/types.h index 00734438dd7..fbed468b90f 100644 --- a/src/include/types.h +++ b/src/include/types.h @@ -472,7 +472,8 @@ inline ostream& operator<<(ostream& out, const kb_t& kb) inline ostream& operator<<(ostream& out, const ceph_mon_subscribe_item& i) { - return out << i.have << (i.onetime ? "" : "+"); + return out << i.start + << ((i.flags & CEPH_SUBSCRIBE_ONETIME) ? "" : "+"); } #endif diff --git a/src/mds/MDS.cc b/src/mds/MDS.cc index 6e4b60442c0..a2b9cb14e56 100644 --- a/src/mds/MDS.cc +++ b/src/mds/MDS.cc @@ -456,7 +456,7 @@ int MDS::init() objecter->init(); - monc->sub_want("mdsmap", 0); + monc->sub_want("mdsmap", 0, 0); monc->renew_subs(); // schedule tick diff --git a/src/messages/MMonSubscribe.h b/src/messages/MMonSubscribe.h index a57f759af71..0d94ff05c31 100644 --- a/src/messages/MMonSubscribe.h +++ b/src/messages/MMonSubscribe.h @@ -17,6 +17,17 @@ #include "msg/Message.h" +/* + * compatibility with old crap + */ +struct ceph_mon_subscribe_item_old { + __le64 unused; + __le64 have; + __u8 onetime; +} __attribute__ ((packed)); +WRITE_RAW_ENCODER(ceph_mon_subscribe_item_old) + + struct MMonSubscribe : public Message { map what; @@ -25,13 +36,9 @@ private: ~MMonSubscribe() {} public: - void sub_onetime(const char *w, version_t have) { - what[w].onetime = true; - what[w].have = have; - } - void sub_persistent(const char *w, version_t have) { - what[w].onetime = false; - what[w].have = have; + void sub_want(const char *w, version_t start, unsigned flags) { + what[w].start = start; + what[w].flags = flags; } const char *get_type_name() { return "mon_subscribe"; } @@ -41,10 +48,43 @@ public: void decode_payload() { bufferlist::iterator p = payload.begin(); - ::decode(what, p); + if (header.version < 2) { + map oldwhat; + ::decode(oldwhat, p); + what.clear(); + for (map::iterator q = oldwhat.begin(); + q != oldwhat.end(); + q++) { + if (q->second.have) + what[q->first].start = q->second.have + 1; + else + what[q->first].start = 0; + what[q->first].flags = 0; + if (q->second.onetime) + what[q->first].flags |= CEPH_SUBSCRIBE_ONETIME; + } + } else { + ::decode(what, p); + } } void encode_payload() { - ::encode(what, payload); + if (get_connection()->has_feature(CEPH_FEATURE_SUBSCRIBE2)) { + header.version = 2; + ::encode(what, payload); + } else { + map oldwhat; + for (map::iterator q = what.begin(); + q != what.end(); + q++) { + if (q->second.start) + // warning: start=1 -> have=0, which was ambiguous + oldwhat[q->first].have = q->second.start - 1; + else + oldwhat[q->first].have = 0; + oldwhat[q->first].onetime = q->second.flags & CEPH_SUBSCRIBE_ONETIME; + } + ::encode(oldwhat, payload); + } } }; diff --git a/src/mon/MDSMonitor.cc b/src/mon/MDSMonitor.cc index 0e7bd2a7105..814c9e1976e 100644 --- a/src/mon/MDSMonitor.cc +++ b/src/mon/MDSMonitor.cc @@ -599,13 +599,13 @@ void MDSMonitor::check_subs() void MDSMonitor::check_sub(Subscription *sub) { - if (sub->last < mdsmap.get_epoch()) { + if (sub->next <= mdsmap.get_epoch()) { mon->messenger->send_message(new MMDSMap(mon->monmap->fsid, &mdsmap), sub->session->inst); if (sub->onetime) mon->session_map.remove_sub(sub); else - sub->last = mdsmap.get_epoch(); + sub->next = mdsmap.get_epoch() + 1; } } diff --git a/src/mon/MonClient.cc b/src/mon/MonClient.cc index 8ba8b0bb311..91d33d4aeff 100644 --- a/src/mon/MonClient.cc +++ b/src/mon/MonClient.cc @@ -115,7 +115,7 @@ int MonClient::get_monmap() dout(10) << "get_monmap" << dendl; Mutex::Locker l(monc_lock); - _sub_want("monmap", monmap.get_epoch()); + _sub_want("monmap", 0, 0); if (cur_mon < 0) _reopen_session(); @@ -266,7 +266,7 @@ int MonClient::authenticate(double timeout) return 0; } - _sub_want("monmap", monmap.get_epoch()); + _sub_want("monmap", monmap.get_epoch() ? monmap.get_epoch() + 1 : 0, 0); if (cur_mon < 0) _reopen_session(); diff --git a/src/mon/MonClient.h b/src/mon/MonClient.h index b191e83be86..336396d218c 100644 --- a/src/mon/MonClient.h +++ b/src/mon/MonClient.h @@ -121,25 +121,21 @@ private: void _renew_subs(); void handle_subscribe_ack(MMonSubscribeAck* m); - void _sub_want(string what, version_t have) { - sub_have[what].have = have; - sub_have[what].onetime = false; + bool _sub_want(string what, version_t start, unsigned flags) { + if (sub_have.count(what) && + sub_have[what].start == start && + sub_have[what].flags == flags) + return false; + sub_have[what].start = start; + sub_have[what].flags = flags; + return true; } - bool _sub_want_onetime(string what, version_t have) { - if (sub_have.count(what) == 0) { - sub_have[what].have = have; - sub_have[what].onetime = true; - return true; - } else - sub_have[what].have = have; - return false; - } - void _sub_got(string what, version_t have) { + void _sub_got(string what, version_t got) { if (sub_have.count(what)) { - if (sub_have[what].onetime) + if (sub_have[what].flags & CEPH_SUBSCRIBE_ONETIME) sub_have.erase(what); else - sub_have[what].have = have; + sub_have[what].start = got + 1; } } @@ -151,13 +147,9 @@ public: Mutex::Locker l(monc_lock); _renew_subs(); } - void sub_want(string what, version_t have) { + bool sub_want(string what, version_t start, unsigned flags) { Mutex::Locker l(monc_lock); - _sub_want(what, have); - } - bool sub_want_onetime(string what, version_t have) { - Mutex::Locker l(monc_lock); - return _sub_want_onetime(what, have); + return _sub_want(what, start, flags); } void sub_got(string what, version_t have) { Mutex::Locker l(monc_lock); diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index 6a98d0208cb..68f0079c8d2 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -775,9 +775,13 @@ void Monitor::handle_subscribe(MMonSubscribe *m) for (map::iterator p = m->what.begin(); p != m->what.end(); p++) { - if (!p->second.onetime) + // if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer + if ((p->second.flags & CEPH_SUBSCRIBE_ONETIME) == 0) reply = true; - session_map.add_update_sub(s, p->first, p->second.have, p->second.onetime); + + session_map.add_update_sub(s, p->first, p->second.start, + p->second.flags & CEPH_SUBSCRIBE_ONETIME); + if (p->first == "mdsmap") { if ((int)s->caps.check_privileges(PAXOS_MDSMAP, MON_CAP_R)) { mdsmon()->check_sub(s->sub_map["mdsmap"]); @@ -836,13 +840,13 @@ void Monitor::check_subs() void Monitor::check_sub(Subscription *sub) { - dout(0) << "check_sub monmap last " << sub->last << " have " << monmap->get_epoch() << dendl; - if (sub->last < monmap->get_epoch()) { + dout(0) << "check_sub monmap next " << sub->next << " have " << monmap->get_epoch() << dendl; + if (sub->next <= monmap->get_epoch()) { send_latest_monmap(sub->session->inst); if (sub->onetime) session_map.remove_sub(sub); else - sub->last = monmap->get_epoch(); + sub->next = monmap->get_epoch() + 1; } } diff --git a/src/mon/OSDMonitor.cc b/src/mon/OSDMonitor.cc index f3a6b7649eb..b6938e62128 100644 --- a/src/mon/OSDMonitor.cc +++ b/src/mon/OSDMonitor.cc @@ -816,16 +816,16 @@ void OSDMonitor::check_subs() void OSDMonitor::check_sub(Subscription *sub) { - if (sub->last < osdmap.get_epoch()) { - if (sub->last) - send_incremental(sub->last, sub->session->inst); + if (sub->next <= osdmap.get_epoch()) { + if (sub->next > 1) + send_incremental(sub->next - 1, sub->session->inst); else mon->messenger->send_message(new MOSDMap(mon->monmap->fsid, &osdmap), sub->session->inst); if (sub->onetime) mon->session_map.remove_sub(sub); else - sub->last = osdmap.get_epoch(); + sub->next = osdmap.get_epoch() + 1; } } diff --git a/src/mon/Session.h b/src/mon/Session.h index ba9c119fd1d..0d537d0ddd1 100644 --- a/src/mon/Session.h +++ b/src/mon/Session.h @@ -28,10 +28,11 @@ struct Subscription { MonSession *session; string type; xlist::item type_item; - version_t last; + version_t next; bool onetime; - Subscription(MonSession *s, const string& t) : session(s), type(t), type_item(this) {}; + Subscription(MonSession *s, const string& t) : session(s), type(t), type_item(this), + next(0), onetime(false) {}; }; struct MonSession : public RefCountedObject { @@ -106,7 +107,7 @@ struct MonSessionMap { } - void add_update_sub(MonSession *s, const string& what, version_t have, bool onetime) { + void add_update_sub(MonSession *s, const string& what, version_t start, bool onetime) { Subscription *sub = 0; if (s->sub_map.count(what)) { sub = s->sub_map[what]; @@ -115,7 +116,7 @@ struct MonSessionMap { s->sub_map[what] = sub; subs[what].push_back(&sub->type_item); } - sub->last = have; + sub->next = start; sub->onetime = onetime; } diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index 75d0b5a98b4..f629c9ba950 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -52,7 +52,7 @@ using namespace __gnu_cxx; */ // default feature(s) everyone gets -#define MSGR_FEATURES_SUPPORTED CEPH_FEATURE_NOSRCADDR +#define MSGR_FEATURES_SUPPORTED CEPH_FEATURE_NOSRCADDR|CEPH_FEATURE_SUBSCRIBE2 class SimpleMessenger : public Messenger { public: @@ -66,10 +66,12 @@ public: Policy() : lossy(false), server(false), throttler(NULL), - features_supported(MSGR_FEATURES_SUPPORTED), features_required(0) {} + features_supported(MSGR_FEATURES_SUPPORTED), + features_required(0) {} Policy(bool l, bool s, uint64_t sup, uint64_t req) : lossy(l), server(s), throttler(NULL), - features_supported(sup | MSGR_FEATURES_SUPPORTED), features_required(req) {} + features_supported(sup | MSGR_FEATURES_SUPPORTED), + features_required(req) {} static Policy stateful_server(uint64_t sup, uint64_t req) { return Policy(false, true, sup, req); } static Policy stateless_server(uint64_t sup, uint64_t req) { return Policy(true, true, sup, req); } diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index b3c8a95e4b2..a6602eb4fc5 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -510,8 +510,6 @@ int OSD::init() monc->set_want_keys(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD); monc->init(); - monc->sub_want("monmap", 0); - osd_lock.Unlock(); monc->authenticate(); @@ -1417,7 +1415,7 @@ void OSD::heartbeat() if (now - last_mon_heartbeat > g_conf.osd_mon_heartbeat_interval) { last_mon_heartbeat = now; dout(10) << "i have no heartbeat peers; checking mon for new map" << dendl; - monc->sub_want_onetime("osdmap", osdmap->get_epoch()); + monc->sub_want("osdmap", osdmap->get_epoch() + 1, CEPH_SUBSCRIBE_ONETIME); monc->renew_subs(); } } @@ -2091,7 +2089,7 @@ void OSD::wait_for_new_map(Message *m) { // ask? if (waiting_for_osdmap.empty()) { - monc->sub_want_onetime("osdmap", osdmap->get_epoch()); + monc->sub_want("osdmap", osdmap->get_epoch() + 1, CEPH_SUBSCRIBE_ONETIME); monc->renew_subs(); } @@ -2345,7 +2343,7 @@ void OSD::handle_osd_map(MOSDMap *m) } else { dout(10) << "handle_osd_map missing epoch " << cur+1 << dendl; - monc->sub_want_onetime("osdmap", cur); + monc->sub_want("osdmap", cur+1, CEPH_SUBSCRIBE_ONETIME); monc->renew_subs(); break; } diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index c60ffef176a..eff5a0606f9 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -143,7 +143,7 @@ void Objecter::handle_osd_map(MOSDMap *m) } else { dout(3) << "handle_osd_map requesting missing epoch " << osdmap->get_epoch()+1 << dendl; - monc->sub_want_onetime("osdmap", osdmap->get_epoch()); + monc->sub_want("osdmap", osdmap->get_epoch() + 1, CEPH_SUBSCRIBE_ONETIME); monc->renew_subs(); break; } @@ -176,7 +176,7 @@ void Objecter::handle_osd_map(MOSDMap *m) scan_pgs(changed_pgs); } else { dout(3) << "handle_osd_map hmm, i want a full map, requesting" << dendl; - monc->sub_want_onetime("osdmap", 0); + monc->sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME); monc->renew_subs(); } } @@ -213,7 +213,7 @@ void Objecter::handle_osd_map(MOSDMap *m) void Objecter::maybe_request_map() { dout(10) << "maybe_request_map subscribing (onetime) to next osd map" << dendl; - if (monc->sub_want_onetime("osdmap", osdmap->get_epoch())) + if (monc->sub_want("osdmap", osdmap->get_epoch() ? osdmap->get_epoch()+1 : 0, CEPH_SUBSCRIBE_ONETIME)) monc->renew_subs(); }