uclient: subscribe to mdsmap; strip out some signed_ticket cruft; fix mount

This commit is contained in:
Sage Weil 2009-08-31 13:00:09 -07:00
parent dbc4f60eb8
commit 269448ad6f
7 changed files with 112 additions and 33 deletions

View File

@ -815,7 +815,7 @@ int Client::make_request(MClientRequest *req,
if (!mdsmap->is_active(mds)) {
dout(10) << "no address for mds" << mds << ", requesting new mdsmap" << dendl;
monclient->send_mon_message(new MMDSGetMap(monclient->get_fsid(), mdsmap->get_epoch()));
//monclient->send_mon_message(new MMDSGetMap(monclient->get_fsid(), mdsmap->get_epoch()));
waiting_for_mdsmap.push_back(&cond);
cond.Wait(client_lock);
@ -1201,6 +1201,8 @@ void Client::handle_mds_map(MMDSMap* m)
delete oldmap;
delete m;
monclient->update_sub("mdsmap", mdsmap->get_epoch());
}
void Client::send_reconnect(int mds)
@ -2544,9 +2546,6 @@ int Client::mount()
whoami = messenger->get_myname().num();
signed_ticket = monclient->get_signed_ticket();
objecter->signed_ticket = signed_ticket;
objecter->init();
mounted = true;
@ -2554,6 +2553,9 @@ int Client::mount()
dout(2) << "mounted: have osdmap " << osdmap->get_epoch()
<< " and mdsmap " << mdsmap->get_epoch()
<< dendl;
monclient->update_sub("mdsmap", mdsmap->get_epoch());
// hack: get+pin root inode.
// fuse assumes it's always there.
@ -2736,6 +2738,8 @@ void Client::tick()
dout(21) << "tick" << dendl;
tick_event = new C_C_Tick(this);
timer.add_event_after(g_conf.client_tick_interval, tick_event);
monclient->tick();
utime_t now = g_clock.now();

View File

@ -753,8 +753,6 @@ public:
Messenger *messenger;
int whoami;
bufferlist signed_ticket;
// mds sessions
map<int, MDSSession> mds_sessions; // mds -> push seq
map<int, list<Cond*> > waiting_for_session;

View File

@ -304,7 +304,6 @@ bool RadosClient::init()
lock.Lock();
objecter->signed_ticket = monclient.get_signed_ticket();
objecter->set_client_incarnation(0);
objecter->init();

View File

@ -20,6 +20,8 @@
#include "include/nstring.h"
#include "messages/MClientMountAck.h"
#include "messages/MMonSubscribe.h"
#include "messages/MMonSubscribeAck.h"
#include "common/ConfUtils.h"
#include "MonClient.h"
@ -161,6 +163,9 @@ bool MonClient::dispatch_impl(Message *m)
handle_mount_ack((MClientMountAck*)m);
return true;
case CEPH_MSG_MON_SUBSCRIBE_ACK:
handle_subscribe_ack((MMonSubscribeAck*)m);
return true;
}
return false;
@ -227,23 +232,10 @@ int MonClient::mount(double mount_timeout)
}
mounters++;
while (signed_ticket.length() == 0 ||
(!itsme && !mounted)) // non-doers wait a little longer
while (!mounted)
mount_cond.Wait(monc_lock);
if (!itsme) {
dout(5) << "additional mounter returning" << dendl;
assert(mounted);
return 0;
}
// finish.
timer.cancel_event(mount_timeout_event);
mount_timeout_event = 0;
mounted = true;
mount_cond.SignalAll(); // wake up non-doers
dout(5) << "mount success" << dendl;
return 0;
}
@ -257,8 +249,14 @@ void MonClient::handle_mount_ack(MClientMountAck* m)
messenger->reset_myname(entity_name_t::CLIENT(m->client));
mount_cond.Signal();
// finish.
timer.cancel_event(mount_timeout_event);
mount_timeout_event = 0;
mounted = true;
mount_cond.SignalAll();
delete m;
}
@ -279,3 +277,64 @@ void MonClient::pick_new_mon()
messenger->mark_down(monmap.get_inst(oldmon).addr);
monmap.pick_mon(true);
}
void MonClient::ms_handle_reset(const entity_addr_t& peer)
{
dout(10) << "ms_handle_reset " << peer << dendl;
pick_new_mon();
renew_subs();
}
// ---------
void MonClient::renew_subs()
{
if (sub_have.empty()) {
dout(10) << "renew_subs - empty" << dendl;
return;
}
dout(10) << "renew_subs" << dendl;
if (sub_renew_sent == utime_t())
sub_renew_sent = g_clock.now();
MMonSubscribe *m = new MMonSubscribe;
m->what = sub_have;
send_mon_message(m);
}
void MonClient::handle_subscribe_ack(MMonSubscribeAck *m)
{
if (sub_renew_sent != utime_t()) {
sub_renew_after = sub_renew_sent;
sub_renew_after += 1000.0 * m->interval_ms;
dout(10) << "handle_subscribe_ack sent " << sub_renew_sent << " renew after " << sub_renew_after << dendl;
sub_renew_sent = utime_t();
} else {
dout(10) << "handle_subscribe_ack sent " << sub_renew_sent << ", ignoring" << dendl;
}
}
void MonClient::tick()
{
dout(10) << "tick" << dendl;
if (!mounted)
return;
utime_t now = g_clock.now();
static utime_t last_tick;
if (now - last_tick < 10.0)
return;
last_tick = now;
if (now > sub_renew_after)
renew_subs();
int oldmon = monmap.pick_mon();
messenger->send_keepalive(monmap.mon_inst[oldmon]);
}

