return an error when the last wal segment record is torn. (#451)

* return an error when the  last wal segment record is torn.

this ensures that a repair will be run when the last record in a segment
is torn.

Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com>
This commit is contained in:
Krasi Georgiev 2018-11-28 15:15:11 +02:00 committed by GitHub
parent fb32ef6000
commit 24520727a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 98 additions and 51 deletions

View File

@ -231,7 +231,7 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi
return nil, err return nil, err
} }
} else { } else {
if w.segment, err = OpenWriteSegment(w.logger, w.dir, j); err != nil { if w.segment, err = OpenWriteSegment(logger, w.dir, j); err != nil {
return nil, err return nil, err
} }
// Correctly initialize donePages. // Correctly initialize donePages.
@ -713,11 +713,12 @@ func (r *segmentBufReader) Read(b []byte) (n int, err error) {
// Reader reads WAL records from an io.Reader. // Reader reads WAL records from an io.Reader.
type Reader struct { type Reader struct {
rdr io.Reader rdr io.Reader
err error err error
rec []byte rec []byte
buf [pageSize]byte buf [pageSize]byte
total int64 // total bytes processed. total int64 // Total bytes processed.
curRecTyp recType // Used for checking that the last record is not torn.
} }
// NewReader returns a new reader. // NewReader returns a new reader.
@ -730,6 +731,12 @@ func NewReader(r io.Reader) *Reader {
func (r *Reader) Next() bool { func (r *Reader) Next() bool {
err := r.next() err := r.next()
if errors.Cause(err) == io.EOF { if errors.Cause(err) == io.EOF {
// The last WAL segment record shouldn't be torn(should be full or last).
// The last record would be torn after a crash just before
// the last record part could be persisted to disk.
if recType(r.curRecTyp) == recFirst || recType(r.curRecTyp) == recMiddle {
r.err = errors.New("last record is torn")
}
return false return false
} }
r.err = err r.err = err
@ -750,12 +757,13 @@ func (r *Reader) next() (err error) {
return errors.Wrap(err, "read first header byte") return errors.Wrap(err, "read first header byte")
} }
r.total++ r.total++
typ := recType(hdr[0]) r.curRecTyp = recType(hdr[0])
// Gobble up zero bytes. // Gobble up zero bytes.
if typ == recPageTerm { if r.curRecTyp == recPageTerm {
// recPageTerm is a single byte that indicates that the rest of the page is padded. // recPageTerm is a single byte that indicates the rest of the page is padded.
// If it's the first byte in a page, buf is too small and we have to resize buf to fit pageSize-1 bytes. // If it's the first byte in a page, buf is too small and
// needs to be resized to fit pageSize-1 bytes.
buf = r.buf[1:] buf = r.buf[1:]
// We are pedantic and check whether the zeros are actually up // We are pedantic and check whether the zeros are actually up
@ -806,7 +814,7 @@ func (r *Reader) next() (err error) {
} }
r.rec = append(r.rec, buf[:length]...) r.rec = append(r.rec, buf[:length]...)
switch typ { switch r.curRecTyp {
case recFull: case recFull:
if i != 0 { if i != 0 {
return errors.New("unexpected full record") return errors.New("unexpected full record")
@ -826,7 +834,7 @@ func (r *Reader) next() (err error) {
} }
return nil return nil
default: default:
return errors.Errorf("unexpected record type %d", typ) return errors.Errorf("unexpected record type %d", r.curRecTyp)
} }
// Only increment i for non-zero records since we use it // Only increment i for non-zero records since we use it
// to determine valid content record sequences. // to determine valid content record sequences.

View File

@ -216,44 +216,84 @@ func TestWAL_FuzzWriteRead(t *testing.T) {
} }
func TestWAL_Repair(t *testing.T) { func TestWAL_Repair(t *testing.T) {
for name, cf := range map[string]func(f *os.File){
// Ensures that the page buffer is big enough to fit and entyre page size without panicing. for name, test := range map[string]struct {
corrSgm int // Which segment to corrupt.
corrFunc func(f *os.File) // Func that applies the corruption.
intactRecs int // Total expected records left after the repair.
}{
"torn_last_record": {
2,
func(f *os.File) {
_, err := f.Seek(pageSize*2, 0)
testutil.Ok(t, err)
_, err = f.Write([]byte{byte(recFirst)})
testutil.Ok(t, err)
},
8,
},
// Ensures that the page buffer is big enough to fit
// an entire page size without panicing.
// https://github.com/prometheus/tsdb/pull/414 // https://github.com/prometheus/tsdb/pull/414
"bad_header": func(f *os.File) { "bad_header": {
_, err := f.Seek(pageSize, 0) 1,
testutil.Ok(t, err) func(f *os.File) {
_, err = f.Write([]byte{byte(recPageTerm)}) _, err := f.Seek(pageSize, 0)
testutil.Ok(t, err) testutil.Ok(t, err)
_, err = f.Write([]byte{byte(recPageTerm)})
testutil.Ok(t, err)
},
4,
}, },
"bad_fragment_sequence": func(f *os.File) { "bad_fragment_sequence": {
_, err := f.Seek(pageSize, 0) 1,
testutil.Ok(t, err) func(f *os.File) {
_, err = f.Write([]byte{byte(recLast)}) _, err := f.Seek(pageSize, 0)
testutil.Ok(t, err) testutil.Ok(t, err)
_, err = f.Write([]byte{byte(recLast)})
testutil.Ok(t, err)
},
4,
}, },
"bad_fragment_flag": func(f *os.File) { "bad_fragment_flag": {
_, err := f.Seek(pageSize, 0) 1,
testutil.Ok(t, err) func(f *os.File) {
_, err = f.Write([]byte{123}) _, err := f.Seek(pageSize, 0)
testutil.Ok(t, err) testutil.Ok(t, err)
_, err = f.Write([]byte{123})
testutil.Ok(t, err)
},
4,
}, },
"bad_checksum": func(f *os.File) { "bad_checksum": {
_, err := f.Seek(pageSize+4, 0) 1,
testutil.Ok(t, err) func(f *os.File) {
_, err = f.Write([]byte{0}) _, err := f.Seek(pageSize+4, 0)
testutil.Ok(t, err) testutil.Ok(t, err)
_, err = f.Write([]byte{0})
testutil.Ok(t, err)
},
4,
}, },
"bad_length": func(f *os.File) { "bad_length": {
_, err := f.Seek(pageSize+2, 0) 1,
testutil.Ok(t, err) func(f *os.File) {
_, err = f.Write([]byte{0}) _, err := f.Seek(pageSize+2, 0)
testutil.Ok(t, err) testutil.Ok(t, err)
_, err = f.Write([]byte{0})
testutil.Ok(t, err)
},
4,
}, },
"bad_content": func(f *os.File) { "bad_content": {
_, err := f.Seek(pageSize+100, 0) 1,
testutil.Ok(t, err) func(f *os.File) {
_, err = f.Write([]byte("beef")) _, err := f.Seek(pageSize+100, 0)
testutil.Ok(t, err) testutil.Ok(t, err)
_, err = f.Write([]byte("beef"))
testutil.Ok(t, err)
},
4,
}, },
} { } {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
@ -261,10 +301,9 @@ func TestWAL_Repair(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
// We create 3 segments with 3 records each and then corrupt the 2nd record // We create 3 segments with 3 records each and
// of the 2nd segment. // then corrupt a given record in a given segment.
// As a result we want a repaired WAL with the first 4 records intact. // As a result we want a repaired WAL with given intact records.
intactRecords := 4
w, err := NewSize(nil, nil, dir, 3*pageSize) w, err := NewSize(nil, nil, dir, 3*pageSize)
testutil.Ok(t, err) testutil.Ok(t, err)
@ -278,11 +317,11 @@ func TestWAL_Repair(t *testing.T) {
} }
testutil.Ok(t, w.Close()) testutil.Ok(t, w.Close())
f, err := os.OpenFile(SegmentName(dir, 1), os.O_RDWR, 0666) f, err := os.OpenFile(SegmentName(dir, test.corrSgm), os.O_RDWR, 0666)
testutil.Ok(t, err) testutil.Ok(t, err)
// Apply corruption function. // Apply corruption function.
cf(f) test.corrFunc(f)
testutil.Ok(t, f.Close()) testutil.Ok(t, f.Close())
@ -315,7 +354,7 @@ func TestWAL_Repair(t *testing.T) {
result = append(result, append(b, r.Record()...)) result = append(result, append(b, r.Record()...))
} }
testutil.Ok(t, r.Err()) testutil.Ok(t, r.Err())
testutil.Equals(t, intactRecords, len(result), "Wrong number of intact records") testutil.Equals(t, test.intactRecs, len(result), "Wrong number of intact records")
for i, r := range result { for i, r := range result {
if !bytes.Equal(records[i], r) { if !bytes.Equal(records[i], r) {