diff --git a/util_test.go b/util_test.go new file mode 100644 index 000000000..69af7e9e3 --- /dev/null +++ b/util_test.go @@ -0,0 +1,67 @@ +// The MIT License (MIT) + +// Copyright (c) 2014 Ben Johnson + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: + +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. + +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package tsdb + +import ( + "fmt" + "path/filepath" + "reflect" + "runtime" + "testing" +) + +// Assert fails the test if the condition is false. +func Assert(tb testing.TB, condition bool, msg string, v ...interface{}) { + if !condition { + _, file, line, _ := runtime.Caller(1) + fmt.Printf("\033[31m%s:%d: "+msg+"\033[39m\n\n", append([]interface{}{filepath.Base(file), line}, v...)...) + tb.FailNow() + } +} + +// Ok fails the test if an err is not nil. +func Ok(tb testing.TB, err error) { + if err != nil { + _, file, line, _ := runtime.Caller(1) + fmt.Printf("\033[31m%s:%d: unexpected error: %s\033[39m\n\n", filepath.Base(file), line, err.Error()) + tb.FailNow() + } +} + +// NotOk fails the test if an err is nil. +func NotOk(tb testing.TB, err error) { + if err == nil { + _, file, line, _ := runtime.Caller(1) + fmt.Printf("\033[31m%s:%d: expected error, got nothing \033[39m\n\n", filepath.Base(file), line) + tb.FailNow() + } +} + +// Equals fails the test if exp is not equal to act. +func Equals(tb testing.TB, exp, act interface{}) { + if !reflect.DeepEqual(exp, act) { + _, file, line, _ := runtime.Caller(1) + fmt.Printf("\033[31m%s:%d:\n\n\texp: %#v\n\n\tgot: %#v\033[39m\n\n", filepath.Base(file), line, exp, act) + tb.FailNow() + } +} diff --git a/wal.go b/wal.go index 5c6e78d2e..225851de1 100644 --- a/wal.go +++ b/wal.go @@ -222,12 +222,21 @@ func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration, if err != nil { return nil, err } - for _, fn := range fns { + + for i, fn := range fns { f, err := w.openSegmentFile(fn) - if err != nil { - return nil, err + if err == nil { + w.files = append(w.files, newSegmentFile(f)) + continue } - w.files = append(w.files, newSegmentFile(f)) + level.Warn(logger).Log("msg", "invalid segment file detected, truncating WAL", "err", err, "file", fn) + + for _, fn := range fns[i:] { + if err := os.Remove(fn); err != nil { + return w, errors.Wrap(err, "removing segment failed") + } + } + break } go w.run(flushInterval) diff --git a/wal_test.go b/wal_test.go index aadce89d7..b4fccb981 100644 --- a/wal_test.go +++ b/wal_test.go @@ -15,69 +15,16 @@ package tsdb import ( "encoding/binary" - "fmt" "io/ioutil" "math/rand" "os" "testing" "github.com/go-kit/kit/log" + "github.com/prometheus/tsdb/fileutil" "github.com/stretchr/testify/require" ) -func TestSegmentWAL_Open(t *testing.T) { - tmpdir, err := ioutil.TempDir("", "test_wal_open") - require.NoError(t, err) - defer os.RemoveAll(tmpdir) - - // Create segment files with an appropriate header. - for i := 1; i <= 5; i++ { - metab := make([]byte, 8) - binary.BigEndian.PutUint32(metab[:4], WALMagic) - metab[4] = WALFormatDefault - - f, err := os.Create(fmt.Sprintf("%s/000%d", tmpdir, i)) - require.NoError(t, err) - _, err = f.Write(metab) - require.NoError(t, err) - require.NoError(t, f.Close()) - } - - // Initialize 5 correct segment files. - w, err := OpenSegmentWAL(tmpdir, nil, 0, nil) - require.NoError(t, err) - - require.Equal(t, 5, len(w.files), "unexpected number of segments loaded") - - // Validate that files are locked properly. - for _, of := range w.files { - f, err := os.Open(of.Name()) - require.NoError(t, err, "open locked segment %s", f.Name()) - - _, err = f.Read([]byte{0}) - require.NoError(t, err, "read locked segment %s", f.Name()) - - _, err = f.Write([]byte{0}) - require.Error(t, err, "write to tail segment file %s", f.Name()) - - require.NoError(t, f.Close()) - } - - for _, f := range w.files { - require.NoError(t, f.Close()) - } - - // Make initialization fail by corrupting the header of one file. - f, err := os.OpenFile(w.files[3].Name(), os.O_WRONLY, 0666) - require.NoError(t, err) - - _, err = f.WriteAt([]byte{0}, 4) - require.NoError(t, err) - - w, err = OpenSegmentWAL(tmpdir, nil, 0, nil) - require.Error(t, err, "open with corrupted segments") -} - func TestSegmentWAL_cut(t *testing.T) { tmpdir, err := ioutil.TempDir("", "test_wal_cut") require.NoError(t, err) @@ -312,6 +259,36 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { } } +func TestWALRestoreCorrupted_invalidSegment(t *testing.T) { + dir, err := ioutil.TempDir("", "test_wal_log_restore") + Ok(t, err) + defer os.RemoveAll(dir) + + wal, err := OpenSegmentWAL(dir, nil, 0, nil) + Ok(t, err) + + _, err = wal.createSegmentFile(dir + "/000000") + Ok(t, err) + f, err := wal.createSegmentFile(dir + "/000001") + Ok(t, err) + _, err = wal.createSegmentFile(dir + "/000002") + Ok(t, err) + + // Make header of second segment invalid. + _, err = f.WriteAt([]byte{1, 2, 3, 4}, 0) + Ok(t, err) + Ok(t, f.Close()) + + Ok(t, wal.Close()) + + wal, err = OpenSegmentWAL(dir, log.NewLogfmtLogger(os.Stderr), 0, nil) + Ok(t, err) + + fns, err := fileutil.ReadDir(dir) + Ok(t, err) + Equals(t, []string{"000000"}, fns) +} + // Test reading from a WAL that has been corrupted through various means. func TestWALRestoreCorrupted(t *testing.T) { cases := []struct {