Truncate segments on broken header
This commit is contained in:
parent
e59b7b8ac4
commit
80055bb95b
|
@ -0,0 +1,67 @@
|
||||||
|
// The MIT License (MIT)
|
||||||
|
|
||||||
|
// Copyright (c) 2014 Ben Johnson
|
||||||
|
|
||||||
|
// Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
|
// of this software and associated documentation files (the "Software"), to deal
|
||||||
|
// in the Software without restriction, including without limitation the rights
|
||||||
|
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
|
// copies of the Software, and to permit persons to whom the Software is
|
||||||
|
// furnished to do so, subject to the following conditions:
|
||||||
|
|
||||||
|
// The above copyright notice and this permission notice shall be included in all
|
||||||
|
// copies or substantial portions of the Software.
|
||||||
|
|
||||||
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||||
|
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||||
|
// SOFTWARE.
|
||||||
|
|
||||||
|
package tsdb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"path/filepath"
|
||||||
|
"reflect"
|
||||||
|
"runtime"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Assert fails the test if the condition is false.
|
||||||
|
func Assert(tb testing.TB, condition bool, msg string, v ...interface{}) {
|
||||||
|
if !condition {
|
||||||
|
_, file, line, _ := runtime.Caller(1)
|
||||||
|
fmt.Printf("\033[31m%s:%d: "+msg+"\033[39m\n\n", append([]interface{}{filepath.Base(file), line}, v...)...)
|
||||||
|
tb.FailNow()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ok fails the test if an err is not nil.
|
||||||
|
func Ok(tb testing.TB, err error) {
|
||||||
|
if err != nil {
|
||||||
|
_, file, line, _ := runtime.Caller(1)
|
||||||
|
fmt.Printf("\033[31m%s:%d: unexpected error: %s\033[39m\n\n", filepath.Base(file), line, err.Error())
|
||||||
|
tb.FailNow()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NotOk fails the test if an err is nil.
|
||||||
|
func NotOk(tb testing.TB, err error) {
|
||||||
|
if err == nil {
|
||||||
|
_, file, line, _ := runtime.Caller(1)
|
||||||
|
fmt.Printf("\033[31m%s:%d: expected error, got nothing \033[39m\n\n", filepath.Base(file), line)
|
||||||
|
tb.FailNow()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Equals fails the test if exp is not equal to act.
|
||||||
|
func Equals(tb testing.TB, exp, act interface{}) {
|
||||||
|
if !reflect.DeepEqual(exp, act) {
|
||||||
|
_, file, line, _ := runtime.Caller(1)
|
||||||
|
fmt.Printf("\033[31m%s:%d:\n\n\texp: %#v\n\n\tgot: %#v\033[39m\n\n", filepath.Base(file), line, exp, act)
|
||||||
|
tb.FailNow()
|
||||||
|
}
|
||||||
|
}
|
17
wal.go
17
wal.go
|
@ -222,12 +222,21 @@ func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration,
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
for _, fn := range fns {
|
|
||||||
|
for i, fn := range fns {
|
||||||
f, err := w.openSegmentFile(fn)
|
f, err := w.openSegmentFile(fn)
|
||||||
if err != nil {
|
if err == nil {
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
w.files = append(w.files, newSegmentFile(f))
|
w.files = append(w.files, newSegmentFile(f))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
level.Warn(logger).Log("msg", "invalid segment file detected, truncating WAL", "err", err, "file", fn)
|
||||||
|
|
||||||
|
for _, fn := range fns[i:] {
|
||||||
|
if err := os.Remove(fn); err != nil {
|
||||||
|
return w, errors.Wrap(err, "removing segment failed")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
go w.run(flushInterval)
|
go w.run(flushInterval)
|
||||||
|
|
85
wal_test.go
85
wal_test.go
|
@ -15,69 +15,16 @@ package tsdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"fmt"
|
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/go-kit/kit/log"
|
"github.com/go-kit/kit/log"
|
||||||
|
"github.com/prometheus/tsdb/fileutil"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestSegmentWAL_Open(t *testing.T) {
|
|
||||||
tmpdir, err := ioutil.TempDir("", "test_wal_open")
|
|
||||||
require.NoError(t, err)
|
|
||||||
defer os.RemoveAll(tmpdir)
|
|
||||||
|
|
||||||
// Create segment files with an appropriate header.
|
|
||||||
for i := 1; i <= 5; i++ {
|
|
||||||
metab := make([]byte, 8)
|
|
||||||
binary.BigEndian.PutUint32(metab[:4], WALMagic)
|
|
||||||
metab[4] = WALFormatDefault
|
|
||||||
|
|
||||||
f, err := os.Create(fmt.Sprintf("%s/000%d", tmpdir, i))
|
|
||||||
require.NoError(t, err)
|
|
||||||
_, err = f.Write(metab)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.NoError(t, f.Close())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Initialize 5 correct segment files.
|
|
||||||
w, err := OpenSegmentWAL(tmpdir, nil, 0, nil)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
require.Equal(t, 5, len(w.files), "unexpected number of segments loaded")
|
|
||||||
|
|
||||||
// Validate that files are locked properly.
|
|
||||||
for _, of := range w.files {
|
|
||||||
f, err := os.Open(of.Name())
|
|
||||||
require.NoError(t, err, "open locked segment %s", f.Name())
|
|
||||||
|
|
||||||
_, err = f.Read([]byte{0})
|
|
||||||
require.NoError(t, err, "read locked segment %s", f.Name())
|
|
||||||
|
|
||||||
_, err = f.Write([]byte{0})
|
|
||||||
require.Error(t, err, "write to tail segment file %s", f.Name())
|
|
||||||
|
|
||||||
require.NoError(t, f.Close())
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, f := range w.files {
|
|
||||||
require.NoError(t, f.Close())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Make initialization fail by corrupting the header of one file.
|
|
||||||
f, err := os.OpenFile(w.files[3].Name(), os.O_WRONLY, 0666)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
_, err = f.WriteAt([]byte{0}, 4)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
w, err = OpenSegmentWAL(tmpdir, nil, 0, nil)
|
|
||||||
require.Error(t, err, "open with corrupted segments")
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestSegmentWAL_cut(t *testing.T) {
|
func TestSegmentWAL_cut(t *testing.T) {
|
||||||
tmpdir, err := ioutil.TempDir("", "test_wal_cut")
|
tmpdir, err := ioutil.TempDir("", "test_wal_cut")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -312,6 +259,36 @@ func TestSegmentWAL_Log_Restore(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWALRestoreCorrupted_invalidSegment(t *testing.T) {
|
||||||
|
dir, err := ioutil.TempDir("", "test_wal_log_restore")
|
||||||
|
Ok(t, err)
|
||||||
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
|
wal, err := OpenSegmentWAL(dir, nil, 0, nil)
|
||||||
|
Ok(t, err)
|
||||||
|
|
||||||
|
_, err = wal.createSegmentFile(dir + "/000000")
|
||||||
|
Ok(t, err)
|
||||||
|
f, err := wal.createSegmentFile(dir + "/000001")
|
||||||
|
Ok(t, err)
|
||||||
|
_, err = wal.createSegmentFile(dir + "/000002")
|
||||||
|
Ok(t, err)
|
||||||
|
|
||||||
|
// Make header of second segment invalid.
|
||||||
|
_, err = f.WriteAt([]byte{1, 2, 3, 4}, 0)
|
||||||
|
Ok(t, err)
|
||||||
|
Ok(t, f.Close())
|
||||||
|
|
||||||
|
Ok(t, wal.Close())
|
||||||
|
|
||||||
|
wal, err = OpenSegmentWAL(dir, log.NewLogfmtLogger(os.Stderr), 0, nil)
|
||||||
|
Ok(t, err)
|
||||||
|
|
||||||
|
fns, err := fileutil.ReadDir(dir)
|
||||||
|
Ok(t, err)
|
||||||
|
Equals(t, []string{"000000"}, fns)
|
||||||
|
}
|
||||||
|
|
||||||
// Test reading from a WAL that has been corrupted through various means.
|
// Test reading from a WAL that has been corrupted through various means.
|
||||||
func TestWALRestoreCorrupted(t *testing.T) {
|
func TestWALRestoreCorrupted(t *testing.T) {
|
||||||
cases := []struct {
|
cases := []struct {
|
||||||
|
|
Loading…
Reference in New Issue