mirror of
https://github.com/ceph/ceph
synced 2025-01-20 18:21:57 +00:00
Merge pull request #2391 from ceph/wip-mds-lock
Wip mds lock Reviewed-by: John Spray <john.spray@redhat.com>
This commit is contained in:
commit
d27ac4411c
@ -341,8 +341,6 @@ void MDLog::_submit_thread()
|
|||||||
bufferlist bl;
|
bufferlist bl;
|
||||||
le->encode_with_header(bl);
|
le->encode_with_header(bl);
|
||||||
|
|
||||||
mds->mds_lock.Lock();
|
|
||||||
|
|
||||||
uint64_t write_pos = journaler->get_write_pos();
|
uint64_t write_pos = journaler->get_write_pos();
|
||||||
|
|
||||||
le->set_start_off(write_pos);
|
le->set_start_off(write_pos);
|
||||||
@ -361,19 +359,15 @@ void MDLog::_submit_thread()
|
|||||||
if (data.flush)
|
if (data.flush)
|
||||||
journaler->flush();
|
journaler->flush();
|
||||||
|
|
||||||
mds->mds_lock.Unlock();
|
|
||||||
|
|
||||||
if (logger)
|
if (logger)
|
||||||
logger->set(l_mdl_wrpos, ls->end);
|
logger->set(l_mdl_wrpos, ls->end);
|
||||||
|
|
||||||
delete le;
|
delete le;
|
||||||
} else {
|
} else {
|
||||||
mds->mds_lock.Lock();
|
|
||||||
if (data.fin)
|
if (data.fin)
|
||||||
journaler->wait_for_flush(new C_IO_Wrapper(mds, data.fin));
|
journaler->wait_for_flush(new C_IO_Wrapper(mds, data.fin));
|
||||||
if (data.flush)
|
if (data.flush)
|
||||||
journaler->flush();
|
journaler->flush();
|
||||||
mds->mds_lock.Unlock();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
submit_mutex.Lock();
|
submit_mutex.Lock();
|
||||||
@ -739,9 +733,7 @@ void MDLog::_recovery_thread(MDSInternalContextBase *completion)
|
|||||||
if (mds->is_standby_replay()) {
|
if (mds->is_standby_replay()) {
|
||||||
dout(1) << "Journal " << jp.front << " is being rewritten, "
|
dout(1) << "Journal " << jp.front << " is being rewritten, "
|
||||||
<< "cannot replay in standby until an active MDS completes rewrite" << dendl;
|
<< "cannot replay in standby until an active MDS completes rewrite" << dendl;
|
||||||
mds->mds_lock.Lock();
|
|
||||||
completion->complete(-EAGAIN);
|
completion->complete(-EAGAIN);
|
||||||
mds->mds_lock.Unlock();
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
dout(1) << "Erasing journal " << jp.back << dendl;
|
dout(1) << "Erasing journal " << jp.back << dendl;
|
||||||
@ -750,19 +742,15 @@ void MDLog::_recovery_thread(MDSInternalContextBase *completion)
|
|||||||
mds->objecter, logger, l_mdl_jlat, &mds->timer, &mds->finisher);
|
mds->objecter, logger, l_mdl_jlat, &mds->timer, &mds->finisher);
|
||||||
|
|
||||||
// Read all about this journal (header + extents)
|
// Read all about this journal (header + extents)
|
||||||
mds->mds_lock.Lock();
|
|
||||||
C_SaferCond recover_wait;
|
C_SaferCond recover_wait;
|
||||||
back.recover(&recover_wait);
|
back.recover(&recover_wait);
|
||||||
mds->mds_lock.Unlock();
|
|
||||||
int recovery_result = recover_wait.wait();
|
int recovery_result = recover_wait.wait();
|
||||||
|
|
||||||
// Journaler.recover succeeds if no journal objects are present: an error
|
// Journaler.recover succeeds if no journal objects are present: an error
|
||||||
// means something worse like a corrupt header, which we can't handle here.
|
// means something worse like a corrupt header, which we can't handle here.
|
||||||
assert(recovery_result == 0);
|
assert(recovery_result == 0);
|
||||||
// We could read journal, so we can erase it.
|
// We could read journal, so we can erase it.
|
||||||
mds->mds_lock.Lock();
|
|
||||||
back.erase(&erase_waiter);
|
back.erase(&erase_waiter);
|
||||||
mds->mds_lock.Unlock();
|
|
||||||
int erase_result = erase_waiter.wait();
|
int erase_result = erase_waiter.wait();
|
||||||
|
|
||||||
// If we are successful, or find no data, we can update the JournalPointer to
|
// If we are successful, or find no data, we can update the JournalPointer to
|
||||||
@ -782,9 +770,7 @@ void MDLog::_recovery_thread(MDSInternalContextBase *completion)
|
|||||||
Journaler *front_journal = new Journaler(jp.front, mds->mdsmap->get_metadata_pool(),
|
Journaler *front_journal = new Journaler(jp.front, mds->mdsmap->get_metadata_pool(),
|
||||||
CEPH_FS_ONDISK_MAGIC, mds->objecter, logger, l_mdl_jlat, &mds->timer, &mds->finisher);
|
CEPH_FS_ONDISK_MAGIC, mds->objecter, logger, l_mdl_jlat, &mds->timer, &mds->finisher);
|
||||||
C_SaferCond recover_wait;
|
C_SaferCond recover_wait;
|
||||||
mds->mds_lock.Lock();
|
|
||||||
front_journal->recover(&recover_wait);
|
front_journal->recover(&recover_wait);
|
||||||
mds->mds_lock.Unlock();
|
|
||||||
dout(4) << "Waiting for journal " << jp.front << " to recover..." << dendl;
|
dout(4) << "Waiting for journal " << jp.front << " to recover..." << dendl;
|
||||||
int recovery_result = recover_wait.wait();
|
int recovery_result = recover_wait.wait();
|
||||||
dout(4) << "Journal " << jp.front << " recovered." << dendl;
|
dout(4) << "Journal " << jp.front << " recovered." << dendl;
|
||||||
@ -857,9 +843,7 @@ void MDLog::_reformat_journal(JournalPointer const &jp_in, Journaler *old_journa
|
|||||||
|
|
||||||
/* Write the new journal header to RADOS */
|
/* Write the new journal header to RADOS */
|
||||||
C_SaferCond write_head_wait;
|
C_SaferCond write_head_wait;
|
||||||
mds->mds_lock.Lock();
|
|
||||||
new_journal->write_head(&write_head_wait);
|
new_journal->write_head(&write_head_wait);
|
||||||
mds->mds_lock.Unlock();
|
|
||||||
write_head_wait.wait();
|
write_head_wait.wait();
|
||||||
|
|
||||||
// Read in the old journal, and whenever we have readable events,
|
// Read in the old journal, and whenever we have readable events,
|
||||||
@ -870,7 +854,6 @@ void MDLog::_reformat_journal(JournalPointer const &jp_in, Journaler *old_journa
|
|||||||
// e.g. between checking readable and doing wait_for_readable so that journaler
|
// e.g. between checking readable and doing wait_for_readable so that journaler
|
||||||
// state doesn't change in between.
|
// state doesn't change in between.
|
||||||
uint32_t events_transcribed = 0;
|
uint32_t events_transcribed = 0;
|
||||||
mds->mds_lock.Lock();
|
|
||||||
while (1) {
|
while (1) {
|
||||||
while (!old_journal->is_readable() &&
|
while (!old_journal->is_readable() &&
|
||||||
old_journal->get_read_pos() < old_journal->get_write_pos() &&
|
old_journal->get_read_pos() < old_journal->get_write_pos() &&
|
||||||
@ -881,9 +864,7 @@ void MDLog::_reformat_journal(JournalPointer const &jp_in, Journaler *old_journa
|
|||||||
old_journal->wait_for_readable(&readable_waiter);
|
old_journal->wait_for_readable(&readable_waiter);
|
||||||
|
|
||||||
// Wait for a journal prefetch to complete
|
// Wait for a journal prefetch to complete
|
||||||
mds->mds_lock.Unlock();
|
|
||||||
readable_waiter.wait();
|
readable_waiter.wait();
|
||||||
mds->mds_lock.Lock();
|
|
||||||
}
|
}
|
||||||
if (old_journal->get_error()) {
|
if (old_journal->get_error()) {
|
||||||
r = old_journal->get_error();
|
r = old_journal->get_error();
|
||||||
@ -906,18 +887,11 @@ void MDLog::_reformat_journal(JournalPointer const &jp_in, Journaler *old_journa
|
|||||||
// Write (buffered, synchronous) one serialized LogEvent
|
// Write (buffered, synchronous) one serialized LogEvent
|
||||||
events_transcribed += 1;
|
events_transcribed += 1;
|
||||||
new_journal->append_entry(bl);
|
new_journal->append_entry(bl);
|
||||||
|
|
||||||
// Allow other I/O to advance, e.g. MDS beacons
|
|
||||||
mds->mds_lock.Unlock();
|
|
||||||
mds->mds_lock.Lock();
|
|
||||||
}
|
}
|
||||||
mds->mds_lock.Unlock();
|
|
||||||
|
|
||||||
dout(1) << "Transcribed " << events_transcribed << " events, flushing new journal" << dendl;
|
dout(1) << "Transcribed " << events_transcribed << " events, flushing new journal" << dendl;
|
||||||
C_SaferCond flush_waiter;
|
C_SaferCond flush_waiter;
|
||||||
mds->mds_lock.Lock();
|
|
||||||
new_journal->flush(&flush_waiter);
|
new_journal->flush(&flush_waiter);
|
||||||
mds->mds_lock.Unlock();
|
|
||||||
flush_waiter.wait();
|
flush_waiter.wait();
|
||||||
|
|
||||||
// If failed to rewrite journal, leave the part written journal
|
// If failed to rewrite journal, leave the part written journal
|
||||||
@ -934,9 +908,7 @@ void MDLog::_reformat_journal(JournalPointer const &jp_in, Journaler *old_journa
|
|||||||
/* Delete the old journal to free space */
|
/* Delete the old journal to free space */
|
||||||
dout(1) << "New journal flushed, erasing old journal" << dendl;
|
dout(1) << "New journal flushed, erasing old journal" << dendl;
|
||||||
C_SaferCond erase_waiter;
|
C_SaferCond erase_waiter;
|
||||||
mds->mds_lock.Lock();
|
|
||||||
old_journal->erase(&erase_waiter);
|
old_journal->erase(&erase_waiter);
|
||||||
mds->mds_lock.Unlock();
|
|
||||||
int erase_result = erase_waiter.wait();
|
int erase_result = erase_waiter.wait();
|
||||||
assert(erase_result == 0);
|
assert(erase_result == 0);
|
||||||
delete old_journal;
|
delete old_journal;
|
||||||
@ -962,7 +934,6 @@ void MDLog::_reformat_journal(JournalPointer const &jp_in, Journaler *old_journa
|
|||||||
// i am a separate thread
|
// i am a separate thread
|
||||||
void MDLog::_replay_thread()
|
void MDLog::_replay_thread()
|
||||||
{
|
{
|
||||||
mds->mds_lock.Lock();
|
|
||||||
dout(10) << "_replay_thread start" << dendl;
|
dout(10) << "_replay_thread start" << dendl;
|
||||||
|
|
||||||
// loop
|
// loop
|
||||||
@ -997,9 +968,7 @@ void MDLog::_replay_thread()
|
|||||||
* -- as long as we drop the main mds lock--. */
|
* -- as long as we drop the main mds lock--. */
|
||||||
C_SaferCond reread_fin;
|
C_SaferCond reread_fin;
|
||||||
journaler->reread_head(&reread_fin);
|
journaler->reread_head(&reread_fin);
|
||||||
mds->mds_lock.Unlock();
|
|
||||||
int err = reread_fin.wait();
|
int err = reread_fin.wait();
|
||||||
mds->mds_lock.Lock();
|
|
||||||
if (err) {
|
if (err) {
|
||||||
if (err == -ENOENT && mds->is_standby_replay()) {
|
if (err == -ENOENT && mds->is_standby_replay()) {
|
||||||
r = -EAGAIN;
|
r = -EAGAIN;
|
||||||
@ -1081,10 +1050,6 @@ void MDLog::_replay_thread()
|
|||||||
delete le;
|
delete le;
|
||||||
|
|
||||||
logger->set(l_mdl_rdpos, pos);
|
logger->set(l_mdl_rdpos, pos);
|
||||||
|
|
||||||
// drop lock for a second, so other events/messages (e.g. beacon timer!) can go off
|
|
||||||
mds->mds_lock.Unlock();
|
|
||||||
mds->mds_lock.Lock();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// done!
|
// done!
|
||||||
@ -1097,10 +1062,11 @@ void MDLog::_replay_thread()
|
|||||||
}
|
}
|
||||||
|
|
||||||
dout(10) << "_replay_thread kicking waiters" << dendl;
|
dout(10) << "_replay_thread kicking waiters" << dendl;
|
||||||
|
mds->mds_lock.Lock();
|
||||||
finish_contexts(g_ceph_context, waitfor_replay, r);
|
finish_contexts(g_ceph_context, waitfor_replay, r);
|
||||||
|
mds->mds_lock.Unlock();
|
||||||
|
|
||||||
dout(10) << "_replay_thread finish" << dendl;
|
dout(10) << "_replay_thread finish" << dendl;
|
||||||
mds->mds_lock.Unlock();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void MDLog::standby_trim_segments()
|
void MDLog::standby_trim_segments()
|
||||||
|
Loading…
Reference in New Issue
Block a user