mon: revamp subscribe protocol [backward compatible protocol change]

Before, we would provide "have" and a bool "onetime" flag.  The struct was
also screwed up with an extra __le64.  Then have=0 was a special case
that meant "give me the latest".

The problem is this is ambiguous between the usual "give me everything
since X" and "give me your latest", because you might actually have 0 and
want 1..current.

Changes protocol and cleans up the struct:

 - now "start" and "flags", where only 1 flag (ONETIME) is defined
 - clean up sub_want_* methods throughout
 - fix all sub_want callers to ask for _start_ (not have) epoch, or 0 for
   any/latest
 - add a feature bit; talks old clients w/o that bit

Signed-off-by: Sage Weil <sage@newdream.net>
This commit is contained in:
Sage Weil 2010-07-26 15:03:11 -07:00
parent fa48a1a8d9
commit 997d67e5b1
14 changed files with 105 additions and 64 deletions

View File

@ -264,8 +264,8 @@ void Client::init()
monclient->init();
monclient->set_want_keys(CEPH_ENTITY_TYPE_MDS | CEPH_ENTITY_TYPE_OSD);
monclient->sub_want("mdsmap", mdsmap->get_epoch());
monclient->sub_want_onetime("osdmap", 0);
monclient->sub_want("mdsmap", 0, 0);
monclient->sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME);
// do logger crap only once per process.
static bool did_init = false;

View File

