mirror of
https://github.com/ceph/ceph
synced 2025-01-20 18:21:57 +00:00
Merge PR #19424 into master
* refs/pull/19424/head: osdc/Journaler: introduce STATE_STOPPING state osdc/Journaler: add 'stopping' check to various finish callbacks Reviewed-by: Patrick Donnelly <pdonnell@redhat.com>
This commit is contained in:
commit
6443022710
@ -153,7 +153,7 @@ public:
|
||||
void Journaler::recover(Context *onread)
|
||||
{
|
||||
lock_guard l(lock);
|
||||
if (stopping) {
|
||||
if (is_stopping()) {
|
||||
onread->complete(-EAGAIN);
|
||||
return;
|
||||
}
|
||||
@ -213,6 +213,10 @@ void Journaler::_reread_head(Context *onfinish)
|
||||
void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish)
|
||||
{
|
||||
lock_guard l(lock);
|
||||
if (is_stopping()) {
|
||||
finish->complete(-EAGAIN);
|
||||
return;
|
||||
}
|
||||
|
||||
//read on-disk header into
|
||||
assert(bl.length() || r < 0 );
|
||||
@ -241,6 +245,8 @@ void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish)
|
||||
void Journaler::_finish_read_head(int r, bufferlist& bl)
|
||||
{
|
||||
lock_guard l(lock);
|
||||
if (is_stopping())
|
||||
return;
|
||||
|
||||
assert(state == STATE_READHEAD);
|
||||
|
||||
@ -331,6 +337,10 @@ void Journaler::_finish_reprobe(int r, uint64_t new_end,
|
||||
C_OnFinisher *onfinish)
|
||||
{
|
||||
lock_guard l(lock);
|
||||
if (is_stopping()) {
|
||||
onfinish->complete(-EAGAIN);
|
||||
return;
|
||||
}
|
||||
|
||||
assert(new_end >= write_pos || r < 0);
|
||||
ldout(cct, 1) << "_finish_reprobe new_end = " << new_end
|
||||
@ -344,6 +354,8 @@ void Journaler::_finish_reprobe(int r, uint64_t new_end,
|
||||
void Journaler::_finish_probe_end(int r, uint64_t end)
|
||||
{
|
||||
lock_guard l(lock);
|
||||
if (is_stopping())
|
||||
return;
|
||||
|
||||
assert(state == STATE_PROBING);
|
||||
if (r < 0) { // error in probing
|
||||
@ -396,6 +408,10 @@ void Journaler::_finish_reread_head_and_probe(int r, C_OnFinisher *onfinish)
|
||||
{
|
||||
// Expect to be called back from finish_reread_head, which already takes lock
|
||||
// lock is locked
|
||||
if (is_stopping()) {
|
||||
onfinish->complete(-EAGAIN);
|
||||
return;
|
||||
}
|
||||
|
||||
assert(!r); //if we get an error, we're boned
|
||||
_reprobe(onfinish);
|
||||
@ -587,6 +603,8 @@ uint64_t Journaler::append_entry(bufferlist& bl)
|
||||
|
||||
void Journaler::_do_flush(unsigned amount)
|
||||
{
|
||||
if (is_stopping())
|
||||
return;
|
||||
if (write_pos == flush_pos)
|
||||
return;
|
||||
assert(write_pos > flush_pos);
|
||||
@ -676,7 +694,7 @@ void Journaler::_do_flush(unsigned amount)
|
||||
void Journaler::wait_for_flush(Context *onsafe)
|
||||
{
|
||||
lock_guard l(lock);
|
||||
if (stopping) {
|
||||
if (is_stopping()) {
|
||||
onsafe->complete(-EAGAIN);
|
||||
return;
|
||||
}
|
||||
@ -709,6 +727,10 @@ void Journaler::_wait_for_flush(Context *onsafe)
|
||||
void Journaler::flush(Context *onsafe)
|
||||
{
|
||||
lock_guard l(lock);
|
||||
if (is_stopping()) {
|
||||
onsafe->complete(-EAGAIN);
|
||||
return;
|
||||
}
|
||||
_flush(wrap_finisher(onsafe));
|
||||
}
|
||||
|
||||
@ -1013,6 +1035,9 @@ void Journaler::_issue_read(uint64_t len)
|
||||
|
||||
void Journaler::_prefetch()
|
||||
{
|
||||
if (is_stopping())
|
||||
return;
|
||||
|
||||
ldout(cct, 10) << "_prefetch" << dendl;
|
||||
// prefetch
|
||||
uint64_t pf;
|
||||
@ -1158,6 +1183,10 @@ void Journaler::erase(Context *completion)
|
||||
void Journaler::_finish_erase(int data_result, C_OnFinisher *completion)
|
||||
{
|
||||
lock_guard l(lock);
|
||||
if (is_stopping()) {
|
||||
completion->complete(-EAGAIN);
|
||||
return;
|
||||
}
|
||||
|
||||
if (data_result == 0) {
|
||||
// Async delete the journal header
|
||||
@ -1220,7 +1249,7 @@ bool Journaler::try_read_entry(bufferlist& bl)
|
||||
void Journaler::wait_for_readable(Context *onreadable)
|
||||
{
|
||||
lock_guard l(lock);
|
||||
if (stopping) {
|
||||
if (is_stopping()) {
|
||||
finisher->queue(onreadable, -EAGAIN);
|
||||
return;
|
||||
}
|
||||
@ -1265,6 +1294,9 @@ void Journaler::trim()
|
||||
|
||||
void Journaler::_trim()
|
||||
{
|
||||
if (is_stopping())
|
||||
return;
|
||||
|
||||
assert(!readonly);
|
||||
uint64_t period = get_layout_period();
|
||||
uint64_t trim_to = last_committed.expire_pos;
|
||||
@ -1515,8 +1547,8 @@ void Journaler::shutdown()
|
||||
|
||||
ldout(cct, 1) << __func__ << dendl;
|
||||
|
||||
state = STATE_STOPPING;
|
||||
readable = false;
|
||||
stopping = true;
|
||||
|
||||
// Kick out anyone reading from journal
|
||||
error = -EAGAIN;
|
||||
@ -1526,7 +1558,9 @@ void Journaler::shutdown()
|
||||
f->complete(-EAGAIN);
|
||||
}
|
||||
|
||||
finish_contexts(cct, waitfor_recover, -ESHUTDOWN);
|
||||
list<Context*> ls;
|
||||
ls.swap(waitfor_recover);
|
||||
finish_contexts(cct, ls, -ESHUTDOWN);
|
||||
|
||||
std::map<uint64_t, std::list<Context*> >::iterator i;
|
||||
for (i = waitfor_safe.begin(); i != waitfor_safe.end(); ++i) {
|
||||
|
@ -253,6 +253,7 @@ private:
|
||||
static const int STATE_ACTIVE = 3;
|
||||
static const int STATE_REREADHEAD = 4;
|
||||
static const int STATE_REPROBING = 5;
|
||||
static const int STATE_STOPPING = 6;
|
||||
|
||||
int state;
|
||||
int error;
|
||||
@ -411,7 +412,7 @@ public:
|
||||
fetch_len(0), temp_fetch_len(0),
|
||||
on_readable(0), on_write_error(NULL), called_write_error(false),
|
||||
expire_pos(0), trimming_pos(0), trimmed_pos(0), readable(false),
|
||||
write_iohint(0), stopping(false)
|
||||
write_iohint(0)
|
||||
{
|
||||
}
|
||||
|
||||
@ -510,8 +511,6 @@ public:
|
||||
* to -EAGAIN.
|
||||
*/
|
||||
void shutdown();
|
||||
protected:
|
||||
bool stopping;
|
||||
public:
|
||||
|
||||
// Synchronous getters
|
||||
@ -522,6 +521,7 @@ public:
|
||||
}
|
||||
file_layout_t& get_layout() { return layout; }
|
||||
bool is_active() { return state == STATE_ACTIVE; }
|
||||
bool is_stopping() { return state == STATE_STOPPING; }
|
||||
int get_error() { return error; }
|
||||
bool is_readonly() { return readonly; }
|
||||
bool is_readable();
|
||||
|
Loading…
Reference in New Issue
Block a user