move the wal repair logic in db.Open (#633)

* move the wal repair logic in db.Open

This is to allow opening a wal in a read oly mode without triggering a
repair.

Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com>
This commit is contained in:
Krasi Georgiev 2019-06-14 17:39:22 +02:00 committed by GitHub
parent 4892da59eb
commit 69740485c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 84 additions and 42 deletions

8
db.go
View File

@ -328,8 +328,12 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
minValidTime = blocks[len(blocks)-1].Meta().MaxTime
}
if err := db.head.Init(minValidTime); err != nil {
return nil, errors.Wrap(err, "read WAL")
if initErr := db.head.Init(minValidTime); initErr != nil {
db.head.metrics.walCorruptionsTotal.Inc()
level.Warn(db.logger).Log("msg", "encountered WAL read error, attempting repair", "err", err)
if err := wlog.Repair(initErr); err != nil {
return nil, errors.Wrap(err, "repair corrupted WAL")
}
}
go db.run()

45
head.go
View File

@ -315,7 +315,7 @@ func (h *Head) updateMinMaxTime(mint, maxt int64) {
}
}
func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) error {
func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) {
// Track number of samples that referenced a series we don't know about
// for error reporting.
var unknownRefs uint64
@ -332,6 +332,18 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) error {
)
wg.Add(n)
defer func() {
// For CorruptionErr ensure to terminate all workers before exiting.
if _, ok := err.(*wal.CorruptionErr); ok {
for i := 0; i < n; i++ {
close(inputs[i])
for range outputs[i] {
}
}
wg.Wait()
}
}()
for i := 0; i < n; i++ {
outputs[i] = make(chan []RefSample, 300)
inputs[i] = make(chan []RefSample, 300)
@ -349,9 +361,12 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) error {
samples []RefSample
tstones []Stone
allStones = newMemTombstones()
err error
)
defer allStones.Close()
defer func() {
if err := allStones.Close(); err != nil {
level.Warn(h.logger).Log("msg", "closing memTombstones during wal read", "err", err)
}
}()
for r.Next() {
series, samples, tstones = series[:0], samples[:0], tstones[:0]
rec := r.Record()
@ -450,9 +465,6 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) error {
}
}
}
if r.Err() != nil {
return errors.Wrap(r.Err(), "read records")
}
// Signal termination to each worker and wait for it to close its output channel.
for i := 0; i < n; i++ {
@ -462,6 +474,10 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) error {
}
wg.Wait()
if r.Err() != nil {
return errors.Wrap(r.Err(), "read records")
}
if err := allStones.Iter(func(ref uint64, dranges Intervals) error {
return h.chunkRewrite(ref, dranges)
}); err != nil {
@ -497,7 +513,11 @@ func (h *Head) Init(minValidTime int64) error {
if err != nil {
return errors.Wrap(err, "open checkpoint")
}
defer sr.Close()
defer func() {
if err := sr.Close(); err != nil {
level.Warn(h.logger).Log("msg", "error while closing the wal segments reader", "err", err)
}
}()
// A corrupted checkpoint is a hard error for now and requires user
// intervention. There's likely little data that can be recovered anyway.
@ -522,14 +542,11 @@ func (h *Head) Init(minValidTime int64) error {
sr := wal.NewSegmentBufReader(s)
err = h.loadWAL(wal.NewReader(sr), multiRef)
sr.Close() // Close the reader so that if there was an error the repair can remove the corrupted file under Windows.
if err == nil {
continue
if err := sr.Close(); err != nil {
level.Warn(h.logger).Log("msg", "error while closing the wal segments reader", "err", err)
}
level.Warn(h.logger).Log("msg", "encountered WAL error, attempting repair", "err", err)
h.metrics.walCorruptionsTotal.Inc()
if err := h.wal.Repair(err); err != nil {
return errors.Wrap(err, "repair corrupted WAL")
if err != nil {
return err
}
}

View File

@ -19,9 +19,11 @@ import (
"math/rand"
"os"
"path"
"path/filepath"
"sort"
"testing"
"github.com/pkg/errors"
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/tsdb/chunkenc"
"github.com/prometheus/tsdb/chunks"
@ -1088,15 +1090,16 @@ func TestWalRepair_DecodingError(t *testing.T) {
},
} {
t.Run(name, func(t *testing.T) {
dir, err := ioutil.TempDir("", "wal_head_repair")
dir, err := ioutil.TempDir("", "wal_repair")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
w, err := wal.New(nil, nil, dir)
// Fill the wal and corrupt it.
{
w, err := wal.New(nil, nil, filepath.Join(dir, "wal"))
testutil.Ok(t, err)
defer w.Close()
for i := 1; i <= test.totalRecs; i++ {
// At this point insert a corrupted record.
@ -1110,10 +1113,27 @@ func TestWalRepair_DecodingError(t *testing.T) {
h, err := NewHead(nil, nil, w, 1)
testutil.Ok(t, err)
testutil.Equals(t, 0.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal))
testutil.Ok(t, h.Init(math.MinInt64))
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal))
initErr := h.Init(math.MinInt64)
sr, err := wal.NewSegmentsReader(dir)
err = errors.Cause(initErr) // So that we can pick up errors even if wrapped.
_, corrErr := err.(*wal.CorruptionErr)
testutil.Assert(t, corrErr, "reading the wal didn't return corruption error")
testutil.Ok(t, w.Close())
}
// Open the db to trigger a repair.
{
db, err := Open(dir, nil, nil, DefaultOptions)
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, db.Close())
}()
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.walCorruptionsTotal))
}
// Read the wal content after the repair.
{
sr, err := wal.NewSegmentsReader(filepath.Join(dir, "wal"))
testutil.Ok(t, err)
defer sr.Close()
r := wal.NewReader(sr)
@ -1124,6 +1144,7 @@ func TestWalRepair_DecodingError(t *testing.T) {
}
testutil.Ok(t, r.Err())
testutil.Equals(t, test.expRecs, actRec, "Wrong number of intact records")
}
})
}