Merge branch 'stable'

Conflicts:
	src/mds/MDLog.cc
	src/osdc/Journaler.cc
	src/osdc/Journaler.h
This commit is contained in:
Sage Weil 2011-04-25 14:34:49 -07:00
commit 3f8204136a
11 changed files with 102 additions and 70 deletions

View File

@ -7,7 +7,7 @@ AC_PREREQ(2.59)
# NOTE: This version is _only_ used for naming the tarball. The # NOTE: This version is _only_ used for naming the tarball. The
# VERSION define is not used by the code. It gets a version string # VERSION define is not used by the code. It gets a version string
# from 'git describe'; see src/ceph_ver.[ch] # from 'git describe'; see src/ceph_ver.[ch]
AC_INIT([ceph], [0.26], [ceph-devel@vger.kernel.org]) AC_INIT([ceph], [0.27], [ceph-devel@vger.kernel.org])
AC_CONFIG_SUBDIRS([src/gtest]) AC_CONFIG_SUBDIRS([src/gtest])

6
debian/changelog vendored
View File

@ -1,3 +1,9 @@
ceph (0.27-1) experimental; urgency=low
* New upstream release.
-- Sage Weil <sage@newdream.net> Fri, 22 Apr 2011 16:51:49 -0700
ceph (0.26-1) experimental; urgency=low ceph (0.26-1) experimental; urgency=low
* New upstream release. * New upstream release.

View File

