From cb4dde7659d55e074cacbe24385e393c598c56f0 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Wed, 8 Mar 2017 16:53:07 +0100 Subject: [PATCH] Fix WAL log recovery bug This fixes a bug where the last WAL file was closed after consuming it instead of being left open for further writes. Reloading of blocks on startup considers loading head blocks now. --- db.go | 8 +++++++- head.go | 10 +--------- wal.go | 6 +++++- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/db.go b/db.go index 0a89bd45f..bf8600fc4 100644 --- a/db.go +++ b/db.go @@ -358,7 +358,13 @@ func (db *DB) reloadBlocks() error { b, ok := db.seqBlocks[meta.Sequence] if meta.Compaction.Generation == 0 { - if ok && meta.ULID != b.Meta().ULID { + if !ok { + b, err = openHeadBlock(dirs[i], db.logger) + if err != nil { + return errors.Wrapf(err, "load head at %s", dirs[i]) + } + } + if meta.ULID != b.Meta().ULID { return errors.Errorf("head block ULID changed unexpectedly") } heads = append(heads, b.(*headBlock)) diff --git a/head.go b/head.go index 2a9599386..211cadf77 100644 --- a/head.go +++ b/head.go @@ -44,7 +44,6 @@ type headBlock struct { activeWriters uint64 - symbols map[string]struct{} // descs holds all chunk descs for the head block. Each chunk implicitly // is assigned the index as its ID. series []*memSeries @@ -150,7 +149,7 @@ func (h *headBlock) Close() error { h.mtx.Lock() if err := h.wal.Close(); err != nil { - return err + return errors.Wrapf(err, "close WAL for head %s", h.dir) } // Check whether the head block still exists in the underlying dir // or has already been replaced with a compacted version or removed. @@ -528,13 +527,6 @@ func (h *headBlock) create(hash uint64, lset labels.Labels) *memSeries { return s } -func (h *headBlock) fullness() float64 { - h.metamtx.RLock() - defer h.metamtx.RUnlock() - - return float64(h.meta.Stats.NumSamples) / float64(h.meta.Stats.NumSeries+1) / 250 -} - func (h *headBlock) updateMapping() { h.mtx.RLock() diff --git a/wal.go b/wal.go index 8b88d110d..5b85ca3c9 100644 --- a/wal.go +++ b/wal.go @@ -448,7 +448,11 @@ func (r *WALReader) nextEntry() (WALEntryType, byte, []byte, error) { cr := r.rs[r.cur] et, flag, b, err := r.entry(cr) - if err == io.EOF { + // If we reached the end of the reader, advance to the next one + // and close. + // Do not close on the last one as it will still be appended to. + // XXX(fabxc): leaky abstraction. + if err == io.EOF && r.cur < len(r.rs)-1 { // Current reader completed, close and move to the next one. if err := cr.Close(); err != nil { return 0, 0, nil, err