From 997d67e5b1ce6df1b467a9c8b284f1289fbee22a Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 26 Jul 2010 15:03:11 -0700 Subject: [PATCH] mon: revamp subscribe protocol [backward compatible protocol change] Before, we would provide "have" and a bool "onetime" flag. The struct was also screwed up with an extra __le64. Then have=0 was a special case that meant "give me the latest". The problem is this is ambiguous between the usual "give me everything since X" and "give me your latest", because you might actually have 0 and want 1..current. Changes protocol and cleans up the struct: - now "start" and "flags", where only 1 flag (ONETIME) is defined - clean up sub_want_* methods throughout - fix all sub_want callers to ask for _start_ (not have) epoch, or 0 for any/latest - add a feature bit; talks old clients w/o that bit Signed-off-by: Sage Weil --- src/client/Client.cc | 4 +-- src/include/ceph_fs.h | 7 +++-- src/include/types.h | 3 +- src/mds/MDS.cc | 2 +- src/messages/MMonSubscribe.h | 58 ++++++++++++++++++++++++++++++------ src/mon/MDSMonitor.cc | 4 +-- src/mon/MonClient.cc | 4 +-- src/mon/MonClient.h | 34 ++++++++------------- src/mon/Monitor.cc | 14 +++++---- src/mon/OSDMonitor.cc | 8 ++--- src/mon/Session.h | 9 +++--- src/msg/SimpleMessenger.h | 8 +++-- src/osd/OSD.cc | 8 ++--- src/osdc/Objecter.cc | 6 ++-- 14 files changed, 105 insertions(+), 64 deletions(-) diff --git a/src/client/Client.cc b/src/client/Client.cc index 8319d926dcb..7bcab7be4b1 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -264,8 +264,8 @@ void Client::init() monclient->init(); monclient->set_want_keys(CEPH_ENTITY_TYPE_MDS | CEPH_ENTITY_TYPE_OSD); - monclient->sub_want("mdsmap", mdsmap->get_epoch()); - monclient->sub_want_onetime("osdmap", 0); + monclient->sub_want("mdsmap", 0, 0); + monclient->sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME); // do logger crap only once per process. static bool did_init = false; diff --git a/src/include/ceph_fs.h b/src/include/ceph_fs.h index bf7dea67323..f8823e13386 100644 --- a/src/include/ceph_fs.h +++ b/src/include/ceph_fs.h @@ -43,6 +43,7 @@ #define CEPH_FEATURE_NOSRCADDR (1<<1) #define CEPH_FEATURE_MONCLOCKCHECK (1<<2) #define CEPH_FEATURE_FLOCK (1<<3) +#define CEPH_FEATURE_SUBSCRIBE2 (1<<4) /* @@ -201,9 +202,11 @@ struct ceph_client_mount { struct ceph_mon_request_header monhdr; } __attribute__ ((packed)); +#define CEPH_SUBSCRIBE_ONETIME 1 /* i want only 1 update after have */ + struct ceph_mon_subscribe_item { - __le64 have_version; __le64 have; - __u8 onetime; + __le64 start; + __u8 flags; } __attribute__ ((packed)); struct ceph_mon_subscribe_ack { diff --git a/src/include/types.h b/src/include/types.h index 00734438dd7..fbed468b90f 100644 --- a/src/include/types.h +++ b/src/include/types.h @@ -472,7 +472,8 @@ inline ostream& operator<<(ostream& out, const kb_t& kb) inline ostream& operator<<(ostream& out, const ceph_mon_subscribe_item& i) { - return out << i.have << (i.onetime ? "" : "+"); + return out << i.start + << ((i.flags & CEPH_SUBSCRIBE_ONETIME) ? "" : "+"); } #endif diff --git a/src/mds/MDS.cc b/src/mds/MDS.cc index 6e4b60442c0..a2b9cb14e56 100644 --- a/src/mds/MDS.cc +++ b/src/mds/MDS.cc @@ -456,7 +456,7 @@ int MDS::init() objecter->init(); - monc->sub_want("mdsmap", 0); + monc->sub_want("mdsmap", 0, 0); monc->renew_subs(); // schedule tick diff --git a/src/messages/MMonSubscribe.h b/src/messages/MMonSubscribe.h index a57f759af71..0d94ff05c31 100644 --- a/src/messages/MMonSubscribe.h +++ b/src/messages/MMonSubscribe.h @@ -17,6 +17,17 @@ #include "msg/Message.h" +/* + * compatibility with old crap + */ +struct ceph_mon_subscribe_item_old { + __le64 unused; + __le64 have; + __u8 onetime; +} __attribute__ ((packed)); +WRITE_RAW_ENCODER(ceph_mon_subscribe_item_old) + + struct MMonSubscribe : public Message { map what; @@ -25,13 +36,9 @@ private: ~MMonSubscribe() {} public: - void sub_onetime(const char *w, version_t have) { - what[w].onetime = true; - what[w].have = have; - } - void sub_persistent(const char *w, version_t have) { - what[w].onetime = false; - what[w].have = have; + void sub_want(const char *w, version_t start, unsigned flags) { + what[w].start = start; + what[w].flags = flags; } const char *get_type_name() { return "mon_subscribe"; } @@ -41,10 +48,43 @@ public: void decode_payload() { bufferlist::iterator p = payload.begin(); - ::decode(what, p); + if (header.version < 2) { + map oldwhat; + ::decode(oldwhat, p); + what.clear(); + for (map::iterator q = oldwhat.begin(); + q != oldwhat.end(); + q++) { + if (q->second.have) + what[q->first].start = q->second.have + 1; + else + what[q->first].start = 0; + what[q->first].flags = 0; + if (q->second.onetime) + what[q->first].flags |= CEPH_SUBSCRIBE_ONETIME; + } + } else { + ::decode(what, p); + } } void encode_payload() { - ::encode(what, payload); + if (get_connection()->has_feature(CEPH_FEATURE_SUBSCRIBE2)) { + header.version = 2; + ::encode(what, payload); + } else { + map oldwhat; + for (map::iterator q = what.begin(); + q != what.end(); + q++) { + if (q->second.start) + // warning: start=1 -> have=0, which was ambiguous + oldwhat[q->first].have = q->second.start - 1; + else + oldwhat[q->first].have = 0; + oldwhat[q->first].onetime = q->second.flags & CEPH_SUBSCRIBE_ONETIME; + } + ::encode(oldwhat, payload); + } } }; diff --git a/src/mon/MDSMonitor.cc b/src/mon/MDSMonitor.cc index 0e7bd2a7105..814c9e1976e 100644 --- a/src/mon/MDSMonitor.cc +++ b/src/mon/MDSMonitor.cc @@ -599,13 +599,13 @@ void MDSMonitor::check_subs() void MDSMonitor::check_sub(Subscription *sub) { - if (sub->last < mdsmap.get_epoch()) { + if (sub->next <= mdsmap.get_epoch()) { mon->messenger->send_message(new MMDSMap(mon->monmap->fsid, &mdsmap), sub->session->inst); if (sub->onetime) mon->session_map.remove_sub(sub); else - sub->last = mdsmap.get_epoch(); + sub->next = mdsmap.get_epoch() + 1; } } diff --git a/src/mon/MonClient.cc b/src/mon/MonClient.cc index 8ba8b0bb311..91d33d4aeff 100644 --- a/src/mon/MonClient.cc +++ b/src/mon/MonClient.cc @@ -115,7 +115,7 @@ int MonClient::get_monmap() dout(10) << "get_monmap" << dendl; Mutex::Locker l(monc_lock); - _sub_want("monmap", monmap.get_epoch()); + _sub_want("monmap", 0, 0); if (cur_mon < 0) _reopen_session(); @@ -266,7 +266,7 @@ int MonClient::authenticate(double timeout) return 0; } - _sub_want("monmap", monmap.get_epoch()); + _sub_want("monmap", monmap.get_epoch() ? monmap.get_epoch() + 1 : 0, 0); if (cur_mon < 0) _reopen_session(); diff --git a/src/mon/MonClient.h b/src/mon/MonClient.h index b191e83be86..336396d218c 100644 --- a/src/mon/MonClient.h +++ b/src/mon/MonClient.h @@ -121,25 +121,21 @@ private: void _renew_subs(); void handle_subscribe_ack(MMonSubscribeAck* m); - void _sub_want(string what, version_t have) { - sub_have[what].have = have; - sub_have[what].onetime = false; + bool _sub_want(string what, version_t start, unsigned flags) { + if (sub_have.count(what) && + sub_have[what].start == start && + sub_have[what].flags == flags) + return false; + sub_have[what].start = start; + sub_have[what].flags = flags; + return true; } - bool _sub_want_onetime(string what, version_t have) { - if (sub_have.count(what) == 0) { - sub_have[what].have = have; - sub_have[what].onetime = true; - return true; - } else - sub_have[what].have = have; - return false; - } - void _sub_got(string what, version_t have) { + void _sub_got(string what, version_t got) { if (sub_have.count(what)) { - if (sub_have[what].onetime) + if (sub_have[what].flags & CEPH_SUBSCRIBE_ONETIME) sub_have.erase(what); else - sub_have[what].have = have; + sub_have[what].start = got + 1; } } @@ -151,13 +147,9 @@ public: Mutex::Locker l(monc_lock); _renew_subs(); } - void sub_want(string what, version_t have) { + bool sub_want(string what, version_t start, unsigned flags) { Mutex::Locker l(monc_lock); - _sub_want(what, have); - } - bool sub_want_onetime(string what, version_t have) { - Mutex::Locker l(monc_lock); - return _sub_want_onetime(what, have); + return _sub_want(what, start, flags); } void sub_got(string what, version_t have) { Mutex::Locker l(monc_lock); diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index 6a98d0208cb..68f0079c8d2 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -775,9 +775,13 @@ void Monitor::handle_subscribe(MMonSubscribe *m) for (map::iterator p = m->what.begin(); p != m->what.end(); p++) { - if (!p->second.onetime) + // if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer + if ((p->second.flags & CEPH_SUBSCRIBE_ONETIME) == 0) reply = true; - session_map.add_update_sub(s, p->first, p->second.have, p->second.onetime); + + session_map.add_update_sub(s, p->first, p->second.start, + p->second.flags & CEPH_SUBSCRIBE_ONETIME); + if (p->first == "mdsmap") { if ((int)s->caps.check_privileges(PAXOS_MDSMAP, MON_CAP_R)) { mdsmon()->check_sub(s->sub_map["mdsmap"]); @@ -836,13 +840,13 @@ void Monitor::check_subs() void Monitor::check_sub(Subscription *sub) { - dout(0) << "check_sub monmap last " << sub->last << " have " << monmap->get_epoch() << dendl; - if (sub->last < monmap->get_epoch()) { + dout(0) << "check_sub monmap next " << sub->next << " have " << monmap->get_epoch() << dendl; + if (sub->next <= monmap->get_epoch()) { send_latest_monmap(sub->session->inst); if (sub->onetime) session_map.remove_sub(sub); else - sub->last = monmap->get_epoch(); + sub->next = monmap->get_epoch() + 1; } } diff --git a/src/mon/OSDMonitor.cc b/src/mon/OSDMonitor.cc index f3a6b7649eb..b6938e62128 100644 --- a/src/mon/OSDMonitor.cc +++ b/src/mon/OSDMonitor.cc @@ -816,16 +816,16 @@ void OSDMonitor::check_subs() void OSDMonitor::check_sub(Subscription *sub) { - if (sub->last < osdmap.get_epoch()) { - if (sub->last) - send_incremental(sub->last, sub->session->inst); + if (sub->next <= osdmap.get_epoch()) { + if (sub->next > 1) + send_incremental(sub->next - 1, sub->session->inst); else mon->messenger->send_message(new MOSDMap(mon->monmap->fsid, &osdmap), sub->session->inst); if (sub->onetime) mon->session_map.remove_sub(sub); else - sub->last = osdmap.get_epoch(); + sub->next = osdmap.get_epoch() + 1; } } diff --git a/src/mon/Session.h b/src/mon/Session.h index ba9c119fd1d..0d537d0ddd1 100644 --- a/src/mon/Session.h +++ b/src/mon/Session.h @@ -28,10 +28,11 @@ struct Subscription { MonSession *session; string type; xlist::item type_item; - version_t last; + version_t next; bool onetime; - Subscription(MonSession *s, const string& t) : session(s), type(t), type_item(this) {}; + Subscription(MonSession *s, const string& t) : session(s), type(t), type_item(this), + next(0), onetime(false) {}; }; struct MonSession : public RefCountedObject { @@ -106,7 +107,7 @@ struct MonSessionMap { } - void add_update_sub(MonSession *s, const string& what, version_t have, bool onetime) { + void add_update_sub(MonSession *s, const string& what, version_t start, bool onetime) { Subscription *sub = 0; if (s->sub_map.count(what)) { sub = s->sub_map[what]; @@ -115,7 +116,7 @@ struct MonSessionMap { s->sub_map[what] = sub; subs[what].push_back(&sub->type_item); } - sub->last = have; + sub->next = start; sub->onetime = onetime; } diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index 75d0b5a98b4..f629c9ba950 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -52,7 +52,7 @@ using namespace __gnu_cxx; */ // default feature(s) everyone gets -#define MSGR_FEATURES_SUPPORTED CEPH_FEATURE_NOSRCADDR +#define MSGR_FEATURES_SUPPORTED CEPH_FEATURE_NOSRCADDR|CEPH_FEATURE_SUBSCRIBE2 class SimpleMessenger : public Messenger { public: @@ -66,10 +66,12 @@ public: Policy() : lossy(false), server(false), throttler(NULL), - features_supported(MSGR_FEATURES_SUPPORTED), features_required(0) {} + features_supported(MSGR_FEATURES_SUPPORTED), + features_required(0) {} Policy(bool l, bool s, uint64_t sup, uint64_t req) : lossy(l), server(s), throttler(NULL), - features_supported(sup | MSGR_FEATURES_SUPPORTED), features_required(req) {} + features_supported(sup | MSGR_FEATURES_SUPPORTED), + features_required(req) {} static Policy stateful_server(uint64_t sup, uint64_t req) { return Policy(false, true, sup, req); } static Policy stateless_server(uint64_t sup, uint64_t req) { return Policy(true, true, sup, req); } diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index b3c8a95e4b2..a6602eb4fc5 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -510,8 +510,6 @@ int OSD::init() monc->set_want_keys(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD); monc->init(); - monc->sub_want("monmap", 0); - osd_lock.Unlock(); monc->authenticate(); @@ -1417,7 +1415,7 @@ void OSD::heartbeat() if (now - last_mon_heartbeat > g_conf.osd_mon_heartbeat_interval) { last_mon_heartbeat = now; dout(10) << "i have no heartbeat peers; checking mon for new map" << dendl; - monc->sub_want_onetime("osdmap", osdmap->get_epoch()); + monc->sub_want("osdmap", osdmap->get_epoch() + 1, CEPH_SUBSCRIBE_ONETIME); monc->renew_subs(); } } @@ -2091,7 +2089,7 @@ void OSD::wait_for_new_map(Message *m) { // ask? if (waiting_for_osdmap.empty()) { - monc->sub_want_onetime("osdmap", osdmap->get_epoch()); + monc->sub_want("osdmap", osdmap->get_epoch() + 1, CEPH_SUBSCRIBE_ONETIME); monc->renew_subs(); } @@ -2345,7 +2343,7 @@ void OSD::handle_osd_map(MOSDMap *m) } else { dout(10) << "handle_osd_map missing epoch " << cur+1 << dendl; - monc->sub_want_onetime("osdmap", cur); + monc->sub_want("osdmap", cur+1, CEPH_SUBSCRIBE_ONETIME); monc->renew_subs(); break; } diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index c60ffef176a..eff5a0606f9 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -143,7 +143,7 @@ void Objecter::handle_osd_map(MOSDMap *m) } else { dout(3) << "handle_osd_map requesting missing epoch " << osdmap->get_epoch()+1 << dendl; - monc->sub_want_onetime("osdmap", osdmap->get_epoch()); + monc->sub_want("osdmap", osdmap->get_epoch() + 1, CEPH_SUBSCRIBE_ONETIME); monc->renew_subs(); break; } @@ -176,7 +176,7 @@ void Objecter::handle_osd_map(MOSDMap *m) scan_pgs(changed_pgs); } else { dout(3) << "handle_osd_map hmm, i want a full map, requesting" << dendl; - monc->sub_want_onetime("osdmap", 0); + monc->sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME); monc->renew_subs(); } } @@ -213,7 +213,7 @@ void Objecter::handle_osd_map(MOSDMap *m) void Objecter::maybe_request_map() { dout(10) << "maybe_request_map subscribing (onetime) to next osd map" << dendl; - if (monc->sub_want_onetime("osdmap", osdmap->get_epoch())) + if (monc->sub_want("osdmap", osdmap->get_epoch() ? osdmap->get_epoch()+1 : 0, CEPH_SUBSCRIBE_ONETIME)) monc->renew_subs(); }