diff --git a/db.go b/db.go index f8e6f5232..e80d3bd2d 100644 --- a/db.go +++ b/db.go @@ -328,8 +328,12 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db minValidTime = blocks[len(blocks)-1].Meta().MaxTime } - if err := db.head.Init(minValidTime); err != nil { - return nil, errors.Wrap(err, "read WAL") + if initErr := db.head.Init(minValidTime); initErr != nil { + db.head.metrics.walCorruptionsTotal.Inc() + level.Warn(db.logger).Log("msg", "encountered WAL read error, attempting repair", "err", err) + if err := wlog.Repair(initErr); err != nil { + return nil, errors.Wrap(err, "repair corrupted WAL") + } } go db.run() diff --git a/head.go b/head.go index f9d497802..44c71c74c 100644 --- a/head.go +++ b/head.go @@ -315,7 +315,7 @@ func (h *Head) updateMinMaxTime(mint, maxt int64) { } } -func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) error { +func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) { // Track number of samples that referenced a series we don't know about // for error reporting. var unknownRefs uint64 @@ -332,6 +332,18 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) error { ) wg.Add(n) + defer func() { + // For CorruptionErr ensure to terminate all workers before exiting. + if _, ok := err.(*wal.CorruptionErr); ok { + for i := 0; i < n; i++ { + close(inputs[i]) + for range outputs[i] { + } + } + wg.Wait() + } + }() + for i := 0; i < n; i++ { outputs[i] = make(chan []RefSample, 300) inputs[i] = make(chan []RefSample, 300) @@ -349,9 +361,12 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) error { samples []RefSample tstones []Stone allStones = newMemTombstones() - err error ) - defer allStones.Close() + defer func() { + if err := allStones.Close(); err != nil { + level.Warn(h.logger).Log("msg", "closing memTombstones during wal read", "err", err) + } + }() for r.Next() { series, samples, tstones = series[:0], samples[:0], tstones[:0] rec := r.Record() @@ -450,9 +465,6 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) error { } } } - if r.Err() != nil { - return errors.Wrap(r.Err(), "read records") - } // Signal termination to each worker and wait for it to close its output channel. for i := 0; i < n; i++ { @@ -462,6 +474,10 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) error { } wg.Wait() + if r.Err() != nil { + return errors.Wrap(r.Err(), "read records") + } + if err := allStones.Iter(func(ref uint64, dranges Intervals) error { return h.chunkRewrite(ref, dranges) }); err != nil { @@ -497,7 +513,11 @@ func (h *Head) Init(minValidTime int64) error { if err != nil { return errors.Wrap(err, "open checkpoint") } - defer sr.Close() + defer func() { + if err := sr.Close(); err != nil { + level.Warn(h.logger).Log("msg", "error while closing the wal segments reader", "err", err) + } + }() // A corrupted checkpoint is a hard error for now and requires user // intervention. There's likely little data that can be recovered anyway. @@ -522,14 +542,11 @@ func (h *Head) Init(minValidTime int64) error { sr := wal.NewSegmentBufReader(s) err = h.loadWAL(wal.NewReader(sr), multiRef) - sr.Close() // Close the reader so that if there was an error the repair can remove the corrupted file under Windows. - if err == nil { - continue + if err := sr.Close(); err != nil { + level.Warn(h.logger).Log("msg", "error while closing the wal segments reader", "err", err) } - level.Warn(h.logger).Log("msg", "encountered WAL error, attempting repair", "err", err) - h.metrics.walCorruptionsTotal.Inc() - if err := h.wal.Repair(err); err != nil { - return errors.Wrap(err, "repair corrupted WAL") + if err != nil { + return err } } diff --git a/head_test.go b/head_test.go index 30ff27dbf..b10294f73 100644 --- a/head_test.go +++ b/head_test.go @@ -19,9 +19,11 @@ import ( "math/rand" "os" "path" + "path/filepath" "sort" "testing" + "github.com/pkg/errors" prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/tsdb/chunkenc" "github.com/prometheus/tsdb/chunks" @@ -1088,42 +1090,61 @@ func TestWalRepair_DecodingError(t *testing.T) { }, } { t.Run(name, func(t *testing.T) { - dir, err := ioutil.TempDir("", "wal_head_repair") + dir, err := ioutil.TempDir("", "wal_repair") testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - w, err := wal.New(nil, nil, dir) - testutil.Ok(t, err) - defer w.Close() + // Fill the wal and corrupt it. + { + w, err := wal.New(nil, nil, filepath.Join(dir, "wal")) + testutil.Ok(t, err) - for i := 1; i <= test.totalRecs; i++ { - // At this point insert a corrupted record. - if i-1 == test.expRecs { - testutil.Ok(t, w.Log(test.corrFunc(test.rec))) - continue + for i := 1; i <= test.totalRecs; i++ { + // At this point insert a corrupted record. + if i-1 == test.expRecs { + testutil.Ok(t, w.Log(test.corrFunc(test.rec))) + continue + } + testutil.Ok(t, w.Log(test.rec)) } - testutil.Ok(t, w.Log(test.rec)) + + h, err := NewHead(nil, nil, w, 1) + testutil.Ok(t, err) + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal)) + initErr := h.Init(math.MinInt64) + + err = errors.Cause(initErr) // So that we can pick up errors even if wrapped. + _, corrErr := err.(*wal.CorruptionErr) + testutil.Assert(t, corrErr, "reading the wal didn't return corruption error") + testutil.Ok(t, w.Close()) } - h, err := NewHead(nil, nil, w, 1) - testutil.Ok(t, err) - testutil.Equals(t, 0.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal)) - testutil.Ok(t, h.Init(math.MinInt64)) - testutil.Equals(t, 1.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal)) - - sr, err := wal.NewSegmentsReader(dir) - testutil.Ok(t, err) - defer sr.Close() - r := wal.NewReader(sr) - - var actRec int - for r.Next() { - actRec++ + // Open the db to trigger a repair. + { + db, err := Open(dir, nil, nil, DefaultOptions) + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, db.Close()) + }() + testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.walCorruptionsTotal)) + } + + // Read the wal content after the repair. + { + sr, err := wal.NewSegmentsReader(filepath.Join(dir, "wal")) + testutil.Ok(t, err) + defer sr.Close() + r := wal.NewReader(sr) + + var actRec int + for r.Next() { + actRec++ + } + testutil.Ok(t, r.Err()) + testutil.Equals(t, test.expRecs, actRec, "Wrong number of intact records") } - testutil.Ok(t, r.Err()) - testutil.Equals(t, test.expRecs, actRec, "Wrong number of intact records") }) }