diff --git a/src/mds/PurgeQueue.cc b/src/mds/PurgeQueue.cc index 7bea41bfab3..2cf61938afc 100644 --- a/src/mds/PurgeQueue.cc +++ b/src/mds/PurgeQueue.cc @@ -139,17 +139,65 @@ void PurgeQueue::open(Context *completion) } else if (r == 0) { Mutex::Locker l(lock); dout(4) << "open complete" << dendl; - if (r == 0) { - journaler.set_writeable(); + + // Journaler only guarantees entries before head write_pos have been + // fully flushed. Before appending new entries, we need to find and + // drop any partial written entry. + if (journaler.last_committed.write_pos < journaler.get_write_pos()) { + dout(4) << "recovering write_pos" << dendl; + journaler.set_read_pos(journaler.last_committed.write_pos); + _recover(completion); + return; } - completion->complete(r); + + journaler.set_writeable(); + completion->complete(0); } else { derr << "Error " << r << " loading Journaler" << dendl; - on_error->complete(0); + on_error->complete(r); } })); } + +void PurgeQueue::_recover(Context *completion) +{ + assert(lock.is_locked_by_me()); + + // Journaler::is_readable() adjusts write_pos if partial entry is encountered + while (1) { + if (!journaler.is_readable() && + !journaler.get_error() && + journaler.get_read_pos() < journaler.get_write_pos()) { + journaler.wait_for_readable(new FunctionContext([this, completion](int r) { + Mutex::Locker l(lock); + _recover(completion); + })); + return; + } + + if (journaler.get_error()) { + int r = journaler.get_error(); + derr << "Error " << r << " recovering write_pos" << dendl; + on_error->complete(r); + return; + } + + if (journaler.get_read_pos() == journaler.get_write_pos()) { + dout(4) << "write_pos recovered" << dendl; + // restore original read_pos + journaler.set_read_pos(journaler.last_committed.expire_pos); + journaler.set_writeable(); + completion->complete(0); + return; + } + + bufferlist bl; + bool readable = journaler.try_read_entry(bl); + assert(readable); // we checked earlier + } +} + void PurgeQueue::create(Context *fin) { dout(4) << "creating" << dendl; diff --git a/src/mds/PurgeQueue.h b/src/mds/PurgeQueue.h index 33b8c861799..b9699ddf3e3 100644 --- a/src/mds/PurgeQueue.h +++ b/src/mds/PurgeQueue.h @@ -112,6 +112,9 @@ protected: // Has drain() ever been called on this instance? bool draining; + // recover the journal write_pos (drop any partial written entry) + void _recover(Context *completion); + /** * @return true if we were in a position to try and consume something: * does not mean we necessarily did. diff --git a/src/osdc/Journaler.cc b/src/osdc/Journaler.cc index 978511ca0df..642b15be430 100644 --- a/src/osdc/Journaler.cc +++ b/src/osdc/Journaler.cc @@ -65,9 +65,10 @@ void Journaler::create(file_layout_t *l, stream_format_t const sf) journal_stream.set_format(sf); _set_layout(l); - prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = - read_pos = requested_pos = received_pos = - expire_pos = trimming_pos = trimmed_pos = layout.get_period(); + prezeroing_pos = prezero_pos = write_pos = flush_pos = + safe_pos = read_pos = requested_pos = received_pos = + expire_pos = trimming_pos = trimmed_pos = + next_safe_pos = layout.get_period(); ldout(cct, 1) << "created blank journal at inode 0x" << std::hex << ino << std::dec << ", format=" << stream_format << dendl; @@ -228,7 +229,7 @@ void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish) finish->complete(-EINVAL); return; } - prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos + prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = h.write_pos; expire_pos = h.expire_pos; trimmed_pos = trimming_pos = h.trimmed_pos; @@ -289,7 +290,7 @@ void Journaler::_finish_read_head(int r, bufferlist& bl) return; } - prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos + prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = h.write_pos; read_pos = requested_pos = received_pos = expire_pos = h.expire_pos; trimmed_pos = trimming_pos = h.trimmed_pos; @@ -337,7 +338,7 @@ void Journaler::_finish_reprobe(int r, uint64_t new_end, ldout(cct, 1) << "_finish_reprobe new_end = " << new_end << " (header had " << write_pos << ")." << dendl; - prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = new_end; + prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = new_end; state = STATE_ACTIVE; onfinish->complete(r); } @@ -364,7 +365,7 @@ void Journaler::_finish_probe_end(int r, uint64_t end) state = STATE_ACTIVE; - prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = end; + prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = end; out: // done. @@ -500,7 +501,6 @@ void Journaler::_finish_flush(int r, uint64_t start, ceph::real_time stamp) return; } - assert(start >= safe_pos); assert(start < flush_pos); // calc latency? @@ -510,12 +510,13 @@ void Journaler::_finish_flush(int r, uint64_t start, ceph::real_time stamp) } // adjust safe_pos - assert(pending_safe.count(start)); - pending_safe.erase(start); + auto it = pending_safe.find(start); + assert(it != pending_safe.end()); + pending_safe.erase(it); if (pending_safe.empty()) - safe_pos = flush_pos; + safe_pos = next_safe_pos; else - safe_pos = *pending_safe.begin(); + safe_pos = pending_safe.begin()->second; ldout(cct, 10) << "_finish_flush safe from " << start << ", pending_safe " << pending_safe @@ -602,6 +603,10 @@ uint64_t Journaler::append_entry(bufferlist& bl) ldout(cct, 10) << " flushing completed object(s) (su " << su << " wro " << write_obj << " flo " << flush_obj << ")" << dendl; _do_flush(write_buf.length() - write_off); + if (write_off) { + // current entry isn't being flushed, set next_safe_pos to the end of previous entry + next_safe_pos = write_pos - wrote; + } } return write_pos; @@ -654,13 +659,14 @@ void Journaler::_do_flush(unsigned amount) SnapContext snapc; Context *onsafe = new C_Flush(this, flush_pos, now); // on COMMIT - pending_safe.insert(flush_pos); + pending_safe[flush_pos] = next_safe_pos; bufferlist write_bl; // adjust pointers if (len == write_buf.length()) { write_bl.swap(write_buf); + next_safe_pos = write_pos; } else { write_buf.splice(0, len, &write_bl); } @@ -978,7 +984,7 @@ void Journaler::_issue_read(uint64_t len) ldout(cct, 10) << "_issue_read requested_pos = safe_pos = " << safe_pos << ", waiting" << dendl; assert(write_pos > requested_pos); - if (flush_pos == safe_pos) { + if (pending_safe.empty()) { _flush(NULL); } waitfor_safe[flush_pos].push_back(new C_RetryRead(this)); @@ -1047,7 +1053,7 @@ void Journaler::_prefetch() << " < target " << target << " (" << raw_target << "), prefetching " << len << dendl; - if (flush_pos == safe_pos && write_pos > safe_pos) { + if (pending_safe.empty() && write_pos > safe_pos) { // If we are reading and writing the journal, then we may need // to issue a flush if one isn't already in progress. // Avoid doing a flush every time so that if we do write/read/write/read @@ -1089,7 +1095,7 @@ bool Journaler::_is_readable() "adjusting write_pos to " << read_pos << dendl; // adjust write_pos - prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = read_pos; + prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = read_pos; assert(write_buf.length() == 0); // reset read state diff --git a/src/osdc/Journaler.h b/src/osdc/Journaler.h index 12708efdba1..df901b3eedf 100644 --- a/src/osdc/Journaler.h +++ b/src/osdc/Journaler.h @@ -298,6 +298,11 @@ private: uint64_t flush_pos; ///< where we will flush. if /// write_pos>flush_pos, we're buffering writes. uint64_t safe_pos; ///< what has been committed safely to disk. + + uint64_t next_safe_pos; /// start postion of the first entry that isn't + /// being fully flushed. If we don't flush any + // partial entry, it's equal to flush_pos. + bufferlist write_buf; ///< write buffer. flush_pos + /// write_buf.length() == write_pos. @@ -306,7 +311,7 @@ private: bool waiting_for_zero; interval_set pending_zero; // non-contig bits we've zeroed - std::set pending_safe; + std::map pending_safe; // flush_pos -> safe_pos // when safe through given offset std::map > waitfor_safe; @@ -398,7 +403,8 @@ public: objecter(obj), filer(objecter, f), logger(l), logger_key_lat(lkey), delay_flush_event(0), state(STATE_UNDEF), error(0), - prezeroing_pos(0), prezero_pos(0), write_pos(0), flush_pos(0), safe_pos(0), + prezeroing_pos(0), prezero_pos(0), write_pos(0), flush_pos(0), + safe_pos(0), next_safe_pos(0), write_buf_throttle(cct, "write_buf_throttle", UINT_MAX - (UINT_MAX >> 3)), waiting_for_zero(false), read_pos(0), requested_pos(0), received_pos(0), @@ -428,6 +434,7 @@ public: write_pos = 0; flush_pos = 0; safe_pos = 0; + next_safe_pos = 0; read_pos = 0; requested_pos = 0; received_pos = 0; @@ -459,7 +466,7 @@ public: void set_writeable(); void set_write_pos(uint64_t p) { lock_guard l(lock); - prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = p; + prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = p; } void set_read_pos(uint64_t p) { lock_guard l(lock);