View File

@ -27,6 +27,7 @@
class MonMap;
class MMonMap;
class MClientMountAck;
class MMonSubscribeAck;
class MonClient : public Dispatcher {
public:
@ -36,9 +37,6 @@ private:
entity_addr_t my_addr;
ClientTicket ticket;
bufferlist signed_ticket;
Context *mount_timeout_event;
Mutex monc_lock;
@ -47,10 +45,11 @@ private:
int mounters;
Cond mount_cond, map_cond;
bool dispatch_impl(Message *m);
void handle_monmap(MMonMap *m);
void ms_handle_reset(const entity_addr_t& peer);
protected:
class C_MountTimeout : public Context {
MonClient *client;
@ -65,13 +64,29 @@ private:
void _try_mount(double timeout);
void _mount_timeout(double timeout);
void handle_mount_ack(MClientMountAck* m);
// mon subscriptions
private:
map<nstring,version_t> sub_have; // my subs, and current versions
utime_t sub_renew_sent, sub_renew_after;
public:
void renew_subs();
void update_sub(nstring what, version_t have) {
bool had = sub_have.count(what);
sub_have[what] = have;
if (!had)
renew_subs();
}
void handle_subscribe_ack(MMonSubscribeAck* m);
public:
MonClient() : messenger(NULL),
mount_timeout_event(NULL),
monc_lock("MonClient::monc_lock"),
timer(monc_lock) {
mounted = false;
mounters = 0;
mount_timeout_event = 0;
}
int build_initial_monmap();
@ -79,6 +94,8 @@ private:
int mount(double mount_timeout);
void tick();
void send_mon_message(Message *m, bool new_mon=false);
void note_mon_leader(int m) {
monmap.last_mon = m;
@ -110,8 +127,6 @@ private:
void set_messenger(Messenger *m) { messenger = m; }
bufferlist& get_signed_ticket() { return signed_ticket; }
ClientTicket& get_ticket() { return ticket; }
};

View File

@ -53,6 +53,8 @@ void Objecter::init()
assert(client_lock.is_locked()); // otherwise event cancellation is unsafe
timer.add_event_after(g_conf.objecter_tick_interval, new C_Tick(this));
maybe_request_map();
//monc->update_sub("osdmap", 0);
}
void Objecter::shutdown()
@ -190,6 +192,8 @@ void Objecter::handle_osd_map(MOSDMap *m)
}
delete m;
//monc->update_sub("osdmap", osdmap->get_epoch());
}
@ -442,7 +446,9 @@ tid_t Objecter::op_submit(Op *op)
if (op->onack)
flags |= CEPH_OSD_FLAG_ACK;
MOSDOp *m = new MOSDOp(signed_ticket, client_inc, op->tid,
bufferlist empty_ticket_fixme;
#warning remove signed ticket ref
MOSDOp *m = new MOSDOp(empty_ticket_fixme, client_inc, op->tid,
op->oid, op->layout, osdmap->get_epoch(),
flags);

View File

@ -177,8 +177,6 @@ class Objecter {
MonClient *monc;
OSDMap *osdmap;
bufferlist signed_ticket;
private:
tid_t last_tid;