mirror of
https://github.com/ceph/ceph
synced 2025-01-03 01:22:53 +00:00
Merge pull request #14447 from ukernel/wip-19450
mds: drop partial entry and adjust write_pos when opening PurgeQueue Reviewed-by: John Spray <john.spray@redhat.com>
This commit is contained in:
commit
f854e3769e
@ -139,17 +139,65 @@ void PurgeQueue::open(Context *completion)
|
||||
} else if (r == 0) {
|
||||
Mutex::Locker l(lock);
|
||||
dout(4) << "open complete" << dendl;
|
||||
if (r == 0) {
|
||||
journaler.set_writeable();
|
||||
|
||||
// Journaler only guarantees entries before head write_pos have been
|
||||
// fully flushed. Before appending new entries, we need to find and
|
||||
// drop any partial written entry.
|
||||
if (journaler.last_committed.write_pos < journaler.get_write_pos()) {
|
||||
dout(4) << "recovering write_pos" << dendl;
|
||||
journaler.set_read_pos(journaler.last_committed.write_pos);
|
||||
_recover(completion);
|
||||
return;
|
||||
}
|
||||
completion->complete(r);
|
||||
|
||||
journaler.set_writeable();
|
||||
completion->complete(0);
|
||||
} else {
|
||||
derr << "Error " << r << " loading Journaler" << dendl;
|
||||
on_error->complete(0);
|
||||
on_error->complete(r);
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
|
||||
void PurgeQueue::_recover(Context *completion)
|
||||
{
|
||||
assert(lock.is_locked_by_me());
|
||||
|
||||
// Journaler::is_readable() adjusts write_pos if partial entry is encountered
|
||||
while (1) {
|
||||
if (!journaler.is_readable() &&
|
||||
!journaler.get_error() &&
|
||||
journaler.get_read_pos() < journaler.get_write_pos()) {
|
||||
journaler.wait_for_readable(new FunctionContext([this, completion](int r) {
|
||||
Mutex::Locker l(lock);
|
||||
_recover(completion);
|
||||
}));
|
||||
return;
|
||||
}
|
||||
|
||||
if (journaler.get_error()) {
|
||||
int r = journaler.get_error();
|
||||
derr << "Error " << r << " recovering write_pos" << dendl;
|
||||
on_error->complete(r);
|
||||
return;
|
||||
}
|
||||
|
||||
if (journaler.get_read_pos() == journaler.get_write_pos()) {
|
||||
dout(4) << "write_pos recovered" << dendl;
|
||||
// restore original read_pos
|
||||
journaler.set_read_pos(journaler.last_committed.expire_pos);
|
||||
journaler.set_writeable();
|
||||
completion->complete(0);
|
||||
return;
|
||||
}
|
||||
|
||||
bufferlist bl;
|
||||
bool readable = journaler.try_read_entry(bl);
|
||||
assert(readable); // we checked earlier
|
||||
}
|
||||
}
|
||||
|
||||
void PurgeQueue::create(Context *fin)
|
||||
{
|
||||
dout(4) << "creating" << dendl;
|
||||
|
@ -112,6 +112,9 @@ protected:
|
||||
// Has drain() ever been called on this instance?
|
||||
bool draining;
|
||||
|
||||
// recover the journal write_pos (drop any partial written entry)
|
||||
void _recover(Context *completion);
|
||||
|
||||
/**
|
||||
* @return true if we were in a position to try and consume something:
|
||||
* does not mean we necessarily did.
|
||||
|
@ -65,9 +65,10 @@ void Journaler::create(file_layout_t *l, stream_format_t const sf)
|
||||
journal_stream.set_format(sf);
|
||||
_set_layout(l);
|
||||
|
||||
prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos =
|
||||
read_pos = requested_pos = received_pos =
|
||||
expire_pos = trimming_pos = trimmed_pos = layout.get_period();
|
||||
prezeroing_pos = prezero_pos = write_pos = flush_pos =
|
||||
safe_pos = read_pos = requested_pos = received_pos =
|
||||
expire_pos = trimming_pos = trimmed_pos =
|
||||
next_safe_pos = layout.get_period();
|
||||
|
||||
ldout(cct, 1) << "created blank journal at inode 0x" << std::hex << ino
|
||||
<< std::dec << ", format=" << stream_format << dendl;
|
||||
@ -228,7 +229,7 @@ void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish)
|
||||
finish->complete(-EINVAL);
|
||||
return;
|
||||
}
|
||||
prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos
|
||||
prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos
|
||||
= h.write_pos;
|
||||
expire_pos = h.expire_pos;
|
||||
trimmed_pos = trimming_pos = h.trimmed_pos;
|
||||
@ -289,7 +290,7 @@ void Journaler::_finish_read_head(int r, bufferlist& bl)
|
||||
return;
|
||||
}
|
||||
|
||||
prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos
|
||||
prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos
|
||||
= h.write_pos;
|
||||
read_pos = requested_pos = received_pos = expire_pos = h.expire_pos;
|
||||
trimmed_pos = trimming_pos = h.trimmed_pos;
|
||||
@ -337,7 +338,7 @@ void Journaler::_finish_reprobe(int r, uint64_t new_end,
|
||||
ldout(cct, 1) << "_finish_reprobe new_end = " << new_end
|
||||
<< " (header had " << write_pos << ")."
|
||||
<< dendl;
|
||||
prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = new_end;
|
||||
prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = new_end;
|
||||
state = STATE_ACTIVE;
|
||||
onfinish->complete(r);
|
||||
}
|
||||
@ -364,7 +365,7 @@ void Journaler::_finish_probe_end(int r, uint64_t end)
|
||||
|
||||
state = STATE_ACTIVE;
|
||||
|
||||
prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = end;
|
||||
prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = end;
|
||||
|
||||
out:
|
||||
// done.
|
||||
@ -500,7 +501,6 @@ void Journaler::_finish_flush(int r, uint64_t start, ceph::real_time stamp)
|
||||
return;
|
||||
}
|
||||
|
||||
assert(start >= safe_pos);
|
||||
assert(start < flush_pos);
|
||||
|
||||
// calc latency?
|
||||
@ -510,12 +510,13 @@ void Journaler::_finish_flush(int r, uint64_t start, ceph::real_time stamp)
|
||||
}
|
||||
|
||||
// adjust safe_pos
|
||||
assert(pending_safe.count(start));
|
||||
pending_safe.erase(start);
|
||||
auto it = pending_safe.find(start);
|
||||
assert(it != pending_safe.end());
|
||||
pending_safe.erase(it);
|
||||
if (pending_safe.empty())
|
||||
safe_pos = flush_pos;
|
||||
safe_pos = next_safe_pos;
|
||||
else
|
||||
safe_pos = *pending_safe.begin();
|
||||
safe_pos = pending_safe.begin()->second;
|
||||
|
||||
ldout(cct, 10) << "_finish_flush safe from " << start
|
||||
<< ", pending_safe " << pending_safe
|
||||
@ -602,6 +603,10 @@ uint64_t Journaler::append_entry(bufferlist& bl)
|
||||
ldout(cct, 10) << " flushing completed object(s) (su " << su << " wro "
|
||||
<< write_obj << " flo " << flush_obj << ")" << dendl;
|
||||
_do_flush(write_buf.length() - write_off);
|
||||
if (write_off) {
|
||||
// current entry isn't being flushed, set next_safe_pos to the end of previous entry
|
||||
next_safe_pos = write_pos - wrote;
|
||||
}
|
||||
}
|
||||
|
||||
return write_pos;
|
||||
@ -654,13 +659,14 @@ void Journaler::_do_flush(unsigned amount)
|
||||
SnapContext snapc;
|
||||
|
||||
Context *onsafe = new C_Flush(this, flush_pos, now); // on COMMIT
|
||||
pending_safe.insert(flush_pos);
|
||||
pending_safe[flush_pos] = next_safe_pos;
|
||||
|
||||
bufferlist write_bl;
|
||||
|
||||
// adjust pointers
|
||||
if (len == write_buf.length()) {
|
||||
write_bl.swap(write_buf);
|
||||
next_safe_pos = write_pos;
|
||||
} else {
|
||||
write_buf.splice(0, len, &write_bl);
|
||||
}
|
||||
@ -978,7 +984,7 @@ void Journaler::_issue_read(uint64_t len)
|
||||
ldout(cct, 10) << "_issue_read requested_pos = safe_pos = " << safe_pos
|
||||
<< ", waiting" << dendl;
|
||||
assert(write_pos > requested_pos);
|
||||
if (flush_pos == safe_pos) {
|
||||
if (pending_safe.empty()) {
|
||||
_flush(NULL);
|
||||
}
|
||||
waitfor_safe[flush_pos].push_back(new C_RetryRead(this));
|
||||
@ -1047,7 +1053,7 @@ void Journaler::_prefetch()
|
||||
<< " < target " << target << " (" << raw_target
|
||||
<< "), prefetching " << len << dendl;
|
||||
|
||||
if (flush_pos == safe_pos && write_pos > safe_pos) {
|
||||
if (pending_safe.empty() && write_pos > safe_pos) {
|
||||
// If we are reading and writing the journal, then we may need
|
||||
// to issue a flush if one isn't already in progress.
|
||||
// Avoid doing a flush every time so that if we do write/read/write/read
|
||||
@ -1089,7 +1095,7 @@ bool Journaler::_is_readable()
|
||||
"adjusting write_pos to " << read_pos << dendl;
|
||||
|
||||
// adjust write_pos
|
||||
prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = read_pos;
|
||||
prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = read_pos;
|
||||
assert(write_buf.length() == 0);
|
||||
|
||||
// reset read state
|
||||
|
@ -298,6 +298,11 @@ private:
|
||||
uint64_t flush_pos; ///< where we will flush. if
|
||||
/// write_pos>flush_pos, we're buffering writes.
|
||||
uint64_t safe_pos; ///< what has been committed safely to disk.
|
||||
|
||||
uint64_t next_safe_pos; /// start postion of the first entry that isn't
|
||||
/// being fully flushed. If we don't flush any
|
||||
// partial entry, it's equal to flush_pos.
|
||||
|
||||
bufferlist write_buf; ///< write buffer. flush_pos +
|
||||
/// write_buf.length() == write_pos.
|
||||
|
||||
@ -306,7 +311,7 @@ private:
|
||||
|
||||
bool waiting_for_zero;
|
||||
interval_set<uint64_t> pending_zero; // non-contig bits we've zeroed
|
||||
std::set<uint64_t> pending_safe;
|
||||
std::map<uint64_t, uint64_t> pending_safe; // flush_pos -> safe_pos
|
||||
// when safe through given offset
|
||||
std::map<uint64_t, std::list<Context*> > waitfor_safe;
|
||||
|
||||
@ -398,7 +403,8 @@ public:
|
||||
objecter(obj), filer(objecter, f), logger(l), logger_key_lat(lkey),
|
||||
delay_flush_event(0),
|
||||
state(STATE_UNDEF), error(0),
|
||||
prezeroing_pos(0), prezero_pos(0), write_pos(0), flush_pos(0), safe_pos(0),
|
||||
prezeroing_pos(0), prezero_pos(0), write_pos(0), flush_pos(0),
|
||||
safe_pos(0), next_safe_pos(0),
|
||||
write_buf_throttle(cct, "write_buf_throttle", UINT_MAX - (UINT_MAX >> 3)),
|
||||
waiting_for_zero(false),
|
||||
read_pos(0), requested_pos(0), received_pos(0),
|
||||
@ -428,6 +434,7 @@ public:
|
||||
write_pos = 0;
|
||||
flush_pos = 0;
|
||||
safe_pos = 0;
|
||||
next_safe_pos = 0;
|
||||
read_pos = 0;
|
||||
requested_pos = 0;
|
||||
received_pos = 0;
|
||||
@ -459,7 +466,7 @@ public:
|
||||
void set_writeable();
|
||||
void set_write_pos(uint64_t p) {
|
||||
lock_guard l(lock);
|
||||
prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = p;
|
||||
prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = p;
|
||||
}
|
||||
void set_read_pos(uint64_t p) {
|
||||
lock_guard l(lock);
|
||||
|
Loading…
Reference in New Issue
Block a user