@ -92,6 +92,7 @@ void MDLog::init_journaler()
journaler = new Journaler(ino, mds->mdsmap->get_metadata_pg_pool(), CEPH_FS_ONDISK_MAGIC, mds->objecter, journaler = new Journaler(ino, mds->mdsmap->get_metadata_pg_pool(), CEPH_FS_ONDISK_MAGIC, mds->objecter,
logger, l_mdl_jlat, logger, l_mdl_jlat,
&mds->timer); &mds->timer);
assert(journaler->is_readonly());
} }
void MDLog::write_head(Context *c) void MDLog::write_head(Context *c)
@ -120,8 +121,9 @@ void MDLog::create(Context *c)
{ {
dout(5) << "create empty log" << dendl; dout(5) << "create empty log" << dendl;
init_journaler(); init_journaler();
journaler->set_writeable();
journaler->create(&mds->mdcache->default_log_layout); journaler->create(&mds->mdcache->default_log_layout);
write_head(c); journaler->write_head(c);
logger->set(l_mdl_expos, journaler->get_expire_pos()); logger->set(l_mdl_expos, journaler->get_expire_pos());
logger->set(l_mdl_wrpos, journaler->get_write_pos()); logger->set(l_mdl_wrpos, journaler->get_write_pos());
@ -392,8 +394,9 @@ void MDLog::_expired(LogSegment *ls)
expired_segments.erase(ls); expired_segments.erase(ls);
num_events -= ls->num_events; num_events -= ls->num_events;
journaler->set_expire_pos(ls->offset); // this was the oldest segment, adjust expire pos // this was the oldest segment, adjust expire pos
journaler->write_head(0); if (journaler->get_expire_pos() < ls->offset)
journaler->set_expire_pos(ls->offset);
logger->set(l_mdl_expos, ls->offset); logger->set(l_mdl_expos, ls->offset);
logger->inc(l_mdl_segtrm); logger->inc(l_mdl_segtrm);
@ -402,6 +405,8 @@ void MDLog::_expired(LogSegment *ls)
segments.erase(ls->offset); segments.erase(ls->offset);
delete ls; delete ls;
} }
journaler->write_head(0);
} }
logger->set(l_mdl_ev, num_events); logger->set(l_mdl_ev, num_events);
@ -415,6 +420,7 @@ void MDLog::_expired(LogSegment *ls)
void MDLog::replay(Context *c) void MDLog::replay(Context *c)
{ {
assert(journaler->is_active()); assert(journaler->is_active());
assert(journaler->is_readonly());
// empty? // empty?
if (journaler->get_read_pos() == journaler->get_write_pos()) { if (journaler->get_read_pos() == journaler->get_write_pos()) {
@ -490,6 +496,7 @@ void MDLog::_replay_thread()
while (!done) while (!done)
cond.Wait(mylock); cond.Wait(mylock);
mds->mds_lock.Lock(); mds->mds_lock.Lock();
standby_trim_segments();
if (journaler->get_read_pos() < journaler->get_expire_pos()) { if (journaler->get_read_pos() < journaler->get_expire_pos()) {
dout(0) << "expire_pos is higher than read_pos, returning EAGAIN" << dendl; dout(0) << "expire_pos is higher than read_pos, returning EAGAIN" << dendl;
r = -EAGAIN; r = -EAGAIN;
@ -572,4 +579,31 @@ void MDLog::_replay_thread()
mds->mds_lock.Unlock(); mds->mds_lock.Unlock();
} }
void MDLog::standby_trim_segments()
{
dout(10) << "standby_trim_segments" << dendl;
uint64_t expire_pos = journaler->get_expire_pos();
dout(10) << " expire_pos=" << expire_pos << dendl;
LogSegment *seg = NULL;
bool removed_segment = false;
while ((seg = get_oldest_segment())->end <= expire_pos) {
dout(10) << " removing segment " << seg->offset << dendl;
seg->dirty_dirfrags.clear_list();
seg->new_dirfrags.clear_list();
seg->dirty_inodes.clear_list();
seg->dirty_dentries.clear_list();
seg->open_files.clear_list();
seg->renamed_files.clear_list();
seg->dirty_dirfrag_dir.clear_list();
seg->dirty_dirfrag_nest.clear_list();
seg->dirty_dirfrag_dirfragtree.clear_list();
remove_oldest_segment();
removed_segment = true;
}
if (removed_segment) {
dout(20) << " calling mdcache->trim!" << dendl;
mds->mdcache->trim(-1);
} else
dout(20) << " removed no segments!" << dendl;
}

View File

@ -239,6 +239,8 @@ public:
void open(Context *onopen); // append() or replay() to follow! void open(Context *onopen); // append() or replay() to follow!
void append(); void append();
void replay(Context *onfinish); void replay(Context *onfinish);
void standby_trim_segments();
}; };
#endif #endif

View File

@ -90,7 +90,7 @@ MDS::MDS(const std::string &n, Messenger *m, MonClient *mc) :
whoami(-1), incarnation(0), whoami(-1), incarnation(0),
standby_for_rank(MDSMap::MDS_NO_STANDBY_PREF), standby_for_rank(MDSMap::MDS_NO_STANDBY_PREF),
standby_type(0), standby_type(0),
continue_replay(false), standby_replaying(false),
messenger(m), messenger(m),
monc(mc), monc(mc),
clog(messenger, &mc->monmap, mc, LogClient::NO_FLAGS), clog(messenger, &mc->monmap, mc, LogClient::NO_FLAGS),
@ -1210,8 +1210,6 @@ void MDS::boot_start(int step, int r)
case 3: case 3:
if (is_any_replay()) { if (is_any_replay()) {
dout(2) << "boot_start " << step << ": replaying mds log" << dendl; dout(2) << "boot_start " << step << ": replaying mds log" << dendl;
if(is_oneshot_replay() || is_standby_replay())
mdlog->get_journaler()->set_readonly();
mdlog->replay(new C_MDS_BootStart(this, 4)); mdlog->replay(new C_MDS_BootStart(this, 4));
break; break;
} else { } else {
@ -1260,8 +1258,9 @@ void MDS::calc_recovery_set()
void MDS::replay_start() void MDS::replay_start()
{ {
dout(1) << "replay_start" << dendl; dout(1) << "replay_start" << dendl;
if (is_standby_replay()) if (is_standby_replay())
continue_replay = true; standby_replaying = true;
standby_type = 0; standby_type = 0;
@ -1275,6 +1274,8 @@ void MDS::replay_start()
if (osdmap->get_epoch() >= mdsmap->get_last_failure_osd_epoch()) { if (osdmap->get_epoch() >= mdsmap->get_last_failure_osd_epoch()) {
boot_start(); boot_start();
} else { } else {
dout(1) << " waiting for osdmap " << mdsmap->get_last_failure_osd_epoch()
<< " (which blacklists prior instance)" << dendl;
objecter->wait_for_new_map(new C_MDS_BootStart(this, 0), objecter->wait_for_new_map(new C_MDS_BootStart(this, 0),
mdsmap->get_last_failure_osd_epoch()); mdsmap->get_last_failure_osd_epoch());
} }
@ -1293,6 +1294,7 @@ public:
mds->respawn(); /* we're too far back, and this is easier than mds->respawn(); /* we're too far back, and this is easier than
trying to reset everything in the cache, etc */ trying to reset everything in the cache, etc */
} else { } else {
mds->mdlog->standby_trim_segments();
mds->boot_start(3, r); mds->boot_start(3, r);
} }
} }
@ -1300,9 +1302,18 @@ public:
inline void MDS::standby_replay_restart() inline void MDS::standby_replay_restart()
{ {
dout(1) << "standby_replay_restart" << (standby_replaying ? " (as standby)":" (final takeover pass)") << dendl;
if (!standby_replaying && osdmap->get_epoch() < mdsmap->get_last_failure_osd_epoch()) {
dout(1) << " waiting for osdmap " << mdsmap->get_last_failure_osd_epoch()
<< " (which blacklists prior instance)" << dendl;
objecter->wait_for_new_map(new C_MDS_BootStart(this, 3),
mdsmap->get_last_failure_osd_epoch());
} else {
mdlog->get_journaler()->reread_head_and_probe( mdlog->get_journaler()->reread_head_and_probe(
new C_MDS_StandbyReplayRestartFinish(this, mdlog->get_journaler()->get_read_pos())); new C_MDS_StandbyReplayRestartFinish(this, mdlog->get_journaler()->get_read_pos()));
} }
}
class MDS::C_MDS_StandbyReplayRestart : public Context { class MDS::C_MDS_StandbyReplayRestart : public Context {
MDS *mds; MDS *mds;
@ -1316,9 +1327,7 @@ public:
void MDS::replay_done() void MDS::replay_done()
{ {
dout(1) << "replay_done in=" << mdsmap->get_num_mds() dout(1) << "replay_done" << (standby_replaying ? " (as standby)" : "") << dendl;
<< " failed=" << mdsmap->get_num_failed()
<< dendl;
if (is_oneshot_replay()) { if (is_oneshot_replay()) {
dout(2) << "hack. journal looks ok. shutting down." << dendl; dout(2) << "hack. journal looks ok. shutting down." << dendl;
@ -1327,19 +1336,20 @@ void MDS::replay_done()
} }
if (is_standby_replay()) { if (is_standby_replay()) {
standby_trim_segments();
dout(10) << "setting replay timer" << dendl; dout(10) << "setting replay timer" << dendl;
timer.add_event_after(g_conf.mds_replay_interval, timer.add_event_after(g_conf.mds_replay_interval,
new C_MDS_StandbyReplayRestart(this)); new C_MDS_StandbyReplayRestart(this));
return; return;
} }
if (continue_replay) { if (standby_replaying) {
continue_replay = false; dout(10) << " last replay pass was as a standby; making final pass" << dendl;
standby_replaying = false;
standby_replay_restart(); standby_replay_restart();
return; return;
} }
dout(1) << "making mds journal writeable" << dendl;
mdlog->get_journaler()->set_writeable(); mdlog->get_journaler()->set_writeable();
mdlog->get_journaler()->trim_tail(); mdlog->get_journaler()->trim_tail();
@ -1370,35 +1380,6 @@ void MDS::replay_done()
} }
} }
void MDS::standby_trim_segments()
{
dout(10) << "standby_trim_segments" << dendl;
LogSegment *seg = NULL;
uint64_t expire_pos = mdlog->get_journaler()->get_expire_pos();
dout(10) << "expire_pos=" << expire_pos << dendl;
bool removed_segment = false;
while ((seg=mdlog->get_oldest_segment())->end <= expire_pos) {
dout(0) << "removing segment" << dendl;
seg->dirty_dirfrags.clear_list();
seg->new_dirfrags.clear_list();
seg->dirty_inodes.clear_list();
seg->dirty_dentries.clear_list();
seg->open_files.clear_list();
seg->renamed_files.clear_list();
seg->dirty_dirfrag_dir.clear_list();
seg->dirty_dirfrag_nest.clear_list();
seg->dirty_dirfrag_dirfragtree.clear_list();
mdlog->remove_oldest_segment();
removed_segment = true;
}
if (removed_segment) {
dout(20) << "calling mdcache->trim!" << dendl;
mdcache->trim(-1);
} else dout(20) << "removed no segments!" << dendl;
return;
}
void MDS::reopen_log() void MDS::reopen_log()
{ {
dout(1) << "reopen_log" << dendl; dout(1) << "reopen_log" << dendl;

View File

@ -149,9 +149,7 @@ class MDS : public Dispatcher {
int standby_for_rank; int standby_for_rank;
int standby_type; int standby_type;
string standby_for_name; string standby_for_name;
bool continue_replay; /* set to true by replay_start if we're a hot standby, bool standby_replaying; // true if current replay pass is in standby-replay mode
remains true until leader MDS fails and we need to
take over*/
Messenger *messenger; Messenger *messenger;
MonClient *monc; MonClient *monc;
@ -362,7 +360,6 @@ class MDS : public Dispatcher {
void starting_done(); void starting_done();
void replay_done(); void replay_done();
void standby_replay_restart(); void standby_replay_restart();
void standby_trim_segments();
class C_MDS_StandbyReplayRestart; class C_MDS_StandbyReplayRestart;
class C_MDS_StandbyReplayRestartFinish; class C_MDS_StandbyReplayRestartFinish;

View File

@ -99,7 +99,8 @@ void Resetter::reset()
journaler->set_read_pos(new_start); journaler->set_read_pos(new_start);
journaler->set_write_pos(new_start); journaler->set_write_pos(new_start);
journaler->set_expire_trimmed_pos(new_start); journaler->set_expire_pos(new_start);
journaler->set_trimmed_pos(new_start);
journaler->set_writeable(); journaler->set_writeable();
{ {

View File

@ -1050,6 +1050,7 @@ void ESubtreeMap::replay(MDS *mds)
{ {
if (expire_pos && expire_pos > mds->mdlog->journaler->get_expire_pos()) if (expire_pos && expire_pos > mds->mdlog->journaler->get_expire_pos())
mds->mdlog->journaler->set_expire_pos(expire_pos); mds->mdlog->journaler->set_expire_pos(expire_pos);
// suck up the subtree map? // suck up the subtree map?
if (mds->mdcache->is_subtrees()) { if (mds->mdcache->is_subtrees()) {
dout(10) << "ESubtreeMap.replay -- ignoring, already have import map" << dendl; dout(10) << "ESubtreeMap.replay -- ignoring, already have import map" << dendl;

View File

@ -22,9 +22,20 @@
#define DOUT_SUBSYS journaler #define DOUT_SUBSYS journaler
#undef dout_prefix #undef dout_prefix
#define dout_prefix *_dout << objecter->messenger->get_myname() << ".journaler " #define dout_prefix *_dout << objecter->messenger->get_myname() << ".journaler" << (readonly ? "(ro) ":"(rw) ")
void Journaler::set_readonly()
{
dout(1) << "set_readonly" << dendl;
readonly = true;
}
void Journaler::set_writeable()
{
dout(1) << "set_writeable" << dendl;
readonly = false;
}
void Journaler::create(ceph_file_layout *l) void Journaler::create(ceph_file_layout *l)
{ {
@ -41,7 +52,6 @@ void Journaler::create(ceph_file_layout *l)
void Journaler::set_layout(ceph_file_layout *l) void Journaler::set_layout(ceph_file_layout *l)
{ {
assert(!readonly);
layout = *l; layout = *l;
assert(layout.fl_pg_pool == pg_pool); assert(layout.fl_pg_pool == pg_pool);
@ -115,6 +125,7 @@ void Journaler::recover(Context *onread)
{ {
dout(1) << "recover start" << dendl; dout(1) << "recover start" << dendl;
assert(state != STATE_ACTIVE); assert(state != STATE_ACTIVE);
assert(readonly);
if (onread) if (onread)
waitfor_recover.push_back(onread); waitfor_recover.push_back(onread);
@ -544,14 +555,15 @@ void Journaler::wait_for_flush(Context *onsafe)
void Journaler::flush(Context *onsafe) void Journaler::flush(Context *onsafe)
{ {
assert(!readonly); assert(!readonly);
wait_for_flush(onsafe);
if (write_pos == safe_pos)
return;
if (write_pos == flush_pos) { if (write_pos == flush_pos) {
assert(write_buf.length() == 0); assert(write_buf.length() == 0);
dout(10) << "flush nothing to flush, (prezeroing/prezero)/write/flush/safe pointers at " dout(10) << "flush nothing to flush, (prezeroing/prezero)/write/flush/safe pointers at "
<< "(" << prezeroing_pos << "/" << prezero_pos << ")/" << write_pos << "/" << flush_pos << "/" << safe_pos << dendl; << "(" << prezeroing_pos << "/" << prezero_pos << ")/" << write_pos << "/" << flush_pos << "/" << safe_pos << dendl;
if (onsafe) {
onsafe->finish(0);
delete onsafe;
}
} else { } else {
if (1) { if (1) {
// maybe buffer // maybe buffer
@ -570,6 +582,7 @@ void Journaler::flush(Context *onsafe)
// always flush // always flush
_do_flush(); _do_flush();
} }
wait_for_flush(onsafe);
} }
// write head? // write head?
@ -946,7 +959,6 @@ void Journaler::trim()
{ {
assert(!readonly); assert(!readonly);
uint64_t period = get_layout_period(); uint64_t period = get_layout_period();
uint64_t trim_to = last_committed.expire_pos; uint64_t trim_to = last_committed.expire_pos;
trim_to -= trim_to % period; trim_to -= trim_to % period;
dout(10) << "trim last_commited head was " << last_committed dout(10) << "trim last_commited head was " << last_committed

View File

@ -230,7 +230,7 @@ private:
public: public:
Journaler(inodeno_t ino_, int pool, const char *mag, Objecter *obj, ProfLogger *l, int lkey, SafeTimer *tim) : Journaler(inodeno_t ino_, int pool, const char *mag, Objecter *obj, ProfLogger *l, int lkey, SafeTimer *tim) :
last_written(mag), last_committed(mag), last_written(mag), last_committed(mag),
ino(ino_), pg_pool(pool), readonly(false), magic(mag), ino(ino_), pg_pool(pool), readonly(true), magic(mag),
objecter(obj), filer(objecter), logger(l), logger_key_lat(lkey), objecter(obj), filer(objecter), logger(l), logger_key_lat(lkey),
timer(tim), delay_flush_event(0), timer(tim), delay_flush_event(0),
state(STATE_UNDEF), error(0), state(STATE_UNDEF), error(0),
@ -245,7 +245,7 @@ public:
void reset() { void reset() {
assert(state == STATE_ACTIVE); assert(state == STATE_ACTIVE);
readonly = false; readonly = true;
delay_flush_event = 0; delay_flush_event = 0;
state = STATE_UNDEF; state = STATE_UNDEF;
error = 0; error = 0;
@ -284,8 +284,8 @@ public:
void set_layout(ceph_file_layout *l); void set_layout(ceph_file_layout *l);
void set_readonly() { readonly = true; } void set_readonly();
void set_writeable() { readonly = false; } void set_writeable();
bool is_readonly() { return readonly; } bool is_readonly() { return readonly; }
bool is_active() { return state == STATE_ACTIVE; } bool is_active() { return state == STATE_ACTIVE; }
@ -320,21 +320,16 @@ public:
void set_write_pos(int64_t p) { void set_write_pos(int64_t p) {
prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = p; prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = p;
} }
void set_expire_trimmed_pos(int64_t p) {
expire_pos = trimming_pos = trimmed_pos = p;
}
// trim // trim
void set_expire_pos(int64_t ep) { expire_pos = ep; } void set_expire_pos(int64_t ep) { expire_pos = ep; }
void trim(); void set_trimmed_pos(int64_t p) { trimming_pos = trimmed_pos = p; }
void trim();
void trim_tail() { void trim_tail() {
assert(!readonly); assert(!readonly);
_issue_prezero(); _issue_prezero();
} }
//bool is_trimmable() { return trimming_pos < expire_pos; }
//void trim(int64_t trim_to=0, Context *c=0);
}; };
WRITE_CLASS_ENCODER(Journaler::Header) WRITE_CLASS_ENCODER(Journaler::Header)

View File

@ -424,7 +424,7 @@ EOF
if [ "$set_standby" -eq 1 ]; then if [ "$set_standby" -eq 1 ]; then
cat <<EOF >> $conf cat <<EOF >> $conf
mds standby replay = true mds standby replay = true
mds standby for name = $last_mds_nama mds standby for name = $last_mds_name
EOF EOF
set_standby=0 set_standby=0
else else
@ -450,6 +450,9 @@ EOF
#$CEPH_BIN/cmds -d $ARGS --mds_thrash_fragments 0 --mds_thrash_exports 0 #--debug_ms 20 #$CEPH_BIN/cmds -d $ARGS --mds_thrash_fragments 0 --mds_thrash_exports 0 #--debug_ms 20
#$CEPH_ADM mds set_max_mds 2 #$CEPH_ADM mds set_max_mds 2
done done
if [ "$standby" -eq 1 ]; then
CEPH_NUM_MDS=$(($CEPH_NUM_MDS / 2))
fi
cmd="$CEPH_ADM mds set_max_mds $CEPH_NUM_MDS" cmd="$CEPH_ADM mds set_max_mds $CEPH_NUM_MDS"
echo $cmd echo $cmd
$cmd $cmd