From bc3b0bd429153ab54662a930df3817e4f29d169e Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 18 Feb 2019 19:05:07 +0000 Subject: [PATCH] Test to corrupt segments mid-WAL, repair and check we can read the correct number of records. (#528) Test to corrupt segments mid-WAL, repair and check we can read the correct number of records. Make segmentBufReader pad short segments with zeros, and only advance curr segment index after fully reading segment. --- testutil/logging.go | 35 ++++++++++++ wal/wal.go | 54 ++++++++++++------- wal/wal_test.go | 126 +++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 194 insertions(+), 21 deletions(-) create mode 100644 testutil/logging.go diff --git a/testutil/logging.go b/testutil/logging.go new file mode 100644 index 000000000..839b86690 --- /dev/null +++ b/testutil/logging.go @@ -0,0 +1,35 @@ +// Copyright 2019 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package testutil + +import ( + "testing" + + "github.com/go-kit/kit/log" +) + +type logger struct { + t *testing.T +} + +// NewLogger returns a gokit compatible Logger which calls t.Log. +func NewLogger(t *testing.T) log.Logger { + return logger{t: t} +} + +// Log implements log.Logger. +func (t logger) Log(keyvals ...interface{}) error { + t.t.Log(keyvals...) + return nil +} diff --git a/wal/wal.go b/wal/wal.go index 1ab633460..c7782564f 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -681,20 +681,20 @@ func NewSegmentsRangeReader(sr ...SegmentRange) (io.ReadCloser, error) { // segmentBufReader is a buffered reader that reads in multiples of pages. // The main purpose is that we are able to track segment and offset for -// corruption reporting. +// corruption reporting. We have to be careful not to increment curr too +// early, as it is used by Reader.Err() to tell Repair which segment is corrupt. +// As such we pad the end of non-page align segments with zeros. type segmentBufReader struct { buf *bufio.Reader segs []*Segment - cur int - off int - more bool + cur int // Index into segs. + off int // Offset of read data into current segment. } func newSegmentBufReader(segs ...*Segment) *segmentBufReader { return &segmentBufReader{ - buf: bufio.NewReaderSize(nil, 16*pageSize), + buf: bufio.NewReaderSize(segs[0], 16*pageSize), segs: segs, - cur: -1, } } @@ -707,25 +707,39 @@ func (r *segmentBufReader) Close() (err error) { return err } +// Read implements io.Reader. func (r *segmentBufReader) Read(b []byte) (n int, err error) { - if !r.more { - if r.cur+1 >= len(r.segs) { - return 0, io.EOF - } - r.cur++ - r.off = 0 - r.more = true - r.buf.Reset(r.segs[r.cur]) - } n, err = r.buf.Read(b) r.off += n - if err != io.EOF { + + // If we succeeded, or hit a non-EOF, we can stop. + if err == nil || err != io.EOF { return n, err } - // Just return what we read so far, but don't signal EOF. - // Only unset more so we don't invalidate the current segment and - // offset before the next read. - r.more = false + + // We hit EOF; fake out zero padding at the end of short segments, so we + // don't increment curr too early and report the wrong segment as corrupt. + if r.off%pageSize != 0 { + i := 0 + for ; n+i < len(b) && (r.off+i)%pageSize != 0; i++ { + b[n+i] = 0 + } + + // Return early, even if we didn't fill b. + r.off += i + return n + i, nil + } + + // There is no more deta left in the curr segment and there are no more + // segments left. Return EOF. + if r.cur+1 >= len(r.segs) { + return n, io.EOF + } + + // Move to next segment. + r.cur++ + r.off = 0 + r.buf.Reset(r.segs[r.cur]) return n, nil } diff --git a/wal/wal_test.go b/wal/wal_test.go index 737c44ca9..c767bb56c 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -24,6 +24,7 @@ import ( "math/rand" "os" "path" + "path/filepath" "sync" "testing" "time" @@ -225,7 +226,7 @@ func TestReader_Live(t *testing.T) { } func TestWAL_FuzzWriteRead_Live(t *testing.T) { - const count = 5000 + const count = 500 var input [][]byte lock := sync.RWMutex{} var recs [][]byte @@ -547,6 +548,129 @@ func TestWAL_Repair(t *testing.T) { } } +// TestCorruptAndCarryOn writes a multi-segment WAL; corrupts the first segment and +// ensures that an error during reading that segment are correctly repaired before +// moving to write more records to the WAL. +func TestCorruptAndCarryOn(t *testing.T) { + dir, err := ioutil.TempDir("", "wal_repair") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + var ( + logger = testutil.NewLogger(t) + segmentSize = pageSize * 3 + recordSize = (pageSize / 3) - recordHeaderSize + ) + + // Produce a WAL with a two segments of 3 pages with 3 records each, + // so when we truncate the file we're guaranteed to split a record. + { + w, err := NewSize(logger, nil, dir, segmentSize) + testutil.Ok(t, err) + + for i := 0; i < 18; i++ { + buf := make([]byte, recordSize) + _, err := rand.Read(buf) + testutil.Ok(t, err) + + err = w.Log(buf) + testutil.Ok(t, err) + } + + err = w.Close() + testutil.Ok(t, err) + } + + // Check all the segments are the correct size. + { + segments, err := listSegments(dir) + testutil.Ok(t, err) + for _, segment := range segments { + f, err := os.OpenFile(filepath.Join(dir, fmt.Sprintf("%08d", segment.index)), os.O_RDONLY, 0666) + testutil.Ok(t, err) + + fi, err := f.Stat() + testutil.Ok(t, err) + + t.Log("segment", segment.index, "size", fi.Size()) + testutil.Equals(t, int64(segmentSize), fi.Size()) + + err = f.Close() + testutil.Ok(t, err) + } + } + + // Truncate the first file, splitting the middle record in the second + // page in half, leaving 4 valid records. + { + f, err := os.OpenFile(filepath.Join(dir, fmt.Sprintf("%08d", 0)), os.O_RDWR, 0666) + testutil.Ok(t, err) + + fi, err := f.Stat() + testutil.Ok(t, err) + testutil.Equals(t, int64(segmentSize), fi.Size()) + + err = f.Truncate(int64(segmentSize / 2)) + testutil.Ok(t, err) + + err = f.Close() + testutil.Ok(t, err) + } + + // Now try and repair this WAL, and write 5 more records to it. + { + sr, err := NewSegmentsReader(dir) + testutil.Ok(t, err) + + reader := NewReader(sr) + i := 0 + for ; i < 4 && reader.Next(); i++ { + testutil.Equals(t, recordSize, len(reader.Record())) + } + testutil.Equals(t, 4, i, "not enough records") + testutil.Assert(t, !reader.Next(), "unexpected record") + + corruptionErr := reader.Err() + testutil.Assert(t, corruptionErr != nil, "expected error") + + err = sr.Close() + testutil.Ok(t, err) + + w, err := NewSize(logger, nil, dir, segmentSize) + testutil.Ok(t, err) + + err = w.Repair(corruptionErr) + testutil.Ok(t, err) + + for i := 0; i < 5; i++ { + buf := make([]byte, recordSize) + _, err := rand.Read(buf) + testutil.Ok(t, err) + + err = w.Log(buf) + testutil.Ok(t, err) + } + + err = w.Close() + testutil.Ok(t, err) + } + + // Replay the WAL. Should get 9 records. + { + sr, err := NewSegmentsReader(dir) + testutil.Ok(t, err) + + reader := NewReader(sr) + i := 0 + for ; i < 9 && reader.Next(); i++ { + testutil.Equals(t, recordSize, len(reader.Record())) + } + testutil.Equals(t, 9, i, "wrong number of records") + testutil.Assert(t, !reader.Next(), "unexpected record") + testutil.Equals(t, nil, reader.Err()) + } +} + func BenchmarkWAL_LogBatched(b *testing.B) { dir, err := ioutil.TempDir("", "bench_logbatch") testutil.Ok(b, err)