diff --git a/head.go b/head.go index 92619a640..740eff07f 100644 --- a/head.go +++ b/head.go @@ -14,6 +14,7 @@ package tsdb import ( + "fmt" "math" "runtime" "sort" @@ -492,22 +493,32 @@ func (h *Head) Init(minValidTime int64) error { startFrom++ } - // Backfill segments from the last checkpoint onwards - sr, err := wal.NewSegmentsRangeReader(wal.SegmentRange{Dir: h.wal.Dir(), First: startFrom, Last: -1}) + // Find the last segment. + _, last, err := h.wal.Segments() if err != nil { - return errors.Wrap(err, "open WAL segments") + return errors.Wrap(err, "finding WAL segments") } - err = h.loadWAL(wal.NewReader(sr)) - sr.Close() // Close the reader so that if there was an error the repair can remove the corrupted file under Windows. - if err == nil { - return nil - } - level.Warn(h.logger).Log("msg", "encountered WAL error, attempting repair", "err", err) - h.metrics.walCorruptionsTotal.Inc() - if err := h.wal.Repair(err); err != nil { - return errors.Wrap(err, "repair corrupted WAL") + // Backfill segments from the most recent checkpoint onwards. + for i := startFrom; i <= last; i++ { + s, err := wal.OpenReadSegment(wal.SegmentName(h.wal.Dir(), i)) + if err != nil { + return errors.Wrap(err, fmt.Sprintf("open WAL segment: %d", i)) + } + + sr := wal.NewSegmentBufReader(s) + err = h.loadWAL(wal.NewReader(sr)) + sr.Close() // Close the reader so that if there was an error the repair can remove the corrupted file under Windows. + if err == nil { + continue + } + level.Warn(h.logger).Log("msg", "encountered WAL error, attempting repair", "err", err) + h.metrics.walCorruptionsTotal.Inc() + if err := h.wal.Repair(err); err != nil { + return errors.Wrap(err, "repair corrupted WAL") + } } + return nil } diff --git a/wal/wal.go b/wal/wal.go index 46504f0d9..cb2e11ff9 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -228,29 +228,25 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi } _, j, err := w.Segments() + // Index of the Segment we want to open and write to. + writeSegmentIndex := 0 if err != nil { return nil, errors.Wrap(err, "get segment range") } - // Fresh dir, no segments yet. - if j == -1 { - segment, err := CreateSegment(w.dir, 0) - if err != nil { - return nil, err - } - - if err := w.setSegment(segment); err != nil { - return nil, err - } - } else { - segment, err := OpenWriteSegment(logger, w.dir, j) - if err != nil { - return nil, err - } - - if err := w.setSegment(segment); err != nil { - return nil, err - } + // If some segments already exist create one with a higher index than the last segment. + if j != -1 { + writeSegmentIndex = j + 1 } + + segment, err := CreateSegment(w.dir, writeSegmentIndex) + if err != nil { + return nil, err + } + + if err := w.setSegment(segment); err != nil { + return nil, err + } + go w.run() return w, nil @@ -363,6 +359,9 @@ func (w *WAL) Repair(origErr error) error { } // We expect an error here from r.Err(), so nothing to handle. + // We need to pad to the end of the last page in the repaired segment + w.flushPage(true) + // We explicitly close even when there is a defer for Windows to be // able to delete it. The defer is in place to close it in-case there // are errors above. @@ -372,6 +371,20 @@ func (w *WAL) Repair(origErr error) error { if err := os.Remove(tmpfn); err != nil { return errors.Wrap(err, "delete corrupted segment") } + + // Explicitly close the the segment we just repaired to avoid issues with Windows. + s.Close() + + // We always want to start writing to a new Segment rather than an existing + // Segment, which is handled by NewSize, but earlier in Repair we're deleting + // all segments that come after the corrupted Segment. Recreate a new Segment here. + s, err = CreateSegment(w.dir, cerr.Segment+1) + if err != nil { + return err + } + if err := w.setSegment(s); err != nil { + return err + } return nil } @@ -710,7 +723,7 @@ func NewSegmentsRangeReader(sr ...SegmentRange) (io.ReadCloser, error) { segs = append(segs, s) } } - return newSegmentBufReader(segs...), nil + return NewSegmentBufReader(segs...), nil } // segmentBufReader is a buffered reader that reads in multiples of pages. @@ -725,7 +738,7 @@ type segmentBufReader struct { off int // Offset of read data into current segment. } -func newSegmentBufReader(segs ...*Segment) *segmentBufReader { +func NewSegmentBufReader(segs ...*Segment) *segmentBufReader { return &segmentBufReader{ buf: bufio.NewReaderSize(segs[0], 16*pageSize), segs: segs, diff --git a/wal/wal_test.go b/wal/wal_test.go index 5363ebf4a..577ae5fd0 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -143,20 +143,35 @@ func TestWAL_Repair(t *testing.T) { testutil.Ok(t, err) defer w.Close() + first, last, err := w.Segments() + testutil.Ok(t, err) + + // Backfill segments from the most recent checkpoint onwards. + for i := first; i <= last; i++ { + s, err := OpenReadSegment(SegmentName(w.Dir(), i)) + testutil.Ok(t, err) + + sr := NewSegmentBufReader(s) + testutil.Ok(t, err) + r := NewReader(sr) + for r.Next() { + } + + //Close the segment so we don't break things on Windows. + s.Close() + + // No corruption in this segment. + if r.Err() == nil { + continue + } + testutil.Ok(t, w.Repair(r.Err())) + break + } + sr, err := NewSegmentsReader(dir) testutil.Ok(t, err) - r := NewReader(sr) - - for r.Next() { - } - testutil.NotOk(t, r.Err()) - testutil.Ok(t, sr.Close()) - - testutil.Ok(t, w.Repair(r.Err())) - sr, err = NewSegmentsReader(dir) - testutil.Ok(t, err) defer sr.Close() - r = NewReader(sr) + r := NewReader(sr) var result [][]byte for r.Next() { @@ -172,10 +187,13 @@ func TestWAL_Repair(t *testing.T) { } } - // Make sure the last segment is the corrupt segment. - _, last, err := w.Segments() + // Make sure there is a new 0 size Segment after the corrupted Segment. + _, last, err = w.Segments() testutil.Ok(t, err) - testutil.Equals(t, test.corrSgm, last) + testutil.Equals(t, test.corrSgm+1, last) + fi, err := os.Stat(SegmentName(dir, last)) + testutil.Ok(t, err) + testutil.Equals(t, int64(0), fi.Size()) }) } } @@ -276,6 +294,10 @@ func TestCorruptAndCarryOn(t *testing.T) { err = w.Repair(corruptionErr) testutil.Ok(t, err) + // Ensure that we have a completely clean slate after reapiring. + testutil.Equals(t, w.segment.Index(), 1) // We corrupted segment 0. + testutil.Equals(t, w.donePages, 0) + for i := 0; i < 5; i++ { buf := make([]byte, recordSize) _, err := rand.Read(buf)