@ -43,6 +43,7 @@
#define CEPH_FEATURE_NOSRCADDR (1<<1)
#define CEPH_FEATURE_MONCLOCKCHECK (1<<2)
#define CEPH_FEATURE_FLOCK (1<<3)
#define CEPH_FEATURE_SUBSCRIBE2 (1<<4)
/*
@ -201,9 +202,11 @@ struct ceph_client_mount {
struct ceph_mon_request_header monhdr;
} __attribute__ ((packed));
#define CEPH_SUBSCRIBE_ONETIME 1 /* i want only 1 update after have */
struct ceph_mon_subscribe_item {
__le64 have_version; __le64 have;
__u8 onetime;
__le64 start;
__u8 flags;
} __attribute__ ((packed));
struct ceph_mon_subscribe_ack {

View File

@ -472,7 +472,8 @@ inline ostream& operator<<(ostream& out, const kb_t& kb)
inline ostream& operator<<(ostream& out, const ceph_mon_subscribe_item& i)
{
return out << i.have << (i.onetime ? "" : "+");
return out << i.start
<< ((i.flags & CEPH_SUBSCRIBE_ONETIME) ? "" : "+");
}
#endif

View File

@ -456,7 +456,7 @@ int MDS::init()
objecter->init();
monc->sub_want("mdsmap", 0);
monc->sub_want("mdsmap", 0, 0);
monc->renew_subs();
// schedule tick

View File

@ -17,6 +17,17 @@
#include "msg/Message.h"
/*
* compatibility with old crap
*/
struct ceph_mon_subscribe_item_old {
__le64 unused;
__le64 have;
__u8 onetime;
} __attribute__ ((packed));
WRITE_RAW_ENCODER(ceph_mon_subscribe_item_old)
struct MMonSubscribe : public Message {
map<string, ceph_mon_subscribe_item> what;
@ -25,13 +36,9 @@ private:
~MMonSubscribe() {}
public:
void sub_onetime(const char *w, version_t have) {
what[w].onetime = true;
what[w].have = have;
}
void sub_persistent(const char *w, version_t have) {
what[w].onetime = false;
what[w].have = have;
void sub_want(const char *w, version_t start, unsigned flags) {
what[w].start = start;
what[w].flags = flags;
}
const char *get_type_name() { return "mon_subscribe"; }
@ -41,10 +48,43 @@ public:
void decode_payload() {
bufferlist::iterator p = payload.begin();
::decode(what, p);
if (header.version < 2) {
map<string, ceph_mon_subscribe_item_old> oldwhat;
::decode(oldwhat, p);
what.clear();
for (map<string, ceph_mon_subscribe_item_old>::iterator q = oldwhat.begin();
q != oldwhat.end();
q++) {
if (q->second.have)
what[q->first].start = q->second.have + 1;
else
what[q->first].start = 0;
what[q->first].flags = 0;
if (q->second.onetime)
what[q->first].flags |= CEPH_SUBSCRIBE_ONETIME;
}
} else {
::decode(what, p);
}
}
void encode_payload() {
::encode(what, payload);
if (get_connection()->has_feature(CEPH_FEATURE_SUBSCRIBE2)) {
header.version = 2;
::encode(what, payload);
} else {
map<string, ceph_mon_subscribe_item_old> oldwhat;
for (map<string, ceph_mon_subscribe_item>::iterator q = what.begin();
q != what.end();
q++) {
if (q->second.start)
// warning: start=1 -> have=0, which was ambiguous
oldwhat[q->first].have = q->second.start - 1;
else
oldwhat[q->first].have = 0;
oldwhat[q->first].onetime = q->second.flags & CEPH_SUBSCRIBE_ONETIME;
}
::encode(oldwhat, payload);
}
}
};

View File

@ -599,13 +599,13 @@ void MDSMonitor::check_subs()
void MDSMonitor::check_sub(Subscription *sub)
{
if (sub->last < mdsmap.get_epoch()) {
if (sub->next <= mdsmap.get_epoch()) {
mon->messenger->send_message(new MMDSMap(mon->monmap->fsid, &mdsmap),
sub->session->inst);
if (sub->onetime)
mon->session_map.remove_sub(sub);
else
sub->last = mdsmap.get_epoch();
sub->next = mdsmap.get_epoch() + 1;
}
}

View File

@ -115,7 +115,7 @@ int MonClient::get_monmap()
dout(10) << "get_monmap" << dendl;
Mutex::Locker l(monc_lock);
_sub_want("monmap", monmap.get_epoch());
_sub_want("monmap", 0, 0);
if (cur_mon < 0)
_reopen_session();
@ -266,7 +266,7 @@ int MonClient::authenticate(double timeout)
return 0;
}
_sub_want("monmap", monmap.get_epoch());
_sub_want("monmap", monmap.get_epoch() ? monmap.get_epoch() + 1 : 0, 0);
if (cur_mon < 0)
_reopen_session();

View File

@ -121,25 +121,21 @@ private:
void _renew_subs();
void handle_subscribe_ack(MMonSubscribeAck* m);
void _sub_want(string what, version_t have) {
sub_have[what].have = have;
sub_have[what].onetime = false;
bool _sub_want(string what, version_t start, unsigned flags) {
if (sub_have.count(what) &&
sub_have[what].start == start &&
sub_have[what].flags == flags)
return false;
sub_have[what].start = start;
sub_have[what].flags = flags;
return true;
}
bool _sub_want_onetime(string what, version_t have) {
if (sub_have.count(what) == 0) {
sub_have[what].have = have;
sub_have[what].onetime = true;
return true;
} else
sub_have[what].have = have;
return false;
}
void _sub_got(string what, version_t have) {
void _sub_got(string what, version_t got) {
if (sub_have.count(what)) {
if (sub_have[what].onetime)
if (sub_have[what].flags & CEPH_SUBSCRIBE_ONETIME)
sub_have.erase(what);
else
sub_have[what].have = have;
sub_have[what].start = got + 1;
}
}
@ -151,13 +147,9 @@ public:
Mutex::Locker l(monc_lock);
_renew_subs();
}
void sub_want(string what, version_t have) {
bool sub_want(string what, version_t start, unsigned flags) {
Mutex::Locker l(monc_lock);
_sub_want(what, have);
}
bool sub_want_onetime(string what, version_t have) {
Mutex::Locker l(monc_lock);
return _sub_want_onetime(what, have);
return _sub_want(what, start, flags);
}
void sub_got(string what, version_t have) {
Mutex::Locker l(monc_lock);

View File

@ -775,9 +775,13 @@ void Monitor::handle_subscribe(MMonSubscribe *m)
for (map<string,ceph_mon_subscribe_item>::iterator p = m->what.begin();
p != m->what.end();
p++) {
if (!p->second.onetime)
// if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer
if ((p->second.flags & CEPH_SUBSCRIBE_ONETIME) == 0)
reply = true;
session_map.add_update_sub(s, p->first, p->second.have, p->second.onetime);
session_map.add_update_sub(s, p->first, p->second.start,
p->second.flags & CEPH_SUBSCRIBE_ONETIME);
if (p->first == "mdsmap") {
if ((int)s->caps.check_privileges(PAXOS_MDSMAP, MON_CAP_R)) {
mdsmon()->check_sub(s->sub_map["mdsmap"]);
@ -836,13 +840,13 @@ void Monitor::check_subs()
void Monitor::check_sub(Subscription *sub)
{
dout(0) << "check_sub monmap last " << sub->last << " have " << monmap->get_epoch() << dendl;
if (sub->last < monmap->get_epoch()) {
dout(0) << "check_sub monmap next " << sub->next << " have " << monmap->get_epoch() << dendl;
if (sub->next <= monmap->get_epoch()) {
send_latest_monmap(sub->session->inst);
if (sub->onetime)
session_map.remove_sub(sub);
else
sub->last = monmap->get_epoch();
sub->next = monmap->get_epoch() + 1;
}
}

View File

@ -816,16 +816,16 @@ void OSDMonitor::check_subs()
void OSDMonitor::check_sub(Subscription *sub)
{
if (sub->last < osdmap.get_epoch()) {
if (sub->last)
send_incremental(sub->last, sub->session->inst);
if (sub->next <= osdmap.get_epoch()) {
if (sub->next > 1)
send_incremental(sub->next - 1, sub->session->inst);
else
mon->messenger->send_message(new MOSDMap(mon->monmap->fsid, &osdmap),
sub->session->inst);
if (sub->onetime)
mon->session_map.remove_sub(sub);
else
sub->last = osdmap.get_epoch();
sub->next = osdmap.get_epoch() + 1;
}
}

View File

@ -28,10 +28,11 @@ struct Subscription {
MonSession *session;
string type;
xlist<Subscription*>::item type_item;
version_t last;
version_t next;
bool onetime;
Subscription(MonSession *s, const string& t) : session(s), type(t), type_item(this) {};
Subscription(MonSession *s, const string& t) : session(s), type(t), type_item(this),
next(0), onetime(false) {};
};
struct MonSession : public RefCountedObject {
@ -106,7 +107,7 @@ struct MonSessionMap {
}
void add_update_sub(MonSession *s, const string& what, version_t have, bool onetime) {
void add_update_sub(MonSession *s, const string& what, version_t start, bool onetime) {
Subscription *sub = 0;
if (s->sub_map.count(what)) {
sub = s->sub_map[what];
@ -115,7 +116,7 @@ struct MonSessionMap {
s->sub_map[what] = sub;
subs[what].push_back(&sub->type_item);
}
sub->last = have;
sub->next = start;
sub->onetime = onetime;
}

View File

@ -52,7 +52,7 @@ using namespace __gnu_cxx;
*/
// default feature(s) everyone gets
#define MSGR_FEATURES_SUPPORTED CEPH_FEATURE_NOSRCADDR
#define MSGR_FEATURES_SUPPORTED CEPH_FEATURE_NOSRCADDR|CEPH_FEATURE_SUBSCRIBE2
class SimpleMessenger : public Messenger {
public:
@ -66,10 +66,12 @@ public:
Policy() :
lossy(false), server(false), throttler(NULL),
features_supported(MSGR_FEATURES_SUPPORTED), features_required(0) {}
features_supported(MSGR_FEATURES_SUPPORTED),
features_required(0) {}
Policy(bool l, bool s, uint64_t sup, uint64_t req) :
lossy(l), server(s), throttler(NULL),
features_supported(sup | MSGR_FEATURES_SUPPORTED), features_required(req) {}
features_supported(sup | MSGR_FEATURES_SUPPORTED),
features_required(req) {}
static Policy stateful_server(uint64_t sup, uint64_t req) { return Policy(false, true, sup, req); }
static Policy stateless_server(uint64_t sup, uint64_t req) { return Policy(true, true, sup, req); }

View File

@ -510,8 +510,6 @@ int OSD::init()
monc->set_want_keys(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD);
monc->init();
monc->sub_want("monmap", 0);
osd_lock.Unlock();
monc->authenticate();
@ -1417,7 +1415,7 @@ void OSD::heartbeat()
if (now - last_mon_heartbeat > g_conf.osd_mon_heartbeat_interval) {
last_mon_heartbeat = now;
dout(10) << "i have no heartbeat peers; checking mon for new map" << dendl;
monc->sub_want_onetime("osdmap", osdmap->get_epoch());
monc->sub_want("osdmap", osdmap->get_epoch() + 1, CEPH_SUBSCRIBE_ONETIME);
monc->renew_subs();
}
}
@ -2091,7 +2089,7 @@ void OSD::wait_for_new_map(Message *m)
{
// ask?
if (waiting_for_osdmap.empty()) {
monc->sub_want_onetime("osdmap", osdmap->get_epoch());
monc->sub_want("osdmap", osdmap->get_epoch() + 1, CEPH_SUBSCRIBE_ONETIME);
monc->renew_subs();
}
@ -2345,7 +2343,7 @@ void OSD::handle_osd_map(MOSDMap *m)
}
else {
dout(10) << "handle_osd_map missing epoch " << cur+1 << dendl;
monc->sub_want_onetime("osdmap", cur);
monc->sub_want("osdmap", cur+1, CEPH_SUBSCRIBE_ONETIME);
monc->renew_subs();
break;
}

View File

@ -143,7 +143,7 @@ void Objecter::handle_osd_map(MOSDMap *m)
}
else {
dout(3) << "handle_osd_map requesting missing epoch " << osdmap->get_epoch()+1 << dendl;
monc->sub_want_onetime("osdmap", osdmap->get_epoch());
monc->sub_want("osdmap", osdmap->get_epoch() + 1, CEPH_SUBSCRIBE_ONETIME);
monc->renew_subs();
break;
}
@ -176,7 +176,7 @@ void Objecter::handle_osd_map(MOSDMap *m)
scan_pgs(changed_pgs);
} else {
dout(3) << "handle_osd_map hmm, i want a full map, requesting" << dendl;
monc->sub_want_onetime("osdmap", 0);
monc->sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME);
monc->renew_subs();
}
}
@ -213,7 +213,7 @@ void Objecter::handle_osd_map(MOSDMap *m)
void Objecter::maybe_request_map()
{
dout(10) << "maybe_request_map subscribing (onetime) to next osd map" << dendl;
if (monc->sub_want_onetime("osdmap", osdmap->get_epoch()))
if (monc->sub_want("osdmap", osdmap->get_epoch() ? osdmap->get_epoch()+1 : 0, CEPH_SUBSCRIBE_ONETIME))
monc->renew_subs();
}