diff --git a/wal/wal.go b/wal/wal.go index 1aae430b4..f1f13c738 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -231,7 +231,7 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi return nil, err } } else { - if w.segment, err = OpenWriteSegment(w.logger, w.dir, j); err != nil { + if w.segment, err = OpenWriteSegment(logger, w.dir, j); err != nil { return nil, err } // Correctly initialize donePages. @@ -713,11 +713,12 @@ func (r *segmentBufReader) Read(b []byte) (n int, err error) { // Reader reads WAL records from an io.Reader. type Reader struct { - rdr io.Reader - err error - rec []byte - buf [pageSize]byte - total int64 // total bytes processed. + rdr io.Reader + err error + rec []byte + buf [pageSize]byte + total int64 // Total bytes processed. + curRecTyp recType // Used for checking that the last record is not torn. } // NewReader returns a new reader. @@ -730,6 +731,12 @@ func NewReader(r io.Reader) *Reader { func (r *Reader) Next() bool { err := r.next() if errors.Cause(err) == io.EOF { + // The last WAL segment record shouldn't be torn(should be full or last). + // The last record would be torn after a crash just before + // the last record part could be persisted to disk. + if recType(r.curRecTyp) == recFirst || recType(r.curRecTyp) == recMiddle { + r.err = errors.New("last record is torn") + } return false } r.err = err @@ -750,12 +757,13 @@ func (r *Reader) next() (err error) { return errors.Wrap(err, "read first header byte") } r.total++ - typ := recType(hdr[0]) + r.curRecTyp = recType(hdr[0]) // Gobble up zero bytes. - if typ == recPageTerm { - // recPageTerm is a single byte that indicates that the rest of the page is padded. - // If it's the first byte in a page, buf is too small and we have to resize buf to fit pageSize-1 bytes. + if r.curRecTyp == recPageTerm { + // recPageTerm is a single byte that indicates the rest of the page is padded. + // If it's the first byte in a page, buf is too small and + // needs to be resized to fit pageSize-1 bytes. buf = r.buf[1:] // We are pedantic and check whether the zeros are actually up @@ -806,7 +814,7 @@ func (r *Reader) next() (err error) { } r.rec = append(r.rec, buf[:length]...) - switch typ { + switch r.curRecTyp { case recFull: if i != 0 { return errors.New("unexpected full record") @@ -826,7 +834,7 @@ func (r *Reader) next() (err error) { } return nil default: - return errors.Errorf("unexpected record type %d", typ) + return errors.Errorf("unexpected record type %d", r.curRecTyp) } // Only increment i for non-zero records since we use it // to determine valid content record sequences. diff --git a/wal/wal_test.go b/wal/wal_test.go index 29b7fb0dc..922126dcf 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -216,44 +216,84 @@ func TestWAL_FuzzWriteRead(t *testing.T) { } func TestWAL_Repair(t *testing.T) { - for name, cf := range map[string]func(f *os.File){ - // Ensures that the page buffer is big enough to fit and entyre page size without panicing. + + for name, test := range map[string]struct { + corrSgm int // Which segment to corrupt. + corrFunc func(f *os.File) // Func that applies the corruption. + intactRecs int // Total expected records left after the repair. + }{ + "torn_last_record": { + 2, + func(f *os.File) { + _, err := f.Seek(pageSize*2, 0) + testutil.Ok(t, err) + _, err = f.Write([]byte{byte(recFirst)}) + testutil.Ok(t, err) + }, + 8, + }, + // Ensures that the page buffer is big enough to fit + // an entire page size without panicing. // https://github.com/prometheus/tsdb/pull/414 - "bad_header": func(f *os.File) { - _, err := f.Seek(pageSize, 0) - testutil.Ok(t, err) - _, err = f.Write([]byte{byte(recPageTerm)}) - testutil.Ok(t, err) + "bad_header": { + 1, + func(f *os.File) { + _, err := f.Seek(pageSize, 0) + testutil.Ok(t, err) + _, err = f.Write([]byte{byte(recPageTerm)}) + testutil.Ok(t, err) + }, + 4, }, - "bad_fragment_sequence": func(f *os.File) { - _, err := f.Seek(pageSize, 0) - testutil.Ok(t, err) - _, err = f.Write([]byte{byte(recLast)}) - testutil.Ok(t, err) + "bad_fragment_sequence": { + 1, + func(f *os.File) { + _, err := f.Seek(pageSize, 0) + testutil.Ok(t, err) + _, err = f.Write([]byte{byte(recLast)}) + testutil.Ok(t, err) + }, + 4, }, - "bad_fragment_flag": func(f *os.File) { - _, err := f.Seek(pageSize, 0) - testutil.Ok(t, err) - _, err = f.Write([]byte{123}) - testutil.Ok(t, err) + "bad_fragment_flag": { + 1, + func(f *os.File) { + _, err := f.Seek(pageSize, 0) + testutil.Ok(t, err) + _, err = f.Write([]byte{123}) + testutil.Ok(t, err) + }, + 4, }, - "bad_checksum": func(f *os.File) { - _, err := f.Seek(pageSize+4, 0) - testutil.Ok(t, err) - _, err = f.Write([]byte{0}) - testutil.Ok(t, err) + "bad_checksum": { + 1, + func(f *os.File) { + _, err := f.Seek(pageSize+4, 0) + testutil.Ok(t, err) + _, err = f.Write([]byte{0}) + testutil.Ok(t, err) + }, + 4, }, - "bad_length": func(f *os.File) { - _, err := f.Seek(pageSize+2, 0) - testutil.Ok(t, err) - _, err = f.Write([]byte{0}) - testutil.Ok(t, err) + "bad_length": { + 1, + func(f *os.File) { + _, err := f.Seek(pageSize+2, 0) + testutil.Ok(t, err) + _, err = f.Write([]byte{0}) + testutil.Ok(t, err) + }, + 4, }, - "bad_content": func(f *os.File) { - _, err := f.Seek(pageSize+100, 0) - testutil.Ok(t, err) - _, err = f.Write([]byte("beef")) - testutil.Ok(t, err) + "bad_content": { + 1, + func(f *os.File) { + _, err := f.Seek(pageSize+100, 0) + testutil.Ok(t, err) + _, err = f.Write([]byte("beef")) + testutil.Ok(t, err) + }, + 4, }, } { t.Run(name, func(t *testing.T) { @@ -261,10 +301,9 @@ func TestWAL_Repair(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(dir) - // We create 3 segments with 3 records each and then corrupt the 2nd record - // of the 2nd segment. - // As a result we want a repaired WAL with the first 4 records intact. - intactRecords := 4 + // We create 3 segments with 3 records each and + // then corrupt a given record in a given segment. + // As a result we want a repaired WAL with given intact records. w, err := NewSize(nil, nil, dir, 3*pageSize) testutil.Ok(t, err) @@ -278,11 +317,11 @@ func TestWAL_Repair(t *testing.T) { } testutil.Ok(t, w.Close()) - f, err := os.OpenFile(SegmentName(dir, 1), os.O_RDWR, 0666) + f, err := os.OpenFile(SegmentName(dir, test.corrSgm), os.O_RDWR, 0666) testutil.Ok(t, err) // Apply corruption function. - cf(f) + test.corrFunc(f) testutil.Ok(t, f.Close()) @@ -315,7 +354,7 @@ func TestWAL_Repair(t *testing.T) { result = append(result, append(b, r.Record()...)) } testutil.Ok(t, r.Err()) - testutil.Equals(t, intactRecords, len(result), "Wrong number of intact records") + testutil.Equals(t, test.intactRecs, len(result), "Wrong number of intact records") for i, r := range result { if !bytes.Equal(records[i], r) {