From 04bb006e2bb74df05c963a56d84118abe93c06e7 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 14 Mar 2017 19:30:23 +0100 Subject: [PATCH] Handle WAL corruption by truncating This adds handling for various corruption scenarios of the WAL. If corruption is encountered, we truncate the WAL after the last valid entry transparently and continue appending after the offset. --- wal.go | 155 ++++++++++++++++++++++++++++++++++++++-------------- wal_test.go | 139 +++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 251 insertions(+), 43 deletions(-) diff --git a/wal.go b/wal.go index b06425e07..2a49dcf41 100644 --- a/wal.go +++ b/wal.go @@ -63,7 +63,7 @@ const ( // OpenWAL opens or creates a write ahead log in the given directory. // The WAL must be read completely before new data is written. -func OpenWAL(dir string, l log.Logger, flushInterval time.Duration) (*WAL, error) { +func OpenWAL(dir string, logger log.Logger, flushInterval time.Duration) (*WAL, error) { dir = filepath.Join(dir, walDirName) if err := os.MkdirAll(dir, 0777); err != nil { @@ -73,10 +73,13 @@ func OpenWAL(dir string, l log.Logger, flushInterval time.Duration) (*WAL, error if err != nil { return nil, err } + if logger == nil { + logger = log.NewNopLogger() + } w := &WAL{ dirFile: df, - logger: l, + logger: logger, flushInterval: flushInterval, donec: make(chan struct{}), stopc: make(chan struct{}), @@ -95,11 +98,7 @@ func OpenWAL(dir string, l log.Logger, flushInterval time.Duration) (*WAL, error // Reader returns a new reader over the the write ahead log data. // It must be completely consumed before writing to the WAL. func (w *WAL) Reader() *WALReader { - var rs []io.ReadCloser - for _, f := range w.files { - rs = append(rs, f) - } - return NewWALReader(rs...) + return NewWALReader(w.logger, w) } // Log writes a batch of new series labels and samples to the log. @@ -126,21 +125,15 @@ func (w *WAL) initSegments() error { if len(fns) == 0 { return nil } - if len(fns) > 1 { - for _, fn := range fns[:len(fns)-1] { - f, err := os.Open(fn) - if err != nil { - return err - } - w.files = append(w.files, f) + // We must open all file in read mode as we may have to truncate along + // the way and any file may become the tail. + for _, fn := range fns { + f, err := os.OpenFile(fn, os.O_RDWR, 0666) + if err != nil { + return err } + w.files = append(w.files, f) } - // The most recent WAL file is the one we have to keep appending to. - f, err := os.OpenFile(fns[len(fns)-1], os.O_RDWR, 0666) - if err != nil { - return err - } - w.files = append(w.files, f) // Consume and validate meta headers. for _, f := range w.files { @@ -275,7 +268,7 @@ func (w *WAL) Close() error { // On opening, a WAL must be fully consumed once. Afterwards // only the current segment will still be open. if tf := w.tail(); tf != nil { - return tf.Close() + return errors.Wrapf(tf.Close(), "closing WAL tail %s", tf.Name()) } return nil } @@ -413,7 +406,9 @@ func (w *WAL) encodeSamples(samples []refdSample) error { // WALReader decodes and emits write ahead log entries. type WALReader struct { - rs []io.ReadCloser + logger log.Logger + + wal *WAL cur int buf []byte crc32 hash.Hash32 @@ -424,12 +419,17 @@ type WALReader struct { } // NewWALReader returns a new WALReader over the sequence of the given ReadClosers. -func NewWALReader(rs ...io.ReadCloser) *WALReader { - return &WALReader{ - rs: rs, - buf: make([]byte, 0, 128*4096), - crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), +func NewWALReader(logger log.Logger, w *WAL) *WALReader { + if logger == nil { + logger = log.NewNopLogger() } + r := &WALReader{ + logger: logger, + wal: w, + buf: make([]byte, 0, 128*4096), + crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), + } + return r } // At returns the last decoded entry of labels or samples. @@ -446,19 +446,18 @@ func (r *WALReader) Err() error { // nextEntry retrieves the next entry. It is also used as a testing hook. func (r *WALReader) nextEntry() (WALEntryType, byte, []byte, error) { - if r.cur >= len(r.rs) { + if r.cur >= len(r.wal.files) { return 0, 0, nil, io.EOF } - cr := r.rs[r.cur] + cf := r.wal.files[r.cur] - et, flag, b, err := r.entry(cr) + et, flag, b, err := r.entry(cf) // If we reached the end of the reader, advance to the next one // and close. // Do not close on the last one as it will still be appended to. - // XXX(fabxc): leaky abstraction. - if err == io.EOF && r.cur < len(r.rs)-1 { + if err == io.EOF && r.cur < len(r.wal.files)-1 { // Current reader completed, close and move to the next one. - if err := cr.Close(); err != nil { + if err := cf.Close(); err != nil { return 0, 0, nil, err } r.cur++ @@ -473,14 +472,46 @@ func (r *WALReader) Next() bool { r.labels = r.labels[:0] r.samples = r.samples[:0] - et, flag, b, err := r.nextEntry() + if r.cur >= len(r.wal.files) { + return false + } + cf := r.wal.files[r.cur] + + // Save position after last valid entry if we have to truncate the WAL. + lastOffset, err := cf.Seek(0, os.SEEK_CUR) if err != nil { - if err != io.EOF { + r.err = err + return false + } + + et, flag, b, err := r.entry(cf) + // If we reached the end of the reader, advance to the next one + // and close. + // Do not close on the last one as it will still be appended to. + if err == io.EOF { + if r.cur == len(r.wal.files)-1 { + return false + } + // Current reader completed, close and move to the next one. + if err := cf.Close(); err != nil { r.err = err + return false + } + r.cur++ + return r.Next() + } + if err != nil { + r.err = err + + if _, ok := err.(walCorruptionErr); ok { + r.err = r.truncate(lastOffset) } return false } + // In decoding below we never return a walCorruptionErr for now. + // Those should generally be catched by entry decoding before. + switch et { case WALEntrySamples: if err := r.decodeSamples(flag, b); err != nil { @@ -490,19 +521,52 @@ func (r *WALReader) Next() bool { if err := r.decodeSeries(flag, b); err != nil { r.err = err } - default: - r.err = errors.Errorf("unknown WAL entry type %d", et) } return r.err == nil } +func (r *WALReader) current() *os.File { + return r.wal.files[r.cur] +} + +// truncate the WAL after the last valid entry. +func (r *WALReader) truncate(lastOffset int64) error { + r.logger.Log("msg", "WAL corruption detected; truncating", + "err", r.err, "file", r.current().Name(), "pos", lastOffset) + + // Close and delete all files after the current one. + for _, f := range r.wal.files[r.cur+1:] { + if err := f.Close(); err != nil { + return err + } + if err := os.Remove(f.Name()); err != nil { + return err + } + } + r.wal.files = r.wal.files[:r.cur+1] + + // Seek the current file to the last valid offset where we continue writing from. + _, err := r.current().Seek(lastOffset, os.SEEK_SET) + return err +} + +// walCorruptionErr is a type wrapper for errors that indicate WAL corruption +// and trigger a truncation. +type walCorruptionErr error + +func walCorruptionErrf(s string, args ...interface{}) error { + return walCorruptionErr(errors.Errorf(s, args...)) +} + func (r *WALReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { r.crc32.Reset() tr := io.TeeReader(cr, r.crc32) b := make([]byte, 6) - if _, err := tr.Read(b); err != nil { + if n, err := tr.Read(b); err != nil { return 0, 0, nil, err + } else if n != 6 { + return 0, 0, nil, walCorruptionErrf("invalid entry header size %d", n) } var ( @@ -514,21 +578,28 @@ func (r *WALReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { if etype == 0 { return 0, 0, nil, io.EOF } + if etype != WALEntrySeries && etype != WALEntrySamples { + return 0, 0, nil, walCorruptionErrf("invalid entry type %d", etype) + } if length > len(r.buf) { r.buf = make([]byte, length) } buf := r.buf[:length] - if _, err := tr.Read(buf); err != nil { + if n, err := tr.Read(buf); err != nil { return 0, 0, nil, err + } else if n != length { + return 0, 0, nil, walCorruptionErrf("invalid entry body size %d", n) } - _, err := cr.Read(b[:4]) - if err != nil { + + if n, err := cr.Read(b[:4]); err != nil { return 0, 0, nil, err + } else if n != 4 { + return 0, 0, nil, walCorruptionErrf("invalid checksum length %d", n) } if exp, has := binary.BigEndian.Uint32(b[:4]), r.crc32.Sum32(); has != exp { - return 0, 0, nil, errors.Errorf("unexpected CRC32 checksum %x, want %x", has, exp) + return 0, 0, nil, walCorruptionErrf("unexpected CRC32 checksum %x, want %x", has, exp) } return etype, flag, buf, nil diff --git a/wal_test.go b/wal_test.go index 8442dbddc..6a4205c1c 100644 --- a/wal_test.go +++ b/wal_test.go @@ -9,6 +9,7 @@ import ( "testing" "github.com/fabxc/tsdb/labels" + "github.com/go-kit/kit/log" "github.com/coreos/etcd/pkg/fileutil" "github.com/stretchr/testify/require" @@ -109,7 +110,7 @@ func TestWAL_cut(t *testing.T) { // We cannot actually check for correct pre-allocation as it is // optional per filesystem and handled transparently. - et, flag, b, err := NewWALReader(f).nextEntry() + et, flag, b, err := NewWALReader(nil, nil).entry(f) require.NoError(t, err) require.Equal(t, WALEntrySeries, et) require.Equal(t, flag, byte(walSeriesSimple)) @@ -204,3 +205,139 @@ func TestWAL_Log_Restore(t *testing.T) { require.NoError(t, w.Close()) } } + +// Test reading from a WAL that has been corrupted through various means. +func TestWALRestoreCorrupted(t *testing.T) { + cases := []struct { + name string + f func(*testing.T, *WAL) + }{ + { + name: "truncate_checksum", + f: func(t *testing.T, w *WAL) { + f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0666) + require.NoError(t, err) + defer f.Close() + + off, err := f.Seek(0, os.SEEK_END) + require.NoError(t, err) + + require.NoError(t, f.Truncate(off-1)) + }, + }, + { + name: "truncate_body", + f: func(t *testing.T, w *WAL) { + f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0666) + require.NoError(t, err) + defer f.Close() + + off, err := f.Seek(0, os.SEEK_END) + require.NoError(t, err) + + require.NoError(t, f.Truncate(off-8)) + }, + }, + { + name: "body_content", + f: func(t *testing.T, w *WAL) { + f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0666) + require.NoError(t, err) + defer f.Close() + + off, err := f.Seek(0, os.SEEK_END) + require.NoError(t, err) + + // Write junk before checksum starts. + _, err = f.WriteAt([]byte{1, 2, 3, 4}, off-8) + require.NoError(t, err) + }, + }, + { + name: "checksum", + f: func(t *testing.T, w *WAL) { + f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0666) + require.NoError(t, err) + defer f.Close() + + off, err := f.Seek(0, os.SEEK_END) + require.NoError(t, err) + + // Write junk into checksum + _, err = f.WriteAt([]byte{1, 2, 3, 4}, off-4) + require.NoError(t, err) + }, + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + // Generate testing data. It does not make semantical sense but + // for the purpose of this test. + dir, err := ioutil.TempDir("", "test_corrupted_checksum") + require.NoError(t, err) + defer os.RemoveAll(dir) + + w, err := OpenWAL(dir, nil, 0) + require.NoError(t, err) + + require.NoError(t, w.Log(nil, []refdSample{{t: 1, v: 2}})) + require.NoError(t, w.Log(nil, []refdSample{{t: 2, v: 3}})) + + require.NoError(t, w.cut()) + + require.NoError(t, w.Log(nil, []refdSample{{t: 3, v: 4}})) + require.NoError(t, w.Log(nil, []refdSample{{t: 5, v: 6}})) + + require.NoError(t, w.Close()) + + // Corrupt the second entry in the first file. + // After re-opening we must be able to read the first entry + // and the rest, including the second file, must be truncated for clean further + // writes. + c.f(t, w) + + logger := log.NewLogfmtLogger(os.Stderr) + + w2, err := OpenWAL(dir, logger, 0) + require.NoError(t, err) + + r := w2.Reader() + + require.True(t, r.Next()) + l, s := r.At() + require.Equal(t, 0, len(l)) + require.Equal(t, []refdSample{{t: 1, v: 2}}, s) + + // Truncation should happen transparently and now cause an error. + require.False(t, r.Next()) + require.Nil(t, r.Err()) + + require.NoError(t, w2.Log(nil, []refdSample{{t: 99, v: 100}})) + require.NoError(t, w2.Close()) + + files, err := fileutil.ReadDir(dir) + require.NoError(t, err) + require.Equal(t, 1, len(files)) + + // We should see the first valid entry and the new one, everything after + // is truncated. + w3, err := OpenWAL(dir, logger, 0) + require.NoError(t, err) + + r = w3.Reader() + + require.True(t, r.Next()) + l, s = r.At() + require.Equal(t, 0, len(l)) + require.Equal(t, []refdSample{{t: 1, v: 2}}, s) + + require.True(t, r.Next()) + l, s = r.At() + require.Equal(t, 0, len(l)) + require.Equal(t, []refdSample{{t: 99, v: 100}}, s) + + require.False(t, r.Next()) + require.Nil(t, r.Err()) + }) + } +}