Merge remote branch 'gh/wip-mds-resetter'

Reviewed-by: Greg Farnum <gregory.farnum@dreamhost.com>
This commit is contained in:
Sage Weil 2012-02-24 13:48:06 -08:00
commit b0feba56bd
3 changed files with 52 additions and 35 deletions

View File

@ -68,6 +68,7 @@ void usage()
static int do_cmds_special_action(const std::string &action,
const std::string &dump_file, int rank)
{
common_init_finish(g_ceph_context);
SimpleMessenger *messenger = new SimpleMessenger(g_ceph_context);
int r = messenger->bind(g_conf->public_addr, getpid());
if (r < 0)

View File

@ -37,6 +37,23 @@ bool Resetter::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer,
return *authorizer != NULL;
}
bool Resetter::ms_dispatch(Message *m)
{
Mutex::Locker l(lock);
switch (m->get_type()) {
case CEPH_MSG_OSD_OPREPLY:
objecter->handle_osd_op_reply((MOSDOpReply *)m);
break;
case CEPH_MSG_OSD_MAP:
objecter->handle_osd_map((MOSDMap*)m);
break;
default:
return false;
}
return true;
}
void Resetter::init(int rank)
{
inodeno_t ino = MDS_INO_LOG_OFFSET + rank;
@ -69,29 +86,33 @@ void Resetter::init(int rank)
void Resetter::shutdown()
{
messenger->shutdown();
messenger->wait();
lock.Lock();
timer.shutdown();
lock.Unlock();
messenger->shutdown();
messenger->wait();
}
void Resetter::reset()
{
Mutex lock("Resetter::reset::lock");
Mutex mylock("Resetter::reset::lock");
Cond cond;
bool done;
int r;
lock.Lock();
journaler->recover(new C_SafeCond(&lock, &cond, &done, &r));
while (!done)
cond.Wait(lock);
journaler->recover(new C_SafeCond(&mylock, &cond, &done, &r));
lock.Unlock();
mylock.Lock();
while (!done)
cond.Wait(mylock);
mylock.Unlock();
if (r != 0) {
if (r == -ENOENT) {
cerr << "journal does not exist on-disk. Did you set a bad rank?"
<< std::endl;
<< std::endl;
shutdown();
return;
} else {
@ -101,6 +122,7 @@ void Resetter::reset()
}
}
lock.Lock();
uint64_t old_start = journaler->get_read_pos();
uint64_t old_end = journaler->get_write_pos();
uint64_t old_len = old_end - old_start;
@ -116,15 +138,17 @@ void Resetter::reset()
journaler->set_trimmed_pos(new_start);
journaler->set_writeable();
{
cout << "writing journal head" << std::endl;
journaler->write_head(new C_SafeCond(&lock, &cond, &done, &r));
lock.Lock();
while (!done)
cond.Wait(lock);
lock.Unlock();
assert(r == 0);
}
cout << "writing journal head" << std::endl;
journaler->write_head(new C_SafeCond(&mylock, &cond, &done, &r));
lock.Unlock();
mylock.Lock();
while (!done)
cond.Wait(mylock);
mylock.Unlock();
lock.Lock();
assert(r == 0);
LogEvent *le = new EResetJournal;
@ -133,14 +157,17 @@ void Resetter::reset()
cout << "writing EResetJournal entry" << std::endl;
journaler->append_entry(bl);
journaler->flush(new C_SafeCond(&lock, &cond, &done,&r));
lock.Lock();
while (!done)
cond.Wait(lock);
journaler->flush(new C_SafeCond(&mylock, &cond, &done,&r));
lock.Unlock();
mylock.Lock();
while (!done)
cond.Wait(mylock);
mylock.Unlock();
assert(r == 0);
cout << "done" << std::endl;
shutdown();
}

View File

@ -50,24 +50,13 @@ public:
Dispatcher(messenger_->cct),
messenger(messenger_),
monc(monc_),
lock("Resetter::lock"), timer(g_ceph_context, lock)
lock("Resetter::lock"),
timer(g_ceph_context, lock)
{}
virtual ~Resetter();
bool ms_dispatch(Message *m) {
switch (m->get_type()) {
case CEPH_MSG_OSD_OPREPLY:
objecter->handle_osd_op_reply((MOSDOpReply *)m);
break;
case CEPH_MSG_OSD_MAP:
objecter->handle_osd_map((MOSDMap*)m);
break;
default:
return false;
}
return true;
}
bool ms_dispatch(Message *m);
bool ms_handle_reset(Connection *con) { return false; }
void ms_handle_remote_reset(Connection *con) {}
bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer,